Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "encoding/binary"
9 : "io"
10 : "sort"
11 : "strconv"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/bytealloc"
16 : "github.com/cockroachdb/pebble/internal/compact"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/rangekey"
19 : "github.com/cockroachdb/redact"
20 : )
21 :
22 : // compactionIter provides a forward-only iterator that encapsulates the logic
23 : // for collapsing entries during compaction. It wraps an internal iterator and
24 : // collapses entries that are no longer necessary because they are shadowed by
25 : // newer entries. The simplest example of this is when the internal iterator
26 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
27 : // compactionIter collapses the second entry because it is no longer
28 : // necessary. The high-level structure for compactionIter is to iterate over
29 : // its internal iterator and output 1 entry for every user-key. There are four
30 : // complications to this story.
31 : //
32 : // 1. Eliding Deletion Tombstones
33 : //
34 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
35 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
36 : // shadows an entry at a lower level. If we're compacting to the base-level in
37 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
38 : // level and can be elided.
39 : //
40 : // We can do slightly better than only eliding deletion tombstones at the base
41 : // level by observing that we can elide a deletion tombstone if there are no
42 : // sstables that contain the entry's key. This check is performed by
43 : // elideTombstone.
44 : //
45 : // 2. Merges
46 : //
47 : // The MERGE operation merges the value for an entry with the existing value
48 : // for an entry. The logical value of an entry can be composed of a series of
49 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
50 : // internal iterator collapsing MERGE operations for the same key until it
51 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
52 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
53 : // merged using the specified Merger.
54 : //
55 : // An interesting case here occurs when MERGE is combined with SET. Consider
56 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
57 : // reason that the kind is changed to SET is because the SET operation acts as
58 : // a barrier preventing further merging. This can be seen better in the
59 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
60 : // (older) level and not involved in the compaction. If the compaction of
61 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
62 : // a.MERGE.1 would merge the values together incorrectly.
63 : //
64 : // 3. Snapshots
65 : //
66 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
67 : // a snapshot is a sequence number along with a guarantee from Pebble that it
68 : // will maintain the view of the database at that sequence number. Part of this
69 : // guarantee is relatively straightforward to achieve. When reading from the
70 : // database Pebble will ignore sequence numbers that are larger than the
71 : // snapshot sequence number. The primary complexity with snapshots occurs
72 : // during compaction: the collapsing of entries that are shadowed by newer
73 : // entries is at odds with the guarantee that Pebble will maintain the view of
74 : // the database at the snapshot sequence number. Rather than collapsing entries
75 : // up to the next user key, compactionIter can only collapse entries up to the
76 : // next snapshot boundary. That is, every snapshot boundary potentially causes
77 : // another entry for the same user-key to be emitted. Another way to view this
78 : // is that snapshots define stripes and entries are collapsed within stripes,
79 : // but not across stripes. Consider the following scenario:
80 : //
81 : // a.PUT.9
82 : // a.DEL.8
83 : // a.PUT.7
84 : // a.DEL.6
85 : // a.PUT.5
86 : //
87 : // In the absence of snapshots these entries would be collapsed to
88 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
89 : // be divided into two stripes and collapsed within the stripes:
90 : //
91 : // a.PUT.9 a.PUT.9
92 : // a.DEL.8 --->
93 : // a.PUT.7
94 : // -- --
95 : // a.DEL.6 ---> a.DEL.6
96 : // a.PUT.5
97 : //
98 : // All of the rules described earlier still apply, but they are confined to
99 : // operate within a snapshot stripe. Snapshots only affect compaction when the
100 : // snapshot sequence number lies within the range of sequence numbers being
101 : // compacted. In the above example, a snapshot at sequence number 10 or at
102 : // sequence number 5 would not have any effect.
103 : //
104 : // 4. Range Deletions
105 : //
106 : // Range deletions provide the ability to delete all of the keys (and values)
107 : // in a contiguous range. Range deletions are stored indexed by their start
108 : // key. The end key of the range is stored in the value. In order to support
109 : // lookup of the range deletions which overlap with a particular key, the range
110 : // deletion tombstones need to be fragmented whenever they overlap. This
111 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
112 : // subject to the rules for snapshots. For example, consider the two range
113 : // tombstones [a,e)#1 and [c,g)#2:
114 : //
115 : // 2: c-------g
116 : // 1: a-------e
117 : //
118 : // These tombstones will be fragmented into:
119 : //
120 : // 2: c---e---g
121 : // 1: a---c---e
122 : //
123 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
124 : // depends on whether it is in a new snapshot stripe.
125 : //
126 : // In addition to the fragmentation of range tombstones, compaction also needs
127 : // to take the range tombstones into consideration when outputting normal
128 : // keys. Just as with point deletions, a range deletion covering an entry can
129 : // cause the entry to be elided.
130 : //
131 : // A note on the stability of keys and values.
132 : //
133 : // The stability guarantees of keys and values returned by the iterator tree
134 : // that backs a compactionIter is nuanced and care must be taken when
135 : // referencing any returned items.
136 : //
137 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
138 : // lifetimes that fall into two categories:
139 : //
140 : // Lifetime valid for duration of compaction. Range deletion keys and values are
141 : // stable for the duration of the compaction, due to way in which a
142 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
143 : // which wraps the iterator over the range deletion block in a noCloseIter,
144 : // preventing the release of the backing memory until the compaction is
145 : // finished).
146 : //
147 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
148 : // etc.) and values must be cloned / copied following the return from the
149 : // exported function, and before a subsequent call to Next advances the iterator
150 : // and mutates the contents of the returned key and value.
151 : type compactionIter struct {
152 : equal Equal
153 : merge Merge
154 : iter internalIterator
155 : err error
156 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
157 : // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
158 : // case on return from all public methods -- these methods return `key`.
159 : // Additionally, it is the internal state when the code is moving to the
160 : // next key so it can determine whether the user key has changed from
161 : // the previous key.
162 : key InternalKey
163 : // keyTrailer is updated when `i.key` is updated and holds the key's
164 : // original trailer (eg, before any sequence-number zeroing or changes to
165 : // key kind).
166 : keyTrailer uint64
167 : value []byte
168 : valueCloser io.Closer
169 : // Temporary buffer used for storing the previous user key in order to
170 : // determine when iteration has advanced to a new user key and thus a new
171 : // snapshot stripe.
172 : keyBuf []byte
173 : // Temporary buffer used for storing the previous value, which may be an
174 : // unsafe, i.iter-owned slice that could be altered when the iterator is
175 : // advanced.
176 : valueBuf []byte
177 : // Is the current entry valid?
178 : valid bool
179 : iterKey *InternalKey
180 : iterValue []byte
181 : iterStripeChange stripeChangeType
182 : // `skip` indicates whether the remaining entries in the current snapshot
183 : // stripe should be skipped or processed. `skip` has no effect when `pos ==
184 : // iterPosNext`.
185 : skip bool
186 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
187 : // (`iterPos`) values take on the following meanings in the context of
188 : // `compactionIter`.
189 : //
190 : // - `iterPosCur`: the iterator is at the last key returned.
191 : // - `iterPosNext`: the iterator has already been advanced to the next
192 : // candidate key. For example, this happens when processing merge operands,
193 : // where we advance the iterator all the way into the next stripe or next
194 : // user key to ensure we've seen all mergeable operands.
195 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
196 : pos iterPos
197 : // `snapshotPinned` indicates whether the last point key returned by the
198 : // compaction iterator was only returned because an open snapshot prevents
199 : // its elision. This field only applies to point keys, and not to range
200 : // deletions or range keys.
201 : snapshotPinned bool
202 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
203 : // snapshotPinned is true. This value is true when the point is obsolete due
204 : // to a RANGEDEL but could not be deleted due to a snapshot.
205 : //
206 : // NB: it may seem that the additional cases that snapshotPinned captures
207 : // are harmless in that they can also be used to mark a point as obsolete
208 : // (it is merely a duplication of some logic that happens in
209 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
210 : // writing -- snapshotPinned originated in stats collection and for a
211 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
212 : // due to a snapshot, the snapshotPinned value for the SET is true.
213 : //
214 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
215 : // whether we need forceObsoleteDueToRangeDel.
216 : forceObsoleteDueToRangeDel bool
217 : // The index of the snapshot for the current key within the snapshots slice.
218 : curSnapshotIdx int
219 : curSnapshotSeqNum uint64
220 : // The snapshot sequence numbers that need to be maintained. These sequence
221 : // numbers define the snapshot stripes (see the Snapshots description
222 : // above). The sequence numbers are in ascending order.
223 : snapshots []uint64
224 : // frontiers holds a heap of user keys that affect compaction behavior when
225 : // they're exceeded. Before a new key is returned, the compaction iterator
226 : // advances the frontier, notifying any code that subscribed to be notified
227 : // when a key was reached. The primary use today is within the
228 : // implementation of compactionOutputSplitters in compaction.go. Many of
229 : // these splitters wait for the compaction iterator to call Advance(k) when
230 : // it's returning a new key. If the key that they're waiting for is
231 : // surpassed, these splitters update internal state recording that they
232 : // should request a compaction split next time they're asked in
233 : // [shouldSplitBefore].
234 : frontiers compact.Frontiers
235 : // Reference to the range deletion tombstone fragmenter (e.g.,
236 : // `compaction.rangeDelFrag`).
237 : // TODO(jackson): We can eliminate range{Del,Key}Frag now that fragmentation
238 : // is performed upfront by keyspanimpl.MergingIters.
239 : rangeDelFrag *keyspan.Fragmenter
240 : rangeKeyFrag *keyspan.Fragmenter
241 : // The fragmented tombstones.
242 : tombstones []keyspan.Span
243 : // The fragmented range keys.
244 : rangeKeys []keyspan.Span
245 : // Byte allocator for the tombstone keys.
246 : alloc bytealloc.A
247 : allowZeroSeqNum bool
248 : elideTombstone func(key []byte) bool
249 : elideRangeTombstone func(start, end []byte) bool
250 : ineffectualSingleDeleteCallback func(userKey []byte)
251 : singleDeleteInvariantViolationCallback func(userKey []byte)
252 : // The on-disk format major version. This informs the types of keys that
253 : // may be written to disk during a compaction.
254 : formatVersion FormatMajorVersion
255 : stats struct {
256 : // count of DELSIZED keys that were missized.
257 : countMissizedDels uint64
258 : }
259 : }
260 :
261 : func newCompactionIter(
262 : cmp Compare,
263 : equal Equal,
264 : formatKey base.FormatKey,
265 : merge Merge,
266 : iter internalIterator,
267 : snapshots []uint64,
268 : rangeDelFrag *keyspan.Fragmenter,
269 : rangeKeyFrag *keyspan.Fragmenter,
270 : allowZeroSeqNum bool,
271 : elideTombstone func(key []byte) bool,
272 : elideRangeTombstone func(start, end []byte) bool,
273 : ineffectualSingleDeleteCallback func(userKey []byte),
274 : singleDeleteInvariantViolationCallback func(userKey []byte),
275 : formatVersion FormatMajorVersion,
276 1 : ) *compactionIter {
277 1 : i := &compactionIter{
278 1 : equal: equal,
279 1 : merge: merge,
280 1 : iter: iter,
281 1 : snapshots: snapshots,
282 1 : rangeDelFrag: rangeDelFrag,
283 1 : rangeKeyFrag: rangeKeyFrag,
284 1 : allowZeroSeqNum: allowZeroSeqNum,
285 1 : elideTombstone: elideTombstone,
286 1 : elideRangeTombstone: elideRangeTombstone,
287 1 : ineffectualSingleDeleteCallback: ineffectualSingleDeleteCallback,
288 1 : singleDeleteInvariantViolationCallback: singleDeleteInvariantViolationCallback,
289 1 : formatVersion: formatVersion,
290 1 : }
291 1 : i.frontiers.Init(cmp)
292 1 : i.rangeDelFrag.Cmp = cmp
293 1 : i.rangeDelFrag.Format = formatKey
294 1 : i.rangeDelFrag.Emit = i.emitRangeDelChunk
295 1 : i.rangeKeyFrag.Cmp = cmp
296 1 : i.rangeKeyFrag.Format = formatKey
297 1 : i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
298 1 : return i
299 1 : }
300 :
301 1 : func (i *compactionIter) First() (*InternalKey, []byte) {
302 1 : if i.err != nil {
303 0 : return nil, nil
304 0 : }
305 1 : var iterValue LazyValue
306 1 : i.iterKey, iterValue = i.iter.First()
307 1 : i.iterValue, _, i.err = iterValue.Value(nil)
308 1 : if i.err != nil {
309 0 : return nil, nil
310 0 : }
311 1 : if i.iterKey != nil {
312 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
313 1 : }
314 1 : i.pos = iterPosNext
315 1 : i.iterStripeChange = newStripeNewKey
316 1 : return i.Next()
317 : }
318 :
319 1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
320 1 : if i.err != nil {
321 0 : return nil, nil
322 0 : }
323 :
324 : // Close the closer for the current value if one was open.
325 1 : if i.closeValueCloser() != nil {
326 0 : return nil, nil
327 0 : }
328 :
329 : // Prior to this call to `Next()` we are in one of three situations with
330 : // respect to `iterKey` and related state:
331 : //
332 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
333 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
334 : // To move forward we advance by one key, even if that lands us in the same
335 : // snapshot stripe.
336 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
337 : // To move forward we skip skippable entries in the stripe.
338 1 : if i.pos == iterPosCurForward {
339 1 : if i.skip {
340 1 : i.skipInStripe()
341 1 : } else {
342 1 : i.nextInStripe()
343 1 : }
344 1 : } else if i.skip {
345 0 : panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
346 : }
347 :
348 1 : i.pos = iterPosCurForward
349 1 : i.valid = false
350 1 :
351 1 : for i.iterKey != nil {
352 1 : // If we entered a new snapshot stripe with the same key, any key we
353 1 : // return on this iteration is only returned because the open snapshot
354 1 : // prevented it from being elided or merged with the key returned for
355 1 : // the previous stripe. Mark it as pinned so that the compaction loop
356 1 : // can correctly populate output tables' pinned statistics. We might
357 1 : // also set snapshotPinned=true down below if we observe that the key is
358 1 : // deleted by a range deletion in a higher stripe or that this key is a
359 1 : // tombstone that could be elided if only it were in the last snapshot
360 1 : // stripe.
361 1 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
362 1 :
363 1 : if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
364 1 : // Return the span so the compaction can use it for file truncation and add
365 1 : // it to the relevant fragmenter. In the case of range deletions, we do not
366 1 : // set `skip` to true before returning as there may be any number of point
367 1 : // keys with the same user key and sequence numbers ≥ the range deletion's
368 1 : // sequence number. Such point keys must be visible (i.e., not skipped
369 1 : // over) since we promise point keys are not deleted by range tombstones at
370 1 : // the same sequence number (or higher).
371 1 : //
372 1 : // Note that `skip` must already be false here, because range keys and range
373 1 : // deletions are interleaved at the maximal sequence numbers and neither will
374 1 : // set `skip`=true.
375 1 : if i.skip {
376 0 : panic(errors.AssertionFailedf("pebble: compaction iterator: skip unexpectedly true"))
377 : }
378 :
379 : // NOTE: there is a subtle invariant violation here in that calling
380 : // saveKey and returning a reference to the temporary slice violates
381 : // the stability guarantee for range deletion keys. A potential
382 : // mediation could return the original iterKey and iterValue
383 : // directly, as the backing memory is guaranteed to be stable until
384 : // the compaction completes. The violation here is only minor in
385 : // that the caller immediately clones the range deletion InternalKey
386 : // when passing the key to the deletion fragmenter (see the
387 : // call-site in compaction.go).
388 : // TODO(travers): address this violation by removing the call to
389 : // saveKey and instead return the original iterKey and iterValue.
390 : // This goes against the comment on i.key in the struct, and
391 : // therefore warrants some investigation.
392 1 : i.saveKey()
393 1 : // TODO(jackson): Handle tracking pinned statistics for range keys
394 1 : // and range deletions. This would require updating
395 1 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
396 1 : // statistics when they apply their own snapshot striping logic.
397 1 : i.snapshotPinned = false
398 1 : i.value = i.iterValue
399 1 : i.valid = true
400 1 : return &i.key, i.value
401 : }
402 :
403 : // TODO(sumeer): we could avoid calling Covers if i.iterStripeChange ==
404 : // sameStripeSameKey since that check has already been done in
405 : // nextInStripeHelper. However, we also need to handle the case of
406 : // CoversInvisibly below.
407 1 : if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
408 1 : // A pending range deletion deletes this key. Skip it.
409 1 : i.saveKey()
410 1 : i.skipInStripe()
411 1 : continue
412 1 : } else if cover == keyspan.CoversInvisibly {
413 1 : // i.iterKey would be deleted by a range deletion if there weren't
414 1 : // any open snapshots. Mark it as pinned.
415 1 : //
416 1 : // NB: there are multiple places in this file where we call
417 1 : // i.rangeDelFrag.Covers and this is the only one where we are writing
418 1 : // to i.snapshotPinned. Those other cases occur in mergeNext where the
419 1 : // caller is deciding whether the value should be merged or not, and the
420 1 : // key is in the same snapshot stripe. Hence, snapshotPinned is by
421 1 : // definition false in those cases.
422 1 : i.snapshotPinned = true
423 1 : i.forceObsoleteDueToRangeDel = true
424 1 : } else {
425 1 : i.forceObsoleteDueToRangeDel = false
426 1 : }
427 :
428 1 : switch i.iterKey.Kind() {
429 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
430 1 : if i.elideTombstone(i.iterKey.UserKey) {
431 1 : if i.curSnapshotIdx == 0 {
432 1 : // If we're at the last snapshot stripe and the tombstone
433 1 : // can be elided skip skippable keys in the same stripe.
434 1 : i.saveKey()
435 1 : if i.key.Kind() == InternalKeyKindSingleDelete {
436 1 : i.skipDueToSingleDeleteElision()
437 1 : } else {
438 1 : i.skipInStripe()
439 1 : if !i.skip && i.iterStripeChange != newStripeNewKey {
440 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
441 : }
442 : }
443 1 : if i.iterStripeChange == newStripeSameKey {
444 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
445 : }
446 1 : continue
447 1 : } else {
448 1 : // We're not at the last snapshot stripe, so the tombstone
449 1 : // can NOT yet be elided. Mark it as pinned, so that it's
450 1 : // included in table statistics appropriately.
451 1 : i.snapshotPinned = true
452 1 : }
453 : }
454 :
455 1 : switch i.iterKey.Kind() {
456 1 : case InternalKeyKindDelete:
457 1 : i.saveKey()
458 1 : i.value = i.iterValue
459 1 : i.valid = true
460 1 : i.skip = true
461 1 : return &i.key, i.value
462 :
463 1 : case InternalKeyKindDeleteSized:
464 1 : // We may skip subsequent keys because of this tombstone. Scan
465 1 : // ahead to see just how much data this tombstone drops and if
466 1 : // the tombstone's value should be updated accordingly.
467 1 : return i.deleteSizedNext()
468 :
469 1 : case InternalKeyKindSingleDelete:
470 1 : if i.singleDeleteNext() {
471 1 : return &i.key, i.value
472 1 : } else if i.err != nil {
473 0 : return nil, nil
474 0 : }
475 1 : continue
476 :
477 0 : default:
478 0 : panic(errors.AssertionFailedf(
479 0 : "unexpected kind %s", redact.SafeString(i.iterKey.Kind().String())))
480 : }
481 :
482 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
483 1 : // The key we emit for this entry is a function of the current key
484 1 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
485 1 : // entry. setNext() does the work to move the iterator forward,
486 1 : // preserving the original value, and potentially mutating the key
487 1 : // kind.
488 1 : i.setNext()
489 1 : if i.err != nil {
490 1 : return nil, nil
491 1 : }
492 1 : return &i.key, i.value
493 :
494 1 : case InternalKeyKindMerge:
495 1 : // Record the snapshot index before mergeNext as merging
496 1 : // advances the iterator, adjusting curSnapshotIdx.
497 1 : origSnapshotIdx := i.curSnapshotIdx
498 1 : var valueMerger ValueMerger
499 1 : valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
500 1 : if i.err == nil {
501 1 : i.mergeNext(valueMerger)
502 1 : }
503 1 : var needDelete bool
504 1 : if i.err == nil {
505 1 : // includesBase is true whenever we've transformed the MERGE record
506 1 : // into a SET.
507 1 : var includesBase bool
508 1 : switch i.key.Kind() {
509 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
510 1 : includesBase = true
511 1 : case InternalKeyKindMerge:
512 0 : default:
513 0 : panic(errors.AssertionFailedf(
514 0 : "unexpected kind %s", redact.SafeString(i.key.Kind().String())))
515 : }
516 1 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
517 : }
518 1 : if i.err == nil {
519 1 : if needDelete {
520 0 : i.valid = false
521 0 : if i.closeValueCloser() != nil {
522 0 : return nil, nil
523 0 : }
524 0 : continue
525 : }
526 :
527 1 : i.maybeZeroSeqnum(origSnapshotIdx)
528 1 : return &i.key, i.value
529 : }
530 1 : if i.err != nil {
531 1 : i.valid = false
532 1 : // TODO(sumeer): why is MarkCorruptionError only being called for
533 1 : // MERGE?
534 1 : i.err = base.MarkCorruptionError(i.err)
535 1 : }
536 1 : return nil, nil
537 :
538 1 : default:
539 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
540 1 : i.valid = false
541 1 : return nil, nil
542 : }
543 : }
544 :
545 1 : return nil, nil
546 : }
547 :
548 1 : func (i *compactionIter) closeValueCloser() error {
549 1 : if i.valueCloser == nil {
550 1 : return nil
551 1 : }
552 :
553 0 : i.err = i.valueCloser.Close()
554 0 : i.valueCloser = nil
555 0 : if i.err != nil {
556 0 : i.valid = false
557 0 : }
558 0 : return i.err
559 : }
560 :
561 : // snapshotIndex returns the index of the first sequence number in snapshots
562 : // which is greater than or equal to seq.
563 1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
564 1 : index := sort.Search(len(snapshots), func(i int) bool {
565 1 : return snapshots[i] > seq
566 1 : })
567 1 : if index >= len(snapshots) {
568 1 : return index, InternalKeySeqNumMax
569 1 : }
570 1 : return index, snapshots[index]
571 : }
572 :
573 : // skipInStripe skips over skippable keys in the same stripe and user key. It
574 : // may set i.err, in which case i.iterKey will be nil.
575 1 : func (i *compactionIter) skipInStripe() {
576 1 : i.skip = true
577 1 : // TODO(sumeer): we can avoid the overhead of calling i.rangeDelFrag.Covers,
578 1 : // in this case of nextInStripe, since we are skipping all of them anyway.
579 1 : for i.nextInStripe() == sameStripe {
580 1 : if i.err != nil {
581 0 : panic(i.err)
582 : }
583 : }
584 : // We landed outside the original stripe, so reset skip.
585 1 : i.skip = false
586 : }
587 :
588 1 : func (i *compactionIter) iterNext() bool {
589 1 : var iterValue LazyValue
590 1 : i.iterKey, iterValue = i.iter.Next()
591 1 : i.iterValue, _, i.err = iterValue.Value(nil)
592 1 : if i.err != nil {
593 0 : i.iterKey = nil
594 0 : }
595 1 : return i.iterKey != nil
596 : }
597 :
598 : // stripeChangeType indicates how the snapshot stripe changed relative to the
599 : // previous key. If the snapshot stripe changed, it also indicates whether the
600 : // new stripe was entered because the iterator progressed onto an entirely new
601 : // key or entered a new stripe within the same key.
602 : type stripeChangeType int
603 :
604 : const (
605 : newStripeNewKey stripeChangeType = iota
606 : newStripeSameKey
607 : sameStripe
608 : )
609 :
610 : // nextInStripe advances the iterator and returns one of the above const ints
611 : // indicating how its state changed.
612 : //
613 : // All sameStripe keys that are covered by a RANGEDEL will be skipped and not
614 : // returned.
615 : //
616 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
617 : // temporary reference to the original key, so that forward iteration can
618 : // proceed with a reference to the original key. Care should be taken to avoid
619 : // overwriting or mutating the saved key or value before they have been returned
620 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
621 : //
622 : // nextInStripe may set i.err, in which case the return value will be
623 : // newStripeNewKey, and i.iterKey will be nil.
624 1 : func (i *compactionIter) nextInStripe() stripeChangeType {
625 1 : i.iterStripeChange = i.nextInStripeHelper()
626 1 : return i.iterStripeChange
627 1 : }
628 :
629 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
630 : // nextInStripe and not call nextInStripeHelper.
631 1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
632 1 : origSnapshotIdx := i.curSnapshotIdx
633 1 : for {
634 1 : if !i.iterNext() {
635 1 : return newStripeNewKey
636 1 : }
637 1 : key := i.iterKey
638 1 :
639 1 : // Is this a new key? There are two cases:
640 1 : //
641 1 : // 1. The new key has a different user key.
642 1 : // 2. The previous key was an interleaved range deletion or range key
643 1 : // boundary. These keys are interleaved in the same input iterator
644 1 : // stream as point keys, but they do not obey the ordinary sequence
645 1 : // number ordering within a user key. If the previous key was one
646 1 : // of these keys, we consider the new key a `newStripeNewKey` to
647 1 : // reflect that it's the beginning of a new stream of point keys.
648 1 : if i.key.IsExclusiveSentinel() || !i.equal(i.key.UserKey, key.UserKey) {
649 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
650 1 : return newStripeNewKey
651 1 : }
652 :
653 : // If i.key and key have the same user key, then
654 : // 1. i.key must not have had a zero sequence number (or it would've be the last
655 : // key with its user key).
656 : // 2. i.key must have a strictly larger sequence number
657 : // There's an exception in that either key may be a range delete. Range
658 : // deletes may share a sequence number with a point key if the keys were
659 : // ingested together. Range keys may also share the sequence number if they
660 : // were ingested, but range keys are interleaved into the compaction
661 : // iterator's input iterator at the maximal sequence number so their
662 : // original sequence number will not be observed here.
663 1 : if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= key.SeqNum()) &&
664 1 : i.key.Kind() != InternalKeyKindRangeDelete && key.Kind() != InternalKeyKindRangeDelete {
665 0 : prevKey := i.key
666 0 : prevKey.Trailer = i.keyTrailer
667 0 : panic(errors.AssertionFailedf("pebble: invariant violation: %s and %s out of order", prevKey, key))
668 : }
669 :
670 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
671 1 : switch key.Kind() {
672 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
673 0 : InternalKeyKindRangeDelete:
674 0 : // Range tombstones and range keys are interleaved at the max
675 0 : // sequence number for a given user key, and the first key after one
676 0 : // is always considered a newStripeNewKey, so we should never reach
677 0 : // this.
678 0 : panic("unreachable")
679 : case InternalKeyKindDelete, InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSingleDelete,
680 1 : InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
681 : // Fall through
682 1 : default:
683 1 : kind := i.iterKey.Kind()
684 1 : i.iterKey = nil
685 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(kind))
686 1 : i.valid = false
687 1 : return newStripeNewKey
688 : }
689 1 : if i.curSnapshotIdx == origSnapshotIdx {
690 1 : // Same snapshot.
691 1 : if i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
692 1 : continue
693 : }
694 1 : return sameStripe
695 : }
696 1 : return newStripeSameKey
697 : }
698 : }
699 :
700 1 : func (i *compactionIter) setNext() {
701 1 : // Save the current key.
702 1 : i.saveKey()
703 1 : i.value = i.iterValue
704 1 : i.valid = true
705 1 : i.maybeZeroSeqnum(i.curSnapshotIdx)
706 1 :
707 1 : // If this key is already a SETWITHDEL we can early return and skip the remaining
708 1 : // records in the stripe:
709 1 : if i.iterKey.Kind() == InternalKeyKindSetWithDelete {
710 1 : i.skip = true
711 1 : return
712 1 : }
713 :
714 : // We are iterating forward. Save the current value.
715 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
716 1 : i.value = i.valueBuf
717 1 :
718 1 : // Else, we continue to loop through entries in the stripe looking for a
719 1 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
720 1 : //
721 1 : // NB: nextInStripe will skip sameStripe keys that are visibly covered by a
722 1 : // RANGEDEL. This can include DELs -- this is fine since such DELs don't
723 1 : // need to be combined with SET to make SETWITHDEL.
724 1 : for {
725 1 : switch i.nextInStripe() {
726 1 : case newStripeNewKey, newStripeSameKey:
727 1 : i.pos = iterPosNext
728 1 : return
729 1 : case sameStripe:
730 1 : // We're still in the same stripe. If this is a
731 1 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
732 1 : // Subsequent keys are eligible for skipping.
733 1 : switch i.iterKey.Kind() {
734 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
735 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
736 1 : i.skip = true
737 1 : return
738 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
739 : // Do nothing
740 0 : default:
741 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
742 0 : i.valid = false
743 : }
744 0 : default:
745 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
746 : }
747 : }
748 : }
749 :
750 1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) {
751 1 : // Save the current key.
752 1 : i.saveKey()
753 1 : i.valid = true
754 1 :
755 1 : // Loop looking for older values in the current snapshot stripe and merge
756 1 : // them.
757 1 : for {
758 1 : if i.nextInStripe() != sameStripe {
759 1 : i.pos = iterPosNext
760 1 : return
761 1 : }
762 1 : if i.err != nil {
763 0 : panic(i.err)
764 : }
765 : // NB: MERGE#10+RANGEDEL#9 stays a MERGE, since nextInStripe skips
766 : // sameStripe keys that are visibly covered by a RANGEDEL. There may be
767 : // MERGE#7 that is invisibly covered and will be preserved, but there is
768 : // no risk that MERGE#10 and MERGE#7 will get merged in the future as
769 : // the RANGEDEL still exists and will be used in user-facing reads that
770 : // see MERGE#10, and will also eventually cause MERGE#7 to be deleted in
771 : // a compaction.
772 1 : key := i.iterKey
773 1 : switch key.Kind() {
774 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
775 1 : // We've hit a deletion tombstone. Return everything up to this point and
776 1 : // then skip entries until the next snapshot stripe. We change the kind
777 1 : // of the result key to a Set so that it shadows keys in lower
778 1 : // levels. That is, MERGE+DEL -> SETWITHDEL.
779 1 : //
780 1 : // We do the same for SingleDelete since SingleDelete is only
781 1 : // permitted (with deterministic behavior) for keys that have been
782 1 : // set once since the last SingleDelete/Delete, so everything
783 1 : // older is acceptable to shadow. Note that this is slightly
784 1 : // different from singleDeleteNext() which implements stricter
785 1 : // semantics in terms of applying the SingleDelete to the single
786 1 : // next Set. But those stricter semantics are not observable to
787 1 : // the end-user since Iterator interprets SingleDelete as Delete.
788 1 : // We could do something more complicated here and consume only a
789 1 : // single Set, and then merge in any following Sets, but that is
790 1 : // complicated wrt code and unnecessary given the narrow permitted
791 1 : // use of SingleDelete.
792 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
793 1 : i.skip = true
794 1 : return
795 :
796 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
797 1 : // We've hit a Set or SetWithDel value. Merge with the existing
798 1 : // value and return. We change the kind of the resulting key to a
799 1 : // Set so that it shadows keys in lower levels. That is:
800 1 : // MERGE + (SET*) -> SET.
801 1 : i.err = valueMerger.MergeOlder(i.iterValue)
802 1 : if i.err != nil {
803 0 : i.valid = false
804 0 : return
805 0 : }
806 1 : i.key.SetKind(InternalKeyKindSet)
807 1 : i.skip = true
808 1 : return
809 :
810 1 : case InternalKeyKindMerge:
811 1 : // We've hit another Merge value. Merge with the existing value and
812 1 : // continue looping.
813 1 : i.err = valueMerger.MergeOlder(i.iterValue)
814 1 : if i.err != nil {
815 0 : i.valid = false
816 0 : return
817 0 : }
818 :
819 0 : default:
820 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
821 0 : i.valid = false
822 0 : return
823 : }
824 : }
825 : }
826 :
827 : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
828 : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
829 : // performance optimization when the client knows a user key has not been
830 : // overwritten, allowing the elision of the tombstone earlier, avoiding write
831 : // amplification.
832 : //
833 : // singleDeleteNext returns a boolean indicating whether or not the caller
834 : // should yield the SingleDelete key to the consumer of the compactionIter. If
835 : // singleDeleteNext returns false, the caller may consume/elide the
836 : // SingleDelete.
837 1 : func (i *compactionIter) singleDeleteNext() bool {
838 1 : // Save the current key.
839 1 : i.saveKey()
840 1 : i.value = i.iterValue
841 1 : i.valid = true
842 1 :
843 1 : // Loop until finds a key to be passed to the next level.
844 1 : for {
845 1 : // If we find a key that can't be skipped, return true so that the
846 1 : // caller yields the SingleDelete to the caller.
847 1 : if i.nextInStripe() != sameStripe {
848 1 : // This defers additional error checking regarding single delete
849 1 : // invariants to the compaction where the keys with the same user key as
850 1 : // the single delete are in the same stripe.
851 1 : i.pos = iterPosNext
852 1 : return i.err == nil
853 1 : }
854 1 : if i.err != nil {
855 0 : panic(i.err)
856 : }
857 : // INVARIANT: sameStripe.
858 1 : key := i.iterKey
859 1 : kind := key.Kind()
860 1 : switch kind {
861 1 : case InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
862 1 : if (kind == InternalKeyKindDelete || kind == InternalKeyKindDeleteSized) &&
863 1 : i.ineffectualSingleDeleteCallback != nil {
864 1 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
865 1 : }
866 : // We've hit a Delete, DeleteSized, SetWithDelete, transform
867 : // the SingleDelete into a full Delete.
868 1 : i.key.SetKind(InternalKeyKindDelete)
869 1 : i.skip = true
870 1 : return true
871 :
872 1 : case InternalKeyKindSet, InternalKeyKindMerge:
873 1 : // This SingleDelete deletes the Set/Merge, and we can now elide the
874 1 : // SingleDel as well. We advance past the Set and return false to
875 1 : // indicate to the main compaction loop that we should NOT yield the
876 1 : // current SingleDel key to the compaction loop.
877 1 : //
878 1 : // NB: singleDeleteNext was called with i.pos == iterPosCurForward, and
879 1 : // after the call to nextInStripe, we are still at iterPosCurForward,
880 1 : // since we are at the key after the Set/Merge that was single deleted.
881 1 : change := i.nextInStripe()
882 1 : switch change {
883 1 : case sameStripe, newStripeSameKey:
884 1 : // On the same user key.
885 1 : nextKind := i.iterKey.Kind()
886 1 : switch nextKind {
887 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
888 1 : if i.singleDeleteInvariantViolationCallback != nil {
889 1 : // sameStripe keys returned by nextInStripe() are already
890 1 : // known to not be covered by a RANGEDEL, so it is an invariant
891 1 : // violation. The rare case is newStripeSameKey, where it is a
892 1 : // violation if not covered by a RANGEDEL.
893 1 : if change == sameStripe ||
894 1 : i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum) == keyspan.NoCover {
895 1 : i.singleDeleteInvariantViolationCallback(i.key.UserKey)
896 1 : }
897 : }
898 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
899 0 : default:
900 0 : panic(errors.AssertionFailedf(
901 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
902 : }
903 1 : case newStripeNewKey:
904 0 : default:
905 0 : panic("unreachable")
906 : }
907 1 : i.valid = false
908 1 : return false
909 :
910 1 : case InternalKeyKindSingleDelete:
911 1 : // Two single deletes met in a compaction. The first single delete is
912 1 : // ineffectual.
913 1 : if i.ineffectualSingleDeleteCallback != nil {
914 1 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
915 1 : }
916 : // Continue to apply the second single delete.
917 1 : continue
918 :
919 0 : default:
920 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
921 0 : i.valid = false
922 0 : return false
923 : }
924 : }
925 : }
926 :
927 : // skipDueToSingleDeleteElision is called when the SingleDelete is being
928 : // elided because it is in the final snapshot stripe and there are no keys
929 : // with the same user key in lower levels in the LSM (below the files in this
930 : // compaction).
931 : //
932 : // TODO(sumeer): the only difference between singleDeleteNext and
933 : // skipDueToSingleDeleteElision is the fact that the caller knows it will be
934 : // eliding the single delete in the latter case. There are some similar things
935 : // happening in both implementations. My first attempt at combining them into
936 : // a single method was hard to comprehend. Try again.
937 1 : func (i *compactionIter) skipDueToSingleDeleteElision() {
938 1 : for {
939 1 : stripeChange := i.nextInStripe()
940 1 : if i.err != nil {
941 0 : panic(i.err)
942 : }
943 1 : switch stripeChange {
944 1 : case newStripeNewKey:
945 1 : // The single delete is only now being elided, meaning it did not elide
946 1 : // any keys earlier in its descent down the LSM. We stepped onto a new
947 1 : // user key, meaning that even now at its moment of elision, it still
948 1 : // hasn't elided any other keys. The single delete was ineffectual (a
949 1 : // no-op).
950 1 : if i.ineffectualSingleDeleteCallback != nil {
951 1 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
952 1 : }
953 1 : i.skip = false
954 1 : return
955 0 : case newStripeSameKey:
956 0 : // This should be impossible. If we're eliding a single delete, we
957 0 : // determined that the tombstone is in the final snapshot stripe, but we
958 0 : // stepped into a new stripe of the same key.
959 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
960 1 : case sameStripe:
961 1 : kind := i.iterKey.Kind()
962 1 : switch kind {
963 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
964 1 : if i.ineffectualSingleDeleteCallback != nil {
965 1 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
966 1 : }
967 1 : switch kind {
968 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized:
969 1 : i.skipInStripe()
970 1 : return
971 1 : case InternalKeyKindSingleDelete:
972 1 : // Repeat the same with this SingleDelete. We don't want to simply
973 1 : // call skipInStripe(), since it increases the strength of the
974 1 : // SingleDel, which hides bugs in the use of single delete.
975 1 : continue
976 0 : default:
977 0 : panic(errors.AssertionFailedf(
978 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
979 : }
980 1 : case InternalKeyKindSetWithDelete:
981 1 : // The SingleDelete should behave like a Delete.
982 1 : i.skipInStripe()
983 1 : return
984 1 : case InternalKeyKindSet, InternalKeyKindMerge:
985 1 : // This SingleDelete deletes the Set/Merge, and we are eliding the
986 1 : // SingleDel as well. Step to the next key (this is not deleted by the
987 1 : // SingleDelete).
988 1 : //
989 1 : // NB: skipDueToSingleDeleteElision was called with i.pos ==
990 1 : // iterPosCurForward, and after the call to nextInStripe, we are still
991 1 : // at iterPosCurForward, since we are at the key after the Set/Merge
992 1 : // that was single deleted.
993 1 : change := i.nextInStripe()
994 1 : if i.err != nil {
995 0 : panic(i.err)
996 : }
997 1 : switch change {
998 0 : case newStripeSameKey:
999 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
1000 1 : case newStripeNewKey:
1001 0 : case sameStripe:
1002 0 : // On the same key.
1003 0 : nextKind := i.iterKey.Kind()
1004 0 : switch nextKind {
1005 0 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
1006 0 : if i.singleDeleteInvariantViolationCallback != nil {
1007 0 : i.singleDeleteInvariantViolationCallback(i.key.UserKey)
1008 0 : }
1009 0 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
1010 0 : default:
1011 0 : panic(errors.AssertionFailedf(
1012 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
1013 : }
1014 0 : default:
1015 0 : panic("unreachable")
1016 : }
1017 : // Whether in same stripe or new stripe, this key is not consumed by
1018 : // the SingleDelete.
1019 1 : i.skip = false
1020 1 : return
1021 0 : default:
1022 0 : panic(errors.AssertionFailedf(
1023 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKey.Kind())))
1024 : }
1025 0 : default:
1026 0 : panic("unreachable")
1027 : }
1028 : }
1029 : }
1030 :
1031 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
1032 : // these tombstones carry a value that's a varint indicating the size of the
1033 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
1034 : //
1035 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
1036 : // any, are elided as a result of the tombstone.
1037 1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
1038 1 : i.saveKey()
1039 1 : i.valid = true
1040 1 : i.skip = true
1041 1 :
1042 1 : // The DELSIZED tombstone may have no value at all. This happens when the
1043 1 : // tombstone has already deleted the key that the user originally predicted.
1044 1 : // In this case, we still peek forward in case there's another DELSIZED key
1045 1 : // with a lower sequence number, in which case we'll adopt its value.
1046 1 : if len(i.iterValue) == 0 {
1047 1 : i.value = i.valueBuf[:0]
1048 1 : } else {
1049 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1050 1 : i.value = i.valueBuf
1051 1 : }
1052 :
1053 : // Loop through all the keys within this stripe that are skippable.
1054 1 : i.pos = iterPosNext
1055 1 : for i.nextInStripe() == sameStripe {
1056 1 : if i.err != nil {
1057 0 : panic(i.err)
1058 : }
1059 1 : switch i.iterKey.Kind() {
1060 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
1061 1 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
1062 1 : // the original DELSIZED tombstone. This can happen in two cases:
1063 1 : //
1064 1 : // (1) These tombstones were intended to delete two distinct values,
1065 1 : // and this DELSIZED has already dropped the relevant key. For
1066 1 : // example:
1067 1 : //
1068 1 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
1069 1 : //
1070 1 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
1071 1 : // already been zeroed out. In this case, we want to adopt the
1072 1 : // value of the DELSIZED with the lower sequence number, in
1073 1 : // case the a.SET.4 key has not yet been elided.
1074 1 : //
1075 1 : // (2) This DELSIZED was missized. The user thought they were
1076 1 : // deleting a key with this user key, but this user key had
1077 1 : // already been deleted.
1078 1 : //
1079 1 : // We can differentiate these two cases by examining the length of
1080 1 : // the DELSIZED's value. A DELSIZED's value holds the size of both
1081 1 : // the user key and value that it intends to delete. For any user
1082 1 : // key with a length > 0, a DELSIZED that has not deleted a key must
1083 1 : // have a value with a length > 0.
1084 1 : //
1085 1 : // We treat both cases the same functionally, adopting the identity
1086 1 : // of the lower-sequence numbered tombstone. However in the second
1087 1 : // case, we also increment the stat counting missized tombstones.
1088 1 : if len(i.value) > 0 {
1089 1 : // The original DELSIZED key was missized. The key that the user
1090 1 : // thought they were deleting does not exist.
1091 1 : i.stats.countMissizedDels++
1092 1 : }
1093 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1094 1 : i.value = i.valueBuf
1095 1 : if i.iterKey.Kind() != InternalKeyKindDeleteSized {
1096 1 : // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
1097 1 : // may not have deleted the key(s) it was intended to yet. The
1098 1 : // ordinary DEL compaction heuristics are better suited at that,
1099 1 : // plus we don't want to count it as a missized DEL. We early
1100 1 : // exit in this case, after skipping the remainder of the
1101 1 : // snapshot stripe.
1102 1 : i.key.SetKind(InternalKeyKindDelete)
1103 1 : // NB: We skipInStripe now, rather than returning leaving
1104 1 : // i.skip=true and returning early, because Next() requires
1105 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1106 1 : //
1107 1 : // Ignore any error caused by skipInStripe since it does not affect
1108 1 : // the key/value being returned here, and the next call to Next() will
1109 1 : // expose it.
1110 1 : i.skipInStripe()
1111 1 : return &i.key, i.value
1112 1 : }
1113 : // Continue, in case we uncover another DELSIZED or a key this
1114 : // DELSIZED deletes.
1115 :
1116 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
1117 1 : // If the DELSIZED is value-less, it already deleted the key that it
1118 1 : // was intended to delete. This is possible with a sequence like:
1119 1 : //
1120 1 : // DELSIZED.8 SET.7 SET.3
1121 1 : //
1122 1 : // The DELSIZED only describes the size of the SET.7, which in this
1123 1 : // case has already been elided. We don't count it as a missizing,
1124 1 : // instead converting the DELSIZED to a DEL. Skip the remainder of
1125 1 : // the snapshot stripe and return.
1126 1 : if len(i.value) == 0 {
1127 1 : i.key.SetKind(InternalKeyKindDelete)
1128 1 : // NB: We skipInStripe now, rather than returning leaving
1129 1 : // i.skip=true and returning early, because Next() requires
1130 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1131 1 : //
1132 1 : // Ignore any error caused by skipInStripe since it does not affect
1133 1 : // the key/value being returned here, and the next call to Next() will
1134 1 : // expose it.
1135 1 : i.skipInStripe()
1136 1 : return &i.key, i.value
1137 1 : }
1138 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
1139 : // has a positive size.
1140 1 : expectedSize, n := binary.Uvarint(i.value)
1141 1 : if n != len(i.value) {
1142 1 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
1143 1 : i.valid = false
1144 1 : return nil, nil
1145 1 : }
1146 1 : elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
1147 1 : if elidedSize != expectedSize {
1148 1 : // The original DELSIZED key was missized. It's unclear what to
1149 1 : // do. The user-provided size was wrong, so it's unlikely to be
1150 1 : // accurate or meaningful. We could:
1151 1 : //
1152 1 : // 1. return the DELSIZED with the original user-provided size unmodified
1153 1 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
1154 1 : // elided, even if it wasn't the anticipated size.
1155 1 : // 3. subtract the elided size from the estimate and re-encode.
1156 1 : // 4. convert the DELSIZED into a value-less DEL, so that
1157 1 : // ordinary DEL heuristics apply.
1158 1 : //
1159 1 : // We opt for (4) under the rationale that we can't rely on the
1160 1 : // user-provided size for accuracy, so ordinary DEL heuristics
1161 1 : // are safer.
1162 1 : i.stats.countMissizedDels++
1163 1 : i.key.SetKind(InternalKeyKindDelete)
1164 1 : i.value = i.valueBuf[:0]
1165 1 : // NB: We skipInStripe now, rather than returning leaving
1166 1 : // i.skip=true and returning early, because Next() requires
1167 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1168 1 : //
1169 1 : // Ignore any error caused by skipInStripe since it does not affect
1170 1 : // the key/value being returned here, and the next call to Next() will
1171 1 : // expose it.
1172 1 : i.skipInStripe()
1173 1 : return &i.key, i.value
1174 1 : }
1175 : // NB: We remove the value regardless of whether the key was sized
1176 : // appropriately. The size encoded is 'consumed' the first time it
1177 : // meets a key that it deletes.
1178 1 : i.value = i.valueBuf[:0]
1179 :
1180 0 : default:
1181 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
1182 0 : i.valid = false
1183 0 : return nil, nil
1184 : }
1185 : }
1186 :
1187 1 : if i.iterStripeChange == sameStripe {
1188 0 : panic(errors.AssertionFailedf("unexpectedly found iter stripe change = %d", i.iterStripeChange))
1189 : }
1190 : // We landed outside the original stripe. Reset skip.
1191 1 : i.skip = false
1192 1 : if i.err != nil {
1193 0 : return nil, nil
1194 0 : }
1195 1 : return &i.key, i.value
1196 : }
1197 :
1198 1 : func (i *compactionIter) saveKey() {
1199 1 : i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
1200 1 : i.key.UserKey = i.keyBuf
1201 1 : i.key.Trailer = i.iterKey.Trailer
1202 1 : i.keyTrailer = i.iterKey.Trailer
1203 1 : i.frontiers.Advance(i.key.UserKey)
1204 1 : }
1205 :
1206 1 : func (i *compactionIter) cloneKey(key []byte) []byte {
1207 1 : i.alloc, key = i.alloc.Copy(key)
1208 1 : return key
1209 1 : }
1210 :
1211 1 : func (i *compactionIter) Key() InternalKey {
1212 1 : return i.key
1213 1 : }
1214 :
1215 1 : func (i *compactionIter) Value() []byte {
1216 1 : return i.value
1217 1 : }
1218 :
1219 1 : func (i *compactionIter) Valid() bool {
1220 1 : return i.valid
1221 1 : }
1222 :
1223 1 : func (i *compactionIter) Error() error {
1224 1 : return i.err
1225 1 : }
1226 :
1227 1 : func (i *compactionIter) Close() error {
1228 1 : err := i.iter.Close()
1229 1 : if i.err == nil {
1230 1 : i.err = err
1231 1 : }
1232 :
1233 : // Close the closer for the current value if one was open.
1234 1 : if i.valueCloser != nil {
1235 0 : i.err = firstError(i.err, i.valueCloser.Close())
1236 0 : i.valueCloser = nil
1237 0 : }
1238 :
1239 1 : return i.err
1240 : }
1241 :
1242 : // Tombstones returns a list of pending range tombstones in the fragmenter
1243 : // up to the specified key, or all pending range tombstones if key = nil.
1244 1 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
1245 1 : if key == nil {
1246 1 : i.rangeDelFrag.Finish()
1247 1 : } else {
1248 1 : // The specified end key is exclusive; no versions of the specified
1249 1 : // user key (including range tombstones covering that key) should
1250 1 : // be flushed yet.
1251 1 : i.rangeDelFrag.TruncateAndFlushTo(key)
1252 1 : }
1253 1 : tombstones := i.tombstones
1254 1 : i.tombstones = nil
1255 1 : return tombstones
1256 : }
1257 :
1258 : // RangeKeys returns a list of pending fragmented range keys up to the specified
1259 : // key, or all pending range keys if key = nil.
1260 1 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
1261 1 : if key == nil {
1262 1 : i.rangeKeyFrag.Finish()
1263 1 : } else {
1264 1 : // The specified end key is exclusive; no versions of the specified
1265 1 : // user key (including range tombstones covering that key) should
1266 1 : // be flushed yet.
1267 1 : i.rangeKeyFrag.TruncateAndFlushTo(key)
1268 1 : }
1269 1 : rangeKeys := i.rangeKeys
1270 1 : i.rangeKeys = nil
1271 1 : return rangeKeys
1272 : }
1273 :
1274 1 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
1275 1 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1276 1 : // each snapshot stripe.
1277 1 : currentIdx := -1
1278 1 : keys := fragmented.Keys[:0]
1279 1 : for _, k := range fragmented.Keys {
1280 1 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1281 1 : if currentIdx == idx {
1282 1 : continue
1283 : }
1284 1 : if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
1285 1 : // This is the last snapshot stripe and the range tombstone
1286 1 : // can be elided.
1287 1 : break
1288 : }
1289 :
1290 1 : keys = append(keys, k)
1291 1 : if idx == 0 {
1292 1 : // This is the last snapshot stripe.
1293 1 : break
1294 : }
1295 1 : currentIdx = idx
1296 : }
1297 1 : if len(keys) > 0 {
1298 1 : i.tombstones = append(i.tombstones, keyspan.Span{
1299 1 : Start: fragmented.Start,
1300 1 : End: fragmented.End,
1301 1 : Keys: keys,
1302 1 : })
1303 1 : }
1304 : }
1305 :
1306 1 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
1307 1 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1308 1 : // do that here.
1309 1 : if len(fragmented.Keys) > 0 {
1310 1 : i.rangeKeys = append(i.rangeKeys, fragmented)
1311 1 : }
1312 : }
1313 :
1314 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1315 : // so improves compression and enables an optimization during forward iteration
1316 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1317 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1318 1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1319 1 : if !i.allowZeroSeqNum {
1320 1 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1321 1 : // make the determination on a key by key basis, similar to what is done
1322 1 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1323 1 : // that isn't too expensive.
1324 1 : return
1325 1 : }
1326 1 : if snapshotIdx > 0 {
1327 1 : // This is not the last snapshot
1328 1 : return
1329 1 : }
1330 1 : i.key.SetSeqNum(base.SeqNumZero)
1331 : }
|