Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "sort"
13 : "strconv"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/internal/rangekey"
20 : "github.com/cockroachdb/redact"
21 : )
22 :
23 : // compactionIter provides a forward-only iterator that encapsulates the logic
24 : // for collapsing entries during compaction. It wraps an internal iterator and
25 : // collapses entries that are no longer necessary because they are shadowed by
26 : // newer entries. The simplest example of this is when the internal iterator
27 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
28 : // compactionIter collapses the second entry because it is no longer
29 : // necessary. The high-level structure for compactionIter is to iterate over
30 : // its internal iterator and output 1 entry for every user-key. There are four
31 : // complications to this story.
32 : //
33 : // 1. Eliding Deletion Tombstones
34 : //
35 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
36 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
37 : // shadows an entry at a lower level. If we're compacting to the base-level in
38 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
39 : // level and can be elided.
40 : //
41 : // We can do slightly better than only eliding deletion tombstones at the base
42 : // level by observing that we can elide a deletion tombstone if there are no
43 : // sstables that contain the entry's key. This check is performed by
44 : // elideTombstone.
45 : //
46 : // 2. Merges
47 : //
48 : // The MERGE operation merges the value for an entry with the existing value
49 : // for an entry. The logical value of an entry can be composed of a series of
50 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
51 : // internal iterator collapsing MERGE operations for the same key until it
52 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
53 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
54 : // merged using the specified Merger.
55 : //
56 : // An interesting case here occurs when MERGE is combined with SET. Consider
57 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
58 : // reason that the kind is changed to SET is because the SET operation acts as
59 : // a barrier preventing further merging. This can be seen better in the
60 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
61 : // (older) level and not involved in the compaction. If the compaction of
62 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
63 : // a.MERGE.1 would merge the values together incorrectly.
64 : //
65 : // 3. Snapshots
66 : //
67 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
68 : // a snapshot is a sequence number along with a guarantee from Pebble that it
69 : // will maintain the view of the database at that sequence number. Part of this
70 : // guarantee is relatively straightforward to achieve. When reading from the
71 : // database Pebble will ignore sequence numbers that are larger than the
72 : // snapshot sequence number. The primary complexity with snapshots occurs
73 : // during compaction: the collapsing of entries that are shadowed by newer
74 : // entries is at odds with the guarantee that Pebble will maintain the view of
75 : // the database at the snapshot sequence number. Rather than collapsing entries
76 : // up to the next user key, compactionIter can only collapse entries up to the
77 : // next snapshot boundary. That is, every snapshot boundary potentially causes
78 : // another entry for the same user-key to be emitted. Another way to view this
79 : // is that snapshots define stripes and entries are collapsed within stripes,
80 : // but not across stripes. Consider the following scenario:
81 : //
82 : // a.PUT.9
83 : // a.DEL.8
84 : // a.PUT.7
85 : // a.DEL.6
86 : // a.PUT.5
87 : //
88 : // In the absence of snapshots these entries would be collapsed to
89 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
90 : // be divided into two stripes and collapsed within the stripes:
91 : //
92 : // a.PUT.9 a.PUT.9
93 : // a.DEL.8 --->
94 : // a.PUT.7
95 : // -- --
96 : // a.DEL.6 ---> a.DEL.6
97 : // a.PUT.5
98 : //
99 : // All of the rules described earlier still apply, but they are confined to
100 : // operate within a snapshot stripe. Snapshots only affect compaction when the
101 : // snapshot sequence number lies within the range of sequence numbers being
102 : // compacted. In the above example, a snapshot at sequence number 10 or at
103 : // sequence number 5 would not have any effect.
104 : //
105 : // 4. Range Deletions
106 : //
107 : // Range deletions provide the ability to delete all of the keys (and values)
108 : // in a contiguous range. Range deletions are stored indexed by their start
109 : // key. The end key of the range is stored in the value. In order to support
110 : // lookup of the range deletions which overlap with a particular key, the range
111 : // deletion tombstones need to be fragmented whenever they overlap. This
112 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
113 : // subject to the rules for snapshots. For example, consider the two range
114 : // tombstones [a,e)#1 and [c,g)#2:
115 : //
116 : // 2: c-------g
117 : // 1: a-------e
118 : //
119 : // These tombstones will be fragmented into:
120 : //
121 : // 2: c---e---g
122 : // 1: a---c---e
123 : //
124 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
125 : // depends on whether it is in a new snapshot stripe.
126 : //
127 : // In addition to the fragmentation of range tombstones, compaction also needs
128 : // to take the range tombstones into consideration when outputting normal
129 : // keys. Just as with point deletions, a range deletion covering an entry can
130 : // cause the entry to be elided.
131 : //
132 : // A note on the stability of keys and values.
133 : //
134 : // The stability guarantees of keys and values returned by the iterator tree
135 : // that backs a compactionIter is nuanced and care must be taken when
136 : // referencing any returned items.
137 : //
138 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
139 : // lifetimes that fall into two categories:
140 : //
141 : // Lifetime valid for duration of compaction. Range deletion keys and values are
142 : // stable for the duration of the compaction, due to way in which a
143 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
144 : // which wraps the iterator over the range deletion block in a noCloseIter,
145 : // preventing the release of the backing memory until the compaction is
146 : // finished).
147 : //
148 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
149 : // etc.) and values must be cloned / copied following the return from the
150 : // exported function, and before a subsequent call to Next advances the iterator
151 : // and mutates the contents of the returned key and value.
152 : type compactionIter struct {
153 : equal Equal
154 : merge Merge
155 : iter internalIterator
156 : err error
157 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
158 : // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
159 : // case on return from all public methods -- these methods return `key`.
160 : // Additionally, it is the internal state when the code is moving to the
161 : // next key so it can determine whether the user key has changed from
162 : // the previous key.
163 : key InternalKey
164 : // keyTrailer is updated when `i.key` is updated and holds the key's
165 : // original trailer (eg, before any sequence-number zeroing or changes to
166 : // key kind).
167 : keyTrailer uint64
168 : value []byte
169 : valueCloser io.Closer
170 : // Temporary buffer used for storing the previous user key in order to
171 : // determine when iteration has advanced to a new user key and thus a new
172 : // snapshot stripe.
173 : keyBuf []byte
174 : // Temporary buffer used for storing the previous value, which may be an
175 : // unsafe, i.iter-owned slice that could be altered when the iterator is
176 : // advanced.
177 : valueBuf []byte
178 : // Is the current entry valid?
179 : valid bool
180 : iterKey *InternalKey
181 : iterValue []byte
182 : iterStripeChange stripeChangeType
183 : // `skip` indicates whether the remaining skippable entries in the current
184 : // snapshot stripe should be skipped or processed. An example of a non-
185 : // skippable entry is a range tombstone as we need to return it from the
186 : // `compactionIter`, even if a key covering its start key has already been
187 : // seen in the same stripe. `skip` has no effect when `pos == iterPosNext`.
188 : //
189 : // TODO(jackson): If we use keyspan.InterleavingIter for range deletions,
190 : // like we do for range keys, the only remaining 'non-skippable' key is
191 : // the invalid key. We should be able to simplify this logic and remove this
192 : // field.
193 : skip bool
194 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
195 : // (`iterPos`) values take on the following meanings in the context of
196 : // `compactionIter`.
197 : //
198 : // - `iterPosCur`: the iterator is at the last key returned.
199 : // - `iterPosNext`: the iterator has already been advanced to the next
200 : // candidate key. For example, this happens when processing merge operands,
201 : // where we advance the iterator all the way into the next stripe or next
202 : // user key to ensure we've seen all mergeable operands.
203 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
204 : pos iterPos
205 : // `snapshotPinned` indicates whether the last point key returned by the
206 : // compaction iterator was only returned because an open snapshot prevents
207 : // its elision. This field only applies to point keys, and not to range
208 : // deletions or range keys.
209 : //
210 : // For MERGE, it is possible that doing the merge is interrupted even when
211 : // the next point key is in the same stripe. This can happen if the loop in
212 : // mergeNext gets interrupted by sameStripeNonSkippable.
213 : // sameStripeNonSkippable occurs due to RANGEDELs that sort before
214 : // SET/MERGE/DEL with the same seqnum, so the RANGEDEL does not necessarily
215 : // delete the subsequent SET/MERGE/DEL keys.
216 : snapshotPinned bool
217 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
218 : // snapshotPinned is true. This value is true when the point is obsolete due
219 : // to a RANGEDEL but could not be deleted due to a snapshot.
220 : //
221 : // NB: it may seem that the additional cases that snapshotPinned captures
222 : // are harmless in that they can also be used to mark a point as obsolete
223 : // (it is merely a duplication of some logic that happens in
224 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
225 : // writing -- snapshotPinned originated in stats collection and for a
226 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
227 : // due to a snapshot, the snapshotPinned value for the SET is true.
228 : //
229 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
230 : // whether we need forceObsoleteDueToRangeDel.
231 : forceObsoleteDueToRangeDel bool
232 : // The index of the snapshot for the current key within the snapshots slice.
233 : curSnapshotIdx int
234 : curSnapshotSeqNum uint64
235 : // The snapshot sequence numbers that need to be maintained. These sequence
236 : // numbers define the snapshot stripes (see the Snapshots description
237 : // above). The sequence numbers are in ascending order.
238 : snapshots []uint64
239 : // frontiers holds a heap of user keys that affect compaction behavior when
240 : // they're exceeded. Before a new key is returned, the compaction iterator
241 : // advances the frontier, notifying any code that subscribed to be notified
242 : // when a key was reached. The primary use today is within the
243 : // implementation of compactionOutputSplitters in compaction.go. Many of
244 : // these splitters wait for the compaction iterator to call Advance(k) when
245 : // it's returning a new key. If the key that they're waiting for is
246 : // surpassed, these splitters update internal state recording that they
247 : // should request a compaction split next time they're asked in
248 : // [shouldSplitBefore].
249 : frontiers frontiers
250 : // Reference to the range deletion tombstone fragmenter (e.g.,
251 : // `compaction.rangeDelFrag`).
252 : rangeDelFrag *keyspan.Fragmenter
253 : rangeKeyFrag *keyspan.Fragmenter
254 : // The fragmented tombstones.
255 : tombstones []keyspan.Span
256 : // The fragmented range keys.
257 : rangeKeys []keyspan.Span
258 : // Byte allocator for the tombstone keys.
259 : alloc bytealloc.A
260 : allowZeroSeqNum bool
261 : elideTombstone func(key []byte) bool
262 : elideRangeTombstone func(start, end []byte) bool
263 : // The on-disk format major version. This informs the types of keys that
264 : // may be written to disk during a compaction.
265 : formatVersion FormatMajorVersion
266 : stats struct {
267 : // count of DELSIZED keys that were missized.
268 : countMissizedDels uint64
269 : }
270 : }
271 :
272 : func newCompactionIter(
273 : cmp Compare,
274 : equal Equal,
275 : formatKey base.FormatKey,
276 : merge Merge,
277 : iter internalIterator,
278 : snapshots []uint64,
279 : rangeDelFrag *keyspan.Fragmenter,
280 : rangeKeyFrag *keyspan.Fragmenter,
281 : allowZeroSeqNum bool,
282 : elideTombstone func(key []byte) bool,
283 : elideRangeTombstone func(start, end []byte) bool,
284 : formatVersion FormatMajorVersion,
285 1 : ) *compactionIter {
286 1 : i := &compactionIter{
287 1 : equal: equal,
288 1 : merge: merge,
289 1 : iter: iter,
290 1 : snapshots: snapshots,
291 1 : frontiers: frontiers{cmp: cmp},
292 1 : rangeDelFrag: rangeDelFrag,
293 1 : rangeKeyFrag: rangeKeyFrag,
294 1 : allowZeroSeqNum: allowZeroSeqNum,
295 1 : elideTombstone: elideTombstone,
296 1 : elideRangeTombstone: elideRangeTombstone,
297 1 : formatVersion: formatVersion,
298 1 : }
299 1 : i.rangeDelFrag.Cmp = cmp
300 1 : i.rangeDelFrag.Format = formatKey
301 1 : i.rangeDelFrag.Emit = i.emitRangeDelChunk
302 1 : i.rangeKeyFrag.Cmp = cmp
303 1 : i.rangeKeyFrag.Format = formatKey
304 1 : i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
305 1 : return i
306 1 : }
307 :
308 1 : func (i *compactionIter) First() (*InternalKey, []byte) {
309 1 : if i.err != nil {
310 0 : return nil, nil
311 0 : }
312 1 : var iterValue LazyValue
313 1 : i.iterKey, iterValue = i.iter.First()
314 1 : i.iterValue, _, i.err = iterValue.Value(nil)
315 1 : if i.err != nil {
316 0 : return nil, nil
317 0 : }
318 1 : if i.iterKey != nil {
319 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
320 1 : }
321 1 : i.pos = iterPosNext
322 1 : i.iterStripeChange = newStripeNewKey
323 1 : return i.Next()
324 : }
325 :
326 1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
327 1 : if i.err != nil {
328 0 : return nil, nil
329 0 : }
330 :
331 : // Close the closer for the current value if one was open.
332 1 : if i.closeValueCloser() != nil {
333 0 : return nil, nil
334 0 : }
335 :
336 : // Prior to this call to `Next()` we are in one of four situations with
337 : // respect to `iterKey` and related state:
338 : //
339 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
340 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
341 : // To move forward we advance by one key, even if that lands us in the same
342 : // snapshot stripe.
343 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
344 : // To move forward we skip skippable entries in the stripe.
345 : // - `skip && pos == iterPosNext && i.iterStripeChange == sameStripeNonSkippable`:
346 : // This case may occur when skipping within a snapshot stripe and we
347 : // encounter either:
348 : // a) an invalid key kind; The previous call will have returned
349 : // whatever key it was processing and deferred handling of the
350 : // invalid key to this invocation of Next(). We're responsible for
351 : // ignoring skip=true and falling into the invalid key kind case
352 : // down below.
353 : // b) an interleaved range delete; This is a wart of the current code
354 : // structure. While skipping within a snapshot stripe, a range
355 : // delete interleaved at its start key and sequence number
356 : // interrupts the sequence of point keys. After we return the range
357 : // delete to the caller, we need to pick up skipping at where we
358 : // left off, so we preserve skip=true.
359 : // TODO(jackson): This last case is confusing and can be removed if we
360 : // interleave range deletions at the maximal sequence number using the
361 : // keyspan interleaving iterator. This is the treatment given to range
362 : // keys today.
363 1 : if i.pos == iterPosCurForward {
364 1 : if i.skip {
365 1 : i.skipInStripe()
366 1 : } else {
367 1 : i.nextInStripe()
368 1 : }
369 1 : } else if i.skip {
370 1 : if i.iterStripeChange != sameStripeNonSkippable {
371 0 : panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
372 : }
373 : }
374 :
375 1 : i.pos = iterPosCurForward
376 1 : i.valid = false
377 1 :
378 1 : for i.iterKey != nil {
379 1 : // If we entered a new snapshot stripe with the same key, any key we
380 1 : // return on this iteration is only returned because the open snapshot
381 1 : // prevented it from being elided or merged with the key returned for
382 1 : // the previous stripe. Mark it as pinned so that the compaction loop
383 1 : // can correctly populate output tables' pinned statistics. We might
384 1 : // also set snapshotPinned=true down below if we observe that the key is
385 1 : // deleted by a range deletion in a higher stripe or that this key is a
386 1 : // tombstone that could be elided if only it were in the last snapshot
387 1 : // stripe.
388 1 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
389 1 :
390 1 : if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
391 1 : // Return the span so the compaction can use it for file truncation and add
392 1 : // it to the relevant fragmenter. We do not set `skip` to true before
393 1 : // returning as there may be a forthcoming point key with the same user key
394 1 : // and sequence number. Such a point key must be visible (i.e., not skipped
395 1 : // over) since we promise point keys are not deleted by range tombstones at
396 1 : // the same sequence number.
397 1 : //
398 1 : // Although, note that `skip` may already be true before reaching here
399 1 : // due to an earlier key in the stripe. Then it is fine to leave it set
400 1 : // to true, as the earlier key must have had a higher sequence number.
401 1 : //
402 1 : // NOTE: there is a subtle invariant violation here in that calling
403 1 : // saveKey and returning a reference to the temporary slice violates
404 1 : // the stability guarantee for range deletion keys. A potential
405 1 : // mediation could return the original iterKey and iterValue
406 1 : // directly, as the backing memory is guaranteed to be stable until
407 1 : // the compaction completes. The violation here is only minor in
408 1 : // that the caller immediately clones the range deletion InternalKey
409 1 : // when passing the key to the deletion fragmenter (see the
410 1 : // call-site in compaction.go).
411 1 : // TODO(travers): address this violation by removing the call to
412 1 : // saveKey and instead return the original iterKey and iterValue.
413 1 : // This goes against the comment on i.key in the struct, and
414 1 : // therefore warrants some investigation.
415 1 : i.saveKey()
416 1 : // TODO(jackson): Handle tracking pinned statistics for range keys
417 1 : // and range deletions. This would require updating
418 1 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
419 1 : // statistics when they apply their own snapshot striping logic.
420 1 : i.snapshotPinned = false
421 1 : i.value = i.iterValue
422 1 : i.valid = true
423 1 : return &i.key, i.value
424 1 : }
425 :
426 1 : if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
427 1 : // A pending range deletion deletes this key. Skip it.
428 1 : i.saveKey()
429 1 : i.skipInStripe()
430 1 : continue
431 1 : } else if cover == keyspan.CoversInvisibly {
432 1 : // i.iterKey would be deleted by a range deletion if there weren't
433 1 : // any open snapshots. Mark it as pinned.
434 1 : //
435 1 : // NB: there are multiple places in this file where we call
436 1 : // i.rangeDelFrag.Covers and this is the only one where we are writing
437 1 : // to i.snapshotPinned. Those other cases occur in mergeNext where the
438 1 : // caller is deciding whether the value should be merged or not, and the
439 1 : // key is in the same snapshot stripe. Hence, snapshotPinned is by
440 1 : // definition false in those cases.
441 1 : i.snapshotPinned = true
442 1 : i.forceObsoleteDueToRangeDel = true
443 1 : } else {
444 1 : i.forceObsoleteDueToRangeDel = false
445 1 : }
446 :
447 1 : switch i.iterKey.Kind() {
448 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
449 1 : if i.elideTombstone(i.iterKey.UserKey) {
450 1 : if i.curSnapshotIdx == 0 {
451 1 : // If we're at the last snapshot stripe and the tombstone
452 1 : // can be elided skip skippable keys in the same stripe.
453 1 : i.saveKey()
454 1 : i.skipInStripe()
455 1 : if i.iterStripeChange == newStripeSameKey {
456 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
457 : }
458 1 : if !i.skip && i.iterStripeChange != newStripeNewKey {
459 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
460 : }
461 1 : continue
462 1 : } else {
463 1 : // We're not at the last snapshot stripe, so the tombstone
464 1 : // can NOT yet be elided. Mark it as pinned, so that it's
465 1 : // included in table statistics appropriately.
466 1 : i.snapshotPinned = true
467 1 : }
468 : }
469 :
470 1 : switch i.iterKey.Kind() {
471 1 : case InternalKeyKindDelete:
472 1 : i.saveKey()
473 1 : i.value = i.iterValue
474 1 : i.valid = true
475 1 : i.skip = true
476 1 : return &i.key, i.value
477 :
478 1 : case InternalKeyKindDeleteSized:
479 1 : // We may skip subsequent keys because of this tombstone. Scan
480 1 : // ahead to see just how much data this tombstone drops and if
481 1 : // the tombstone's value should be updated accordingly.
482 1 : return i.deleteSizedNext()
483 :
484 1 : case InternalKeyKindSingleDelete:
485 1 : if i.singleDeleteNext() {
486 1 : return &i.key, i.value
487 1 : } else if i.err != nil {
488 0 : return nil, nil
489 0 : }
490 1 : continue
491 :
492 0 : default:
493 0 : panic(errors.AssertionFailedf(
494 0 : "unexpected kind %s", redact.SafeString(i.iterKey.Kind().String())))
495 : }
496 :
497 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
498 1 : // The key we emit for this entry is a function of the current key
499 1 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
500 1 : // entry. setNext() does the work to move the iterator forward,
501 1 : // preserving the original value, and potentially mutating the key
502 1 : // kind.
503 1 : i.setNext()
504 1 : if i.err != nil {
505 0 : return nil, nil
506 0 : }
507 1 : return &i.key, i.value
508 :
509 1 : case InternalKeyKindMerge:
510 1 : // Record the snapshot index before mergeNext as merging
511 1 : // advances the iterator, adjusting curSnapshotIdx.
512 1 : origSnapshotIdx := i.curSnapshotIdx
513 1 : var valueMerger ValueMerger
514 1 : valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
515 1 : var change stripeChangeType
516 1 : if i.err == nil {
517 1 : change = i.mergeNext(valueMerger)
518 1 : }
519 1 : var needDelete bool
520 1 : if i.err == nil {
521 1 : // includesBase is true whenever we've transformed the MERGE record
522 1 : // into a SET.
523 1 : var includesBase bool
524 1 : switch i.key.Kind() {
525 1 : case InternalKeyKindSet:
526 1 : includesBase = true
527 1 : case InternalKeyKindMerge:
528 0 : default:
529 0 : panic(errors.AssertionFailedf(
530 0 : "unexpected kind %s", redact.SafeString(i.key.Kind().String())))
531 : }
532 1 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
533 : }
534 1 : if i.err == nil {
535 1 : if needDelete {
536 1 : i.valid = false
537 1 : if i.closeValueCloser() != nil {
538 0 : return nil, nil
539 0 : }
540 1 : continue
541 : }
542 : // A non-skippable entry does not necessarily cover later merge
543 : // operands, so we must not zero the current merge result's seqnum.
544 : //
545 : // For example, suppose the forthcoming two keys are a range
546 : // tombstone, `[a, b)#3`, and a merge operand, `a#3`. Recall that
547 : // range tombstones do not cover point keys at the same seqnum, so
548 : // `a#3` is not deleted. The range tombstone will be seen first due
549 : // to its larger value type. Since it is a non-skippable key, the
550 : // current merge will not include `a#3`. If we zeroed the current
551 : // merge result's seqnum, then it would conflict with the upcoming
552 : // merge including `a#3`, whose seqnum will also be zeroed.
553 1 : if change != sameStripeNonSkippable {
554 1 : i.maybeZeroSeqnum(origSnapshotIdx)
555 1 : }
556 1 : return &i.key, i.value
557 : }
558 0 : if i.err != nil {
559 0 : i.valid = false
560 0 : // TODO(sumeer): why is MarkCorruptionError only being called for
561 0 : // MERGE?
562 0 : i.err = base.MarkCorruptionError(i.err)
563 0 : }
564 0 : return nil, nil
565 :
566 1 : default:
567 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
568 1 : i.valid = false
569 1 : return nil, nil
570 : }
571 : }
572 :
573 1 : return nil, nil
574 : }
575 :
576 1 : func (i *compactionIter) closeValueCloser() error {
577 1 : if i.valueCloser == nil {
578 1 : return nil
579 1 : }
580 :
581 0 : i.err = i.valueCloser.Close()
582 0 : i.valueCloser = nil
583 0 : if i.err != nil {
584 0 : i.valid = false
585 0 : }
586 0 : return i.err
587 : }
588 :
589 : // snapshotIndex returns the index of the first sequence number in snapshots
590 : // which is greater than or equal to seq.
591 1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
592 1 : index := sort.Search(len(snapshots), func(i int) bool {
593 1 : return snapshots[i] > seq
594 1 : })
595 1 : if index >= len(snapshots) {
596 1 : return index, InternalKeySeqNumMax
597 1 : }
598 1 : return index, snapshots[index]
599 : }
600 :
601 : // skipInStripe skips over skippable keys in the same stripe and user key. It
602 : // may set i.err, in which case i.iterKey will be nil.
603 1 : func (i *compactionIter) skipInStripe() {
604 1 : i.skip = true
605 1 : for i.nextInStripe() == sameStripeSkippable {
606 1 : if i.err != nil {
607 0 : panic(i.err)
608 : }
609 : }
610 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
611 : // in the same stripe on a non-skippable key. In that case we should preserve
612 : // `i.skip == true` such that later keys in the stripe will continue to be
613 : // skipped.
614 1 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
615 1 : i.skip = false
616 1 : }
617 : }
618 :
619 1 : func (i *compactionIter) iterNext() bool {
620 1 : var iterValue LazyValue
621 1 : i.iterKey, iterValue = i.iter.Next()
622 1 : i.iterValue, _, i.err = iterValue.Value(nil)
623 1 : if i.err != nil {
624 0 : i.iterKey = nil
625 0 : }
626 1 : return i.iterKey != nil
627 : }
628 :
629 : // stripeChangeType indicates how the snapshot stripe changed relative to the
630 : // previous key. If no change, it also indicates whether the current entry is
631 : // skippable. If the snapshot stripe changed, it also indicates whether the new
632 : // stripe was entered because the iterator progressed onto an entirely new key
633 : // or entered a new stripe within the same key.
634 : type stripeChangeType int
635 :
636 : const (
637 : newStripeNewKey stripeChangeType = iota
638 : newStripeSameKey
639 : sameStripeSkippable
640 : sameStripeNonSkippable
641 : )
642 :
643 : // nextInStripe advances the iterator and returns one of the above const ints
644 : // indicating how its state changed.
645 : //
646 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
647 : // temporary reference to the original key, so that forward iteration can
648 : // proceed with a reference to the original key. Care should be taken to avoid
649 : // overwriting or mutating the saved key or value before they have been returned
650 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
651 : //
652 : // nextInStripe may set i.err, in which case the return value will be
653 : // newStripeNewKey, and i.iterKey will be nil.
654 1 : func (i *compactionIter) nextInStripe() stripeChangeType {
655 1 : i.iterStripeChange = i.nextInStripeHelper()
656 1 : return i.iterStripeChange
657 1 : }
658 :
659 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
660 : // nextInStripe and not call nextInStripeHelper.
661 1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
662 1 : if !i.iterNext() {
663 1 : return newStripeNewKey
664 1 : }
665 1 : key := i.iterKey
666 1 :
667 1 : if !i.equal(i.key.UserKey, key.UserKey) {
668 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
669 1 : return newStripeNewKey
670 1 : }
671 :
672 : // If i.key and key have the same user key, then
673 : // 1. i.key must not have had a zero sequence number (or it would've be the last
674 : // key with its user key).
675 : // 2. i.key must have a strictly larger sequence number
676 : // There's an exception in that either key may be a range delete. Range
677 : // deletes may share a sequence number with a point key if the keys were
678 : // ingested together. Range keys may also share the sequence number if they
679 : // were ingested, but range keys are interleaved into the compaction
680 : // iterator's input iterator at the maximal sequence number so their
681 : // original sequence number will not be observed here.
682 1 : if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= key.SeqNum()) &&
683 1 : i.key.Kind() != InternalKeyKindRangeDelete && key.Kind() != InternalKeyKindRangeDelete {
684 0 : prevKey := i.key
685 0 : prevKey.Trailer = i.keyTrailer
686 0 : panic(errors.AssertionFailedf("pebble: invariant violation: %s and %s out of order", prevKey, key))
687 : }
688 :
689 1 : origSnapshotIdx := i.curSnapshotIdx
690 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
691 1 : switch key.Kind() {
692 1 : case InternalKeyKindRangeDelete:
693 1 : // Range tombstones need to be exposed by the compactionIter to the upper level
694 1 : // `compaction` object, so return them regardless of whether they are in the same
695 1 : // snapshot stripe.
696 1 : if i.curSnapshotIdx == origSnapshotIdx {
697 1 : return sameStripeNonSkippable
698 1 : }
699 1 : return newStripeSameKey
700 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
701 0 : // Range keys are interleaved at the max sequence number for a given user
702 0 : // key, so we should not see any more range keys in this stripe.
703 0 : panic("unreachable")
704 1 : case InternalKeyKindInvalid:
705 1 : if i.curSnapshotIdx == origSnapshotIdx {
706 1 : return sameStripeNonSkippable
707 1 : }
708 0 : return newStripeSameKey
709 : case InternalKeyKindDelete, InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSingleDelete,
710 1 : InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
711 : // Fall through
712 0 : default:
713 0 : i.iterKey = nil
714 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
715 0 : i.valid = false
716 0 : return newStripeNewKey
717 : }
718 1 : if i.curSnapshotIdx == origSnapshotIdx {
719 1 : return sameStripeSkippable
720 1 : }
721 1 : return newStripeSameKey
722 : }
723 :
724 1 : func (i *compactionIter) setNext() {
725 1 : // Save the current key.
726 1 : i.saveKey()
727 1 : i.value = i.iterValue
728 1 : i.valid = true
729 1 : i.maybeZeroSeqnum(i.curSnapshotIdx)
730 1 :
731 1 : // There are two cases where we can early return and skip the remaining
732 1 : // records in the stripe:
733 1 : // - If the DB does not SETWITHDEL.
734 1 : // - If this key is already a SETWITHDEL.
735 1 : if i.formatVersion < FormatSetWithDelete ||
736 1 : i.iterKey.Kind() == InternalKeyKindSetWithDelete {
737 1 : i.skip = true
738 1 : return
739 1 : }
740 :
741 : // We are iterating forward. Save the current value.
742 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
743 1 : i.value = i.valueBuf
744 1 :
745 1 : // Else, we continue to loop through entries in the stripe looking for a
746 1 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
747 1 : for {
748 1 : switch i.nextInStripe() {
749 1 : case newStripeNewKey, newStripeSameKey:
750 1 : i.pos = iterPosNext
751 1 : return
752 1 : case sameStripeNonSkippable:
753 1 : i.pos = iterPosNext
754 1 : // We iterated onto a key that we cannot skip. We can
755 1 : // conservatively transform the original SET into a SETWITHDEL
756 1 : // as an indication that there *may* still be a DEL/SINGLEDEL
757 1 : // under this SET, even if we did not actually encounter one.
758 1 : //
759 1 : // This is safe to do, as:
760 1 : //
761 1 : // - in the case that there *is not* actually a DEL/SINGLEDEL
762 1 : // under this entry, any SINGLEDEL above this now-transformed
763 1 : // SETWITHDEL will become a DEL when the two encounter in a
764 1 : // compaction. The DEL will eventually be elided in a
765 1 : // subsequent compaction. The cost for ensuring correctness is
766 1 : // that this entry is kept around for an additional compaction
767 1 : // cycle(s).
768 1 : //
769 1 : // - in the case there *is* indeed a DEL/SINGLEDEL under us
770 1 : // (but in a different stripe or sstable), then we will have
771 1 : // already done the work to transform the SET into a
772 1 : // SETWITHDEL, and we will skip any additional iteration when
773 1 : // this entry is encountered again in a subsequent compaction.
774 1 : //
775 1 : // Ideally, this codepath would be smart enough to handle the
776 1 : // case of SET <- RANGEDEL <- ... <- DEL/SINGLEDEL <- ....
777 1 : // This requires preserving any RANGEDEL entries we encounter
778 1 : // along the way, then emitting the original (possibly
779 1 : // transformed) key, followed by the RANGEDELs. This requires
780 1 : // a sizable refactoring of the existing code, as nextInStripe
781 1 : // currently returns a sameStripeNonSkippable when it
782 1 : // encounters a RANGEDEL.
783 1 : // TODO(travers): optimize to handle the RANGEDEL case if it
784 1 : // turns out to be a performance problem.
785 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
786 1 :
787 1 : // By setting i.skip=true, we are saying that after the
788 1 : // non-skippable key is emitted (which is likely a RANGEDEL),
789 1 : // the remaining point keys that share the same user key as this
790 1 : // saved key should be skipped.
791 1 : i.skip = true
792 1 : return
793 1 : case sameStripeSkippable:
794 1 : // We're still in the same stripe. If this is a
795 1 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
796 1 : // Subsequent keys are eligible for skipping.
797 1 : switch i.iterKey.Kind() {
798 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
799 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
800 1 : i.skip = true
801 1 : return
802 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
803 : // Do nothing
804 0 : default:
805 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
806 0 : i.valid = false
807 : }
808 0 : default:
809 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
810 : }
811 : }
812 : }
813 :
814 1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) stripeChangeType {
815 1 : // Save the current key.
816 1 : i.saveKey()
817 1 : i.valid = true
818 1 :
819 1 : // Loop looking for older values in the current snapshot stripe and merge
820 1 : // them.
821 1 : for {
822 1 : if i.nextInStripe() != sameStripeSkippable {
823 1 : i.pos = iterPosNext
824 1 : return i.iterStripeChange
825 1 : }
826 1 : if i.err != nil {
827 0 : panic(i.err)
828 : }
829 1 : key := i.iterKey
830 1 : switch key.Kind() {
831 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
832 1 : // We've hit a deletion tombstone. Return everything up to this point and
833 1 : // then skip entries until the next snapshot stripe. We change the kind
834 1 : // of the result key to a Set so that it shadows keys in lower
835 1 : // levels. That is, MERGE+DEL -> SET.
836 1 : // We do the same for SingleDelete since SingleDelete is only
837 1 : // permitted (with deterministic behavior) for keys that have been
838 1 : // set once since the last SingleDelete/Delete, so everything
839 1 : // older is acceptable to shadow. Note that this is slightly
840 1 : // different from singleDeleteNext() which implements stricter
841 1 : // semantics in terms of applying the SingleDelete to the single
842 1 : // next Set. But those stricter semantics are not observable to
843 1 : // the end-user since Iterator interprets SingleDelete as Delete.
844 1 : // We could do something more complicated here and consume only a
845 1 : // single Set, and then merge in any following Sets, but that is
846 1 : // complicated wrt code and unnecessary given the narrow permitted
847 1 : // use of SingleDelete.
848 1 : i.key.SetKind(InternalKeyKindSet)
849 1 : i.skip = true
850 1 : return sameStripeSkippable
851 :
852 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
853 1 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
854 1 : // We change the kind of the result key to a Set so that it shadows
855 1 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
856 1 : // strictly necessary, but provides consistency with the behavior of
857 1 : // MERGE+DEL.
858 1 : i.key.SetKind(InternalKeyKindSet)
859 1 : i.skip = true
860 1 : return sameStripeSkippable
861 1 : }
862 :
863 : // We've hit a Set or SetWithDel value. Merge with the existing
864 : // value and return. We change the kind of the resulting key to a
865 : // Set so that it shadows keys in lower levels. That is:
866 : // MERGE + (SET*) -> SET.
867 1 : i.err = valueMerger.MergeOlder(i.iterValue)
868 1 : if i.err != nil {
869 0 : i.valid = false
870 0 : return sameStripeSkippable
871 0 : }
872 1 : i.key.SetKind(InternalKeyKindSet)
873 1 : i.skip = true
874 1 : return sameStripeSkippable
875 :
876 1 : case InternalKeyKindMerge:
877 1 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
878 1 : // We change the kind of the result key to a Set so that it shadows
879 1 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
880 1 : // strictly necessary, but provides consistency with the behavior of
881 1 : // MERGE+DEL.
882 1 : i.key.SetKind(InternalKeyKindSet)
883 1 : i.skip = true
884 1 : return sameStripeSkippable
885 1 : }
886 :
887 : // We've hit another Merge value. Merge with the existing value and
888 : // continue looping.
889 1 : i.err = valueMerger.MergeOlder(i.iterValue)
890 1 : if i.err != nil {
891 0 : i.valid = false
892 0 : return sameStripeSkippable
893 0 : }
894 :
895 0 : default:
896 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
897 0 : i.valid = false
898 0 : return sameStripeSkippable
899 : }
900 : }
901 : }
902 :
903 : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
904 : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
905 : // performance optimization when the client knows a user key has not been
906 : // overwritten, allowing the elision of the tombstone earlier, avoiding write
907 : // amplification.
908 : //
909 : // singleDeleteNext returns a boolean indicating whether or not the caller
910 : // should yield the SingleDelete key to the consumer of the compactionIter. If
911 : // singleDeleteNext returns false, the caller may consume/elide the
912 : // SingleDelete.
913 1 : func (i *compactionIter) singleDeleteNext() bool {
914 1 : // Save the current key.
915 1 : i.saveKey()
916 1 : i.value = i.iterValue
917 1 : i.valid = true
918 1 :
919 1 : // Loop until finds a key to be passed to the next level.
920 1 : for {
921 1 : // If we find a key that can't be skipped, return true so that the
922 1 : // caller yields the SingleDelete to the caller.
923 1 : if i.nextInStripe() != sameStripeSkippable {
924 1 : i.pos = iterPosNext
925 1 : return i.err == nil
926 1 : }
927 1 : if i.err != nil {
928 0 : panic(i.err)
929 : }
930 1 : key := i.iterKey
931 1 : switch key.Kind() {
932 1 : case InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
933 1 : // We've hit a Delete, DeleteSized, Merge, SetWithDelete, transform
934 1 : // the SingleDelete into a full Delete.
935 1 : i.key.SetKind(InternalKeyKindDelete)
936 1 : i.skip = true
937 1 : return true
938 :
939 1 : case InternalKeyKindSet:
940 1 : // This SingleDelete deletes the Set, and we can now elide the
941 1 : // SingleDel as well. We advance past the Set and return false to
942 1 : // indicate to the main compaction loop that we should NOT yield the
943 1 : // current SingleDel key to the compaction loop.
944 1 : i.nextInStripe()
945 1 : // TODO(jackson): We could assert that nextInStripe either a)
946 1 : // stepped onto a new key, or b) stepped on to a Delete, DeleteSized
947 1 : // or SingleDel key. This would detect improper uses of SingleDel,
948 1 : // but only when all three internal keys meet in the same compaction
949 1 : // which is not likely.
950 1 : i.valid = false
951 1 : return false
952 :
953 1 : case InternalKeyKindSingleDelete:
954 1 : // Two single deletes met in a compaction. With proper deterministic
955 1 : // use of SingleDelete, this should never happen. The expectation is
956 1 : // that there's exactly 1 set beneath a single delete. Currently, we
957 1 : // opt to skip it.
958 1 : // TODO(jackson): Should we make this an error? This would also
959 1 : // allow us to simplify the code a bit by removing the for loop.
960 1 : continue
961 :
962 0 : default:
963 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
964 0 : i.valid = false
965 0 : return false
966 : }
967 : }
968 : }
969 :
970 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
971 : // these tombstones carry a value that's a varint indicating the size of the
972 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
973 : //
974 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
975 : // any, are elided as a result of the tombstone.
976 1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
977 1 : i.saveKey()
978 1 : i.valid = true
979 1 : i.skip = true
980 1 :
981 1 : // The DELSIZED tombstone may have no value at all. This happens when the
982 1 : // tombstone has already deleted the key that the user originally predicted.
983 1 : // In this case, we still peek forward in case there's another DELSIZED key
984 1 : // with a lower sequence number, in which case we'll adopt its value.
985 1 : if len(i.iterValue) == 0 {
986 1 : i.value = i.valueBuf[:0]
987 1 : } else {
988 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
989 1 : i.value = i.valueBuf
990 1 : }
991 :
992 : // Loop through all the keys within this stripe that are skippable.
993 1 : i.pos = iterPosNext
994 1 : for i.nextInStripe() == sameStripeSkippable {
995 1 : if i.err != nil {
996 0 : panic(i.err)
997 : }
998 1 : switch i.iterKey.Kind() {
999 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
1000 1 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
1001 1 : // the original DELSIZED tombstone. This can happen in two cases:
1002 1 : //
1003 1 : // (1) These tombstones were intended to delete two distinct values,
1004 1 : // and this DELSIZED has already dropped the relevant key. For
1005 1 : // example:
1006 1 : //
1007 1 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
1008 1 : //
1009 1 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
1010 1 : // already been zeroed out. In this case, we want to adopt the
1011 1 : // value of the DELSIZED with the lower sequence number, in
1012 1 : // case the a.SET.4 key has not yet been elided.
1013 1 : //
1014 1 : // (2) This DELSIZED was missized. The user thought they were
1015 1 : // deleting a key with this user key, but this user key had
1016 1 : // already been deleted.
1017 1 : //
1018 1 : // We can differentiate these two cases by examining the length of
1019 1 : // the DELSIZED's value. A DELSIZED's value holds the size of both
1020 1 : // the user key and value that it intends to delete. For any user
1021 1 : // key with a length > 1, a DELSIZED that has not deleted a key must
1022 1 : // have a value with a length > 1.
1023 1 : //
1024 1 : // We treat both cases the same functionally, adopting the identity
1025 1 : // of the lower-sequence numbered tombstone. However in the second
1026 1 : // case, we also increment the stat counting missized tombstones.
1027 1 : if len(i.value) > 0 {
1028 1 : // The original DELSIZED key was missized. The key that the user
1029 1 : // thought they were deleting does not exist.
1030 1 : i.stats.countMissizedDels++
1031 1 : }
1032 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
1033 1 : i.value = i.valueBuf
1034 1 : if i.iterKey.Kind() != InternalKeyKindDeleteSized {
1035 1 : // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
1036 1 : // may not have deleted the key(s) it was intended to yet. The
1037 1 : // ordinary DEL compaction heuristics are better suited at that,
1038 1 : // plus we don't want to count it as a missized DEL. We early
1039 1 : // exit in this case, after skipping the remainder of the
1040 1 : // snapshot stripe.
1041 1 : i.key.SetKind(InternalKeyKindDelete)
1042 1 : // NB: We skipInStripe now, rather than returning leaving
1043 1 : // i.skip=true and returning early, because Next() requires
1044 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1045 1 : //
1046 1 : // Ignore any error caused by skipInStripe since it does not affect
1047 1 : // the key/value being returned here, and the next call to Next() will
1048 1 : // expose it.
1049 1 : i.skipInStripe()
1050 1 : return &i.key, i.value
1051 1 : }
1052 : // Continue, in case we uncover another DELSIZED or a key this
1053 : // DELSIZED deletes.
1054 :
1055 1 : case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindSetWithDelete:
1056 1 : // If the DELSIZED is value-less, it already deleted the key that it
1057 1 : // was intended to delete. This is possible with a sequence like:
1058 1 : //
1059 1 : // DELSIZED.8 SET.7 SET.3
1060 1 : //
1061 1 : // The DELSIZED only describes the size of the SET.7, which in this
1062 1 : // case has already been elided. We don't count it as a missizing,
1063 1 : // instead converting the DELSIZED to a DEL. Skip the remainder of
1064 1 : // the snapshot stripe and return.
1065 1 : if len(i.value) == 0 {
1066 1 : i.key.SetKind(InternalKeyKindDelete)
1067 1 : // NB: We skipInStripe now, rather than returning leaving
1068 1 : // i.skip=true and returning early, because Next() requires
1069 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1070 1 : //
1071 1 : // Ignore any error caused by skipInStripe since it does not affect
1072 1 : // the key/value being returned here, and the next call to Next() will
1073 1 : // expose it.
1074 1 : i.skipInStripe()
1075 1 : return &i.key, i.value
1076 1 : }
1077 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
1078 : // has a positive size.
1079 1 : expectedSize, n := binary.Uvarint(i.value)
1080 1 : if n != len(i.value) {
1081 1 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
1082 1 : i.valid = false
1083 1 : return nil, nil
1084 1 : }
1085 1 : elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
1086 1 : if elidedSize != expectedSize {
1087 1 : // The original DELSIZED key was missized. It's unclear what to
1088 1 : // do. The user-provided size was wrong, so it's unlikely to be
1089 1 : // accurate or meaningful. We could:
1090 1 : //
1091 1 : // 1. return the DELSIZED with the original user-provided size unmodified
1092 1 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
1093 1 : // elided, even if it wasn't the anticipated size.
1094 1 : // 3. subtract the elided size from the estimate and re-encode.
1095 1 : // 4. convert the DELSIZED into a value-less DEL, so that
1096 1 : // ordinary DEL heuristics apply.
1097 1 : //
1098 1 : // We opt for (4) under the rationale that we can't rely on the
1099 1 : // user-provided size for accuracy, so ordinary DEL heuristics
1100 1 : // are safer.
1101 1 : i.stats.countMissizedDels++
1102 1 : i.key.SetKind(InternalKeyKindDelete)
1103 1 : i.value = i.valueBuf[:0]
1104 1 : // NB: We skipInStripe now, rather than returning leaving
1105 1 : // i.skip=true and returning early, because Next() requires
1106 1 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1107 1 : //
1108 1 : // Ignore any error caused by skipInStripe since it does not affect
1109 1 : // the key/value being returned here, and the next call to Next() will
1110 1 : // expose it.
1111 1 : i.skipInStripe()
1112 1 : return &i.key, i.value
1113 1 : }
1114 : // NB: We remove the value regardless of whether the key was sized
1115 : // appropriately. The size encoded is 'consumed' the first time it
1116 : // meets a key that it deletes.
1117 1 : i.value = i.valueBuf[:0]
1118 :
1119 0 : default:
1120 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
1121 0 : i.valid = false
1122 0 : return nil, nil
1123 : }
1124 : }
1125 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
1126 : // in the same stripe on a non-skippable key. In that case we should preserve
1127 : // `i.skip == true` such that later keys in the stripe will continue to be
1128 : // skipped.
1129 1 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
1130 1 : i.skip = false
1131 1 : }
1132 1 : if i.err != nil {
1133 0 : return nil, nil
1134 0 : }
1135 1 : return &i.key, i.value
1136 : }
1137 :
1138 1 : func (i *compactionIter) saveKey() {
1139 1 : i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
1140 1 : i.key.UserKey = i.keyBuf
1141 1 : i.key.Trailer = i.iterKey.Trailer
1142 1 : i.keyTrailer = i.iterKey.Trailer
1143 1 : i.frontiers.Advance(i.key.UserKey)
1144 1 : }
1145 :
1146 1 : func (i *compactionIter) cloneKey(key []byte) []byte {
1147 1 : i.alloc, key = i.alloc.Copy(key)
1148 1 : return key
1149 1 : }
1150 :
1151 1 : func (i *compactionIter) Key() InternalKey {
1152 1 : return i.key
1153 1 : }
1154 :
1155 1 : func (i *compactionIter) Value() []byte {
1156 1 : return i.value
1157 1 : }
1158 :
1159 1 : func (i *compactionIter) Valid() bool {
1160 1 : return i.valid
1161 1 : }
1162 :
1163 1 : func (i *compactionIter) Error() error {
1164 1 : return i.err
1165 1 : }
1166 :
1167 1 : func (i *compactionIter) Close() error {
1168 1 : err := i.iter.Close()
1169 1 : if i.err == nil {
1170 1 : i.err = err
1171 1 : }
1172 :
1173 : // Close the closer for the current value if one was open.
1174 1 : if i.valueCloser != nil {
1175 0 : i.err = firstError(i.err, i.valueCloser.Close())
1176 0 : i.valueCloser = nil
1177 0 : }
1178 :
1179 1 : return i.err
1180 : }
1181 :
1182 : // Tombstones returns a list of pending range tombstones in the fragmenter
1183 : // up to the specified key, or all pending range tombstones if key = nil.
1184 1 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
1185 1 : if key == nil {
1186 1 : i.rangeDelFrag.Finish()
1187 1 : } else {
1188 1 : // The specified end key is exclusive; no versions of the specified
1189 1 : // user key (including range tombstones covering that key) should
1190 1 : // be flushed yet.
1191 1 : i.rangeDelFrag.TruncateAndFlushTo(key)
1192 1 : }
1193 1 : tombstones := i.tombstones
1194 1 : i.tombstones = nil
1195 1 : return tombstones
1196 : }
1197 :
1198 : // RangeKeys returns a list of pending fragmented range keys up to the specified
1199 : // key, or all pending range keys if key = nil.
1200 1 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
1201 1 : if key == nil {
1202 1 : i.rangeKeyFrag.Finish()
1203 1 : } else {
1204 1 : // The specified end key is exclusive; no versions of the specified
1205 1 : // user key (including range tombstones covering that key) should
1206 1 : // be flushed yet.
1207 1 : i.rangeKeyFrag.TruncateAndFlushTo(key)
1208 1 : }
1209 1 : rangeKeys := i.rangeKeys
1210 1 : i.rangeKeys = nil
1211 1 : return rangeKeys
1212 : }
1213 :
1214 1 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
1215 1 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1216 1 : // each snapshot stripe.
1217 1 : currentIdx := -1
1218 1 : keys := fragmented.Keys[:0]
1219 1 : for _, k := range fragmented.Keys {
1220 1 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1221 1 : if currentIdx == idx {
1222 1 : continue
1223 : }
1224 1 : if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
1225 1 : // This is the last snapshot stripe and the range tombstone
1226 1 : // can be elided.
1227 1 : break
1228 : }
1229 :
1230 1 : keys = append(keys, k)
1231 1 : if idx == 0 {
1232 1 : // This is the last snapshot stripe.
1233 1 : break
1234 : }
1235 1 : currentIdx = idx
1236 : }
1237 1 : if len(keys) > 0 {
1238 1 : i.tombstones = append(i.tombstones, keyspan.Span{
1239 1 : Start: fragmented.Start,
1240 1 : End: fragmented.End,
1241 1 : Keys: keys,
1242 1 : })
1243 1 : }
1244 : }
1245 :
1246 1 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
1247 1 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1248 1 : // do that here.
1249 1 : if len(fragmented.Keys) > 0 {
1250 1 : i.rangeKeys = append(i.rangeKeys, fragmented)
1251 1 : }
1252 : }
1253 :
1254 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1255 : // so improves compression and enables an optimization during forward iteration
1256 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1257 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1258 1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1259 1 : if !i.allowZeroSeqNum {
1260 1 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1261 1 : // make the determination on a key by key basis, similar to what is done
1262 1 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1263 1 : // that isn't too expensive.
1264 1 : return
1265 1 : }
1266 1 : if snapshotIdx > 0 {
1267 1 : // This is not the last snapshot
1268 1 : return
1269 1 : }
1270 1 : i.key.SetSeqNum(base.SeqNumZero)
1271 : }
1272 :
1273 : // A frontier is used to monitor a compaction's progression across the user
1274 : // keyspace.
1275 : //
1276 : // A frontier hold a user key boundary that it's concerned with in its `key`
1277 : // field. If/when the compaction iterator returns an InternalKey with a user key
1278 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
1279 : // frontier's `reached` function, passing _k_ as its argument.
1280 : //
1281 : // The `reached` function returns a new value to use as the key. If `reached`
1282 : // returns nil, the frontier is forgotten and its `reached` method will not be
1283 : // invoked again, unless the user calls [Update] to set a new key.
1284 : //
1285 : // A frontier's key may be updated outside the context of a `reached`
1286 : // invocation at any time, through its Update method.
1287 : type frontier struct {
1288 : // container points to the containing *frontiers that was passed to Init
1289 : // when the frontier was initialized.
1290 : container *frontiers
1291 :
1292 : // key holds the frontier's current key. If nil, this frontier is inactive
1293 : // and its reached func will not be invoked. The value of this key may only
1294 : // be updated by the `frontiers` type, or the Update method.
1295 : key []byte
1296 :
1297 : // reached is invoked to inform a frontier that its key has been reached.
1298 : // It's invoked with the user key that reached the limit. The `key` argument
1299 : // is guaranteed to be ≥ the frontier's key.
1300 : //
1301 : // After reached is invoked, the frontier's key is updated to the return
1302 : // value of `reached`. Note bene, the frontier is permitted to update its
1303 : // key to a user key ≤ the argument `key`.
1304 : //
1305 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
1306 : // frontier will receive reached(k2) calls until it returns nil or a key
1307 : // `k3` such that k2 < k3. This property is useful for frontiers that use
1308 : // `reached` invocations to drive iteration through collections of keys that
1309 : // may contain multiple keys that are both < k2 and ≥ k1.
1310 : reached func(key []byte) (next []byte)
1311 : }
1312 :
1313 : // Init initializes the frontier with the provided key and reached callback.
1314 : // The frontier is attached to the provided *frontiers and the provided reached
1315 : // func will be invoked when the *frontiers is advanced to a key ≥ this
1316 : // frontier's key.
1317 : func (f *frontier) Init(
1318 : frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
1319 1 : ) {
1320 1 : *f = frontier{
1321 1 : container: frontiers,
1322 1 : key: initialKey,
1323 1 : reached: reached,
1324 1 : }
1325 1 : if initialKey != nil {
1326 1 : f.container.push(f)
1327 1 : }
1328 : }
1329 :
1330 : // String implements fmt.Stringer.
1331 1 : func (f *frontier) String() string {
1332 1 : return string(f.key)
1333 1 : }
1334 :
1335 : // Update replaces the existing frontier's key with the provided key. The
1336 : // frontier's reached func will be invoked when the new key is reached.
1337 1 : func (f *frontier) Update(key []byte) {
1338 1 : c := f.container
1339 1 : prevKeyIsNil := f.key == nil
1340 1 : f.key = key
1341 1 : if prevKeyIsNil {
1342 1 : if key != nil {
1343 1 : c.push(f)
1344 1 : }
1345 1 : return
1346 : }
1347 :
1348 : // Find the frontier within the heap (it must exist within the heap because
1349 : // f.key was != nil). If the frontier key is now nil, remove it from the
1350 : // heap. Otherwise, fix up its position.
1351 1 : for i := 0; i < len(c.items); i++ {
1352 1 : if c.items[i] == f {
1353 1 : if key != nil {
1354 1 : c.fix(i)
1355 1 : } else {
1356 1 : n := c.len() - 1
1357 1 : c.swap(i, n)
1358 1 : c.down(i, n)
1359 1 : c.items = c.items[:n]
1360 1 : }
1361 1 : return
1362 : }
1363 : }
1364 0 : panic("unreachable")
1365 : }
1366 :
1367 : // frontiers is used to track progression of a task (eg, compaction) across the
1368 : // keyspace. Clients that want to be informed when the task advances to a key ≥
1369 : // some frontier may register a frontier, providing a callback. The task calls
1370 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
1371 : // on all tracked frontiers with `key`s ≤ k.
1372 : //
1373 : // Internally, frontiers is implemented as a simple heap.
1374 : type frontiers struct {
1375 : cmp Compare
1376 : items []*frontier
1377 : }
1378 :
1379 : // String implements fmt.Stringer.
1380 1 : func (f *frontiers) String() string {
1381 1 : var buf bytes.Buffer
1382 1 : for i := 0; i < len(f.items); i++ {
1383 1 : if i > 0 {
1384 1 : fmt.Fprint(&buf, ", ")
1385 1 : }
1386 1 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
1387 : }
1388 1 : return buf.String()
1389 : }
1390 :
1391 : // Advance notifies all member frontiers with keys ≤ k.
1392 1 : func (f *frontiers) Advance(k []byte) {
1393 1 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
1394 1 : // This frontier has been reached. Invoke the closure and update with
1395 1 : // the next frontier.
1396 1 : f.items[0].key = f.items[0].reached(k)
1397 1 : if f.items[0].key == nil {
1398 1 : // This was the final frontier that this user was concerned with.
1399 1 : // Remove it from the heap.
1400 1 : f.pop()
1401 1 : } else {
1402 1 : // Fix up the heap root.
1403 1 : f.fix(0)
1404 1 : }
1405 : }
1406 : }
1407 :
1408 1 : func (f *frontiers) len() int {
1409 1 : return len(f.items)
1410 1 : }
1411 :
1412 1 : func (f *frontiers) less(i, j int) bool {
1413 1 : return f.cmp(f.items[i].key, f.items[j].key) < 0
1414 1 : }
1415 :
1416 1 : func (f *frontiers) swap(i, j int) {
1417 1 : f.items[i], f.items[j] = f.items[j], f.items[i]
1418 1 : }
1419 :
1420 : // fix, up and down are copied from the go stdlib.
1421 :
1422 1 : func (f *frontiers) fix(i int) {
1423 1 : if !f.down(i, f.len()) {
1424 1 : f.up(i)
1425 1 : }
1426 : }
1427 :
1428 1 : func (f *frontiers) push(ff *frontier) {
1429 1 : n := len(f.items)
1430 1 : f.items = append(f.items, ff)
1431 1 : f.up(n)
1432 1 : }
1433 :
1434 1 : func (f *frontiers) pop() *frontier {
1435 1 : n := f.len() - 1
1436 1 : f.swap(0, n)
1437 1 : f.down(0, n)
1438 1 : item := f.items[n]
1439 1 : f.items = f.items[:n]
1440 1 : return item
1441 1 : }
1442 :
1443 1 : func (f *frontiers) up(j int) {
1444 1 : for {
1445 1 : i := (j - 1) / 2 // parent
1446 1 : if i == j || !f.less(j, i) {
1447 1 : break
1448 : }
1449 1 : f.swap(i, j)
1450 1 : j = i
1451 : }
1452 : }
1453 :
1454 1 : func (f *frontiers) down(i0, n int) bool {
1455 1 : i := i0
1456 1 : for {
1457 1 : j1 := 2*i + 1
1458 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
1459 1 : break
1460 : }
1461 1 : j := j1 // left child
1462 1 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
1463 1 : j = j2 // = 2*i + 2 // right child
1464 1 : }
1465 1 : if !f.less(j, i) {
1466 1 : break
1467 : }
1468 1 : f.swap(i, j)
1469 1 : i = j
1470 : }
1471 1 : return i > i0
1472 : }
|