Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "encoding/binary"
9 : "fmt"
10 : "io"
11 : "slices"
12 : "sort"
13 : "strconv"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/compact"
19 : "github.com/cockroachdb/pebble/internal/invariants"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/internal/rangekey"
22 : "github.com/cockroachdb/redact"
23 : )
24 :
25 : // compactionIter provides a forward-only iterator that encapsulates the logic
26 : // for collapsing entries during compaction. It wraps an internal iterator and
27 : // collapses entries that are no longer necessary because they are shadowed by
28 : // newer entries. The simplest example of this is when the internal iterator
29 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
30 : // compactionIter collapses the second entry because it is no longer
31 : // necessary. The high-level structure for compactionIter is to iterate over
32 : // its internal iterator and output 1 entry for every user-key. There are four
33 : // complications to this story.
34 : //
35 : // 1. Eliding Deletion Tombstones
36 : //
37 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
38 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
39 : // shadows an entry at a lower level. If we're compacting to the base-level in
40 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
41 : // level and can be elided.
42 : //
43 : // We can do slightly better than only eliding deletion tombstones at the base
44 : // level by observing that we can elide a deletion tombstone if there are no
45 : // sstables that contain the entry's key. This check is performed by
46 : // elideTombstone.
47 : //
48 : // 2. Merges
49 : //
50 : // The MERGE operation merges the value for an entry with the existing value
51 : // for an entry. The logical value of an entry can be composed of a series of
52 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
53 : // internal iterator collapsing MERGE operations for the same key until it
54 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
55 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
56 : // merged using the specified Merger.
57 : //
58 : // An interesting case here occurs when MERGE is combined with SET. Consider
59 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
60 : // reason that the kind is changed to SET is because the SET operation acts as
61 : // a barrier preventing further merging. This can be seen better in the
62 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
63 : // (older) level and not involved in the compaction. If the compaction of
64 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
65 : // a.MERGE.1 would merge the values together incorrectly.
66 : //
67 : // 3. Snapshots
68 : //
69 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
70 : // a snapshot is a sequence number along with a guarantee from Pebble that it
71 : // will maintain the view of the database at that sequence number. Part of this
72 : // guarantee is relatively straightforward to achieve. When reading from the
73 : // database Pebble will ignore sequence numbers that are larger than the
74 : // snapshot sequence number. The primary complexity with snapshots occurs
75 : // during compaction: the collapsing of entries that are shadowed by newer
76 : // entries is at odds with the guarantee that Pebble will maintain the view of
77 : // the database at the snapshot sequence number. Rather than collapsing entries
78 : // up to the next user key, compactionIter can only collapse entries up to the
79 : // next snapshot boundary. That is, every snapshot boundary potentially causes
80 : // another entry for the same user-key to be emitted. Another way to view this
81 : // is that snapshots define stripes and entries are collapsed within stripes,
82 : // but not across stripes. Consider the following scenario:
83 : //
84 : // a.PUT.9
85 : // a.DEL.8
86 : // a.PUT.7
87 : // a.DEL.6
88 : // a.PUT.5
89 : //
90 : // In the absence of snapshots these entries would be collapsed to
91 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
92 : // be divided into two stripes and collapsed within the stripes:
93 : //
94 : // a.PUT.9 a.PUT.9
95 : // a.DEL.8 --->
96 : // a.PUT.7
97 : // -- --
98 : // a.DEL.6 ---> a.DEL.6
99 : // a.PUT.5
100 : //
101 : // All of the rules described earlier still apply, but they are confined to
102 : // operate within a snapshot stripe. Snapshots only affect compaction when the
103 : // snapshot sequence number lies within the range of sequence numbers being
104 : // compacted. In the above example, a snapshot at sequence number 10 or at
105 : // sequence number 5 would not have any effect.
106 : //
107 : // 4. Range Deletions
108 : //
109 : // Range deletions provide the ability to delete all of the keys (and values)
110 : // in a contiguous range. Range deletions are stored indexed by their start
111 : // key. The end key of the range is stored in the value. In order to support
112 : // lookup of the range deletions which overlap with a particular key, the range
113 : // deletion tombstones need to be fragmented whenever they overlap. This
114 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
115 : // subject to the rules for snapshots. For example, consider the two range
116 : // tombstones [a,e)#1 and [c,g)#2:
117 : //
118 : // 2: c-------g
119 : // 1: a-------e
120 : //
121 : // These tombstones will be fragmented into:
122 : //
123 : // 2: c---e---g
124 : // 1: a---c---e
125 : //
126 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
127 : // depends on whether it is in a new snapshot stripe.
128 : //
129 : // In addition to the fragmentation of range tombstones, compaction also needs
130 : // to take the range tombstones into consideration when outputting normal
131 : // keys. Just as with point deletions, a range deletion covering an entry can
132 : // cause the entry to be elided.
133 : //
134 : // A note on the stability of keys and values.
135 : //
136 : // The stability guarantees of keys and values returned by the iterator tree
137 : // that backs a compactionIter is nuanced and care must be taken when
138 : // referencing any returned items.
139 : //
140 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
141 : // lifetimes that fall into two categories:
142 : //
143 : // Lifetime valid for duration of compaction. Range deletion keys and values are
144 : // stable for the duration of the compaction, due to way in which a
145 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
146 : // which wraps the iterator over the range deletion block in a noCloseIter,
147 : // preventing the release of the backing memory until the compaction is
148 : // finished).
149 : //
150 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
151 : // etc.) and values must be cloned / copied following the return from the
152 : // exported function, and before a subsequent call to Next advances the iterator
153 : // and mutates the contents of the returned key and value.
154 : type compactionIter struct {
155 : cmp Compare
156 : equal Equal
157 : merge Merge
158 : iter internalIterator
159 : err error
160 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKV.UserKey`
161 : // and `key.Trailer` is set to `i.iterKV.Trailer`. This is the
162 : // case on return from all public methods -- these methods return `key`.
163 : // Additionally, it is the internal state when the code is moving to the
164 : // next key so it can determine whether the user key has changed from
165 : // the previous key.
166 : key InternalKey
167 : // keyTrailer is updated when `i.key` is updated and holds the key's
168 : // original trailer (eg, before any sequence-number zeroing or changes to
169 : // key kind).
170 : keyTrailer uint64
171 : value []byte
172 : valueCloser io.Closer
173 : // Temporary buffer used for storing the previous user key in order to
174 : // determine when iteration has advanced to a new user key and thus a new
175 : // snapshot stripe.
176 : keyBuf []byte
177 : // Temporary buffer used for storing the previous value, which may be an
178 : // unsafe, i.iter-owned slice that could be altered when the iterator is
179 : // advanced.
180 : valueBuf []byte
181 : // Is the current entry valid?
182 : valid bool
183 : iterKV *base.InternalKV
184 : iterValue []byte
185 : iterStripeChange stripeChangeType
186 : // `skip` indicates whether the remaining entries in the current snapshot
187 : // stripe should be skipped or processed. `skip` has no effect when `pos ==
188 : // iterPosNext`.
189 : skip bool
190 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
191 : // (`iterPos`) values take on the following meanings in the context of
192 : // `compactionIter`.
193 : //
194 : // - `iterPosCur`: the iterator is at the last key returned.
195 : // - `iterPosNext`: the iterator has already been advanced to the next
196 : // candidate key. For example, this happens when processing merge operands,
197 : // where we advance the iterator all the way into the next stripe or next
198 : // user key to ensure we've seen all mergeable operands.
199 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
200 : pos iterPos
201 : // `snapshotPinned` indicates whether the last point key returned by the
202 : // compaction iterator was only returned because an open snapshot prevents
203 : // its elision. This field only applies to point keys, and not to range
204 : // deletions or range keys.
205 : snapshotPinned bool
206 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
207 : // snapshotPinned is true. This value is true when the point is obsolete due
208 : // to a RANGEDEL but could not be deleted due to a snapshot.
209 : //
210 : // NB: it may seem that the additional cases that snapshotPinned captures
211 : // are harmless in that they can also be used to mark a point as obsolete
212 : // (it is merely a duplication of some logic that happens in
213 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
214 : // writing -- snapshotPinned originated in stats collection and for a
215 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
216 : // due to a snapshot, the snapshotPinned value for the SET is true.
217 : //
218 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
219 : // whether we need forceObsoleteDueToRangeDel.
220 : forceObsoleteDueToRangeDel bool
221 : // The index of the snapshot for the current key within the snapshots slice.
222 : curSnapshotIdx int
223 : curSnapshotSeqNum uint64
224 : // The snapshot sequence numbers that need to be maintained. These sequence
225 : // numbers define the snapshot stripes (see the Snapshots description
226 : // above). The sequence numbers are in ascending order.
227 : snapshots []uint64
228 : // frontiers holds a heap of user keys that affect compaction behavior when
229 : // they're exceeded. Before a new key is returned, the compaction iterator
230 : // advances the frontier, notifying any code that subscribed to be notified
231 : // when a key was reached. The primary use today is within the
232 : // implementation of compactionOutputSplitters in compaction.go. Many of
233 : // these splitters wait for the compaction iterator to call Advance(k) when
234 : // it's returning a new key. If the key that they're waiting for is
235 : // surpassed, these splitters update internal state recording that they
236 : // should request a compaction split next time they're asked in
237 : // [shouldSplitBefore].
238 : frontiers compact.Frontiers
239 : // The fragmented tombstones.
240 : tombstones []keyspan.Span
241 : // The fragmented range keys.
242 : rangeKeys []keyspan.Span
243 : // Byte allocator for the tombstone keys.
244 : alloc bytealloc.A
245 : allowZeroSeqNum bool
246 : elideTombstone func(key []byte) bool
247 : elideRangeTombstone func(start, end []byte) bool
248 : ineffectualSingleDeleteCallback func(userKey []byte)
249 : singleDeleteInvariantViolationCallback func(userKey []byte)
250 : // The on-disk format major version. This informs the types of keys that
251 : // may be written to disk during a compaction.
252 : formatVersion FormatMajorVersion
253 : stats struct {
254 : // count of DELSIZED keys that were missized.
255 : countMissizedDels uint64
256 : }
257 : }
258 :
259 : func newCompactionIter(
260 : cmp Compare,
261 : equal Equal,
262 : merge Merge,
263 : iter internalIterator,
264 : snapshots []uint64,
265 : allowZeroSeqNum bool,
266 : elideTombstone func(key []byte) bool,
267 : elideRangeTombstone func(start, end []byte) bool,
268 : ineffectualSingleDeleteCallback func(userKey []byte),
269 : singleDeleteInvariantViolationCallback func(userKey []byte),
270 : formatVersion FormatMajorVersion,
271 1 : ) *compactionIter {
272 1 : i := &compactionIter{
273 1 : cmp: cmp,
274 1 : equal: equal,
275 1 : merge: merge,
276 1 : iter: iter,
277 1 : snapshots: snapshots,
278 1 : allowZeroSeqNum: allowZeroSeqNum,
279 1 : elideTombstone: elideTombstone,
280 1 : elideRangeTombstone: elideRangeTombstone,
281 1 : ineffectualSingleDeleteCallback: ineffectualSingleDeleteCallback,
282 1 : singleDeleteInvariantViolationCallback: singleDeleteInvariantViolationCallback,
283 1 : formatVersion: formatVersion,
284 1 : }
285 1 : i.frontiers.Init(cmp)
286 1 : return i
287 1 : }
288 :
289 1 : func (i *compactionIter) First() (*InternalKey, []byte) {
290 1 : if i.err != nil {
291 0 : return nil, nil
292 0 : }
293 1 : i.iterKV = i.iter.First()
294 1 : if i.iterKV != nil {
295 1 : i.iterValue, _, i.err = i.iterKV.Value(nil)
296 1 : if i.err != nil {
297 0 : return nil, nil
298 0 : }
299 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKV.SeqNum(), i.snapshots)
300 : }
301 1 : i.pos = iterPosNext
302 1 : i.iterStripeChange = newStripeNewKey
303 1 : return i.Next()
304 : }
305 :
306 1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
307 1 : if i.err != nil {
308 0 : return nil, nil
309 0 : }
310 :
311 : // Close the closer for the current value if one was open.
312 1 : if i.closeValueCloser() != nil {
313 0 : return nil, nil
314 0 : }
315 :
316 : // Prior to this call to `Next()` we are in one of three situations with
317 : // respect to `iterKey` and related state:
318 : //
319 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
320 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
321 : // To move forward we advance by one key, even if that lands us in the same
322 : // snapshot stripe.
323 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
324 : // To move forward we skip skippable entries in the stripe.
325 1 : if i.pos == iterPosCurForward {
326 1 : if i.skip {
327 1 : i.skipInStripe()
328 1 : } else {
329 1 : i.nextInStripe()
330 1 : }
331 1 : } else if i.skip {
332 0 : panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
333 : }
334 :
335 1 : i.pos = iterPosCurForward
336 1 : i.valid = false
337 1 :
338 1 : for i.iterKV != nil {
339 1 : // If we entered a new snapshot stripe with the same key, any key we
340 1 : // return on this iteration is only returned because the open snapshot
341 1 : // prevented it from being elided or merged with the key returned for
342 1 : // the previous stripe. Mark it as pinned so that the compaction loop
343 1 : // can correctly populate output tables' pinned statistics. We might
344 1 : // also set snapshotPinned=true down below if we observe that the key is
345 1 : // deleted by a range deletion in a higher stripe or that this key is a
346 1 : // tombstone that could be elided if only it were in the last snapshot
347 1 : // stripe.
348 1 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
349 1 :
350 1 : if i.iterKV.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKV.Kind()) {
351 1 : // Return the span so the compaction can use it for file truncation and add
352 1 : // it to the relevant fragmenter. In the case of range deletions, we do not
353 1 : // set `skip` to true before returning as there may be any number of point
354 1 : // keys with the same user key and sequence numbers ≥ the range deletion's
355 1 : // sequence number. Such point keys must be visible (i.e., not skipped
356 1 : // over) since we promise point keys are not deleted by range tombstones at
357 1 : // the same sequence number (or higher).
358 1 : //
359 1 : // Note that `skip` must already be false here, because range keys and range
360 1 : // deletions are interleaved at the maximal sequence numbers and neither will
361 1 : // set `skip`=true.
362 1 : if i.skip {
363 0 : panic(errors.AssertionFailedf("pebble: compaction iterator: skip unexpectedly true"))
364 : }
365 :
366 : // NOTE: there is a subtle invariant violation here in that calling
367 : // saveKey and returning a reference to the temporary slice violates
368 : // the stability guarantee for range deletion keys. A potential
369 : // mediation could return the original iterKey and iterValue
370 : // directly, as the backing memory is guaranteed to be stable until
371 : // the compaction completes. The violation here is only minor in
372 : // that the caller immediately clones the range deletion InternalKey
373 : // when passing the key to the deletion fragmenter (see the
374 : // call-site in compaction.go).
375 : // TODO(travers): address this violation by removing the call to
376 : // saveKey and instead return the original iterKey and iterValue.
377 : // This goes against the comment on i.key in the struct, and
378 : // therefore warrants some investigation.
379 1 : i.saveKey()
380 1 : // TODO(jackson): Handle tracking pinned statistics for range keys
381 1 : // and range deletions. This would require updating
382 1 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
383 1 : // statistics when they apply their own snapshot striping logic.
384 1 : i.snapshotPinned = false
385 1 : i.value = i.iterValue
386 1 : i.valid = true
387 1 : return &i.key, i.value
388 : }
389 :
390 : // Check if the last tombstone covers the key.
391 : // TODO(sumeer): we could avoid calling tombstoneCovers if
392 : // i.iterStripeChange == sameStripeSameKey since that check has already been
393 : // done in nextInStripeHelper. However, we also need to handle the case of
394 : // CoversInvisibly below.
395 1 : switch i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) {
396 1 : case coversVisibly:
397 1 : // A pending range deletion deletes this key. Skip it.
398 1 : i.saveKey()
399 1 : i.skipInStripe()
400 1 : continue
401 :
402 1 : case coversInvisibly:
403 1 : // i.iterKV would be deleted by a range deletion if there weren't any open
404 1 : // snapshots. Mark it as pinned.
405 1 : //
406 1 : // NB: there are multiple places in this file where we check for a
407 1 : // covering tombstone and this is the only one where we are writing to
408 1 : // i.snapshotPinned. Those other cases occur in mergeNext where the caller
409 1 : // is deciding whether the value should be merged or not, and the key is
410 1 : // in the same snapshot stripe. Hence, snapshotPinned is by definition
411 1 : // false in those cases.
412 1 : i.snapshotPinned = true
413 1 : i.forceObsoleteDueToRangeDel = true
414 :
415 1 : default:
416 1 : i.forceObsoleteDueToRangeDel = false
417 : }
418 :
419 1 : switch i.iterKV.Kind() {
420 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
421 1 : if i.elideTombstone(i.iterKV.K.UserKey) {
422 1 : if i.curSnapshotIdx == 0 {
423 1 : // If we're at the last snapshot stripe and the tombstone
424 1 : // can be elided skip skippable keys in the same stripe.
425 1 : i.saveKey()
426 1 : if i.key.Kind() == InternalKeyKindSingleDelete {
427 1 : i.skipDueToSingleDeleteElision()
428 1 : } else {
429 1 : i.skipInStripe()
430 1 : if !i.skip && i.iterStripeChange != newStripeNewKey {
431 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
432 : }
433 : }
434 1 : if i.iterStripeChange == newStripeSameKey {
435 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
436 : }
437 1 : continue
438 1 : } else {
439 1 : // We're not at the last snapshot stripe, so the tombstone
440 1 : // can NOT yet be elided. Mark it as pinned, so that it's
441 1 : // included in table statistics appropriately.
442 1 : i.snapshotPinned = true
443 1 : }
444 : }
445 :
446 1 : switch i.iterKV.Kind() {
447 1 : case InternalKeyKindDelete:
448 1 : i.saveKey()
449 1 : i.value = i.iterValue
450 1 : i.valid = true
451 1 : i.skip = true
452 1 : return &i.key, i.value
453 :
454 1 : case InternalKeyKindDeleteSized:
455 1 : // We may skip subsequent keys because of this tombstone. Scan
456 1 : // ahead to see just how much data this tombstone drops and if
457 1 : // the tombstone's value should be updated accordingly.
458 1 : return i.deleteSizedNext()
459 :
460 1 : case InternalKeyKindSingleDelete:
461 1 : if i.singleDeleteNext() {
462 1 : return &i.key, i.value
463 1 : } else if i.err != nil {
464 0 : return nil, nil
465 0 : }
466 1 : continue
467 :
468 0 : default:
469 0 : panic(errors.AssertionFailedf(
470 0 : "unexpected kind %s", redact.SafeString(i.iterKV.Kind().String())))
471 : }
472 :
473 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
474 1 : // The key we emit for this entry is a function of the current key
475 1 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
476 1 : // entry. setNext() does the work to move the iterator forward,
477 1 : // preserving the original value, and potentially mutating the key
478 1 : // kind.
479 1 : i.setNext()
480 1 : if i.err != nil {
481 0 : return nil, nil
482 0 : }
483 1 : return &i.key, i.value
484 :
485 1 : case InternalKeyKindMerge:
486 1 : // Record the snapshot index before mergeNext as merging
487 1 : // advances the iterator, adjusting curSnapshotIdx.
488 1 : origSnapshotIdx := i.curSnapshotIdx
489 1 : var valueMerger ValueMerger
490 1 : valueMerger, i.err = i.merge(i.iterKV.K.UserKey, i.iterValue)
491 1 : if i.err == nil {
492 1 : i.mergeNext(valueMerger)
493 1 : }
494 1 : var needDelete bool
495 1 : if i.err == nil {
496 1 : // includesBase is true whenever we've transformed the MERGE record
497 1 : // into a SET.
498 1 : var includesBase bool
499 1 : switch i.key.Kind() {
500 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
501 1 : includesBase = true
502 1 : case InternalKeyKindMerge:
503 0 : default:
504 0 : panic(errors.AssertionFailedf(
505 0 : "unexpected kind %s", redact.SafeString(i.key.Kind().String())))
506 : }
507 1 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
508 : }
509 1 : if i.err == nil {
510 1 : if needDelete {
511 0 : i.valid = false
512 0 : if i.closeValueCloser() != nil {
513 0 : return nil, nil
514 0 : }
515 0 : continue
516 : }
517 :
518 1 : i.maybeZeroSeqnum(origSnapshotIdx)
519 1 : return &i.key, i.value
520 : }
521 0 : if i.err != nil {
522 0 : i.valid = false
523 0 : // TODO(sumeer): why is MarkCorruptionError only being called for
524 0 : // MERGE?
525 0 : i.err = base.MarkCorruptionError(i.err)
526 0 : }
527 0 : return nil, nil
528 :
529 0 : default:
530 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
531 0 : i.valid = false
532 0 : return nil, nil
533 : }
534 : }
535 :
536 1 : return nil, nil
537 : }
538 :
539 1 : func (i *compactionIter) closeValueCloser() error {
540 1 : if i.valueCloser == nil {
541 1 : return nil
542 1 : }
543 :
544 0 : i.err = i.valueCloser.Close()
545 0 : i.valueCloser = nil
546 0 : if i.err != nil {
547 0 : i.valid = false
548 0 : }
549 0 : return i.err
550 : }
551 :
552 : // snapshotIndex returns the index of the first sequence number in snapshots
553 : // which is greater than or equal to seq.
554 1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
555 1 : index := sort.Search(len(snapshots), func(i int) bool {
556 1 : return snapshots[i] > seq
557 1 : })
558 1 : if index >= len(snapshots) {
559 1 : return index, InternalKeySeqNumMax
560 1 : }
561 1 : return index, snapshots[index]
562 : }
563 :
564 : // skipInStripe skips over skippable keys in the same stripe and user key. It
565 : // may set i.err, in which case i.iterKV will be nil.
566 1 : func (i *compactionIter) skipInStripe() {
567 1 : i.skip = true
568 1 : // TODO(sumeer): we can avoid the overhead of calling i.rangeDelFrag.Covers,
569 1 : // in this case of nextInStripe, since we are skipping all of them anyway.
570 1 : for i.nextInStripe() == sameStripe {
571 1 : if i.err != nil {
572 0 : panic(i.err)
573 : }
574 : }
575 : // We landed outside the original stripe, so reset skip.
576 1 : i.skip = false
577 : }
578 :
579 1 : func (i *compactionIter) iterNext() bool {
580 1 : i.iterKV = i.iter.Next()
581 1 : if i.iterKV != nil {
582 1 : i.iterValue, _, i.err = i.iterKV.Value(nil)
583 1 : if i.err != nil {
584 0 : i.iterKV = nil
585 0 : }
586 : }
587 1 : return i.iterKV != nil
588 : }
589 :
590 : // stripeChangeType indicates how the snapshot stripe changed relative to the
591 : // previous key. If the snapshot stripe changed, it also indicates whether the
592 : // new stripe was entered because the iterator progressed onto an entirely new
593 : // key or entered a new stripe within the same key.
594 : type stripeChangeType int
595 :
596 : const (
597 : newStripeNewKey stripeChangeType = iota
598 : newStripeSameKey
599 : sameStripe
600 : )
601 :
602 : // nextInStripe advances the iterator and returns one of the above const ints
603 : // indicating how its state changed.
604 : //
605 : // All sameStripe keys that are covered by a RANGEDEL will be skipped and not
606 : // returned.
607 : //
608 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
609 : // temporary reference to the original key, so that forward iteration can
610 : // proceed with a reference to the original key. Care should be taken to avoid
611 : // overwriting or mutating the saved key or value before they have been returned
612 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
613 : //
614 : // nextInStripe may set i.err, in which case the return value will be
615 : // newStripeNewKey, and i.iterKV will be nil.
616 1 : func (i *compactionIter) nextInStripe() stripeChangeType {
617 1 : i.iterStripeChange = i.nextInStripeHelper()
618 1 : return i.iterStripeChange
619 1 : }
620 :
621 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
622 : // nextInStripe and not call nextInStripeHelper.
623 1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
624 1 : origSnapshotIdx := i.curSnapshotIdx
625 1 : for {
626 1 : if !i.iterNext() {
627 1 : return newStripeNewKey
628 1 : }
629 1 : kv := i.iterKV
630 1 :
631 1 : // Is this a new key? There are two cases:
632 1 : //
633 1 : // 1. The new key has a different user key.
634 1 : // 2. The previous key was an interleaved range deletion or range key
635 1 : // boundary. These keys are interleaved in the same input iterator
636 1 : // stream as point keys, but they do not obey the ordinary sequence
637 1 : // number ordering within a user key. If the previous key was one
638 1 : // of these keys, we consider the new key a `newStripeNewKey` to
639 1 : // reflect that it's the beginning of a new stream of point keys.
640 1 : if i.key.IsExclusiveSentinel() || !i.equal(i.key.UserKey, kv.K.UserKey) {
641 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(kv.SeqNum(), i.snapshots)
642 1 : return newStripeNewKey
643 1 : }
644 :
645 : // If i.key and key have the same user key, then
646 : // 1. i.key must not have had a zero sequence number (or it would've be the last
647 : // key with its user key).
648 : // 2. i.key must have a strictly larger sequence number
649 : // There's an exception in that either key may be a range delete. Range
650 : // deletes may share a sequence number with a point key if the keys were
651 : // ingested together. Range keys may also share the sequence number if they
652 : // were ingested, but range keys are interleaved into the compaction
653 : // iterator's input iterator at the maximal sequence number so their
654 : // original sequence number will not be observed here.
655 1 : if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= kv.SeqNum()) &&
656 1 : i.key.Kind() != InternalKeyKindRangeDelete && kv.Kind() != InternalKeyKindRangeDelete {
657 0 : prevKey := i.key
658 0 : prevKey.Trailer = i.keyTrailer
659 0 : panic(errors.AssertionFailedf("pebble: invariant violation: %s and %s out of order", prevKey, kv.K))
660 : }
661 :
662 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(kv.SeqNum(), i.snapshots)
663 1 : switch kv.Kind() {
664 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete,
665 0 : InternalKeyKindRangeDelete:
666 0 : // Range tombstones and range keys are interleaved at the max
667 0 : // sequence number for a given user key, and the first key after one
668 0 : // is always considered a newStripeNewKey, so we should never reach
669 0 : // this.
670 0 : panic("unreachable")
671 : case InternalKeyKindDelete, InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSingleDelete,
672 1 : InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
673 : // Fall through
674 0 : default:
675 0 : kind := i.iterKV.Kind()
676 0 : i.iterKV = nil
677 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(kind))
678 0 : i.valid = false
679 0 : return newStripeNewKey
680 : }
681 1 : if i.curSnapshotIdx == origSnapshotIdx {
682 1 : // Same snapshot.
683 1 : if i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) == coversVisibly {
684 1 : continue
685 : }
686 1 : return sameStripe
687 : }
688 1 : return newStripeSameKey
689 : }
690 : }
691 :
692 1 : func (i *compactionIter) setNext() {
693 1 : // Save the current key.
694 1 : i.saveKey()
695 1 : i.value = i.iterValue
696 1 : i.valid = true
697 1 : i.maybeZeroSeqnum(i.curSnapshotIdx)
698 1 :
699 1 : // If this key is already a SETWITHDEL we can early return and skip the remaining
700 1 : // records in the stripe:
701 1 : if i.iterKV.Kind() == InternalKeyKindSetWithDelete {
702 1 : i.skip = true
703 1 : return
704 1 : }
705 :
706 : // We are iterating forward. Save the current value.
707 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
708 1 : i.value = i.valueBuf
709 1 :
710 1 : // Else, we continue to loop through entries in the stripe looking for a
711 1 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
712 1 : //
713 1 : // NB: nextInStripe will skip sameStripe keys that are visibly covered by a
714 1 : // RANGEDEL. This can include DELs -- this is fine since such DELs don't
715 1 : // need to be combined with SET to make SETWITHDEL.
716 1 : for {
717 1 : switch i.nextInStripe() {
718 1 : case newStripeNewKey, newStripeSameKey:
719 1 : i.pos = iterPosNext
720 1 : return
721 1 : case sameStripe:
722 1 : // We're still in the same stripe. If this is a
723 1 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
724 1 : // Subsequent keys are eligible for skipping.
725 1 : switch i.iterKV.Kind() {
726 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
727 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
728 1 : i.skip = true
729 1 : return
730 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
731 : // Do nothing
732 0 : default:
733 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
734 0 : i.valid = false
735 : }
736 0 : default:
737 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
738 : }
739 : }
740 : }
741 :
742 1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) {
743 1 : // Save the current key.
744 1 : i.saveKey()
745 1 : i.valid = true
746 1 :
747 1 : // Loop looking for older values in the current snapshot stripe and merge
748 1 : // them.
749 1 : for {
750 1 : if i.nextInStripe() != sameStripe {
751 1 : i.pos = iterPosNext
752 1 : return
753 1 : }
754 1 : if i.err != nil {
755 0 : panic(i.err)
756 : }
757 : // NB: MERGE#10+RANGEDEL#9 stays a MERGE, since nextInStripe skips
758 : // sameStripe keys that are visibly covered by a RANGEDEL. There may be
759 : // MERGE#7 that is invisibly covered and will be preserved, but there is
760 : // no risk that MERGE#10 and MERGE#7 will get merged in the future as
761 : // the RANGEDEL still exists and will be used in user-facing reads that
762 : // see MERGE#10, and will also eventually cause MERGE#7 to be deleted in
763 : // a compaction.
764 1 : key := i.iterKV
765 1 : switch key.Kind() {
766 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
767 1 : // We've hit a deletion tombstone. Return everything up to this point and
768 1 : // then skip entries until the next snapshot stripe. We change the kind
769 1 : // of the result key to a Set so that it shadows keys in lower
770 1 : // levels. That is, MERGE+DEL -> SETWITHDEL.
771 1 : //
772 1 : // We do the same for SingleDelete since SingleDelete is only
773 1 : // permitted (with deterministic behavior) for keys that have been
774 1 : // set once since the last SingleDelete/Delete, so everything
775 1 : // older is acceptable to shadow. Note that this is slightly
776 1 : // different from singleDeleteNext() which implements stricter
777 1 : // semantics in terms of applying the SingleDelete to the single
778 1 : // next Set. But those stricter semantics are not observable to
779 1 : // the end-user since Iterator interprets SingleDelete as Delete.
780 1 : // We could do something more complicated here and consume only a
781 1 : // single Set, and then merge in any following Sets, but that is
782 1 : // complicated wrt code and unnecessary given the narrow permitted
783 1 : // use of SingleDelete.
784 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
785 1 : i.skip = true
786 1 : return
787 :
788 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
789 1 : // We've hit a Set or SetWithDel value. Merge with the existing
790 1 : // value and return. We change the kind of the resulting key to a
791 1 : // Set so that it shadows keys in lower levels. That is:
792 1 : // MERGE + (SET*) -> SET.
793 1 : i.err = valueMerger.MergeOlder(i.iterValue)
794 1 : if i.err != nil {
795 0 : i.valid = false
796 0 : return
797 0 : }
798 1 : i.key.SetKind(InternalKeyKindSet)
799 1 : i.skip = true
800 1 : return
801 :
802 1 : case InternalKeyKindMerge:
803 1 : // We've hit another Merge value. Merge with the existing value and
804 1 : // continue looping.
805 1 : i.err = valueMerger.MergeOlder(i.iterValue)
806 1 : if i.err != nil {
807 0 : i.valid = false
808 0 : return
809 0 : }
810 :
811 0 : default:
812 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
813 0 : i.valid = false
814 0 : return
815 : }
816 : }
817 : }
818 :
819 : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
820 : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
821 : // performance optimization when the client knows a user key has not been
822 : // overwritten, allowing the elision of the tombstone earlier, avoiding write
823 : // amplification.
824 : //
825 : // singleDeleteNext returns a boolean indicating whether or not the caller
826 : // should yield the SingleDelete key to the consumer of the compactionIter. If
827 : // singleDeleteNext returns false, the caller may consume/elide the
828 : // SingleDelete.
829 1 : func (i *compactionIter) singleDeleteNext() bool {
830 1 : // Save the current key.
831 1 : i.saveKey()
832 1 : i.value = i.iterValue
833 1 : i.valid = true
834 1 :
835 1 : // Loop until finds a key to be passed to the next level.
836 1 : for {
837 1 : // If we find a key that can't be skipped, return true so that the
838 1 : // caller yields the SingleDelete to the caller.
839 1 : if i.nextInStripe() != sameStripe {
840 1 : // This defers additional error checking regarding single delete
841 1 : // invariants to the compaction where the keys with the same user key as
842 1 : // the single delete are in the same stripe.
843 1 : i.pos = iterPosNext
844 1 : return i.err == nil
845 1 : }
846 1 : if i.err != nil {
847 0 : panic(i.err)
848 : }
849 : // INVARIANT: sameStripe.
850 1 : key := i.iterKV
851 1 : kind := key.Kind()
852 1 : switch kind {
853 1 : case InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
854 1 : if (kind == InternalKeyKindDelete || kind == InternalKeyKindDeleteSized) &&
855 1 : i.ineffectualSingleDeleteCallback != nil {
856 0 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
857 0 : }
858 : // We've hit a Delete, DeleteSized, SetWithDelete, transform
859 : // the SingleDelete into a full Delete.
860 1 : i.key.SetKind(InternalKeyKindDelete)
861 1 : i.skip = true
862 1 : return true
863 :
864 1 : case InternalKeyKindSet, InternalKeyKindMerge:
865 1 : // This SingleDelete deletes the Set/Merge, and we can now elide the
866 1 : // SingleDel as well. We advance past the Set and return false to
867 1 : // indicate to the main compaction loop that we should NOT yield the
868 1 : // current SingleDel key to the compaction loop.
869 1 : //
870 1 : // NB: singleDeleteNext was called with i.pos == iterPosCurForward, and
871 1 : // after the call to nextInStripe, we are still at iterPosCurForward,
872 1 : // since we are at the key after the Set/Merge that was single deleted.
873 1 : change := i.nextInStripe()
874 1 : switch change {
875 1 : case sameStripe, newStripeSameKey:
876 1 : // On the same user key.
877 1 : nextKind := i.iterKV.Kind()
878 1 : switch nextKind {
879 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
880 1 : if i.singleDeleteInvariantViolationCallback != nil {
881 0 : // sameStripe keys returned by nextInStripe() are already
882 0 : // known to not be covered by a RANGEDEL, so it is an invariant
883 0 : // violation. The rare case is newStripeSameKey, where it is a
884 0 : // violation if not covered by a RANGEDEL.
885 0 : if change == sameStripe ||
886 0 : i.tombstoneCovers(i.iterKV.K, i.curSnapshotSeqNum) == noCover {
887 0 : i.singleDeleteInvariantViolationCallback(i.key.UserKey)
888 0 : }
889 : }
890 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
891 0 : default:
892 0 : panic(errors.AssertionFailedf(
893 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
894 : }
895 1 : case newStripeNewKey:
896 0 : default:
897 0 : panic("unreachable")
898 : }
899 1 : i.valid = false
900 1 : return false
901 :
902 1 : case InternalKeyKindSingleDelete:
903 1 : // Two single deletes met in a compaction. The first single delete is
904 1 : // ineffectual.
905 1 : if i.ineffectualSingleDeleteCallback != nil {
906 0 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
907 0 : }
908 : // Continue to apply the second single delete.
909 1 : continue
910 :
911 0 : default:
912 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
913 0 : i.valid = false
914 0 : return false
915 : }
916 : }
917 : }
918 :
919 : // skipDueToSingleDeleteElision is called when the SingleDelete is being
920 : // elided because it is in the final snapshot stripe and there are no keys
921 : // with the same user key in lower levels in the LSM (below the files in this
922 : // compaction).
923 : //
924 : // TODO(sumeer): the only difference between singleDeleteNext and
925 : // skipDueToSingleDeleteElision is the fact that the caller knows it will be
926 : // eliding the single delete in the latter case. There are some similar things
927 : // happening in both implementations. My first attempt at combining them into
928 : // a single method was hard to comprehend. Try again.
929 1 : func (i *compactionIter) skipDueToSingleDeleteElision() {
930 1 : for {
931 1 : stripeChange := i.nextInStripe()
932 1 : if i.err != nil {
933 0 : panic(i.err)
934 : }
935 1 : switch stripeChange {
936 1 : case newStripeNewKey:
937 1 : // The single delete is only now being elided, meaning it did not elide
938 1 : // any keys earlier in its descent down the LSM. We stepped onto a new
939 1 : // user key, meaning that even now at its moment of elision, it still
940 1 : // hasn't elided any other keys. The single delete was ineffectual (a
941 1 : // no-op).
942 1 : if i.ineffectualSingleDeleteCallback != nil {
943 0 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
944 0 : }
945 1 : i.skip = false
946 1 : return
947 0 : case newStripeSameKey:
948 0 : // This should be impossible. If we're eliding a single delete, we
949 0 : // determined that the tombstone is in the final snapshot stripe, but we
950 0 : // stepped into a new stripe of the same key.
951 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
952 1 : case sameStripe:
953 1 : kind := i.iterKV.Kind()
954 1 : switch kind {
955 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
956 1 : if i.ineffectualSingleDeleteCallback != nil {
957 0 : i.ineffectualSingleDeleteCallback(i.key.UserKey)
958 0 : }
959 1 : switch kind {
960 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized:
961 1 : i.skipInStripe()
962 1 : return
963 1 : case InternalKeyKindSingleDelete:
964 1 : // Repeat the same with this SingleDelete. We don't want to simply
965 1 : // call skipInStripe(), since it increases the strength of the
966 1 : // SingleDel, which hides bugs in the use of single delete.
967 1 : continue
968 0 : default:
969 0 : panic(errors.AssertionFailedf(
970 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
971 : }
972 1 : case InternalKeyKindSetWithDelete:
973 1 : // The SingleDelete should behave like a Delete.
974 1 : i.skipInStripe()
975 1 : return
976 1 : case InternalKeyKindSet, InternalKeyKindMerge:
977 1 : // This SingleDelete deletes the Set/Merge, and we are eliding the
978 1 : // SingleDel as well. Step to the next key (this is not deleted by the
979 1 : // SingleDelete).
980 1 : //
981 1 : // NB: skipDueToSingleDeleteElision was called with i.pos ==
982 1 : // iterPosCurForward, and after the call to nextInStripe, we are still
983 1 : // at iterPosCurForward, since we are at the key after the Set/Merge
984 1 : // that was single deleted.
985 1 : change := i.nextInStripe()
986 1 : if i.err != nil {
987 0 : panic(i.err)
988 : }
989 1 : switch change {
990 0 : case newStripeSameKey:
991 0 : panic(errors.AssertionFailedf("eliding single delete followed by same key in new stripe"))
992 1 : case newStripeNewKey:
993 1 : case sameStripe:
994 1 : // On the same key.
995 1 : nextKind := i.iterKV.Kind()
996 1 : switch nextKind {
997 0 : case InternalKeyKindSet, InternalKeyKindSetWithDelete, InternalKeyKindMerge:
998 0 : if i.singleDeleteInvariantViolationCallback != nil {
999 0 : i.singleDeleteInvariantViolationCallback(i.key.UserKey)
1000 0 : }
1001 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
1002 0 : default:
1003 0 : panic(errors.AssertionFailedf(
1004 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
1005 : }
1006 0 : default:
1007 0 : panic("unreachable")
1008 : }
1009 : // Whether in same stripe or new stripe, this key is not consumed by
1010 : // the SingleDelete.
1011 1 : i.skip = false
1012 1 : return
1013 0 : default:
1014 0 : panic(errors.AssertionFailedf(
1015 0 : "unexpected internal key kind: %d", errors.Safe(i.iterKV.Kind())))
1016 : }
1017 0 : default:
1018 0 : panic("unreachable")
1019 : }
1020 : }
1021 : }
1022 :
1023 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
1024 : // these tombstones carry a value that's a varint indicating the size of the
1025 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
1026 : //
1027 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
1028 : // any, are elided as a result of the tombstone.
1029 1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
1030 1 : i.saveKey()
1031 1 : i.valid = true
1032 1 : i.skip = true
1033 1 :
1034 1 : // The DELSIZED tombstone may have no value at all. This happens when the
1035 1 : // tombstone has already deleted the key that the user originally predicted.
1036 1 : // In this case, we still peek forward in case there's another DELSIZED key
1037 1 : // with a lower sequence number, in which case we'll adopt its value.
1038 1 : if len(i.iterValue) == 0 {
1039 1 : i.value = i.valueBuf[:0]
1040 1 : } else {
1041 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1042 1 : i.value = i.valueBuf
1043 1 : }
1044 :
1045 : // Loop through all the keys within this stripe that are skippable.
1046 1 : i.pos = iterPosNext
1047 1 : for i.nextInStripe() == sameStripe {
1048 1 : if i.err != nil {
1049 0 : panic(i.err)
1050 : }
1051 1 : switch i.iterKV.Kind() {
1052 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
1053 1 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
1054 1 : // the original DELSIZED tombstone. This can happen in two cases:
1055 1 : //
1056 1 : // (1) These tombstones were intended to delete two distinct values,
1057 1 : // and this DELSIZED has already dropped the relevant key. For
1058 1 : // example:
1059 1 : //
1060 1 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
1061 1 : //
1062 1 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
1063 1 : // already been zeroed out. In this case, we want to adopt the
1064 1 : // value of the DELSIZED with the lower sequence number, in
1065 1 : // case the a.SET.4 key has not yet been elided.
1066 1 : //
1067 1 : // (2) This DELSIZED was missized. The user thought they were
1068 1 : // deleting a key with this user key, but this user key had
1069 1 : // already been deleted.
1070 1 : //
1071 1 : // We can differentiate these two cases by examining the length of
1072 1 : // the DELSIZED's value. A DELSIZED's value holds the size of both
1073 1 : // the user key and value that it intends to delete. For any user
1074 1 : // key with a length > 0, a DELSIZED that has not deleted a key must
1075 1 : // have a value with a length > 0.
1076 1 : //
1077 1 : // We treat both cases the same functionally, adopting the identity
1078 1 : // of the lower-sequence numbered tombstone. However in the second
1079 1 : // case, we also increment the stat counting missized tombstones.
1080 1 : if len(i.value) > 0 {
1081 1 : // The original DELSIZED key was missized. The key that the user
1082 1 : // thought they were deleting does not exist.
1083 1 : i.stats.countMissizedDels++
1084 1 : }
1085 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1086 1 : i.value = i.valueBuf
1087 1 : if i.iterKV.Kind() != InternalKeyKindDeleteSized {
1088 1 : // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
1089 1 : // may not have deleted the key(s) it was intended to yet. The
1090 1 : // ordinary DEL compaction heuristics are better suited at that,
1091 1 : // plus we don't want to count it as a missized DEL. We early
1092 1 : // exit in this case, after skipping the remainder of the
1093 1 : // snapshot stripe.
1094 1 : i.key.SetKind(InternalKeyKindDelete)
1095 1 : // NB: We skipInStripe now, rather than returning leaving
1096 1 : // i.skip=true and returning early, because Next() requires
1097 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1098 1 : //
1099 1 : // Ignore any error caused by skipInStripe since it does not affect
1100 1 : // the key/value being returned here, and the next call to Next() will
1101 1 : // expose it.
1102 1 : i.skipInStripe()
1103 1 : return &i.key, i.value
1104 1 : }
1105 : // Continue, in case we uncover another DELSIZED or a key this
1106 : // DELSIZED deletes.
1107 :
1108 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
1109 1 : // If the DELSIZED is value-less, it already deleted the key that it
1110 1 : // was intended to delete. This is possible with a sequence like:
1111 1 : //
1112 1 : // DELSIZED.8 SET.7 SET.3
1113 1 : //
1114 1 : // The DELSIZED only describes the size of the SET.7, which in this
1115 1 : // case has already been elided. We don't count it as a missizing,
1116 1 : // instead converting the DELSIZED to a DEL. Skip the remainder of
1117 1 : // the snapshot stripe and return.
1118 1 : if len(i.value) == 0 {
1119 1 : i.key.SetKind(InternalKeyKindDelete)
1120 1 : // NB: We skipInStripe now, rather than returning leaving
1121 1 : // i.skip=true and returning early, because Next() requires
1122 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1123 1 : //
1124 1 : // Ignore any error caused by skipInStripe since it does not affect
1125 1 : // the key/value being returned here, and the next call to Next() will
1126 1 : // expose it.
1127 1 : i.skipInStripe()
1128 1 : return &i.key, i.value
1129 1 : }
1130 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
1131 : // has a positive size.
1132 1 : expectedSize, n := binary.Uvarint(i.value)
1133 1 : if n != len(i.value) {
1134 0 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
1135 0 : i.valid = false
1136 0 : return nil, nil
1137 0 : }
1138 1 : elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(len(i.iterValue))
1139 1 : if elidedSize != expectedSize {
1140 1 : // The original DELSIZED key was missized. It's unclear what to
1141 1 : // do. The user-provided size was wrong, so it's unlikely to be
1142 1 : // accurate or meaningful. We could:
1143 1 : //
1144 1 : // 1. return the DELSIZED with the original user-provided size unmodified
1145 1 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
1146 1 : // elided, even if it wasn't the anticipated size.
1147 1 : // 3. subtract the elided size from the estimate and re-encode.
1148 1 : // 4. convert the DELSIZED into a value-less DEL, so that
1149 1 : // ordinary DEL heuristics apply.
1150 1 : //
1151 1 : // We opt for (4) under the rationale that we can't rely on the
1152 1 : // user-provided size for accuracy, so ordinary DEL heuristics
1153 1 : // are safer.
1154 1 : i.stats.countMissizedDels++
1155 1 : i.key.SetKind(InternalKeyKindDelete)
1156 1 : i.value = i.valueBuf[:0]
1157 1 : // NB: We skipInStripe now, rather than returning leaving
1158 1 : // i.skip=true and returning early, because Next() requires
1159 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1160 1 : //
1161 1 : // Ignore any error caused by skipInStripe since it does not affect
1162 1 : // the key/value being returned here, and the next call to Next() will
1163 1 : // expose it.
1164 1 : i.skipInStripe()
1165 1 : return &i.key, i.value
1166 1 : }
1167 : // NB: We remove the value regardless of whether the key was sized
1168 : // appropriately. The size encoded is 'consumed' the first time it
1169 : // meets a key that it deletes.
1170 1 : i.value = i.valueBuf[:0]
1171 :
1172 0 : default:
1173 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
1174 0 : i.valid = false
1175 0 : return nil, nil
1176 : }
1177 : }
1178 :
1179 1 : if i.iterStripeChange == sameStripe {
1180 0 : panic(errors.AssertionFailedf("unexpectedly found iter stripe change = %d", i.iterStripeChange))
1181 : }
1182 : // We landed outside the original stripe. Reset skip.
1183 1 : i.skip = false
1184 1 : if i.err != nil {
1185 0 : return nil, nil
1186 0 : }
1187 1 : return &i.key, i.value
1188 : }
1189 :
1190 1 : func (i *compactionIter) saveKey() {
1191 1 : i.keyBuf = append(i.keyBuf[:0], i.iterKV.K.UserKey...)
1192 1 : i.key = base.InternalKey{
1193 1 : UserKey: i.keyBuf,
1194 1 : Trailer: i.iterKV.K.Trailer,
1195 1 : }
1196 1 : i.keyTrailer = i.key.Trailer
1197 1 : i.frontiers.Advance(i.key.UserKey)
1198 1 : }
1199 :
1200 1 : func (i *compactionIter) cloneKey(key []byte) []byte {
1201 1 : i.alloc, key = i.alloc.Copy(key)
1202 1 : return key
1203 1 : }
1204 :
1205 0 : func (i *compactionIter) Key() InternalKey {
1206 0 : return i.key
1207 0 : }
1208 :
1209 0 : func (i *compactionIter) Value() []byte {
1210 0 : return i.value
1211 0 : }
1212 :
1213 0 : func (i *compactionIter) Valid() bool {
1214 0 : return i.valid
1215 0 : }
1216 :
1217 0 : func (i *compactionIter) Error() error {
1218 0 : return i.err
1219 0 : }
1220 :
1221 1 : func (i *compactionIter) Close() error {
1222 1 : err := i.iter.Close()
1223 1 : if i.err == nil {
1224 1 : i.err = err
1225 1 : }
1226 :
1227 : // Close the closer for the current value if one was open.
1228 1 : if i.valueCloser != nil {
1229 0 : i.err = firstError(i.err, i.valueCloser.Close())
1230 0 : i.valueCloser = nil
1231 0 : }
1232 :
1233 1 : return i.err
1234 : }
1235 :
1236 : // AddTombstoneSpan adds a copy of a span of range dels, to be later returned by
1237 : // TombstonesUpTo.
1238 : //
1239 : // The spans must be non-overlapping and ordered. Empty spans are ignored.
1240 1 : func (i *compactionIter) AddTombstoneSpan(span *keyspan.Span) {
1241 1 : i.tombstones = i.appendSpan(i.tombstones, span)
1242 1 : }
1243 :
1244 : // cover is returned by tombstoneCovers and describes a span's relationship to
1245 : // a key at a particular snapshot.
1246 : type cover int8
1247 :
1248 : const (
1249 : // noCover indicates the tested key does not fall within the span's bounds,
1250 : // or the span contains no keys with sequence numbers higher than the key's.
1251 : noCover cover = iota
1252 :
1253 : // coversInvisibly indicates the tested key does fall within the span's
1254 : // bounds and the span contains at least one key with a higher sequence
1255 : // number, but none visible at the provided snapshot.
1256 : coversInvisibly
1257 :
1258 : // coversVisibly indicates the tested key does fall within the span's
1259 : // bounds, and the span constains at least one key with a sequence number
1260 : // higher than the key's sequence number that is visible at the provided
1261 : // snapshot.
1262 : coversVisibly
1263 : )
1264 :
1265 : // tombstoneCovers returns whether the key is covered by a tombstone and whether
1266 : // it is covered by a tombstone visible in the given snapshot.
1267 : //
1268 : // The key's UserKey must be greater or equal to the last span Start key passed
1269 : // to AddTombstoneSpan.
1270 1 : func (i *compactionIter) tombstoneCovers(key InternalKey, snapshot uint64) cover {
1271 1 : if len(i.tombstones) == 0 {
1272 1 : return noCover
1273 1 : }
1274 1 : last := &i.tombstones[len(i.tombstones)-1]
1275 1 : if invariants.Enabled && i.cmp(key.UserKey, last.Start) < 0 {
1276 0 : panic(errors.AssertionFailedf("invalid key %q, last span %s", key, last))
1277 : }
1278 1 : if i.cmp(key.UserKey, last.End) < 0 && last.Covers(key.SeqNum()) {
1279 1 : if last.CoversAt(snapshot, key.SeqNum()) {
1280 1 : return coversVisibly
1281 1 : }
1282 1 : return coversInvisibly
1283 : }
1284 1 : return noCover
1285 : }
1286 :
1287 : // TombstonesUpTo returns a list of pending range tombstones up to the
1288 : // specified key, or all pending range tombstones if key = nil.
1289 : //
1290 : // If a key is specified, it must be greater than the last span Start key passed
1291 : // to AddTombstoneSpan.
1292 1 : func (i *compactionIter) TombstonesUpTo(key []byte) []keyspan.Span {
1293 1 : var toReturn []keyspan.Span
1294 1 : toReturn, i.tombstones = i.splitSpans(i.tombstones, key)
1295 1 :
1296 1 : result := toReturn[:0]
1297 1 : for _, span := range toReturn {
1298 1 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1299 1 : // each snapshot stripe.
1300 1 : currentIdx := -1
1301 1 : keys := make([]keyspan.Key, 0, min(len(span.Keys), len(i.snapshots)+1))
1302 1 : for _, k := range span.Keys {
1303 1 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1304 1 : if currentIdx == idx {
1305 1 : continue
1306 : }
1307 1 : if idx == 0 && i.elideRangeTombstone(span.Start, span.End) {
1308 1 : // This is the last snapshot stripe and the range tombstone
1309 1 : // can be elided.
1310 1 : break
1311 : }
1312 :
1313 1 : keys = append(keys, k)
1314 1 : if idx == 0 {
1315 1 : // This is the last snapshot stripe.
1316 1 : break
1317 : }
1318 1 : currentIdx = idx
1319 : }
1320 1 : if len(keys) > 0 {
1321 1 : result = append(result, keyspan.Span{
1322 1 : Start: i.cloneKey(span.Start),
1323 1 : End: i.cloneKey(span.End),
1324 1 : Keys: keys,
1325 1 : })
1326 1 : }
1327 : }
1328 :
1329 1 : return result
1330 : }
1331 :
1332 : // FirstTombstoneStart returns the start key of the first pending tombstone (the
1333 : // first span that would be returned by TombstonesUpTo). Returns nil if
1334 : // there are no pending range keys.
1335 1 : func (i *compactionIter) FirstTombstoneStart() []byte {
1336 1 : if len(i.tombstones) == 0 {
1337 1 : return nil
1338 1 : }
1339 1 : return i.tombstones[0].Start
1340 : }
1341 :
1342 : // AddRangeKeySpan adds a copy of a span of range keys, to be later returned by
1343 : // RangeKeysUpTo. The span is shallow cloned.
1344 : //
1345 : // The spans must be non-overlapping and ordered. Empty spans are ignored.
1346 1 : func (i *compactionIter) AddRangeKeySpan(span *keyspan.Span) {
1347 1 : i.rangeKeys = i.appendSpan(i.rangeKeys, span)
1348 1 : }
1349 :
1350 : // RangeKeysUpTo returns a list of pending range keys up to the specified key,
1351 : // or all pending range keys if key = nil.
1352 : //
1353 : // If a key is specified, it must be greater than the last span Start key passed
1354 : // to AddRangeKeySpan.
1355 1 : func (i *compactionIter) RangeKeysUpTo(key []byte) []keyspan.Span {
1356 1 : var result []keyspan.Span
1357 1 : result, i.rangeKeys = i.splitSpans(i.rangeKeys, key)
1358 1 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1359 1 : // do that here.
1360 1 : return result
1361 1 : }
1362 :
1363 : // FirstRangeKeyStart returns the start key of the first pending range key span
1364 : // (the first span that would be returned by RangeKeysUpTo). Returns nil
1365 : // if there are no pending range keys.
1366 1 : func (i *compactionIter) FirstRangeKeyStart() []byte {
1367 1 : if len(i.rangeKeys) == 0 {
1368 1 : return nil
1369 1 : }
1370 1 : return i.rangeKeys[0].Start
1371 : }
1372 :
1373 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1374 : // so improves compression and enables an optimization during forward iteration
1375 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1376 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1377 1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1378 1 : if !i.allowZeroSeqNum {
1379 1 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1380 1 : // make the determination on a key by key basis, similar to what is done
1381 1 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1382 1 : // that isn't too expensive.
1383 1 : return
1384 1 : }
1385 1 : if snapshotIdx > 0 {
1386 1 : // This is not the last snapshot
1387 1 : return
1388 1 : }
1389 1 : i.key.SetSeqNum(base.SeqNumZero)
1390 : }
1391 :
1392 : // appendSpan appends a span to a list of ordered, non-overlapping spans.
1393 : // The span is shallow cloned.
1394 1 : func (i *compactionIter) appendSpan(spans []keyspan.Span, span *keyspan.Span) []keyspan.Span {
1395 1 : if span.Empty() {
1396 0 : return spans
1397 0 : }
1398 1 : if invariants.Enabled && len(spans) > 0 {
1399 1 : if last := spans[len(spans)-1]; i.cmp(last.End, span.Start) > 0 {
1400 0 : panic(errors.AssertionFailedf("overlapping range key spans: %s and %s", last, span))
1401 : }
1402 : }
1403 1 : return append(spans, keyspan.Span{
1404 1 : Start: i.cloneKey(span.Start),
1405 1 : End: i.cloneKey(span.End),
1406 1 : Keys: slices.Clone(span.Keys),
1407 1 : })
1408 : }
1409 :
1410 : // splitSpans takes a list of ordered, non-overlapping spans and a key and
1411 : // returns two lists of spans: one that ends at or before the key, and one which
1412 : // starts at or after key.
1413 : //
1414 : // If the key is nil, returns all spans. If not nil, the key must not be smaller
1415 : // than the start of the last span in the list.
1416 : func (i *compactionIter) splitSpans(
1417 : spans []keyspan.Span, key []byte,
1418 1 : ) (before, after []keyspan.Span) {
1419 1 : n := len(spans)
1420 1 : if n == 0 || key == nil || i.cmp(spans[n-1].End, key) <= 0 {
1421 1 : // Common case: return all spans (retaining any extra allocated space).
1422 1 : return spans[:n:n], spans[n:]
1423 1 : }
1424 1 : if c := i.cmp(key, spans[n-1].Start); c <= 0 {
1425 0 : if c < 0 {
1426 0 : panic(fmt.Sprintf("invalid key %q, last span %s", key, spans[n-1]))
1427 : }
1428 : // The last span starts exactly at key.
1429 0 : return spans[: n-1 : n-1], spans[n-1:]
1430 : }
1431 : // We have to split the last span.
1432 1 : key = i.cloneKey(key)
1433 1 : before = spans[:n:n]
1434 1 : after = append(spans[n:], keyspan.Span{
1435 1 : Start: key,
1436 1 : End: spans[n-1].End,
1437 1 : Keys: slices.Clone(spans[n-1].Keys),
1438 1 : })
1439 1 : before[n-1].End = key
1440 1 : return before, after
1441 : }
|