Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "encoding/binary"
9 : "fmt"
10 : "io"
11 : "slices"
12 : "strconv"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/bytealloc"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/internal/rangekey"
20 : "github.com/cockroachdb/redact"
21 : )
22 :
23 : // Iter provides a forward-only iterator that encapsulates the logic for
24 : // collapsing entries during compaction. It wraps an internal iterator and
25 : // collapses entries that are no longer necessary because they are shadowed by
26 : // newer entries. The simplest example of this is when the internal iterator
27 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
28 : // compact.Iter collapses the second entry because it is no longer necessary.
29 : // The high-level structure for compact.Iter is to iterate over its internal
30 : // iterator and output 1 entry for every user-key. There are four complications
31 : // to this story.
32 : //
33 : // 1. Eliding Deletion Tombstones
34 : //
35 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
36 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
37 : // shadows an entry at a lower level. If we're compacting to the base-level in
38 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
39 : // level and can be elided.
40 : //
41 : // We can do slightly better than only eliding deletion tombstones at the base
42 : // level by observing that we can elide a deletion tombstone if there are no
43 : // sstables that contain the entry's key. This check is performed by
44 : // elideTombstone.
45 : //
46 : // 2. Merges
47 : //
48 : // The MERGE operation merges the value for an entry with the existing value
49 : // for an entry. The logical value of an entry can be composed of a series of
50 : // merge operations. When compact.Iter sees a MERGE, it scans forward in its
51 : // internal iterator collapsing MERGE operations for the same key until it
52 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
53 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
54 : // merged using the specified Merger.
55 : //
56 : // An interesting case here occurs when MERGE is combined with SET. Consider
57 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
58 : // reason that the kind is changed to SET is because the SET operation acts as
59 : // a barrier preventing further merging. This can be seen better in the
60 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
61 : // (older) level and not involved in the compaction. If the compaction of
62 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
63 : // a.MERGE.1 would merge the values together incorrectly.
64 : //
65 : // 3. Snapshots
66 : //
67 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
68 : // a snapshot is a sequence number along with a guarantee from Pebble that it
69 : // will maintain the view of the database at that sequence number. Part of this
70 : // guarantee is relatively straightforward to achieve. When reading from the
71 : // database Pebble will ignore sequence numbers that are larger than the
72 : // snapshot sequence number. The primary complexity with snapshots occurs
73 : // during compaction: the collapsing of entries that are shadowed by newer
74 : // entries is at odds with the guarantee that Pebble will maintain the view of
75 : // the database at the snapshot sequence number. Rather than collapsing entries
76 : // up to the next user key, compact.Iter can only collapse entries up to the
77 : // next snapshot boundary. That is, every snapshot boundary potentially causes
78 : // another entry for the same user-key to be emitted. Another way to view this
79 : // is that snapshots define stripes and entries are collapsed within stripes,
80 : // but not across stripes. Consider the following scenario:
81 : //
82 : // a.PUT.9
83 : // a.DEL.8
84 : // a.PUT.7
85 : // a.DEL.6
86 : // a.PUT.5
87 : //
88 : // In the absence of snapshots these entries would be collapsed to
89 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
90 : // be divided into two stripes and collapsed within the stripes:
91 : //
92 : // a.PUT.9 a.PUT.9
93 : // a.DEL.8 --->
94 : // a.PUT.7
95 : // -- --
96 : // a.DEL.6 ---> a.DEL.6
97 : // a.PUT.5
98 : //
99 : // All of the rules described earlier still apply, but they are confined to
100 : // operate within a snapshot stripe. Snapshots only affect compaction when the
101 : // snapshot sequence number lies within the range of sequence numbers being
102 : // compacted. In the above example, a snapshot at sequence number 10 or at
103 : // sequence number 5 would not have any effect.
104 : //
105 : // 4. Range Deletions
106 : //
107 : // Range deletions provide the ability to delete all of the keys (and values)
108 : // in a contiguous range. Range deletions are stored indexed by their start
109 : // key. The end key of the range is stored in the value. In order to support
110 : // lookup of the range deletions which overlap with a particular key, the range
111 : // deletion tombstones need to be fragmented whenever they overlap. This
112 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
113 : // subject to the rules for snapshots. For example, consider the two range
114 : // tombstones [a,e)#1 and [c,g)#2:
115 : //
116 : // 2: c-------g
117 : // 1: a-------e
118 : //
119 : // These tombstones will be fragmented into:
120 : //
121 : // 2: c---e---g
122 : // 1: a---c---e
123 : //
124 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
125 : // depends on whether it is in a new snapshot stripe.
126 : //
127 : // In addition to the fragmentation of range tombstones, compaction also needs
128 : // to take the range tombstones into consideration when outputting normal
129 : // keys. Just as with point deletions, a range deletion covering an entry can
130 : // cause the entry to be elided.
131 : //
132 : // A note on the stability of keys and values.
133 : //
134 : // The stability guarantees of keys and values returned by the iterator tree
135 : // that backs a compact.Iter is nuanced and care must be taken when
136 : // referencing any returned items.
137 : //
138 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
139 : // lifetimes that fall into two categories:
140 : //
141 : // Lifetime valid for duration of compaction. Range deletion keys and values are
142 : // stable for the duration of the compaction, due to way in which a
143 : // compact.Iter is typically constructed (i.e. via (*compaction).newInputIter,
144 : // which wraps the iterator over the range deletion block in a noCloseIter,
145 : // preventing the release of the backing memory until the compaction is
146 : // finished).
147 : //
148 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
149 : // etc.) and values must be cloned / copied following the return from the
150 : // exported function, and before a subsequent call to Next advances the iterator
151 : // and mutates the contents of the returned key and value.
152 : type Iter struct {
153 : cmp base.Compare
154 :
155 : cfg IterConfig
156 :
157 : iter base.InternalIterator
158 : err error
159 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKV.UserKey`
160 : // and `key.Trailer` is set to `i.iterKV.Trailer`. This is the
161 : // case on return from all public methods -- these methods return `key`.
162 : // Additionally, it is the internal state when the code is moving to the
163 : // next key so it can determine whether the user key has changed from
164 : // the previous key.
165 : key base.InternalKey
166 : // keyTrailer is updated when `i.key` is updated and holds the key's
167 : // original trailer (eg, before any sequence-number zeroing or changes to
168 : // key kind).
169 : keyTrailer uint64
170 : value []byte
171 : valueCloser io.Closer
172 : // Temporary buffer used for storing the previous user key in order to
173 : // determine when iteration has advanced to a new user key and thus a new
174 : // snapshot stripe.
175 : keyBuf []byte
176 : // Temporary buffer used for storing the previous value, which may be an
177 : // unsafe, i.iter-owned slice that could be altered when the iterator is
178 : // advanced.
179 : valueBuf []byte
180 : // Is the current entry valid?
181 : valid bool
182 : iterKV *base.InternalKV
183 : iterValue []byte
184 : iterStripeChange stripeChangeType
185 : // skip indicates whether the remaining entries in the current snapshot
186 : // stripe should be skipped or processed. `skip` has no effect when `pos ==
187 : // iterPosNext`.
188 : skip bool
189 : // pos indicates the iterator position at the top of Next():
190 : // - iterPosCurForward: the iterator is at the last key returned.
191 : // - iterPosNext: the iterator has already been advanced to the next
192 : // candidate key. For example, this happens when processing merge operands,
193 : // where we advance the iterator all the way into the next stripe or next
194 : // user key to ensure we've seen all mergeable operands.
195 : pos iterPos
196 : // snapshotPinned indicates whether the last point key returned by the
197 : // compaction iterator was only returned because an open snapshot prevents
198 : // its elision. This field only applies to point keys, and not to range
199 : // deletions or range keys.
200 : snapshotPinned bool
201 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
202 : // snapshotPinned is true. This value is true when the point is obsolete due
203 : // to a RANGEDEL but could not be deleted due to a snapshot.
204 : //
205 : // NB: it may seem that the additional cases that snapshotPinned captures
206 : // are harmless in that they can also be used to mark a point as obsolete
207 : // (it is merely a duplication of some logic that happens in
208 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
209 : // writing -- snapshotPinned originated in stats collection and for a
210 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
211 : // due to a snapshot, the snapshotPinned value for the SET is true.
212 : //
213 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
214 : // whether we need forceObsoleteDueToRangeDel.
215 : forceObsoleteDueToRangeDel bool
216 : // The index of the snapshot for the current key within the snapshots slice.
217 : curSnapshotIdx int
218 : curSnapshotSeqNum uint64
219 : // frontiers holds a heap of user keys that affect compaction behavior when
220 : // they're exceeded. Before a new key is returned, the compaction iterator
221 : // advances the frontier, notifying any code that subscribed to be notified
222 : // when a key was reached. The primary use today is within the
223 : // implementation of compactionOutputSplitters in compaction.go. Many of
224 : // these splitters wait for the compaction iterator to call Advance(k) when
225 : // it's returning a new key. If the key that they're waiting for is
226 : // surpassed, these splitters update internal state recording that they
227 : // should request a compaction split next time they're asked in
228 : // [shouldSplitBefore].
229 : frontiers Frontiers
230 : // The fragmented tombstones.
231 : tombstones []keyspan.Span
232 : // The fragmented range keys.
233 : rangeKeys []keyspan.Span
234 : // Byte allocator for the tombstone keys.
235 : alloc bytealloc.A
236 : stats IterStats
237 : }
238 :
239 : // IterConfig contains the parameters necessary to create a compaction iterator.
240 : type IterConfig struct {
241 : Cmp base.Compare
242 : Equal base.Equal
243 : Merge base.Merge
244 :
245 : // The snapshot sequence numbers that need to be maintained. These sequence
246 : // numbers define the snapshot stripes.
247 : Snapshots Snapshots
248 :
249 : // AllowZeroSeqNum allows the sequence number of KVs in the bottom snapshot
250 : // stripe to be simplified to 0 (which improves compression and enables an
251 : // optimization during forward iteration). This can be enabled if there are no
252 : // tables overlapping the output at lower levels (than the output) in the LSM.
253 : AllowZeroSeqNum bool
254 :
255 : // ElideTombstone returns true if it is ok to elide a tombstone for the
256 : // specified key. A return value of true guarantees that there are no
257 : // key/value pairs at a lower level (than the output) that possibly contain
258 : // the specified user key. The keys in multiple invocations to ElideTombstone
259 : // must be supplied in order.
260 : ElideTombstone func(key []byte) bool
261 :
262 : // ElideRangeTombstone returns true if it is ok to elide the specified range
263 : // tombstone. A return value of true guarantees that there are no key/value
264 : // pairs at a lower level (than the output) that possibly overlap the
265 : // specified tombstone.
266 : ElideRangeTombstone func(start, end []byte) bool
267 :
268 : // IneffectualPointDeleteCallback is called if a SINGLEDEL is being elided
269 : // without deleting a point set/merge.
270 : IneffectualSingleDeleteCallback func(userKey []byte)
271 :
272 : // SingleDeleteInvariantViolationCallback is called in compactions/flushes if any
273 : // single delete has consumed a Set/Merge, and there is another immediately older
274 : // Set/SetWithDelete/Merge. The user of Pebble has violated the invariant under
275 : // which SingleDelete can be used correctly.
276 : SingleDeleteInvariantViolationCallback func(userKey []byte)
277 : }
278 :
279 2 : func (c *IterConfig) ensureDefaults() {
280 2 : if c.IneffectualSingleDeleteCallback == nil {
281 2 : c.IneffectualSingleDeleteCallback = func(userKey []byte) {}
282 : }
283 : }
284 :
285 : // IterStats are statistics produced by the compaction iterator.
286 : type IterStats struct {
287 : // Count of DELSIZED keys that were missized.
288 : CountMissizedDels uint64
289 : }
290 :
291 : type iterPos int8
292 :
293 : const (
294 : iterPosCurForward iterPos = 0
295 : iterPosNext iterPos = 1
296 : )
297 :
298 : // NewIter creates a new compaction iterator. See the comment for Iter for a
299 : // detailed description.
300 2 : func NewIter(cfg IterConfig, iter base.InternalIterator) *Iter {
301 2 : cfg.ensureDefaults()
302 2 : i := &Iter{
303 2 : cmp: cfg.Cmp,
304 2 : cfg: cfg,
305 2 : iter: iter,
306 2 : }
307 2 : i.frontiers.Init(cfg.Cmp)
308 2 : return i
309 2 : }
310 :
311 : // Frontiers returns the frontiers for the compaction iterator.
312 2 : func (i *Iter) Frontiers() *Frontiers {
313 2 : return &i.frontiers
314 2 : }
315 :
316 : // SnapshotPinned returns whether the last point key returned by the compaction
317 : // iterator was only returned because an open snapshot prevents its elision.
318 : // This field only applies to point keys, and not to range deletions or range
319 : // keys.
320 2 : func (i *Iter) SnapshotPinned() bool {
321 2 : return i.snapshotPinned
322 2 : }
323 :
324 : // ForceObsoleteDueToRangeDel returns true in a subset of the cases when
325 : // SnapshotPinned returns true. This value is true when the point is obsolete
326 : // due to a RANGEDEL but could not be deleted due to a snapshot.
327 2 : func (i *Iter) ForceObsoleteDueToRangeDel() bool {
328 2 : return i.forceObsoleteDueToRangeDel
329 2 : }
330 :
331 : // Stats returns the compaction iterator stats.
332 2 : func (i *Iter) Stats() IterStats {
333 2 : return i.stats
334 2 : }
335 :
336 : // First has the same semantics as InternalIterator.First.
337 2 : func (i *Iter) First() (*base.InternalKey, []byte) {
338 2 : if i.err != nil {
339 0 : return nil, nil
340 0 : }
341 2 : i.iterKV = i.iter.First()
342 2 : if i.iterKV != nil {
343 2 : i.iterValue, _, i.err = i.iterKV.Value(nil)
344 2 : if i.err != nil {
345 0 : return nil, nil
346 0 : }
347 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(i.iterKV.SeqNum())
348 : }
349 2 : i.pos = iterPosNext
350 2 : i.iterStripeChange = newStripeNewKey
351 2 : return i.Next()
352 : }
353 :
354 : // Next has the same semantics as InternalIterator.Next.
355 2 : func (i *Iter) Next() (*base.InternalKey, []byte) {
356 2 : if i.err != nil {
357 1 : return nil, nil
358 1 : }
359 :
360 : // Close the closer for the current value if one was open.
361 2 : if i.closeValueCloser() != nil {
362 0 : return nil, nil
363 0 : }
364 :
365 : // Prior to this call to `Next()` we are in one of three situations with
366 : // respect to `iterKey` and related state:
367 : //
368 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
369 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
370 : // To move forward we advance by one key, even if that lands us in the same
371 : // snapshot stripe.
372 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
373 : // To move forward we skip skippable entries in the stripe.
374 2 : if i.pos == iterPosCurForward {
375 2 : if i.skip {
376 2 : i.skipInStripe()
377 2 : } else {
378 2 : i.nextInStripe()
379 2 : }
380 2 : } else if i.skip {
381 0 : panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
382 : }
383 :
384 2 : i.pos = iterPosCurForward
385 2 : i.valid = false
386 2 :
387 2 : for i.iterKV != nil {
388 2 : // If we entered a new snapshot stripe with the same key, any key we
389 2 : // return on this iteration is only returned because the open snapshot
390 2 : // prevented it from being elided or merged with the key returned for
391 2 : // the previous stripe. Mark it as pinned so that the compaction loop
392 2 : // can correctly populate output tables' pinned statistics. We might
393 2 : // also set snapshotPinned=true down below if we observe that the key is
394 2 : // deleted by a range deletion in a higher stripe or that this key is a
395 2 : // tombstone that could be elided if only it were in the last snapshot
396 2 : // stripe.
397 2 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
398 2 :
399 2 : if i.iterKV.Kind() == base.InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKV.Kind()) {
400 2 : // Return the span so the compaction can use it for file truncation and add
401 2 : // it to the relevant fragmenter. In the case of range deletions, we do not
402 2 : // set `skip` to true before returning as there may be any number of point
403 2 : // keys with the same user key and sequence numbers ≥ the range deletion's
404 2 : // sequence number. Such point keys must be visible (i.e., not skipped
405 2 : // over) since we promise point keys are not deleted by range tombstones at
406 2 : // the same sequence number (or higher).
407 2 : //
408 2 : // Note that `skip` must already be false here, because range keys and range
409 2 : // deletions are interleaved at the maximal sequence numbers and neither will
410 2 : // set `skip`=true.
411 2 : if i.skip {
412 0 : panic(errors.AssertionFailedf("pebble: compaction iterator: skip unexpectedly true"))
413 : }
414 :
415 : // NOTE: there is a subtle invariant violation here in that calling
416 : // saveKey and returning a reference to the temporary slice violates
417 : // the stability guarantee for range deletion keys. A potential
418 : // mediation could return the original iterKey and iterValue
419 : // directly, as the backing memory is guaranteed to be stable until
420 : // the compaction completes. The violation here is only minor in
421 : // that the caller immediately clones the range deletion InternalKey
422 : // when passing the key to the deletion fragmenter (see the
423 : // call-site in compaction.go).
424 : // TODO(travers): address this violation by removing the call to
425 : // saveKey and instead return the original iterKey and iterValue.
426 : // This goes against the comment on i.key in the struct, and
427 : // therefore warrants some investigation.
428 2 : i.saveKey()
429 2 : // TODO(jackson): Handle tracking pinned statistics for range keys
430 2 : // and range deletions. This would require updating
431 2 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
432 2 : // statistics when they apply their own snapshot striping logic.
433 2 : i.snapshotPinned = false
434 2 : i.value = i.iterValue
435 2 : i.valid = true
436 2 : return &i.key, i.value
437 : }
438 :
439 : // Check if the last tombstone covers the key.
440 : // TODO(sumeer): we could avoid calling tombstoneCovers if
441 : // i.iterStripeChange == sameStripeSameKey since that check has already been
442 : // done in nextInStripeHelper. However, we also need to handle the case of
443 : // CoversInvisibly below.
444 2 : switch i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) {
445 2 : case coversVisibly:
446 2 : // A pending range deletion deletes this key. Skip it.
447 2 : i.saveKey()
448 2 : i.skipInStripe()
449 2 : continue
450 :
451 2 : case coversInvisibly:
452 2 : // i.iterKV would be deleted by a range deletion if there weren't any open
453 2 : // snapshots. Mark it as pinned.
454 2 : //
455 2 : // NB: there are multiple places in this file where we check for a
456 2 : // covering tombstone and this is the only one where we are writing to
457 2 : // i.snapshotPinned. Those other cases occur in mergeNext where the caller
458 2 : // is deciding whether the value should be merged or not, and the key is
459 2 : // in the same snapshot stripe. Hence, snapshotPinned is by definition
460 2 : // false in those cases.
461 2 : i.snapshotPinned = true
462 2 : i.forceObsoleteDueToRangeDel = true
463 :
464 2 : default:
465 2 : i.forceObsoleteDueToRangeDel = false
466 : }
467 :
468 2 : switch i.iterKV.Kind() {
469 2 : case base.InternalKeyKindDelete, base.InternalKeyKindSingleDelete, base.InternalKeyKindDeleteSized:
470 2 : if i.cfg.ElideTombstone(i.iterKV.K.UserKey) {
471 2 : if i.curSnapshotIdx == 0 {
472 2 : // If we're at the last snapshot stripe and the tombstone
473 2 : // can be elided skip skippable keys in the same stripe.
474 2 : i.saveKey()
475 2 : if i.key.Kind() == base.InternalKeyKindSingleDelete {
476 2 : i.skipDueToSingleDeleteElision()
477 2 : } else {
478 2 : i.skipInStripe()
479 2 : if !i.skip && i.iterStripeChange != newStripeNewKey {
480 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
481 : }
482 : }
483 2 : if i.iterStripeChange == newStripeSameKey {
484 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
485 : }
486 2 : continue
487 2 : } else {
488 2 : // We're not at the last snapshot stripe, so the tombstone
489 2 : // can NOT yet be elided. Mark it as pinned, so that it's
490 2 : // included in table statistics appropriately.
491 2 : i.snapshotPinned = true
492 2 : }
493 : }
494 :
495 2 : switch i.iterKV.Kind() {
496 2 : case base.InternalKeyKindDelete:
497 2 : i.saveKey()
498 2 : i.value = i.iterValue
499 2 : i.valid = true
500 2 : i.skip = true
501 2 : return &i.key, i.value
502 :
503 2 : case base.InternalKeyKindDeleteSized:
504 2 : // We may skip subsequent keys because of this tombstone. Scan
505 2 : // ahead to see just how much data this tombstone drops and if
506 2 : // the tombstone's value should be updated accordingly.
507 2 : return i.deleteSizedNext()
508 :
509 2 : case base.InternalKeyKindSingleDelete:
510 2 : if i.singleDeleteNext() {
511 2 : return &i.key, i.value
512 2 : } else if i.err != nil {
513 0 : return nil, nil
514 0 : }
515 2 : continue
516 :
517 0 : default:
518 0 : panic(errors.AssertionFailedf(
519 0 : "unexpected kind %s", redact.SafeString(i.iterKV.Kind().String())))
520 : }
521 :
522 2 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
523 2 : // The key we emit for this entry is a function of the current key
524 2 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
525 2 : // entry. setNext() does the work to move the iterator forward,
526 2 : // preserving the original value, and potentially mutating the key
527 2 : // kind.
528 2 : i.setNext()
529 2 : if i.err != nil {
530 1 : return nil, nil
531 1 : }
532 2 : return &i.key, i.value
533 :
534 2 : case base.InternalKeyKindMerge:
535 2 : // Record the snapshot index before mergeNext as merging
536 2 : // advances the iterator, adjusting curSnapshotIdx.
537 2 : origSnapshotIdx := i.curSnapshotIdx
538 2 : var valueMerger base.ValueMerger
539 2 : valueMerger, i.err = i.cfg.Merge(i.iterKV.K.UserKey, i.iterValue)
540 2 : if i.err == nil {
541 2 : i.mergeNext(valueMerger)
542 2 : }
543 2 : var needDelete bool
544 2 : if i.err == nil {
545 2 : // includesBase is true whenever we've transformed the MERGE record
546 2 : // into a SET.
547 2 : var includesBase bool
548 2 : switch i.key.Kind() {
549 2 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
550 2 : includesBase = true
551 2 : case base.InternalKeyKindMerge:
552 0 : default:
553 0 : panic(errors.AssertionFailedf(
554 0 : "unexpected kind %s", redact.SafeString(i.key.Kind().String())))
555 : }
556 2 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
557 : }
558 2 : if i.err == nil {
559 2 : if needDelete {
560 1 : i.valid = false
561 1 : if i.closeValueCloser() != nil {
562 0 : return nil, nil
563 0 : }
564 1 : continue
565 : }
566 :
567 2 : i.maybeZeroSeqnum(origSnapshotIdx)
568 2 : return &i.key, i.value
569 : }
570 1 : if i.err != nil {
571 1 : i.valid = false
572 1 : // TODO(sumeer): why is MarkCorruptionError only being called for
573 1 : // MERGE?
574 1 : i.err = base.MarkCorruptionError(i.err)
575 1 : }
576 1 : return nil, nil
577 :
578 1 : default:
579 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
580 1 : i.valid = false
581 1 : return nil, nil
582 : }
583 : }
584 :
585 2 : return nil, nil
586 : }
587 :
588 2 : func (i *Iter) closeValueCloser() error {
589 2 : if i.valueCloser == nil {
590 2 : return nil
591 2 : }
592 :
593 0 : i.err = i.valueCloser.Close()
594 0 : i.valueCloser = nil
595 0 : if i.err != nil {
596 0 : i.valid = false
597 0 : }
598 0 : return i.err
599 : }
600 :
601 : // skipInStripe skips over skippable keys in the same stripe and user key. It
602 : // may set i.err, in which case i.iterKV will be nil.
603 2 : func (i *Iter) skipInStripe() {
604 2 : i.skip = true
605 2 : // TODO(sumeer): we can avoid the overhead of calling i.rangeDelFrag.Covers,
606 2 : // in this case of nextInStripe, since we are skipping all of them anyway.
607 2 : for i.nextInStripe() == sameStripe {
608 2 : if i.err != nil {
609 0 : panic(i.err)
610 : }
611 : }
612 : // We landed outside the original stripe, so reset skip.
613 2 : i.skip = false
614 : }
615 :
616 2 : func (i *Iter) iterNext() bool {
617 2 : i.iterKV = i.iter.Next()
618 2 : if i.iterKV != nil {
619 2 : i.iterValue, _, i.err = i.iterKV.Value(nil)
620 2 : if i.err != nil {
621 0 : i.iterKV = nil
622 0 : }
623 : }
624 2 : return i.iterKV != nil
625 : }
626 :
627 : // stripeChangeType indicates how the snapshot stripe changed relative to the
628 : // previous key. If the snapshot stripe changed, it also indicates whether the
629 : // new stripe was entered because the iterator progressed onto an entirely new
630 : // key or entered a new stripe within the same key.
631 : type stripeChangeType int
632 :
633 : const (
634 : newStripeNewKey stripeChangeType = iota
635 : newStripeSameKey
636 : sameStripe
637 : )
638 :
639 : // nextInStripe advances the iterator and returns one of the above const ints
640 : // indicating how its state changed.
641 : //
642 : // All sameStripe keys that are covered by a RANGEDEL will be skipped and not
643 : // returned.
644 : //
645 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
646 : // temporary reference to the original key, so that forward iteration can
647 : // proceed with a reference to the original key. Care should be taken to avoid
648 : // overwriting or mutating the saved key or value before they have been returned
649 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
650 : //
651 : // nextInStripe may set i.err, in which case the return value will be
652 : // newStripeNewKey, and i.iterKV will be nil.
653 2 : func (i *Iter) nextInStripe() stripeChangeType {
654 2 : i.iterStripeChange = i.nextInStripeHelper()
655 2 : return i.iterStripeChange
656 2 : }
657 :
658 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
659 : // nextInStripe and not call nextInStripeHelper.
660 2 : func (i *Iter) nextInStripeHelper() stripeChangeType {
661 2 : origSnapshotIdx := i.curSnapshotIdx
662 2 : for {
663 2 : if !i.iterNext() {
664 2 : return newStripeNewKey
665 2 : }
666 2 : kv := i.iterKV
667 2 :
668 2 : // Is this a new key? There are two cases:
669 2 : //
670 2 : // 1. The new key has a different user key.
671 2 : // 2. The previous key was an interleaved range deletion or range key
672 2 : // boundary. These keys are interleaved in the same input iterator
673 2 : // stream as point keys, but they do not obey the ordinary sequence
674 2 : // number ordering within a user key. If the previous key was one
675 2 : // of these keys, we consider the new key a `newStripeNewKey` to
676 2 : // reflect that it's the beginning of a new stream of point keys.
677 2 : if i.key.IsExclusiveSentinel() || !i.cfg.Equal(i.key.UserKey, kv.K.UserKey) {
678 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(kv.SeqNum())
679 2 : return newStripeNewKey
680 2 : }
681 :
682 : // If i.key and key have the same user key, then
683 : // 1. i.key must not have had a zero sequence number (or it would've be the last
684 : // key with its user key).
685 : // 2. i.key must have a strictly larger sequence number
686 : // There's an exception in that either key may be a range delete. Range
687 : // deletes may share a sequence number with a point key if the keys were
688 : // ingested together. Range keys may also share the sequence number if they
689 : // were ingested, but range keys are interleaved into the compaction
690 : // iterator's input iterator at the maximal sequence number so their
691 : // original sequence number will not be observed here.
692 2 : if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= kv.SeqNum()) &&
693 2 : i.key.Kind() != base.InternalKeyKindRangeDelete && kv.Kind() != base.InternalKeyKindRangeDelete {
694 0 : prevKey := i.key
695 0 : prevKey.Trailer = i.keyTrailer
696 0 : panic(errors.AssertionFailedf("pebble: invariant violation: %s and %s out of order", prevKey, kv.K))
697 : }
698 :
699 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(kv.SeqNum())
700 2 : switch kv.Kind() {
701 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete,
702 0 : base.InternalKeyKindRangeDelete:
703 0 : // Range tombstones and range keys are interleaved at the max
704 0 : // sequence number for a given user key, and the first key after one
705 0 : // is always considered a newStripeNewKey, so we should never reach
706 0 : // this.
707 0 : panic("unreachable")
708 : case base.InternalKeyKindDelete, base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindSingleDelete,
709 2 : base.InternalKeyKindSetWithDelete, base.InternalKeyKindDeleteSized:
710 : // Fall through
711 1 : default:
712 1 : kind := i.iterKV.Kind()
713 1 : i.iterKV = nil
714 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(kind))
715 1 : i.valid = false
716 1 : return newStripeNewKey
717 : }
718 2 : if i.curSnapshotIdx == origSnapshotIdx {
719 2 : // Same snapshot.
720 2 : if i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) == coversVisibly {
721 2 : continue
722 : }
723 2 : return sameStripe
724 : }
725 2 : return newStripeSameKey
726 : }
727 : }
728 :
729 2 : func (i *Iter) setNext() {
730 2 : // Save the current key.
731 2 : i.saveKey()
732 2 : i.value = i.iterValue
733 2 : i.valid = true
734 2 : i.maybeZeroSeqnum(i.curSnapshotIdx)
735 2 :
736 2 : // If this key is already a SETWITHDEL we can early return and skip the remaining
737 2 : // records in the stripe:
738 2 : if i.iterKV.Kind() == base.InternalKeyKindSetWithDelete {
739 2 : i.skip = true
740 2 : return
741 2 : }
742 :
743 : // We are iterating forward. Save the current value.
744 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
745 2 : i.value = i.valueBuf
746 2 :
747 2 : // Else, we continue to loop through entries in the stripe looking for a
748 2 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
749 2 : //
750 2 : // NB: nextInStripe will skip sameStripe keys that are visibly covered by a
751 2 : // RANGEDEL. This can include DELs -- this is fine since such DELs don't
752 2 : // need to be combined with SET to make SETWITHDEL.
753 2 : for {
754 2 : switch i.nextInStripe() {
755 2 : case newStripeNewKey, newStripeSameKey:
756 2 : i.pos = iterPosNext
757 2 : return
758 2 : case sameStripe:
759 2 : // We're still in the same stripe. If this is a
760 2 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
761 2 : // Subsequent keys are eligible for skipping.
762 2 : switch i.iterKV.Kind() {
763 2 : case base.InternalKeyKindDelete, base.InternalKeyKindSingleDelete, base.InternalKeyKindDeleteSized:
764 2 : i.key.SetKind(base.InternalKeyKindSetWithDelete)
765 2 : i.skip = true
766 2 : return
767 2 : case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindSetWithDelete:
768 : // Do nothing
769 0 : default:
770 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
771 0 : i.valid = false
772 : }
773 0 : default:
774 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
775 : }
776 : }
777 : }
778 :
779 2 : func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
780 2 : // Save the current key.
781 2 : i.saveKey()
782 2 : i.valid = true
783 2 :
784 2 : // Loop looking for older values in the current snapshot stripe and merge
785 2 : // them.
786 2 : for {
787 2 : if i.nextInStripe() != sameStripe {
788 2 : i.pos = iterPosNext
789 2 : return
790 2 : }
791 2 : if i.err != nil {
792 0 : panic(i.err)
793 : }
794 : // NB: MERGE#10+RANGEDEL#9 stays a MERGE, since nextInStripe skips
795 : // sameStripe keys that are visibly covered by a RANGEDEL. There may be
796 : // MERGE#7 that is invisibly covered and will be preserved, but there is
797 : // no risk that MERGE#10 and MERGE#7 will get merged in the future as
798 : // the RANGEDEL still exists and will be used in user-facing reads that
799 : // see MERGE#10, and will also eventually cause MERGE#7 to be deleted in
800 : // a compaction.
801 2 : key := i.iterKV
802 2 : switch key.Kind() {
803 2 : case base.InternalKeyKindDelete, base.InternalKeyKindSingleDelete, base.InternalKeyKindDeleteSized:
804 2 : // We've hit a deletion tombstone. Return everything up to this point and
805 2 : // then skip entries until the next snapshot stripe. We change the kind
806 2 : // of the result key to a Set so that it shadows keys in lower
807 2 : // levels. That is, MERGE+DEL -> SETWITHDEL.
808 2 : //
809 2 : // We do the same for SingleDelete since SingleDelete is only
810 2 : // permitted (with deterministic behavior) for keys that have been
811 2 : // set once since the last SingleDelete/Delete, so everything
812 2 : // older is acceptable to shadow. Note that this is slightly
813 2 : // different from singleDeleteNext() which implements stricter
814 2 : // semantics in terms of applying the SingleDelete to the single
815 2 : // next Set. But those stricter semantics are not observable to
816 2 : // the end-user since Iterator interprets SingleDelete as Delete.
817 2 : // We could do something more complicated here and consume only a
818 2 : // single Set, and then merge in any following Sets, but that is
819 2 : // complicated wrt code and unnecessary given the narrow permitted
820 2 : // use of SingleDelete.
821 2 : i.key.SetKind(base.InternalKeyKindSetWithDelete)
822 2 : i.skip = true
823 2 : return
824 :
825 2 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete:
826 2 : // We've hit a Set or SetWithDel value. Merge with the existing
827 2 : // value and return. We change the kind of the resulting key to a
828 2 : // Set so that it shadows keys in lower levels. That is:
829 2 : // MERGE + (SET*) -> SET.
830 2 : i.err = valueMerger.MergeOlder(i.iterValue)
831 2 : if i.err != nil {
832 0 : i.valid = false
833 0 : return
834 0 : }
835 2 : i.key.SetKind(base.InternalKeyKindSet)
836 2 : i.skip = true
837 2 : return
838 :
839 2 : case base.InternalKeyKindMerge:
840 2 : // We've hit another Merge value. Merge with the existing value and
841 2 : // continue looping.
842 2 : i.err = valueMerger.MergeOlder(i.iterValue)
843 2 : if i.err != nil {
844 0 : i.valid = false
845 0 : return
846 0 : }
847 :
848 0 : default:
849 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
850 0 : i.valid = false
851 0 : return
852 : }
853 : }
854 : }
855 :
856 : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
857 : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
858 : // performance optimization when the client knows a user key has not been
859 : // overwritten, allowing the elision of the tombstone earlier, avoiding write
860 : // amplification.
861 : //
862 : // singleDeleteNext returns a boolean indicating whether or not the caller
863 : // should yield the SingleDelete key to the consumer of the Iter. If
864 : // singleDeleteNext returns false, the caller may consume/elide the
865 : // SingleDelete.
866 2 : func (i *Iter) singleDeleteNext() bool {
867 2 : // Save the current key.
868 2 : i.saveKey()
869 2 : i.value = i.iterValue
870 2 : i.valid = true
871 2 :
872 2 : // Loop until finds a key to be passed to the next level.
873 2 : for {
874 2 : // If we find a key that can't be skipped, return true so that the
875 2 : // caller yields the SingleDelete to the caller.
876 2 : if i.nextInStripe() != sameStripe {
877 2 : // This defers additional error checking regarding single delete
878 2 : // invariants to the compaction where the keys with the same user key as
879 2 : // the single delete are in the same stripe.
880 2 : i.pos = iterPosNext
881 2 : return i.err == nil
882 2 : }
883 2 : if i.err != nil {
884 0 : panic(i.err)
885 : }
886 : // INVARIANT: sameStripe.
887 2 : key := i.iterKV
888 2 : kind := key.Kind()
889 2 : switch kind {
890 2 : case base.InternalKeyKindDelete, base.InternalKeyKindSetWithDelete, base.InternalKeyKindDeleteSized:
891 2 : if kind == base.InternalKeyKindDelete || kind == base.InternalKeyKindDeleteSized {
892 2 : i.cfg.IneffectualSingleDeleteCallback(i.key.UserKey)
893 2 : }
894 : // We've hit a Delete, DeleteSized, SetWithDelete, transform
895 : // the SingleDelete into a full Delete.
896 2 : i.key.SetKind(base.InternalKeyKindDelete)
897 2 : i.skip = true
898 2 : return true
899 :
900 2 : case base.InternalKeyKindSet, base.InternalKeyKindMerge:
901 2 : // This SingleDelete deletes the Set/Merge, and we can now elide the
902 2 : // SingleDel as well. We advance past the Set and return false to
903 2 : // indicate to the main compaction loop that we should NOT yield the
904 2 : // current SingleDel key to the compaction loop.
905 2 : //
906 2 : // NB: singleDeleteNext was called with i.pos == iterPosCurForward, and
907 2 : // after the call to nextInStripe, we are still at iterPosCurForward,
908 2 : // since we are at the key after the Set/Merge that was single deleted.
909 2 : change := i.nextInStripe()
910 2 : switch change {
911 2 : case sameStripe, newStripeSameKey:
912 2 : // On the same user key.
913 2 : nextKind := i.iterKV.Kind()
914 2 : switch nextKind {
915 2 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete, base.InternalKeyKindMerge:
916 2 : if i.cfg.SingleDeleteInvariantViolationCallback != nil {
917 1 : // sameStripe keys returned by nextInStripe() are already
918 1 : // known to not be covered by a RANGEDEL, so it is an invariant
919 1 : // violation. The rare case is newStripeSameKey, where it is a
920 1 : // violation if not covered by a RANGEDEL.
921 1 : if change == sameStripe ||
922 1 : i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) == noCover {
923 1 : i.cfg.SingleDeleteInvariantViolationCallback(i.key.UserKey)
924 1 : }
925 : }
926 2 : case base.InternalKeyKindDelete, base.InternalKeyKindDeleteSized, base.InternalKeyKindSingleDelete:
927 0 : default:
928 0 : panic(errors.AssertionFailedf(
929 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
930 : }
931 2 : case newStripeNewKey:
932 0 : default:
933 0 : panic("unreachable")
934 : }
935 2 : i.valid = false
936 2 : return false
937 :
938 2 : case base.InternalKeyKindSingleDelete:
939 2 : // Two single deletes met in a compaction. The first single delete is
940 2 : // ineffectual.
941 2 : i.cfg.IneffectualSingleDeleteCallback(i.key.UserKey)
942 2 : // Continue to apply the second single delete.
943 2 : continue
944 :
945 0 : default:
946 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
947 0 : i.valid = false
948 0 : return false
949 : }
950 : }
951 : }
952 :
953 : // skipDueToSingleDeleteElision is called when the SingleDelete is being
954 : // elided because it is in the final snapshot stripe and there are no keys
955 : // with the same user key in lower levels in the LSM (below the files in this
956 : // compaction).
957 : //
958 : // TODO(sumeer): the only difference between singleDeleteNext and
959 : // skipDueToSingleDeleteElision is the fact that the caller knows it will be
960 : // eliding the single delete in the latter case. There are some similar things
961 : // happening in both implementations. My first attempt at combining them into
962 : // a single method was hard to comprehend. Try again.
963 2 : func (i *Iter) skipDueToSingleDeleteElision() {
964 2 : for {
965 2 : stripeChange := i.nextInStripe()
966 2 : if i.err != nil {
967 0 : panic(i.err)
968 : }
969 2 : switch stripeChange {
970 2 : case newStripeNewKey:
971 2 : // The single delete is only now being elided, meaning it did not elide
972 2 : // any keys earlier in its descent down the LSM. We stepped onto a new
973 2 : // user key, meaning that even now at its moment of elision, it still
974 2 : // hasn't elided any other keys. The single delete was ineffectual (a
975 2 : // no-op).
976 2 : i.cfg.IneffectualSingleDeleteCallback(i.key.UserKey)
977 2 : i.skip = false
978 2 : return
979 0 : case newStripeSameKey:
980 0 : // This should be impossible. If we're eliding a single delete, we
981 0 : // determined that the tombstone is in the final snapshot stripe, but we
982 0 : // stepped into a new stripe of the same key.
983 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
984 2 : case sameStripe:
985 2 : kind := i.iterKV.Kind()
986 2 : switch kind {
987 2 : case base.InternalKeyKindDelete, base.InternalKeyKindDeleteSized, base.InternalKeyKindSingleDelete:
988 2 : i.cfg.IneffectualSingleDeleteCallback(i.key.UserKey)
989 2 : switch kind {
990 2 : case base.InternalKeyKindDelete, base.InternalKeyKindDeleteSized:
991 2 : i.skipInStripe()
992 2 : return
993 2 : case base.InternalKeyKindSingleDelete:
994 2 : // Repeat the same with this SingleDelete. We don't want to simply
995 2 : // call skipInStripe(), since it increases the strength of the
996 2 : // SingleDel, which hides bugs in the use of single delete.
997 2 : continue
998 0 : default:
999 0 : panic(errors.AssertionFailedf(
1000 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
1001 : }
1002 2 : case base.InternalKeyKindSetWithDelete:
1003 2 : // The SingleDelete should behave like a Delete.
1004 2 : i.skipInStripe()
1005 2 : return
1006 2 : case base.InternalKeyKindSet, base.InternalKeyKindMerge:
1007 2 : // This SingleDelete deletes the Set/Merge, and we are eliding the
1008 2 : // SingleDel as well. Step to the next key (this is not deleted by the
1009 2 : // SingleDelete).
1010 2 : //
1011 2 : // NB: skipDueToSingleDeleteElision was called with i.pos ==
1012 2 : // iterPosCurForward, and after the call to nextInStripe, we are still
1013 2 : // at iterPosCurForward, since we are at the key after the Set/Merge
1014 2 : // that was single deleted.
1015 2 : change := i.nextInStripe()
1016 2 : if i.err != nil {
1017 0 : panic(i.err)
1018 : }
1019 2 : switch change {
1020 0 : case newStripeSameKey:
1021 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
1022 2 : case newStripeNewKey:
1023 2 : case sameStripe:
1024 2 : // On the same key.
1025 2 : nextKind := i.iterKV.Kind()
1026 2 : switch nextKind {
1027 0 : case base.InternalKeyKindSet, base.InternalKeyKindSetWithDelete, base.InternalKeyKindMerge:
1028 0 : if i.cfg.SingleDeleteInvariantViolationCallback != nil {
1029 0 : i.cfg.SingleDeleteInvariantViolationCallback(i.key.UserKey)
1030 0 : }
1031 2 : case base.InternalKeyKindDelete, base.InternalKeyKindDeleteSized, base.InternalKeyKindSingleDelete:
1032 0 : default:
1033 0 : panic(errors.AssertionFailedf(
1034 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
1035 : }
1036 0 : default:
1037 0 : panic("unreachable")
1038 : }
1039 : // Whether in same stripe or new stripe, this key is not consumed by
1040 : // the SingleDelete.
1041 2 : i.skip = false
1042 2 : return
1043 0 : default:
1044 0 : panic(errors.AssertionFailedf(
1045 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
1046 : }
1047 0 : default:
1048 0 : panic("unreachable")
1049 : }
1050 : }
1051 : }
1052 :
1053 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
1054 : // these tombstones carry a value that's a varint indicating the size of the
1055 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
1056 : //
1057 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
1058 : // any, are elided as a result of the tombstone.
1059 2 : func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
1060 2 : i.saveKey()
1061 2 : i.valid = true
1062 2 : i.skip = true
1063 2 :
1064 2 : // The DELSIZED tombstone may have no value at all. This happens when the
1065 2 : // tombstone has already deleted the key that the user originally predicted.
1066 2 : // In this case, we still peek forward in case there's another DELSIZED key
1067 2 : // with a lower sequence number, in which case we'll adopt its value.
1068 2 : if len(i.iterValue) == 0 {
1069 2 : i.value = i.valueBuf[:0]
1070 2 : } else {
1071 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1072 2 : i.value = i.valueBuf
1073 2 : }
1074 :
1075 : // Loop through all the keys within this stripe that are skippable.
1076 2 : i.pos = iterPosNext
1077 2 : for i.nextInStripe() == sameStripe {
1078 2 : if i.err != nil {
1079 0 : panic(i.err)
1080 : }
1081 2 : switch i.iterKV.Kind() {
1082 2 : case base.InternalKeyKindDelete, base.InternalKeyKindDeleteSized, base.InternalKeyKindSingleDelete:
1083 2 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
1084 2 : // the original DELSIZED tombstone. This can happen in two cases:
1085 2 : //
1086 2 : // (1) These tombstones were intended to delete two distinct values,
1087 2 : // and this DELSIZED has already dropped the relevant key. For
1088 2 : // example:
1089 2 : //
1090 2 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
1091 2 : //
1092 2 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
1093 2 : // already been zeroed out. In this case, we want to adopt the
1094 2 : // value of the DELSIZED with the lower sequence number, in
1095 2 : // case the a.SET.4 key has not yet been elided.
1096 2 : //
1097 2 : // (2) This DELSIZED was missized. The user thought they were
1098 2 : // deleting a key with this user key, but this user key had
1099 2 : // already been deleted.
1100 2 : //
1101 2 : // We can differentiate these two cases by examining the length of
1102 2 : // the DELSIZED's value. A DELSIZED's value holds the size of both
1103 2 : // the user key and value that it intends to delete. For any user
1104 2 : // key with a length > 0, a DELSIZED that has not deleted a key must
1105 2 : // have a value with a length > 0.
1106 2 : //
1107 2 : // We treat both cases the same functionally, adopting the identity
1108 2 : // of the lower-sequence numbered tombstone. However in the second
1109 2 : // case, we also increment the stat counting missized tombstones.
1110 2 : if len(i.value) > 0 {
1111 2 : // The original DELSIZED key was missized. The key that the user
1112 2 : // thought they were deleting does not exist.
1113 2 : i.stats.CountMissizedDels++
1114 2 : }
1115 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1116 2 : i.value = i.valueBuf
1117 2 : if i.iterKV.Kind() != base.InternalKeyKindDeleteSized {
1118 2 : // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
1119 2 : // may not have deleted the key(s) it was intended to yet. The
1120 2 : // ordinary DEL compaction heuristics are better suited at that,
1121 2 : // plus we don't want to count it as a missized DEL. We early
1122 2 : // exit in this case, after skipping the remainder of the
1123 2 : // snapshot stripe.
1124 2 : i.key.SetKind(base.InternalKeyKindDelete)
1125 2 : // NB: We skipInStripe now, rather than returning leaving
1126 2 : // i.skip=true and returning early, because Next() requires
1127 2 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1128 2 : //
1129 2 : // Ignore any error caused by skipInStripe since it does not affect
1130 2 : // the key/value being returned here, and the next call to Next() will
1131 2 : // expose it.
1132 2 : i.skipInStripe()
1133 2 : return &i.key, i.value
1134 2 : }
1135 : // Continue, in case we uncover another DELSIZED or a key this
1136 : // DELSIZED deletes.
1137 :
1138 2 : case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindSetWithDelete:
1139 2 : // If the DELSIZED is value-less, it already deleted the key that it
1140 2 : // was intended to delete. This is possible with a sequence like:
1141 2 : //
1142 2 : // DELSIZED.8 SET.7 SET.3
1143 2 : //
1144 2 : // The DELSIZED only describes the size of the SET.7, which in this
1145 2 : // case has already been elided. We don't count it as a missizing,
1146 2 : // instead converting the DELSIZED to a DEL. Skip the remainder of
1147 2 : // the snapshot stripe and return.
1148 2 : if len(i.value) == 0 {
1149 2 : i.key.SetKind(base.InternalKeyKindDelete)
1150 2 : // NB: We skipInStripe now, rather than returning leaving
1151 2 : // i.skip=true and returning early, because Next() requires
1152 2 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1153 2 : //
1154 2 : // Ignore any error caused by skipInStripe since it does not affect
1155 2 : // the key/value being returned here, and the next call to Next() will
1156 2 : // expose it.
1157 2 : i.skipInStripe()
1158 2 : return &i.key, i.value
1159 2 : }
1160 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
1161 : // has a positive size.
1162 2 : expectedSize, n := binary.Uvarint(i.value)
1163 2 : if n != len(i.value) {
1164 1 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
1165 1 : i.valid = false
1166 1 : return nil, nil
1167 1 : }
1168 2 : elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(len(i.iterValue))
1169 2 : if elidedSize != expectedSize {
1170 2 : // The original DELSIZED key was missized. It's unclear what to
1171 2 : // do. The user-provided size was wrong, so it's unlikely to be
1172 2 : // accurate or meaningful. We could:
1173 2 : //
1174 2 : // 1. return the DELSIZED with the original user-provided size unmodified
1175 2 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
1176 2 : // elided, even if it wasn't the anticipated size.
1177 2 : // 3. subtract the elided size from the estimate and re-encode.
1178 2 : // 4. convert the DELSIZED into a value-less DEL, so that
1179 2 : // ordinary DEL heuristics apply.
1180 2 : //
1181 2 : // We opt for (4) under the rationale that we can't rely on the
1182 2 : // user-provided size for accuracy, so ordinary DEL heuristics
1183 2 : // are safer.
1184 2 : i.stats.CountMissizedDels++
1185 2 : i.key.SetKind(base.InternalKeyKindDelete)
1186 2 : i.value = i.valueBuf[:0]
1187 2 : // NB: We skipInStripe now, rather than returning leaving
1188 2 : // i.skip=true and returning early, because Next() requires
1189 2 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1190 2 : //
1191 2 : // Ignore any error caused by skipInStripe since it does not affect
1192 2 : // the key/value being returned here, and the next call to Next() will
1193 2 : // expose it.
1194 2 : i.skipInStripe()
1195 2 : return &i.key, i.value
1196 2 : }
1197 : // NB: We remove the value regardless of whether the key was sized
1198 : // appropriately. The size encoded is 'consumed' the first time it
1199 : // meets a key that it deletes.
1200 2 : i.value = i.valueBuf[:0]
1201 :
1202 0 : default:
1203 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
1204 0 : i.valid = false
1205 0 : return nil, nil
1206 : }
1207 : }
1208 :
1209 2 : if i.iterStripeChange == sameStripe {
1210 0 : panic(errors.AssertionFailedf("unexpectedly found iter stripe change = %d", i.iterStripeChange))
1211 : }
1212 : // We landed outside the original stripe. Reset skip.
1213 2 : i.skip = false
1214 2 : if i.err != nil {
1215 0 : return nil, nil
1216 0 : }
1217 2 : return &i.key, i.value
1218 : }
1219 :
1220 2 : func (i *Iter) saveKey() {
1221 2 : i.keyBuf = append(i.keyBuf[:0], i.iterKV.K.UserKey...)
1222 2 : i.key = base.InternalKey{
1223 2 : UserKey: i.keyBuf,
1224 2 : Trailer: i.iterKV.K.Trailer,
1225 2 : }
1226 2 : i.keyTrailer = i.key.Trailer
1227 2 : i.frontiers.Advance(i.key.UserKey)
1228 2 : }
1229 :
1230 2 : func (i *Iter) cloneKey(key []byte) []byte {
1231 2 : i.alloc, key = i.alloc.Copy(key)
1232 2 : return key
1233 2 : }
1234 :
1235 : // Key returns the current key.
1236 1 : func (i *Iter) Key() base.InternalKey {
1237 1 : return i.key
1238 1 : }
1239 :
1240 : // Value returns the current value.
1241 1 : func (i *Iter) Value() []byte {
1242 1 : return i.value
1243 1 : }
1244 :
1245 : // Valid returns whether the iterator is positioned at a valid key/value pair.
1246 1 : func (i *Iter) Valid() bool {
1247 1 : return i.valid
1248 1 : }
1249 :
1250 : // Error returns any error encountered.
1251 1 : func (i *Iter) Error() error {
1252 1 : return i.err
1253 1 : }
1254 :
1255 : // Close the iterator.
1256 2 : func (i *Iter) Close() error {
1257 2 : err := i.iter.Close()
1258 2 : if i.err == nil {
1259 2 : i.err = err
1260 2 : }
1261 :
1262 : // Close the closer for the current value if one was open.
1263 2 : if i.valueCloser != nil {
1264 0 : i.err = errors.CombineErrors(i.err, i.valueCloser.Close())
1265 0 : i.valueCloser = nil
1266 0 : }
1267 :
1268 2 : return i.err
1269 : }
1270 :
1271 : // AddTombstoneSpan adds a copy of a span of range dels, to be later returned by
1272 : // TombstonesUpTo.
1273 : //
1274 : // The spans must be non-overlapping and ordered. Empty spans are ignored.
1275 2 : func (i *Iter) AddTombstoneSpan(span *keyspan.Span) {
1276 2 : i.tombstones = i.appendSpan(i.tombstones, span)
1277 2 : }
1278 :
1279 : // cover is returned by tombstoneCovers and describes a span's relationship to
1280 : // a key at a particular snapshot.
1281 : type cover int8
1282 :
1283 : const (
1284 : // noCover indicates the tested key does not fall within the span's bounds,
1285 : // or the span contains no keys with sequence numbers higher than the key's.
1286 : noCover cover = iota
1287 :
1288 : // coversInvisibly indicates the tested key does fall within the span's
1289 : // bounds and the span contains at least one key with a higher sequence
1290 : // number, but none visible at the provided snapshot.
1291 : coversInvisibly
1292 :
1293 : // coversVisibly indicates the tested key does fall within the span's
1294 : // bounds, and the span constains at least one key with a sequence number
1295 : // higher than the key's sequence number that is visible at the provided
1296 : // snapshot.
1297 : coversVisibly
1298 : )
1299 :
1300 : // tombstoneCovers returns whether the key is covered by a tombstone and whether
1301 : // it is covered by a tombstone visible in the given snapshot.
1302 : //
1303 : // The key's UserKey must be greater or equal to the last span Start key passed
1304 : // to AddTombstoneSpan.
1305 2 : func (i *Iter) tombstoneCovers(key base.InternalKey, snapshot uint64) cover {
1306 2 : if len(i.tombstones) == 0 {
1307 2 : return noCover
1308 2 : }
1309 2 : last := &i.tombstones[len(i.tombstones)-1]
1310 2 : if invariants.Enabled && i.cmp(key.UserKey, last.Start) < 0 {
1311 0 : panic(errors.AssertionFailedf("invalid key %q, last span %s", key, last))
1312 : }
1313 2 : if i.cmp(key.UserKey, last.End) < 0 && last.Covers(key.SeqNum()) {
1314 2 : if last.CoversAt(snapshot, key.SeqNum()) {
1315 2 : return coversVisibly
1316 2 : }
1317 2 : return coversInvisibly
1318 : }
1319 2 : return noCover
1320 : }
1321 :
1322 : // TombstonesUpTo returns a list of pending range tombstones up to the
1323 : // specified key, or all pending range tombstones if key = nil.
1324 : //
1325 : // If a key is specified, it must be greater than the last span Start key passed
1326 : // to AddTombstoneSpan.
1327 2 : func (i *Iter) TombstonesUpTo(key []byte) []keyspan.Span {
1328 2 : var toReturn []keyspan.Span
1329 2 : toReturn, i.tombstones = i.splitSpans(i.tombstones, key)
1330 2 :
1331 2 : result := toReturn[:0]
1332 2 : for _, span := range toReturn {
1333 2 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1334 2 : // each snapshot stripe.
1335 2 : currentIdx := -1
1336 2 : keys := make([]keyspan.Key, 0, min(len(span.Keys), len(i.cfg.Snapshots)+1))
1337 2 : for _, k := range span.Keys {
1338 2 : idx := i.cfg.Snapshots.Index(k.SeqNum())
1339 2 : if currentIdx == idx {
1340 2 : continue
1341 : }
1342 2 : if idx == 0 && i.cfg.ElideRangeTombstone(span.Start, span.End) {
1343 2 : // This is the last snapshot stripe and the range tombstone
1344 2 : // can be elided.
1345 2 : break
1346 : }
1347 :
1348 2 : keys = append(keys, k)
1349 2 : if idx == 0 {
1350 2 : // This is the last snapshot stripe.
1351 2 : break
1352 : }
1353 2 : currentIdx = idx
1354 : }
1355 2 : if len(keys) > 0 {
1356 2 : result = append(result, keyspan.Span{
1357 2 : Start: i.cloneKey(span.Start),
1358 2 : End: i.cloneKey(span.End),
1359 2 : Keys: keys,
1360 2 : })
1361 2 : }
1362 : }
1363 :
1364 2 : return result
1365 : }
1366 :
1367 : // FirstTombstoneStart returns the start key of the first pending tombstone (the
1368 : // first span that would be returned by TombstonesUpTo). Returns nil if
1369 : // there are no pending range keys.
1370 2 : func (i *Iter) FirstTombstoneStart() []byte {
1371 2 : if len(i.tombstones) == 0 {
1372 2 : return nil
1373 2 : }
1374 2 : return i.tombstones[0].Start
1375 : }
1376 :
1377 : // AddRangeKeySpan adds a copy of a span of range keys, to be later returned by
1378 : // RangeKeysUpTo. The span is shallow cloned.
1379 : //
1380 : // The spans must be non-overlapping and ordered. Empty spans are ignored.
1381 2 : func (i *Iter) AddRangeKeySpan(span *keyspan.Span) {
1382 2 : i.rangeKeys = i.appendSpan(i.rangeKeys, span)
1383 2 : }
1384 :
1385 : // RangeKeysUpTo returns a list of pending range keys up to the specified key,
1386 : // or all pending range keys if key = nil.
1387 : //
1388 : // If a key is specified, it must be greater than the last span Start key passed
1389 : // to AddRangeKeySpan.
1390 2 : func (i *Iter) RangeKeysUpTo(key []byte) []keyspan.Span {
1391 2 : var result []keyspan.Span
1392 2 : result, i.rangeKeys = i.splitSpans(i.rangeKeys, key)
1393 2 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1394 2 : // do that here.
1395 2 : return result
1396 2 : }
1397 :
1398 : // FirstRangeKeyStart returns the start key of the first pending range key span
1399 : // (the first span that would be returned by RangeKeysUpTo). Returns nil
1400 : // if there are no pending range keys.
1401 2 : func (i *Iter) FirstRangeKeyStart() []byte {
1402 2 : if len(i.rangeKeys) == 0 {
1403 2 : return nil
1404 2 : }
1405 2 : return i.rangeKeys[0].Start
1406 : }
1407 :
1408 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1409 : // so improves compression and enables an optimization during forward iteration
1410 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1411 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1412 2 : func (i *Iter) maybeZeroSeqnum(snapshotIdx int) {
1413 2 : if !i.cfg.AllowZeroSeqNum {
1414 2 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1415 2 : // make the determination on a key by key basis, similar to what is done
1416 2 : // for elideTombstone. Need to add a benchmark for Iter to verify
1417 2 : // that isn't too expensive.
1418 2 : return
1419 2 : }
1420 2 : if snapshotIdx > 0 {
1421 2 : // This is not the last snapshot
1422 2 : return
1423 2 : }
1424 2 : i.key.SetSeqNum(base.SeqNumZero)
1425 : }
1426 :
1427 : // appendSpan appends a span to a list of ordered, non-overlapping spans.
1428 : // The span is shallow cloned.
1429 2 : func (i *Iter) appendSpan(spans []keyspan.Span, span *keyspan.Span) []keyspan.Span {
1430 2 : if span.Empty() {
1431 0 : return spans
1432 0 : }
1433 2 : if invariants.Enabled && len(spans) > 0 {
1434 2 : if last := spans[len(spans)-1]; i.cmp(last.End, span.Start) > 0 {
1435 0 : panic(errors.AssertionFailedf("overlapping range key spans: %s and %s", last, span))
1436 : }
1437 : }
1438 2 : return append(spans, keyspan.Span{
1439 2 : Start: i.cloneKey(span.Start),
1440 2 : End: i.cloneKey(span.End),
1441 2 : Keys: slices.Clone(span.Keys),
1442 2 : })
1443 : }
1444 :
1445 : // splitSpans takes a list of ordered, non-overlapping spans and a key and
1446 : // returns two lists of spans: one that ends at or before the key, and one which
1447 : // starts at or after key.
1448 : //
1449 : // If the key is nil, returns all spans. If not nil, the key must not be smaller
1450 : // than the start of the last span in the list.
1451 2 : func (i *Iter) splitSpans(spans []keyspan.Span, key []byte) (before, after []keyspan.Span) {
1452 2 : n := len(spans)
1453 2 : if n == 0 || key == nil || i.cmp(spans[n-1].End, key) <= 0 {
1454 2 : // Common case: return all spans (retaining any extra allocated space).
1455 2 : return spans[:n:n], spans[n:]
1456 2 : }
1457 2 : if c := i.cmp(key, spans[n-1].Start); c <= 0 {
1458 1 : if c < 0 {
1459 0 : panic(fmt.Sprintf("invalid key %q, last span %s", key, spans[n-1]))
1460 : }
1461 : // The last span starts exactly at key.
1462 1 : return spans[: n-1 : n-1], spans[n-1:]
1463 : }
1464 : // We have to split the last span.
1465 2 : key = i.cloneKey(key)
1466 2 : before = spans[:n:n]
1467 2 : after = append(spans[n:], keyspan.Span{
1468 2 : Start: key,
1469 2 : End: spans[n-1].End,
1470 2 : Keys: slices.Clone(spans[n-1].Keys),
1471 2 : })
1472 2 : before[n-1].End = key
1473 2 : return before, after
1474 : }
1475 :
1476 : func finishValueMerger(
1477 : valueMerger base.ValueMerger, includesBase bool,
1478 2 : ) (value []byte, needDelete bool, closer io.Closer, err error) {
1479 2 : if valueMerger2, ok := valueMerger.(base.DeletableValueMerger); ok {
1480 1 : value, needDelete, closer, err = valueMerger2.DeletableFinish(includesBase)
1481 2 : } else {
1482 2 : value, closer, err = valueMerger.Finish(includesBase)
1483 2 : }
1484 2 : return
1485 : }
|