Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "sort"
13 : "strconv"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 : // compactionIter provides a forward-only iterator that encapsulates the logic
24 : // for collapsing entries during compaction. It wraps an internal iterator and
25 : // collapses entries that are no longer necessary because they are shadowed by
26 : // newer entries. The simplest example of this is when the internal iterator
27 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
28 : // compactionIter collapses the second entry because it is no longer
29 : // necessary. The high-level structure for compactionIter is to iterate over
30 : // its internal iterator and output 1 entry for every user-key. There are four
31 : // complications to this story.
32 : //
33 : // 1. Eliding Deletion Tombstones
34 : //
35 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
36 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
37 : // shadows an entry at a lower level. If we're compacting to the base-level in
38 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
39 : // level and can be elided.
40 : //
41 : // We can do slightly better than only eliding deletion tombstones at the base
42 : // level by observing that we can elide a deletion tombstone if there are no
43 : // sstables that contain the entry's key. This check is performed by
44 : // elideTombstone.
45 : //
46 : // 2. Merges
47 : //
48 : // The MERGE operation merges the value for an entry with the existing value
49 : // for an entry. The logical value of an entry can be composed of a series of
50 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
51 : // internal iterator collapsing MERGE operations for the same key until it
52 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
53 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
54 : // merged using the specified Merger.
55 : //
56 : // An interesting case here occurs when MERGE is combined with SET. Consider
57 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
58 : // reason that the kind is changed to SET is because the SET operation acts as
59 : // a barrier preventing further merging. This can be seen better in the
60 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
61 : // (older) level and not involved in the compaction. If the compaction of
62 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
63 : // a.MERGE.1 would merge the values together incorrectly.
64 : //
65 : // 3. Snapshots
66 : //
67 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
68 : // a snapshot is a sequence number along with a guarantee from Pebble that it
69 : // will maintain the view of the database at that sequence number. Part of this
70 : // guarantee is relatively straightforward to achieve. When reading from the
71 : // database Pebble will ignore sequence numbers that are larger than the
72 : // snapshot sequence number. The primary complexity with snapshots occurs
73 : // during compaction: the collapsing of entries that are shadowed by newer
74 : // entries is at odds with the guarantee that Pebble will maintain the view of
75 : // the database at the snapshot sequence number. Rather than collapsing entries
76 : // up to the next user key, compactionIter can only collapse entries up to the
77 : // next snapshot boundary. That is, every snapshot boundary potentially causes
78 : // another entry for the same user-key to be emitted. Another way to view this
79 : // is that snapshots define stripes and entries are collapsed within stripes,
80 : // but not across stripes. Consider the following scenario:
81 : //
82 : // a.PUT.9
83 : // a.DEL.8
84 : // a.PUT.7
85 : // a.DEL.6
86 : // a.PUT.5
87 : //
88 : // In the absence of snapshots these entries would be collapsed to
89 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
90 : // be divided into two stripes and collapsed within the stripes:
91 : //
92 : // a.PUT.9 a.PUT.9
93 : // a.DEL.8 --->
94 : // a.PUT.7
95 : // -- --
96 : // a.DEL.6 ---> a.DEL.6
97 : // a.PUT.5
98 : //
99 : // All of the rules described earlier still apply, but they are confined to
100 : // operate within a snapshot stripe. Snapshots only affect compaction when the
101 : // snapshot sequence number lies within the range of sequence numbers being
102 : // compacted. In the above example, a snapshot at sequence number 10 or at
103 : // sequence number 5 would not have any effect.
104 : //
105 : // 4. Range Deletions
106 : //
107 : // Range deletions provide the ability to delete all of the keys (and values)
108 : // in a contiguous range. Range deletions are stored indexed by their start
109 : // key. The end key of the range is stored in the value. In order to support
110 : // lookup of the range deletions which overlap with a particular key, the range
111 : // deletion tombstones need to be fragmented whenever they overlap. This
112 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
113 : // subject to the rules for snapshots. For example, consider the two range
114 : // tombstones [a,e)#1 and [c,g)#2:
115 : //
116 : // 2: c-------g
117 : // 1: a-------e
118 : //
119 : // These tombstones will be fragmented into:
120 : //
121 : // 2: c---e---g
122 : // 1: a---c---e
123 : //
124 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
125 : // depends on whether it is in a new snapshot stripe.
126 : //
127 : // In addition to the fragmentation of range tombstones, compaction also needs
128 : // to take the range tombstones into consideration when outputting normal
129 : // keys. Just as with point deletions, a range deletion covering an entry can
130 : // cause the entry to be elided.
131 : //
132 : // A note on the stability of keys and values.
133 : //
134 : // The stability guarantees of keys and values returned by the iterator tree
135 : // that backs a compactionIter is nuanced and care must be taken when
136 : // referencing any returned items.
137 : //
138 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
139 : // lifetimes that fall into two categories:
140 : //
141 : // Lifetime valid for duration of compaction. Range deletion keys and values are
142 : // stable for the duration of the compaction, due to way in which a
143 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
144 : // which wraps the iterator over the range deletion block in a noCloseIter,
145 : // preventing the release of the backing memory until the compaction is
146 : // finished).
147 : //
148 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
149 : // etc.) and values must be cloned / copied following the return from the
150 : // exported function, and before a subsequent call to Next advances the iterator
151 : // and mutates the contents of the returned key and value.
152 : type compactionIter struct {
153 : equal Equal
154 : merge Merge
155 : iter internalIterator
156 : err error
157 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
158 : // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
159 : // case on return from all public methods -- these methods return `key`.
160 : // Additionally, it is the internal state when the code is moving to the
161 : // next key so it can determine whether the user key has changed from
162 : // the previous key.
163 : key InternalKey
164 : // keyTrailer is updated when `i.key` is updated and holds the key's
165 : // original trailer (eg, before any sequence-number zeroing or changes to
166 : // key kind).
167 : keyTrailer uint64
168 : value []byte
169 : valueCloser io.Closer
170 : // Temporary buffer used for storing the previous user key in order to
171 : // determine when iteration has advanced to a new user key and thus a new
172 : // snapshot stripe.
173 : keyBuf []byte
174 : // Temporary buffer used for storing the previous value, which may be an
175 : // unsafe, i.iter-owned slice that could be altered when the iterator is
176 : // advanced.
177 : valueBuf []byte
178 : // Is the current entry valid?
179 : valid bool
180 : iterKey *InternalKey
181 : iterValue []byte
182 : iterStripeChange stripeChangeType
183 : // `skip` indicates whether the remaining skippable entries in the current
184 : // snapshot stripe should be skipped or processed. An example of a non-
185 : // skippable entry is a range tombstone as we need to return it from the
186 : // `compactionIter`, even if a key covering its start key has already been
187 : // seen in the same stripe. `skip` has no effect when `pos == iterPosNext`.
188 : //
189 : // TODO(jackson): If we use keyspan.InterleavingIter for range deletions,
190 : // like we do for range keys, the only remaining 'non-skippable' key is
191 : // the invalid key. We should be able to simplify this logic and remove this
192 : // field.
193 : skip bool
194 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
195 : // (`iterPos`) values take on the following meanings in the context of
196 : // `compactionIter`.
197 : //
198 : // - `iterPosCur`: the iterator is at the last key returned.
199 : // - `iterPosNext`: the iterator has already been advanced to the next
200 : // candidate key. For example, this happens when processing merge operands,
201 : // where we advance the iterator all the way into the next stripe or next
202 : // user key to ensure we've seen all mergeable operands.
203 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
204 : pos iterPos
205 : // `snapshotPinned` indicates whether the last point key returned by the
206 : // compaction iterator was only returned because an open snapshot prevents
207 : // its elision. This field only applies to point keys, and not to range
208 : // deletions or range keys.
209 : //
210 : // For MERGE, it is possible that doing the merge is interrupted even when
211 : // the next point key is in the same stripe. This can happen if the loop in
212 : // mergeNext gets interrupted by sameStripeNonSkippable.
213 : // sameStripeNonSkippable occurs due to RANGEDELs that sort before
214 : // SET/MERGE/DEL with the same seqnum, so the RANGEDEL does not necessarily
215 : // delete the subsequent SET/MERGE/DEL keys.
216 : snapshotPinned bool
217 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
218 : // snapshotPinned is true. This value is true when the point is obsolete due
219 : // to a RANGEDEL but could not be deleted due to a snapshot.
220 : //
221 : // NB: it may seem that the additional cases that snapshotPinned captures
222 : // are harmless in that they can also be used to mark a point as obsolete
223 : // (it is merely a duplication of some logic that happens in
224 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
225 : // writing -- snapshotPinned originated in stats collection and for a
226 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
227 : // due to a snapshot, the snapshotPinned value for the SET is true.
228 : //
229 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
230 : // whether we need forceObsoleteDueToRangeDel.
231 : forceObsoleteDueToRangeDel bool
232 : // The index of the snapshot for the current key within the snapshots slice.
233 : curSnapshotIdx int
234 : curSnapshotSeqNum uint64
235 : // The snapshot sequence numbers that need to be maintained. These sequence
236 : // numbers define the snapshot stripes (see the Snapshots description
237 : // above). The sequence numbers are in ascending order.
238 : snapshots []uint64
239 : // frontiers holds a heap of user keys that affect compaction behavior when
240 : // they're exceeded. Before a new key is returned, the compaction iterator
241 : // advances the frontier, notifying any code that subscribed to be notified
242 : // when a key was reached. The primary use today is within the
243 : // implementation of compactionOutputSplitters in compaction.go. Many of
244 : // these splitters wait for the compaction iterator to call Advance(k) when
245 : // it's returning a new key. If the key that they're waiting for is
246 : // surpassed, these splitters update internal state recording that they
247 : // should request a compaction split next time they're asked in
248 : // [shouldSplitBefore].
249 : frontiers frontiers
250 : // Reference to the range deletion tombstone fragmenter (e.g.,
251 : // `compaction.rangeDelFrag`).
252 : rangeDelFrag *keyspan.Fragmenter
253 : rangeKeyFrag *keyspan.Fragmenter
254 : // The fragmented tombstones.
255 : tombstones []keyspan.Span
256 : // The fragmented range keys.
257 : rangeKeys []keyspan.Span
258 : // Byte allocator for the tombstone keys.
259 : alloc bytealloc.A
260 : allowZeroSeqNum bool
261 : elideTombstone func(key []byte) bool
262 : elideRangeTombstone func(start, end []byte) bool
263 : // The on-disk format major version. This informs the types of keys that
264 : // may be written to disk during a compaction.
265 : formatVersion FormatMajorVersion
266 : stats struct {
267 : // count of DELSIZED keys that were missized.
268 : countMissizedDels uint64
269 : }
270 : }
271 :
272 : func newCompactionIter(
273 : cmp Compare,
274 : equal Equal,
275 : formatKey base.FormatKey,
276 : merge Merge,
277 : iter internalIterator,
278 : snapshots []uint64,
279 : rangeDelFrag *keyspan.Fragmenter,
280 : rangeKeyFrag *keyspan.Fragmenter,
281 : allowZeroSeqNum bool,
282 : elideTombstone func(key []byte) bool,
283 : elideRangeTombstone func(start, end []byte) bool,
284 : formatVersion FormatMajorVersion,
285 2 : ) *compactionIter {
286 2 : i := &compactionIter{
287 2 : equal: equal,
288 2 : merge: merge,
289 2 : iter: iter,
290 2 : snapshots: snapshots,
291 2 : frontiers: frontiers{cmp: cmp},
292 2 : rangeDelFrag: rangeDelFrag,
293 2 : rangeKeyFrag: rangeKeyFrag,
294 2 : allowZeroSeqNum: allowZeroSeqNum,
295 2 : elideTombstone: elideTombstone,
296 2 : elideRangeTombstone: elideRangeTombstone,
297 2 : formatVersion: formatVersion,
298 2 : }
299 2 : i.rangeDelFrag.Cmp = cmp
300 2 : i.rangeDelFrag.Format = formatKey
301 2 : i.rangeDelFrag.Emit = i.emitRangeDelChunk
302 2 : i.rangeKeyFrag.Cmp = cmp
303 2 : i.rangeKeyFrag.Format = formatKey
304 2 : i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
305 2 : return i
306 2 : }
307 :
308 2 : func (i *compactionIter) First() (*InternalKey, []byte) {
309 2 : if i.err != nil {
310 0 : return nil, nil
311 0 : }
312 2 : var iterValue LazyValue
313 2 : i.iterKey, iterValue = i.iter.First()
314 2 : i.iterValue, _, i.err = iterValue.Value(nil)
315 2 : if i.err != nil {
316 0 : return nil, nil
317 0 : }
318 2 : if i.iterKey != nil {
319 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
320 2 : }
321 2 : i.pos = iterPosNext
322 2 : i.iterStripeChange = newStripeNewKey
323 2 : return i.Next()
324 : }
325 :
326 2 : func (i *compactionIter) Next() (*InternalKey, []byte) {
327 2 : if i.err != nil {
328 0 : return nil, nil
329 0 : }
330 :
331 : // Close the closer for the current value if one was open.
332 2 : if i.closeValueCloser() != nil {
333 0 : return nil, nil
334 0 : }
335 :
336 : // Prior to this call to `Next()` we are in one of four situations with
337 : // respect to `iterKey` and related state:
338 : //
339 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
340 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
341 : // To move forward we advance by one key, even if that lands us in the same
342 : // snapshot stripe.
343 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
344 : // To move forward we skip skippable entries in the stripe.
345 : // - `skip && pos == iterPosNext && i.iterStripeChange == sameStripeNonSkippable`:
346 : // This case may occur when skipping within a snapshot stripe and we
347 : // encounter either:
348 : // a) an invalid key kind; The previous call will have returned
349 : // whatever key it was processing and deferred handling of the
350 : // invalid key to this invocation of Next(). We're responsible for
351 : // ignoring skip=true and falling into the invalid key kind case
352 : // down below.
353 : // b) an interleaved range delete; This is a wart of the current code
354 : // structure. While skipping within a snapshot stripe, a range
355 : // delete interleaved at its start key and sequence number
356 : // interrupts the sequence of point keys. After we return the range
357 : // delete to the caller, we need to pick up skipping at where we
358 : // left off, so we preserve skip=true.
359 : // TODO(jackson): This last case is confusing and can be removed if we
360 : // interleave range deletions at the maximal sequence number using the
361 : // keyspan interleaving iterator. This is the treatment given to range
362 : // keys today.
363 2 : if i.pos == iterPosCurForward {
364 2 : if i.skip {
365 2 : i.skipInStripe()
366 2 : } else {
367 2 : i.nextInStripe()
368 2 : }
369 2 : } else if i.skip {
370 2 : if i.iterStripeChange != sameStripeNonSkippable {
371 0 : panic(errors.AssertionFailedf("compaction iterator has skip=true, but iterator is at iterPosNext"))
372 : }
373 : }
374 :
375 2 : i.pos = iterPosCurForward
376 2 : i.valid = false
377 2 :
378 2 : for i.iterKey != nil {
379 2 : // If we entered a new snapshot stripe with the same key, any key we
380 2 : // return on this iteration is only returned because the open snapshot
381 2 : // prevented it from being elided or merged with the key returned for
382 2 : // the previous stripe. Mark it as pinned so that the compaction loop
383 2 : // can correctly populate output tables' pinned statistics. We might
384 2 : // also set snapshotPinned=true down below if we observe that the key is
385 2 : // deleted by a range deletion in a higher stripe or that this key is a
386 2 : // tombstone that could be elided if only it were in the last snapshot
387 2 : // stripe.
388 2 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
389 2 :
390 2 : if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
391 2 : // Return the span so the compaction can use it for file truncation and add
392 2 : // it to the relevant fragmenter. We do not set `skip` to true before
393 2 : // returning as there may be a forthcoming point key with the same user key
394 2 : // and sequence number. Such a point key must be visible (i.e., not skipped
395 2 : // over) since we promise point keys are not deleted by range tombstones at
396 2 : // the same sequence number.
397 2 : //
398 2 : // Although, note that `skip` may already be true before reaching here
399 2 : // due to an earlier key in the stripe. Then it is fine to leave it set
400 2 : // to true, as the earlier key must have had a higher sequence number.
401 2 : //
402 2 : // NOTE: there is a subtle invariant violation here in that calling
403 2 : // saveKey and returning a reference to the temporary slice violates
404 2 : // the stability guarantee for range deletion keys. A potential
405 2 : // mediation could return the original iterKey and iterValue
406 2 : // directly, as the backing memory is guaranteed to be stable until
407 2 : // the compaction completes. The violation here is only minor in
408 2 : // that the caller immediately clones the range deletion InternalKey
409 2 : // when passing the key to the deletion fragmenter (see the
410 2 : // call-site in compaction.go).
411 2 : // TODO(travers): address this violation by removing the call to
412 2 : // saveKey and instead return the original iterKey and iterValue.
413 2 : // This goes against the comment on i.key in the struct, and
414 2 : // therefore warrants some investigation.
415 2 : i.saveKey()
416 2 : // TODO(jackson): Handle tracking pinned statistics for range keys
417 2 : // and range deletions. This would require updating
418 2 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
419 2 : // statistics when they apply their own snapshot striping logic.
420 2 : i.snapshotPinned = false
421 2 : i.value = i.iterValue
422 2 : i.valid = true
423 2 : return &i.key, i.value
424 2 : }
425 :
426 2 : if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
427 2 : // A pending range deletion deletes this key. Skip it.
428 2 : i.saveKey()
429 2 : i.skipInStripe()
430 2 : continue
431 2 : } else if cover == keyspan.CoversInvisibly {
432 2 : // i.iterKey would be deleted by a range deletion if there weren't
433 2 : // any open snapshots. Mark it as pinned.
434 2 : //
435 2 : // NB: there are multiple places in this file where we call
436 2 : // i.rangeDelFrag.Covers and this is the only one where we are writing
437 2 : // to i.snapshotPinned. Those other cases occur in mergeNext where the
438 2 : // caller is deciding whether the value should be merged or not, and the
439 2 : // key is in the same snapshot stripe. Hence, snapshotPinned is by
440 2 : // definition false in those cases.
441 2 : i.snapshotPinned = true
442 2 : i.forceObsoleteDueToRangeDel = true
443 2 : } else {
444 2 : i.forceObsoleteDueToRangeDel = false
445 2 : }
446 :
447 2 : switch i.iterKey.Kind() {
448 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
449 2 : if i.elideTombstone(i.iterKey.UserKey) {
450 2 : if i.curSnapshotIdx == 0 {
451 2 : // If we're at the last snapshot stripe and the tombstone
452 2 : // can be elided skip skippable keys in the same stripe.
453 2 : i.saveKey()
454 2 : i.skipInStripe()
455 2 : if i.iterStripeChange == newStripeSameKey {
456 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe found a new stripe within the same key"))
457 : }
458 2 : if !i.skip && i.iterStripeChange != newStripeNewKey {
459 0 : panic(errors.AssertionFailedf("pebble: skipInStripe in last stripe disabled skip without advancing to new key"))
460 : }
461 2 : continue
462 2 : } else {
463 2 : // We're not at the last snapshot stripe, so the tombstone
464 2 : // can NOT yet be elided. Mark it as pinned, so that it's
465 2 : // included in table statistics appropriately.
466 2 : i.snapshotPinned = true
467 2 : }
468 : }
469 :
470 2 : switch i.iterKey.Kind() {
471 2 : case InternalKeyKindDelete:
472 2 : i.saveKey()
473 2 : i.value = i.iterValue
474 2 : i.valid = true
475 2 : i.skip = true
476 2 : return &i.key, i.value
477 :
478 2 : case InternalKeyKindDeleteSized:
479 2 : // We may skip subsequent keys because of this tombstone. Scan
480 2 : // ahead to see just how much data this tombstone drops and if
481 2 : // the tombstone's value should be updated accordingly.
482 2 : return i.deleteSizedNext()
483 :
484 2 : case InternalKeyKindSingleDelete:
485 2 : if i.singleDeleteNext() {
486 2 : return &i.key, i.value
487 2 : }
488 2 : continue
489 : }
490 :
491 2 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
492 2 : // The key we emit for this entry is a function of the current key
493 2 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
494 2 : // entry. setNext() does the work to move the iterator forward,
495 2 : // preserving the original value, and potentially mutating the key
496 2 : // kind.
497 2 : i.setNext()
498 2 : return &i.key, i.value
499 :
500 2 : case InternalKeyKindMerge:
501 2 : // Record the snapshot index before mergeNext as merging
502 2 : // advances the iterator, adjusting curSnapshotIdx.
503 2 : origSnapshotIdx := i.curSnapshotIdx
504 2 : var valueMerger ValueMerger
505 2 : valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
506 2 : var change stripeChangeType
507 2 : if i.err == nil {
508 2 : change = i.mergeNext(valueMerger)
509 2 : }
510 2 : var needDelete bool
511 2 : if i.err == nil {
512 2 : // includesBase is true whenever we've transformed the MERGE record
513 2 : // into a SET.
514 2 : includesBase := i.key.Kind() == InternalKeyKindSet
515 2 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
516 2 : }
517 2 : if i.err == nil {
518 2 : if needDelete {
519 1 : i.valid = false
520 1 : if i.closeValueCloser() != nil {
521 0 : return nil, nil
522 0 : }
523 1 : continue
524 : }
525 : // A non-skippable entry does not necessarily cover later merge
526 : // operands, so we must not zero the current merge result's seqnum.
527 : //
528 : // For example, suppose the forthcoming two keys are a range
529 : // tombstone, `[a, b)#3`, and a merge operand, `a#3`. Recall that
530 : // range tombstones do not cover point keys at the same seqnum, so
531 : // `a#3` is not deleted. The range tombstone will be seen first due
532 : // to its larger value type. Since it is a non-skippable key, the
533 : // current merge will not include `a#3`. If we zeroed the current
534 : // merge result's seqnum, then it would conflict with the upcoming
535 : // merge including `a#3`, whose seqnum will also be zeroed.
536 2 : if change != sameStripeNonSkippable {
537 2 : i.maybeZeroSeqnum(origSnapshotIdx)
538 2 : }
539 2 : return &i.key, i.value
540 : }
541 0 : if i.err != nil {
542 0 : i.valid = false
543 0 : i.err = base.MarkCorruptionError(i.err)
544 0 : }
545 0 : return nil, nil
546 :
547 1 : default:
548 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
549 1 : i.valid = false
550 1 : return nil, nil
551 : }
552 : }
553 :
554 2 : return nil, nil
555 : }
556 :
557 2 : func (i *compactionIter) closeValueCloser() error {
558 2 : if i.valueCloser == nil {
559 2 : return nil
560 2 : }
561 :
562 0 : i.err = i.valueCloser.Close()
563 0 : i.valueCloser = nil
564 0 : if i.err != nil {
565 0 : i.valid = false
566 0 : }
567 0 : return i.err
568 : }
569 :
570 : // snapshotIndex returns the index of the first sequence number in snapshots
571 : // which is greater than or equal to seq.
572 2 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
573 2 : index := sort.Search(len(snapshots), func(i int) bool {
574 2 : return snapshots[i] > seq
575 2 : })
576 2 : if index >= len(snapshots) {
577 2 : return index, InternalKeySeqNumMax
578 2 : }
579 2 : return index, snapshots[index]
580 : }
581 :
582 : // skipInStripe skips over skippable keys in the same stripe and user key.
583 2 : func (i *compactionIter) skipInStripe() {
584 2 : i.skip = true
585 2 : for i.nextInStripe() == sameStripeSkippable {
586 2 : }
587 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
588 : // in the same stripe on a non-skippable key. In that case we should preserve
589 : // `i.skip == true` such that later keys in the stripe will continue to be
590 : // skipped.
591 2 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
592 2 : i.skip = false
593 2 : }
594 : }
595 :
596 2 : func (i *compactionIter) iterNext() bool {
597 2 : var iterValue LazyValue
598 2 : i.iterKey, iterValue = i.iter.Next()
599 2 : i.iterValue, _, i.err = iterValue.Value(nil)
600 2 : if i.err != nil {
601 0 : i.iterKey = nil
602 0 : }
603 2 : return i.iterKey != nil
604 : }
605 :
606 : // stripeChangeType indicates how the snapshot stripe changed relative to the
607 : // previous key. If no change, it also indicates whether the current entry is
608 : // skippable. If the snapshot stripe changed, it also indicates whether the new
609 : // stripe was entered because the iterator progressed onto an entirely new key
610 : // or entered a new stripe within the same key.
611 : type stripeChangeType int
612 :
613 : const (
614 : newStripeNewKey stripeChangeType = iota
615 : newStripeSameKey
616 : sameStripeSkippable
617 : sameStripeNonSkippable
618 : )
619 :
620 : // nextInStripe advances the iterator and returns one of the above const ints
621 : // indicating how its state changed.
622 : //
623 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
624 : // temporary reference to the original key, so that forward iteration can
625 : // proceed with a reference to the original key. Care should be taken to avoid
626 : // overwriting or mutating the saved key or value before they have been returned
627 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
628 2 : func (i *compactionIter) nextInStripe() stripeChangeType {
629 2 : i.iterStripeChange = i.nextInStripeHelper()
630 2 : return i.iterStripeChange
631 2 : }
632 :
633 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
634 : // nextInStripe and not call nextInStripeHelper.
635 2 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
636 2 : if !i.iterNext() {
637 2 : return newStripeNewKey
638 2 : }
639 2 : key := i.iterKey
640 2 :
641 2 : // NB: The below conditional is an optimization to avoid a user key
642 2 : // comparison in many cases. Internal keys with the same user key are
643 2 : // ordered in (strictly) descending order by trailer. If the new key has a
644 2 : // greater or equal trailer, or the previous key had a zero sequence number,
645 2 : // the new key must have a new user key.
646 2 : //
647 2 : // A couple things make these cases common:
648 2 : // - Sequence-number zeroing ensures ~all of the keys in L6 have a zero
649 2 : // sequence number.
650 2 : // - Ingested sstables' keys all adopt the same sequence number.
651 2 : if i.keyTrailer <= base.InternalKeyZeroSeqnumMaxTrailer || key.Trailer >= i.keyTrailer {
652 2 : if invariants.Enabled && i.equal(i.key.UserKey, key.UserKey) {
653 0 : prevKey := i.key
654 0 : prevKey.Trailer = i.keyTrailer
655 0 : panic(fmt.Sprintf("pebble: invariant violation: %s and %s out of order", key, prevKey))
656 : }
657 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
658 2 : return newStripeNewKey
659 2 : } else if !i.equal(i.key.UserKey, key.UserKey) {
660 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
661 2 : return newStripeNewKey
662 2 : }
663 2 : origSnapshotIdx := i.curSnapshotIdx
664 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
665 2 : switch key.Kind() {
666 2 : case InternalKeyKindRangeDelete:
667 2 : // Range tombstones need to be exposed by the compactionIter to the upper level
668 2 : // `compaction` object, so return them regardless of whether they are in the same
669 2 : // snapshot stripe.
670 2 : if i.curSnapshotIdx == origSnapshotIdx {
671 2 : return sameStripeNonSkippable
672 2 : }
673 2 : return newStripeSameKey
674 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
675 0 : // Range keys are interleaved at the max sequence number for a given user
676 0 : // key, so we should not see any more range keys in this stripe.
677 0 : panic("unreachable")
678 1 : case InternalKeyKindInvalid:
679 1 : if i.curSnapshotIdx == origSnapshotIdx {
680 1 : return sameStripeNonSkippable
681 1 : }
682 0 : return newStripeSameKey
683 : }
684 2 : if i.curSnapshotIdx == origSnapshotIdx {
685 2 : return sameStripeSkippable
686 2 : }
687 2 : return newStripeSameKey
688 : }
689 :
690 2 : func (i *compactionIter) setNext() {
691 2 : // Save the current key.
692 2 : i.saveKey()
693 2 : i.value = i.iterValue
694 2 : i.valid = true
695 2 : i.maybeZeroSeqnum(i.curSnapshotIdx)
696 2 :
697 2 : // There are two cases where we can early return and skip the remaining
698 2 : // records in the stripe:
699 2 : // - If the DB does not SETWITHDEL.
700 2 : // - If this key is already a SETWITHDEL.
701 2 : if i.formatVersion < FormatSetWithDelete ||
702 2 : i.iterKey.Kind() == InternalKeyKindSetWithDelete {
703 2 : i.skip = true
704 2 : return
705 2 : }
706 :
707 : // We are iterating forward. Save the current value.
708 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
709 2 : i.value = i.valueBuf
710 2 :
711 2 : // Else, we continue to loop through entries in the stripe looking for a
712 2 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
713 2 : for {
714 2 : switch i.nextInStripe() {
715 2 : case newStripeNewKey, newStripeSameKey:
716 2 : i.pos = iterPosNext
717 2 : return
718 2 : case sameStripeNonSkippable:
719 2 : i.pos = iterPosNext
720 2 : // We iterated onto a key that we cannot skip. We can
721 2 : // conservatively transform the original SET into a SETWITHDEL
722 2 : // as an indication that there *may* still be a DEL/SINGLEDEL
723 2 : // under this SET, even if we did not actually encounter one.
724 2 : //
725 2 : // This is safe to do, as:
726 2 : //
727 2 : // - in the case that there *is not* actually a DEL/SINGLEDEL
728 2 : // under this entry, any SINGLEDEL above this now-transformed
729 2 : // SETWITHDEL will become a DEL when the two encounter in a
730 2 : // compaction. The DEL will eventually be elided in a
731 2 : // subsequent compaction. The cost for ensuring correctness is
732 2 : // that this entry is kept around for an additional compaction
733 2 : // cycle(s).
734 2 : //
735 2 : // - in the case there *is* indeed a DEL/SINGLEDEL under us
736 2 : // (but in a different stripe or sstable), then we will have
737 2 : // already done the work to transform the SET into a
738 2 : // SETWITHDEL, and we will skip any additional iteration when
739 2 : // this entry is encountered again in a subsequent compaction.
740 2 : //
741 2 : // Ideally, this codepath would be smart enough to handle the
742 2 : // case of SET <- RANGEDEL <- ... <- DEL/SINGLEDEL <- ....
743 2 : // This requires preserving any RANGEDEL entries we encounter
744 2 : // along the way, then emitting the original (possibly
745 2 : // transformed) key, followed by the RANGEDELs. This requires
746 2 : // a sizable refactoring of the existing code, as nextInStripe
747 2 : // currently returns a sameStripeNonSkippable when it
748 2 : // encounters a RANGEDEL.
749 2 : // TODO(travers): optimize to handle the RANGEDEL case if it
750 2 : // turns out to be a performance problem.
751 2 : i.key.SetKind(InternalKeyKindSetWithDelete)
752 2 :
753 2 : // By setting i.skip=true, we are saying that after the
754 2 : // non-skippable key is emitted (which is likely a RANGEDEL),
755 2 : // the remaining point keys that share the same user key as this
756 2 : // saved key should be skipped.
757 2 : i.skip = true
758 2 : return
759 2 : case sameStripeSkippable:
760 2 : // We're still in the same stripe. If this is a
761 2 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
762 2 : // Subsequent keys are eligible for skipping.
763 2 : if i.iterKey.Kind() == InternalKeyKindDelete ||
764 2 : i.iterKey.Kind() == InternalKeyKindSingleDelete ||
765 2 : i.iterKey.Kind() == InternalKeyKindDeleteSized {
766 2 : i.key.SetKind(InternalKeyKindSetWithDelete)
767 2 : i.skip = true
768 2 : return
769 2 : }
770 0 : default:
771 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
772 : }
773 : }
774 : }
775 :
776 2 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) stripeChangeType {
777 2 : // Save the current key.
778 2 : i.saveKey()
779 2 : i.valid = true
780 2 :
781 2 : // Loop looking for older values in the current snapshot stripe and merge
782 2 : // them.
783 2 : for {
784 2 : if i.nextInStripe() != sameStripeSkippable {
785 2 : i.pos = iterPosNext
786 2 : return i.iterStripeChange
787 2 : }
788 2 : key := i.iterKey
789 2 : switch key.Kind() {
790 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
791 2 : // We've hit a deletion tombstone. Return everything up to this point and
792 2 : // then skip entries until the next snapshot stripe. We change the kind
793 2 : // of the result key to a Set so that it shadows keys in lower
794 2 : // levels. That is, MERGE+DEL -> SET.
795 2 : // We do the same for SingleDelete since SingleDelete is only
796 2 : // permitted (with deterministic behavior) for keys that have been
797 2 : // set once since the last SingleDelete/Delete, so everything
798 2 : // older is acceptable to shadow. Note that this is slightly
799 2 : // different from singleDeleteNext() which implements stricter
800 2 : // semantics in terms of applying the SingleDelete to the single
801 2 : // next Set. But those stricter semantics are not observable to
802 2 : // the end-user since Iterator interprets SingleDelete as Delete.
803 2 : // We could do something more complicated here and consume only a
804 2 : // single Set, and then merge in any following Sets, but that is
805 2 : // complicated wrt code and unnecessary given the narrow permitted
806 2 : // use of SingleDelete.
807 2 : i.key.SetKind(InternalKeyKindSet)
808 2 : i.skip = true
809 2 : return sameStripeSkippable
810 :
811 2 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
812 2 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
813 2 : // We change the kind of the result key to a Set so that it shadows
814 2 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
815 2 : // strictly necessary, but provides consistency with the behavior of
816 2 : // MERGE+DEL.
817 2 : i.key.SetKind(InternalKeyKindSet)
818 2 : i.skip = true
819 2 : return sameStripeSkippable
820 2 : }
821 :
822 : // We've hit a Set or SetWithDel value. Merge with the existing
823 : // value and return. We change the kind of the resulting key to a
824 : // Set so that it shadows keys in lower levels. That is:
825 : // MERGE + (SET*) -> SET.
826 2 : i.err = valueMerger.MergeOlder(i.iterValue)
827 2 : if i.err != nil {
828 0 : i.valid = false
829 0 : return sameStripeSkippable
830 0 : }
831 2 : i.key.SetKind(InternalKeyKindSet)
832 2 : i.skip = true
833 2 : return sameStripeSkippable
834 :
835 2 : case InternalKeyKindMerge:
836 2 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
837 2 : // We change the kind of the result key to a Set so that it shadows
838 2 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
839 2 : // strictly necessary, but provides consistency with the behavior of
840 2 : // MERGE+DEL.
841 2 : i.key.SetKind(InternalKeyKindSet)
842 2 : i.skip = true
843 2 : return sameStripeSkippable
844 2 : }
845 :
846 : // We've hit another Merge value. Merge with the existing value and
847 : // continue looping.
848 2 : i.err = valueMerger.MergeOlder(i.iterValue)
849 2 : if i.err != nil {
850 0 : i.valid = false
851 0 : return sameStripeSkippable
852 0 : }
853 :
854 0 : default:
855 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
856 0 : i.valid = false
857 0 : return sameStripeSkippable
858 : }
859 : }
860 : }
861 :
862 : // singleDeleteNext processes a SingleDelete point tombstone. A SingleDelete, or
863 : // SINGLEDEL, is unique in that it deletes exactly 1 internal key. It's a
864 : // performance optimization when the client knows a user key has not been
865 : // overwritten, allowing the elision of the tombstone earlier, avoiding write
866 : // amplification.
867 : //
868 : // singleDeleteNext returns a boolean indicating whether or not the caller
869 : // should yield the SingleDelete key to the consumer of the compactionIter. If
870 : // singleDeleteNext returns false, the caller may consume/elide the
871 : // SingleDelete.
872 2 : func (i *compactionIter) singleDeleteNext() bool {
873 2 : // Save the current key.
874 2 : i.saveKey()
875 2 : i.value = i.iterValue
876 2 : i.valid = true
877 2 :
878 2 : // Loop until finds a key to be passed to the next level.
879 2 : for {
880 2 : // If we find a key that can't be skipped, return true so that the
881 2 : // caller yields the SingleDelete to the caller.
882 2 : if i.nextInStripe() != sameStripeSkippable {
883 2 : i.pos = iterPosNext
884 2 : return true
885 2 : }
886 :
887 2 : key := i.iterKey
888 2 : switch key.Kind() {
889 2 : case InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
890 2 : // We've hit a Delete, DeleteSized, Merge, SetWithDelete, transform
891 2 : // the SingleDelete into a full Delete.
892 2 : i.key.SetKind(InternalKeyKindDelete)
893 2 : i.skip = true
894 2 : return true
895 :
896 2 : case InternalKeyKindSet:
897 2 : // This SingleDelete deletes the Set, and we can now elide the
898 2 : // SingleDel as well. We advance past the Set and return false to
899 2 : // indicate to the main compaction loop that we should NOT yield the
900 2 : // current SingleDel key to the compaction loop.
901 2 : i.nextInStripe()
902 2 : // TODO(jackson): We could assert that nextInStripe either a)
903 2 : // stepped onto a new key, or b) stepped on to a Delete, DeleteSized
904 2 : // or SingleDel key. This would detect improper uses of SingleDel,
905 2 : // but only when all three internal keys meet in the same compaction
906 2 : // which is not likely.
907 2 : i.valid = false
908 2 : return false
909 :
910 2 : case InternalKeyKindSingleDelete:
911 2 : // Two single deletes met in a compaction. With proper deterministic
912 2 : // use of SingleDelete, this should never happen. The expectation is
913 2 : // that there's exactly 1 set beneath a single delete. Currently, we
914 2 : // opt to skip it.
915 2 : // TODO(jackson): Should we make this an error? This would also
916 2 : // allow us to simplify the code a bit by removing the for loop.
917 2 : continue
918 :
919 0 : default:
920 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
921 0 : i.valid = false
922 0 : return false
923 : }
924 : }
925 : }
926 :
927 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
928 : // these tombstones carry a value that's a varint indicating the size of the
929 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
930 : //
931 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
932 : // any, are elided as a result of the tombstone.
933 2 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
934 2 : i.saveKey()
935 2 : i.valid = true
936 2 : i.skip = true
937 2 :
938 2 : // The DELSIZED tombstone may have no value at all. This happens when the
939 2 : // tombstone has already deleted the key that the user originally predicted.
940 2 : // In this case, we still peek forward in case there's another DELSIZED key
941 2 : // with a lower sequence number, in which case we'll adopt its value.
942 2 : if len(i.iterValue) == 0 {
943 2 : i.value = i.valueBuf[:0]
944 2 : } else {
945 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
946 2 : i.value = i.valueBuf
947 2 : }
948 :
949 : // Loop through all the keys within this stripe that are skippable.
950 2 : i.pos = iterPosNext
951 2 : for i.nextInStripe() == sameStripeSkippable {
952 2 : switch i.iterKey.Kind() {
953 2 : case InternalKeyKindDelete, InternalKeyKindDeleteSized, InternalKeyKindSingleDelete:
954 2 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
955 2 : // the original DELSIZED tombstone. This can happen in two cases:
956 2 : //
957 2 : // (1) These tombstones were intended to delete two distinct values,
958 2 : // and this DELSIZED has already dropped the relevant key. For
959 2 : // example:
960 2 : //
961 2 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
962 2 : //
963 2 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
964 2 : // already been zeroed out. In this case, we want to adopt the
965 2 : // value of the DELSIZED with the lower sequence number, in
966 2 : // case the a.SET.4 key has not yet been elided.
967 2 : //
968 2 : // (2) This DELSIZED was missized. The user thought they were
969 2 : // deleting a key with this user key, but this user key had
970 2 : // already been deleted.
971 2 : //
972 2 : // We can differentiate these two cases by examining the length of
973 2 : // the DELSIZED's value. A DELSIZED's value holds the size of both
974 2 : // the user key and value that it intends to delete. For any user
975 2 : // key with a length > 1, a DELSIZED that has not deleted a key must
976 2 : // have a value with a length > 1.
977 2 : //
978 2 : // We treat both cases the same functionally, adopting the identity
979 2 : // of the lower-sequence numbered tombstone. However in the second
980 2 : // case, we also increment the stat counting missized tombstones.
981 2 : if len(i.value) > 0 {
982 2 : // The original DELSIZED key was missized. The key that the user
983 2 : // thought they were deleting does not exist.
984 2 : i.stats.countMissizedDels++
985 2 : }
986 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
987 2 : i.value = i.valueBuf
988 2 : if i.iterKey.Kind() != InternalKeyKindDeleteSized {
989 2 : // Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
990 2 : // may not have deleted the key(s) it was intended to yet. The
991 2 : // ordinary DEL compaction heuristics are better suited at that,
992 2 : // plus we don't want to count it as a missized DEL. We early
993 2 : // exit in this case, after skipping the remainder of the
994 2 : // snapshot stripe.
995 2 : i.key.SetKind(InternalKeyKindDelete)
996 2 : // NB: We skipInStripe now, rather than returning leaving
997 2 : // i.skip=true and returning early, because Next() requires
998 2 : // that i.skip=true only if i.iterPos = iterPosCurForward.
999 2 : i.skipInStripe()
1000 2 : return &i.key, i.value
1001 2 : }
1002 : // Continue, in case we uncover another DELSIZED or a key this
1003 : // DELSIZED deletes.
1004 2 : default:
1005 2 : // If the DELSIZED is value-less, it already deleted the key that it
1006 2 : // was intended to delete. This is possible with a sequence like:
1007 2 : //
1008 2 : // DELSIZED.8 SET.7 SET.3
1009 2 : //
1010 2 : // The DELSIZED only describes the size of the SET.7, which in this
1011 2 : // case has already been elided. We don't count it as a missizing,
1012 2 : // instead converting the DELSIZED to a DEL. Skip the remainder of
1013 2 : // the snapshot stripe and return.
1014 2 : if len(i.value) == 0 {
1015 2 : i.key.SetKind(InternalKeyKindDelete)
1016 2 : // NB: We skipInStripe now, rather than returning leaving
1017 2 : // i.skip=true and returning early, because Next() requires
1018 2 : // that i.skip=true only if i.iterPos = iterPosCurForward.
1019 2 : i.skipInStripe()
1020 2 : return &i.key, i.value
1021 2 : }
1022 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
1023 : // has a positive size.
1024 2 : expectedSize, n := binary.Uvarint(i.value)
1025 2 : if n != len(i.value) {
1026 1 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
1027 1 : i.valid = false
1028 1 : return nil, nil
1029 1 : }
1030 2 : elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
1031 2 : if elidedSize != expectedSize {
1032 2 : // The original DELSIZED key was missized. It's unclear what to
1033 2 : // do. The user-provided size was wrong, so it's unlikely to be
1034 2 : // accurate or meaningful. We could:
1035 2 : //
1036 2 : // 1. return the DELSIZED with the original user-provided size unmodified
1037 2 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
1038 2 : // elided, even if it wasn't the anticipated size.
1039 2 : // 3. subtract the elided size from the estimate and re-encode.
1040 2 : // 4. convert the DELSIZED into a value-less DEL, so that
1041 2 : // ordinary DEL heuristics apply.
1042 2 : //
1043 2 : // We opt for (4) under the rationale that we can't rely on the
1044 2 : // user-provided size for accuracy, so ordinary DEL heuristics
1045 2 : // are safer.
1046 2 : i.stats.countMissizedDels++
1047 2 : i.key.SetKind(InternalKeyKindDelete)
1048 2 : i.value = i.valueBuf[:0]
1049 2 : i.skipInStripe()
1050 2 : return &i.key, i.value
1051 2 : }
1052 : // NB: We remove the value regardless of whether the key was sized
1053 : // appropriately. The size encoded is 'consumed' the first time it
1054 : // meets a key that it deletes.
1055 2 : i.value = i.valueBuf[:0]
1056 : }
1057 : }
1058 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
1059 : // in the same stripe on a non-skippable key. In that case we should preserve
1060 : // `i.skip == true` such that later keys in the stripe will continue to be
1061 : // skipped.
1062 2 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
1063 2 : i.skip = false
1064 2 : }
1065 2 : return &i.key, i.value
1066 : }
1067 :
1068 2 : func (i *compactionIter) saveKey() {
1069 2 : i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
1070 2 : i.key.UserKey = i.keyBuf
1071 2 : i.key.Trailer = i.iterKey.Trailer
1072 2 : i.keyTrailer = i.iterKey.Trailer
1073 2 : i.frontiers.Advance(i.key.UserKey)
1074 2 : }
1075 :
1076 2 : func (i *compactionIter) cloneKey(key []byte) []byte {
1077 2 : i.alloc, key = i.alloc.Copy(key)
1078 2 : return key
1079 2 : }
1080 :
1081 1 : func (i *compactionIter) Key() InternalKey {
1082 1 : return i.key
1083 1 : }
1084 :
1085 1 : func (i *compactionIter) Value() []byte {
1086 1 : return i.value
1087 1 : }
1088 :
1089 1 : func (i *compactionIter) Valid() bool {
1090 1 : return i.valid
1091 1 : }
1092 :
1093 1 : func (i *compactionIter) Error() error {
1094 1 : return i.err
1095 1 : }
1096 :
1097 2 : func (i *compactionIter) Close() error {
1098 2 : err := i.iter.Close()
1099 2 : if i.err == nil {
1100 2 : i.err = err
1101 2 : }
1102 :
1103 : // Close the closer for the current value if one was open.
1104 2 : if i.valueCloser != nil {
1105 0 : i.err = firstError(i.err, i.valueCloser.Close())
1106 0 : i.valueCloser = nil
1107 0 : }
1108 :
1109 2 : return i.err
1110 : }
1111 :
1112 : // Tombstones returns a list of pending range tombstones in the fragmenter
1113 : // up to the specified key, or all pending range tombstones if key = nil.
1114 2 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
1115 2 : if key == nil {
1116 2 : i.rangeDelFrag.Finish()
1117 2 : } else {
1118 2 : // The specified end key is exclusive; no versions of the specified
1119 2 : // user key (including range tombstones covering that key) should
1120 2 : // be flushed yet.
1121 2 : i.rangeDelFrag.TruncateAndFlushTo(key)
1122 2 : }
1123 2 : tombstones := i.tombstones
1124 2 : i.tombstones = nil
1125 2 : return tombstones
1126 : }
1127 :
1128 : // RangeKeys returns a list of pending fragmented range keys up to the specified
1129 : // key, or all pending range keys if key = nil.
1130 2 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
1131 2 : if key == nil {
1132 2 : i.rangeKeyFrag.Finish()
1133 2 : } else {
1134 2 : // The specified end key is exclusive; no versions of the specified
1135 2 : // user key (including range tombstones covering that key) should
1136 2 : // be flushed yet.
1137 2 : i.rangeKeyFrag.TruncateAndFlushTo(key)
1138 2 : }
1139 2 : rangeKeys := i.rangeKeys
1140 2 : i.rangeKeys = nil
1141 2 : return rangeKeys
1142 : }
1143 :
1144 2 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
1145 2 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1146 2 : // each snapshot stripe.
1147 2 : currentIdx := -1
1148 2 : keys := fragmented.Keys[:0]
1149 2 : for _, k := range fragmented.Keys {
1150 2 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1151 2 : if currentIdx == idx {
1152 2 : continue
1153 : }
1154 2 : if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
1155 2 : // This is the last snapshot stripe and the range tombstone
1156 2 : // can be elided.
1157 2 : break
1158 : }
1159 :
1160 2 : keys = append(keys, k)
1161 2 : if idx == 0 {
1162 2 : // This is the last snapshot stripe.
1163 2 : break
1164 : }
1165 2 : currentIdx = idx
1166 : }
1167 2 : if len(keys) > 0 {
1168 2 : i.tombstones = append(i.tombstones, keyspan.Span{
1169 2 : Start: fragmented.Start,
1170 2 : End: fragmented.End,
1171 2 : Keys: keys,
1172 2 : })
1173 2 : }
1174 : }
1175 :
1176 2 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
1177 2 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1178 2 : // do that here.
1179 2 : if len(fragmented.Keys) > 0 {
1180 2 : i.rangeKeys = append(i.rangeKeys, fragmented)
1181 2 : }
1182 : }
1183 :
1184 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1185 : // so improves compression and enables an optimization during forward iteration
1186 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1187 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1188 2 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1189 2 : if !i.allowZeroSeqNum {
1190 2 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1191 2 : // make the determination on a key by key basis, similar to what is done
1192 2 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1193 2 : // that isn't too expensive.
1194 2 : return
1195 2 : }
1196 2 : if snapshotIdx > 0 {
1197 2 : // This is not the last snapshot
1198 2 : return
1199 2 : }
1200 2 : i.key.SetSeqNum(base.SeqNumZero)
1201 : }
1202 :
1203 : // A frontier is used to monitor a compaction's progression across the user
1204 : // keyspace.
1205 : //
1206 : // A frontier hold a user key boundary that it's concerned with in its `key`
1207 : // field. If/when the compaction iterator returns an InternalKey with a user key
1208 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
1209 : // frontier's `reached` function, passing _k_ as its argument.
1210 : //
1211 : // The `reached` function returns a new value to use as the key. If `reached`
1212 : // returns nil, the frontier is forgotten and its `reached` method will not be
1213 : // invoked again, unless the user calls [Update] to set a new key.
1214 : //
1215 : // A frontier's key may be updated outside the context of a `reached`
1216 : // invocation at any time, through its Update method.
1217 : type frontier struct {
1218 : // container points to the containing *frontiers that was passed to Init
1219 : // when the frontier was initialized.
1220 : container *frontiers
1221 :
1222 : // key holds the frontier's current key. If nil, this frontier is inactive
1223 : // and its reached func will not be invoked. The value of this key may only
1224 : // be updated by the `frontiers` type, or the Update method.
1225 : key []byte
1226 :
1227 : // reached is invoked to inform a frontier that its key has been reached.
1228 : // It's invoked with the user key that reached the limit. The `key` argument
1229 : // is guaranteed to be ≥ the frontier's key.
1230 : //
1231 : // After reached is invoked, the frontier's key is updated to the return
1232 : // value of `reached`. Note bene, the frontier is permitted to update its
1233 : // key to a user key ≤ the argument `key`.
1234 : //
1235 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
1236 : // frontier will receive reached(k2) calls until it returns nil or a key
1237 : // `k3` such that k2 < k3. This property is useful for frontiers that use
1238 : // `reached` invocations to drive iteration through collections of keys that
1239 : // may contain multiple keys that are both < k2 and ≥ k1.
1240 : reached func(key []byte) (next []byte)
1241 : }
1242 :
1243 : // Init initializes the frontier with the provided key and reached callback.
1244 : // The frontier is attached to the provided *frontiers and the provided reached
1245 : // func will be invoked when the *frontiers is advanced to a key ≥ this
1246 : // frontier's key.
1247 : func (f *frontier) Init(
1248 : frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
1249 2 : ) {
1250 2 : *f = frontier{
1251 2 : container: frontiers,
1252 2 : key: initialKey,
1253 2 : reached: reached,
1254 2 : }
1255 2 : if initialKey != nil {
1256 2 : f.container.push(f)
1257 2 : }
1258 : }
1259 :
1260 : // String implements fmt.Stringer.
1261 1 : func (f *frontier) String() string {
1262 1 : return string(f.key)
1263 1 : }
1264 :
1265 : // Update replaces the existing frontier's key with the provided key. The
1266 : // frontier's reached func will be invoked when the new key is reached.
1267 2 : func (f *frontier) Update(key []byte) {
1268 2 : c := f.container
1269 2 : prevKeyIsNil := f.key == nil
1270 2 : f.key = key
1271 2 : if prevKeyIsNil {
1272 2 : if key != nil {
1273 2 : c.push(f)
1274 2 : }
1275 2 : return
1276 : }
1277 :
1278 : // Find the frontier within the heap (it must exist within the heap because
1279 : // f.key was != nil). If the frontier key is now nil, remove it from the
1280 : // heap. Otherwise, fix up its position.
1281 2 : for i := 0; i < len(c.items); i++ {
1282 2 : if c.items[i] == f {
1283 2 : if key != nil {
1284 2 : c.fix(i)
1285 2 : } else {
1286 2 : n := c.len() - 1
1287 2 : c.swap(i, n)
1288 2 : c.down(i, n)
1289 2 : c.items = c.items[:n]
1290 2 : }
1291 2 : return
1292 : }
1293 : }
1294 0 : panic("unreachable")
1295 : }
1296 :
1297 : // frontiers is used to track progression of a task (eg, compaction) across the
1298 : // keyspace. Clients that want to be informed when the task advances to a key ≥
1299 : // some frontier may register a frontier, providing a callback. The task calls
1300 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
1301 : // on all tracked frontiers with `key`s ≤ k.
1302 : //
1303 : // Internally, frontiers is implemented as a simple heap.
1304 : type frontiers struct {
1305 : cmp Compare
1306 : items []*frontier
1307 : }
1308 :
1309 : // String implements fmt.Stringer.
1310 1 : func (f *frontiers) String() string {
1311 1 : var buf bytes.Buffer
1312 1 : for i := 0; i < len(f.items); i++ {
1313 1 : if i > 0 {
1314 1 : fmt.Fprint(&buf, ", ")
1315 1 : }
1316 1 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
1317 : }
1318 1 : return buf.String()
1319 : }
1320 :
1321 : // Advance notifies all member frontiers with keys ≤ k.
1322 2 : func (f *frontiers) Advance(k []byte) {
1323 2 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
1324 2 : // This frontier has been reached. Invoke the closure and update with
1325 2 : // the next frontier.
1326 2 : f.items[0].key = f.items[0].reached(k)
1327 2 : if f.items[0].key == nil {
1328 2 : // This was the final frontier that this user was concerned with.
1329 2 : // Remove it from the heap.
1330 2 : f.pop()
1331 2 : } else {
1332 2 : // Fix up the heap root.
1333 2 : f.fix(0)
1334 2 : }
1335 : }
1336 : }
1337 :
1338 2 : func (f *frontiers) len() int {
1339 2 : return len(f.items)
1340 2 : }
1341 :
1342 2 : func (f *frontiers) less(i, j int) bool {
1343 2 : return f.cmp(f.items[i].key, f.items[j].key) < 0
1344 2 : }
1345 :
1346 2 : func (f *frontiers) swap(i, j int) {
1347 2 : f.items[i], f.items[j] = f.items[j], f.items[i]
1348 2 : }
1349 :
1350 : // fix, up and down are copied from the go stdlib.
1351 :
1352 2 : func (f *frontiers) fix(i int) {
1353 2 : if !f.down(i, f.len()) {
1354 2 : f.up(i)
1355 2 : }
1356 : }
1357 :
1358 2 : func (f *frontiers) push(ff *frontier) {
1359 2 : n := len(f.items)
1360 2 : f.items = append(f.items, ff)
1361 2 : f.up(n)
1362 2 : }
1363 :
1364 2 : func (f *frontiers) pop() *frontier {
1365 2 : n := f.len() - 1
1366 2 : f.swap(0, n)
1367 2 : f.down(0, n)
1368 2 : item := f.items[n]
1369 2 : f.items = f.items[:n]
1370 2 : return item
1371 2 : }
1372 :
1373 2 : func (f *frontiers) up(j int) {
1374 2 : for {
1375 2 : i := (j - 1) / 2 // parent
1376 2 : if i == j || !f.less(j, i) {
1377 2 : break
1378 : }
1379 2 : f.swap(i, j)
1380 2 : j = i
1381 : }
1382 : }
1383 :
1384 2 : func (f *frontiers) down(i0, n int) bool {
1385 2 : i := i0
1386 2 : for {
1387 2 : j1 := 2*i + 1
1388 2 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
1389 2 : break
1390 : }
1391 2 : j := j1 // left child
1392 2 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
1393 2 : j = j2 // = 2*i + 2 // right child
1394 2 : }
1395 2 : if !f.less(j, i) {
1396 2 : break
1397 : }
1398 2 : f.swap(i, j)
1399 2 : i = j
1400 : }
1401 2 : return i > i0
1402 : }
|