Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "sort"
13 : "strconv"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 : // compactionIter provides a forward-only iterator that encapsulates the logic
24 : // for collapsing entries during compaction. It wraps an internal iterator and
25 : // collapses entries that are no longer necessary because they are shadowed by
26 : // newer entries. The simplest example of this is when the internal iterator
27 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
28 : // compactionIter collapses the second entry because it is no longer
29 : // necessary. The high-level structure for compactionIter is to iterate over
30 : // its internal iterator and output 1 entry for every user-key. There are four
31 : // complications to this story.
32 : //
33 : // 1. Eliding Deletion Tombstones
34 : //
35 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
36 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
37 : // shadows an entry at a lower level. If we're compacting to the base-level in
38 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
39 : // level and can be elided.
40 : //
41 : // We can do slightly better than only eliding deletion tombstones at the base
42 : // level by observing that we can elide a deletion tombstone if there are no
43 : // sstables that contain the entry's key. This check is performed by
44 : // elideTombstone.
45 : //
46 : // 2. Merges
47 : //
48 : // The MERGE operation merges the value for an entry with the existing value
49 : // for an entry. The logical value of an entry can be composed of a series of
50 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
51 : // internal iterator collapsing MERGE operations for the same key until it
52 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
53 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
54 : // merged using the specified Merger.
55 : //
56 : // An interesting case here occurs when MERGE is combined with SET. Consider
57 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
58 : // reason that the kind is changed to SET is because the SET operation acts as
59 : // a barrier preventing further merging. This can be seen better in the
60 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
61 : // (older) level and not involved in the compaction. If the compaction of
62 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
63 : // a.MERGE.1 would merge the values together incorrectly.
64 : //
65 : // 3. Snapshots
66 : //
67 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
68 : // a snapshot is a sequence number along with a guarantee from Pebble that it
69 : // will maintain the view of the database at that sequence number. Part of this
70 : // guarantee is relatively straightforward to achieve. When reading from the
71 : // database Pebble will ignore sequence numbers that are larger than the
72 : // snapshot sequence number. The primary complexity with snapshots occurs
73 : // during compaction: the collapsing of entries that are shadowed by newer
74 : // entries is at odds with the guarantee that Pebble will maintain the view of
75 : // the database at the snapshot sequence number. Rather than collapsing entries
76 : // up to the next user key, compactionIter can only collapse entries up to the
77 : // next snapshot boundary. That is, every snapshot boundary potentially causes
78 : // another entry for the same user-key to be emitted. Another way to view this
79 : // is that snapshots define stripes and entries are collapsed within stripes,
80 : // but not across stripes. Consider the following scenario:
81 : //
82 : // a.PUT.9
83 : // a.DEL.8
84 : // a.PUT.7
85 : // a.DEL.6
86 : // a.PUT.5
87 : //
88 : // In the absence of snapshots these entries would be collapsed to
89 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
90 : // be divided into two stripes and collapsed within the stripes:
91 : //
92 : // a.PUT.9 a.PUT.9
93 : // a.DEL.8 --->
94 : // a.PUT.7
95 : // -- --
96 : // a.DEL.6 ---> a.DEL.6
97 : // a.PUT.5
98 : //
99 : // All of the rules described earlier still apply, but they are confined to
100 : // operate within a snapshot stripe. Snapshots only affect compaction when the
101 : // snapshot sequence number lies within the range of sequence numbers being
102 : // compacted. In the above example, a snapshot at sequence number 10 or at
103 : // sequence number 5 would not have any effect.
104 : //
105 : // 4. Range Deletions
106 : //
107 : // Range deletions provide the ability to delete all of the keys (and values)
108 : // in a contiguous range. Range deletions are stored indexed by their start
109 : // key. The end key of the range is stored in the value. In order to support
110 : // lookup of the range deletions which overlap with a particular key, the range
111 : // deletion tombstones need to be fragmented whenever they overlap. This
112 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
113 : // subject to the rules for snapshots. For example, consider the two range
114 : // tombstones [a,e)#1 and [c,g)#2:
115 : //
116 : // 2: c-------g
117 : // 1: a-------e
118 : //
119 : // These tombstones will be fragmented into:
120 : //
121 : // 2: c---e---g
122 : // 1: a---c---e
123 : //
124 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
125 : // depends on whether it is in a new snapshot stripe.
126 : //
127 : // In addition to the fragmentation of range tombstones, compaction also needs
128 : // to take the range tombstones into consideration when outputting normal
129 : // keys. Just as with point deletions, a range deletion covering an entry can
130 : // cause the entry to be elided.
131 : //
132 : // A note on the stability of keys and values.
133 : //
134 : // The stability guarantees of keys and values returned by the iterator tree
135 : // that backs a compactionIter is nuanced and care must be taken when
136 : // referencing any returned items.
137 : //
138 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
139 : // lifetimes that fall into two categories:
140 : //
141 : // Lifetime valid for duration of compaction. Range deletion keys and values are
142 : // stable for the duration of the compaction, due to way in which a
143 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
144 : // which wraps the iterator over the range deletion block in a noCloseIter,
145 : // preventing the release of the backing memory until the compaction is
146 : // finished).
147 : //
148 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
149 : // etc.) and values must be cloned / copied following the return from the
150 : // exported function, and before a subsequent call to Next advances the iterator
151 : // and mutates the contents of the returned key and value.
152 : type compactionIter struct {
153 : equal Equal
154 : merge Merge
155 : iter internalIterator
156 : err error
157 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
158 : // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
159 : // case on return from all public methods -- these methods return `key`.
160 : // Additionally, it is the internal state when the code is moving to the
161 : // next key so it can determine whether the user key has changed from
162 : // the previous key.
163 : key InternalKey
164 : // keyTrailer is updated when `i.key` is updated and holds the key's
165 : // original trailer (eg, before any sequence-number zeroing or changes to
166 : // key kind).
167 : keyTrailer uint64
168 : value []byte
169 : valueCloser io.Closer
170 : // Temporary buffer used for storing the previous user key in order to
171 : // determine when iteration has advanced to a new user key and thus a new
172 : // snapshot stripe.
173 : keyBuf []byte
174 : // Temporary buffer used for storing the previous value, which may be an
175 : // unsafe, i.iter-owned slice that could be altered when the iterator is
176 : // advanced.
177 : valueBuf []byte
178 : // Is the current entry valid?
179 : valid bool
180 : iterKey *InternalKey
181 : iterValue []byte
182 : iterStripeChange stripeChangeType
183 : // `skip` indicates whether the remaining skippable entries in the current
184 : // snapshot stripe should be skipped or processed. An example of a non-
185 : // skippable entry is a range tombstone as we need to return it from the
186 : // `compactionIter`, even if a key covering its start key has already been
187 : // seen in the same stripe. `skip` has no effect when `pos == iterPosNext`.
188 : //
189 : // TODO(jackson): If we use keyspan.InterleavingIter for range deletions,
190 : // like we do for range keys, the only remaining 'non-skippable' key is
191 : // the invalid key. We should be able to simplify this logic and remove this
192 : // field.
193 : skip bool
194 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
195 : // (`iterPos`) values take on the following meanings in the context of
196 : // `compactionIter`.
197 : //
198 : // - `iterPosCur`: the iterator is at the last key returned.
199 : // - `iterPosNext`: the iterator has already been advanced to the next
200 : // candidate key. For example, this happens when processing merge operands,
201 : // where we advance the iterator all the way into the next stripe or next
202 : // user key to ensure we've seen all mergeable operands.
203 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
204 : pos iterPos
205 : // `snapshotPinned` indicates whether the last point key returned by the
206 : // compaction iterator was only returned because an open snapshot prevents
207 : // its elision. This field only applies to point keys, and not to range
208 : // deletions or range keys.
209 : //
210 : // For MERGE, it is possible that doing the merge is interrupted even when
211 : // the next point key is in the same stripe. This can happen if the loop in
212 : // mergeNext gets interrupted by sameStripeNonSkippable.
213 : // sameStripeNonSkippable occurs due to RANGEDELs that sort before
214 : // SET/MERGE/DEL with the same seqnum, so the RANGEDEL does not necessarily
215 : // delete the subsequent SET/MERGE/DEL keys.
216 : snapshotPinned bool
217 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
218 : // snapshotPinned is true. This value is true when the point is obsolete due
219 : // to a RANGEDEL but could not be deleted due to a snapshot.
220 : //
221 : // NB: it may seem that the additional cases that snapshotPinned captures
222 : // are harmless in that they can also be used to mark a point as obsolete
223 : // (it is merely a duplication of some logic that happens in
224 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
225 : // writing -- snapshotPinned originated in stats collection and for a
226 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
227 : // due to a snapshot, the snapshotPinned value for the SET is true.
228 : //
229 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
230 : // whether we need forceObsoleteDueToRangeDel.
231 : forceObsoleteDueToRangeDel bool
232 : // The index of the snapshot for the current key within the snapshots slice.
233 : curSnapshotIdx int
234 : curSnapshotSeqNum uint64
235 : // The snapshot sequence numbers that need to be maintained. These sequence
236 : // numbers define the snapshot stripes (see the Snapshots description
237 : // above). The sequence numbers are in ascending order.
238 : snapshots []uint64
239 : // frontiers holds a heap of user keys that affect compaction behavior when
240 : // they're exceeded. Before a new key is returned, the compaction iterator
241 : // advances the frontier, notifying any code that subscribed to be notified
242 : // when a key was reached. The primary use today is within the
243 : // implementation of compactionOutputSplitters in compaction.go. Many of
244 : // these splitters wait for the compaction iterator to call Advance(k) when
245 : // it's returning a new key. If the key that they're waiting for is
246 : // surpassed, these splitters update internal state recording that they
247 : // should request a compaction split next time they're asked in
248 : // [shouldSplitBefore].
249 : frontiers frontiers
250 : // Reference to the range deletion tombstone fragmenter (e.g.,
251 : // `compaction.rangeDelFrag`).
252 : rangeDelFrag *keyspan.Fragmenter
253 : rangeKeyFrag *keyspan.Fragmenter
254 : // The fragmented tombstones.
255 : tombstones []keyspan.Span
256 : // The fragmented range keys.
257 : rangeKeys []keyspan.Span
258 : // Byte allocator for the tombstone keys.
259 : alloc bytealloc.A
260 : allowZeroSeqNum bool
261 : elideTombstone func(key []byte) bool
262 : elideRangeTombstone func(start, end []byte) bool
263 : // The on-disk format major version. This informs the types of keys that
264 : // may be written to disk during a compaction.
265 : formatVersion FormatMajorVersion
266 : stats struct {
267 : // count of DELSIZED keys that were missized.
268 : countMissizedDels uint64
269 : }
270 : }
271 :
272 : func newCompactionIter(
273 : cmp Compare,
274 : equal Equal,
275 : formatKey base.FormatKey,
276 : merge Merge,
277 : iter internalIterator,
278 : snapshots []uint64,
279 : rangeDelFrag *keyspan.Fragmenter,
280 : rangeKeyFrag *keyspan.Fragmenter,
281 : allowZeroSeqNum bool,
282 : elideTombstone func(key []byte) bool,
283 : elideRangeTombstone func(start, end []byte) bool,
284 : formatVersion FormatMajorVersion,
285 2 : ) *compactionIter {
286 2 : i := &compactionIter{
287 2 : equal: equal,
288 2 : merge: merge,
289 2 : iter: iter,
290 2 : snapshots: snapshots,
291 2 : frontiers: frontiers{cmp: cmp},
292 2 : rangeDelFrag: rangeDelFrag,
293 2 : rangeKeyFrag: rangeKeyFrag,
294 2 : allowZeroSeqNum: allowZeroSeqNum,
295 2 : elideTombstone: elideTombstone,
296 2 : elideRangeTombstone: elideRangeTombstone,
297 2 : formatVersion: formatVersion,
298 2 : }
299 2 : i.rangeDelFrag.Cmp = cmp
300 2 : i.rangeDelFrag.Format = formatKey
301 2 : i.rangeDelFrag.Emit = i.emitRangeDelChunk
302 2 : i.rangeKeyFrag.Cmp = cmp
303 2 : i.rangeKeyFrag.Format = formatKey
304 2 : i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
305 2 : return i
306 2 : }
307 :
308 2 : func (i *compactionIter) First() (*InternalKey, []byte) {
309 2 : if i.err != nil {
310 0 : return nil, nil
311 0 : }
312 2 : var iterValue LazyValue
313 2 : i.iterKey, iterValue = i.iter.First()
314 2 : i.iterValue, _, i.err = iterValue.Value(nil)
315 2 : if i.err != nil {
316 0 : return nil, nil
317 0 : }
318 2 : if i.iterKey != nil {
319 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
320 2 : }
321 2 : i.pos = iterPosNext
322 2 : i.iterStripeChange = newStripeNewKey
323 2 : return i.Next()
324 : }
325 :
326 2 : func (i *compactionIter) Next() (*InternalKey, []byte) {
327 2 : if i.err != nil {
328 0 : return nil, nil
329 0 : }
330 :
331 : // Close the closer for the current value if one was open.
332 2 : if i.closeValueCloser() != nil {
333 0 : return nil, nil
334 0 : }
335 :
336 : // Prior to this call to `Next()` we are in one of three situations with
337 : // respect to `iterKey` and related state:
338 : //
339 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
340 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
341 : // To move forward we advance by one key, even if that lands us in the same
342 : // snapshot stripe.
343 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
344 : // To move forward we skip skippable entries in the stripe.
345 2 : if i.pos == iterPosCurForward {
346 2 : if i.skip {
347 2 : i.skipInStripe()
348 2 : } else {
349 2 : i.nextInStripe()
350 2 : }
351 : }
352 :
353 2 : i.pos = iterPosCurForward
354 2 : i.valid = false
355 2 :
356 2 : for i.iterKey != nil {
357 2 : // If we entered a new snapshot stripe with the same key, any key we
358 2 : // return on this iteration is only returned because the open snapshot
359 2 : // prevented it from being elided or merged with the key returned for
360 2 : // the previous stripe. Mark it as pinned so that the compaction loop
361 2 : // can correctly populate output tables' pinned statistics. We might
362 2 : // also set snapshotPinned=true down below if we observe that the key is
363 2 : // deleted by a range deletion in a higher stripe or that this key is a
364 2 : // tombstone that could be elided if only it were in the last snapshot
365 2 : // stripe.
366 2 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
367 2 :
368 2 : if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
369 2 : // Return the span so the compaction can use it for file truncation and add
370 2 : // it to the relevant fragmenter. We do not set `skip` to true before
371 2 : // returning as there may be a forthcoming point key with the same user key
372 2 : // and sequence number. Such a point key must be visible (i.e., not skipped
373 2 : // over) since we promise point keys are not deleted by range tombstones at
374 2 : // the same sequence number.
375 2 : //
376 2 : // Although, note that `skip` may already be true before reaching here
377 2 : // due to an earlier key in the stripe. Then it is fine to leave it set
378 2 : // to true, as the earlier key must have had a higher sequence number.
379 2 : //
380 2 : // NOTE: there is a subtle invariant violation here in that calling
381 2 : // saveKey and returning a reference to the temporary slice violates
382 2 : // the stability guarantee for range deletion keys. A potential
383 2 : // mediation could return the original iterKey and iterValue
384 2 : // directly, as the backing memory is guaranteed to be stable until
385 2 : // the compaction completes. The violation here is only minor in
386 2 : // that the caller immediately clones the range deletion InternalKey
387 2 : // when passing the key to the deletion fragmenter (see the
388 2 : // call-site in compaction.go).
389 2 : // TODO(travers): address this violation by removing the call to
390 2 : // saveKey and instead return the original iterKey and iterValue.
391 2 : // This goes against the comment on i.key in the struct, and
392 2 : // therefore warrants some investigation.
393 2 : i.saveKey()
394 2 : // TODO(jackson): Handle tracking pinned statistics for range keys
395 2 : // and range deletions. This would require updating
396 2 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
397 2 : // statistics when they apply their own snapshot striping logic.
398 2 : i.snapshotPinned = false
399 2 : i.value = i.iterValue
400 2 : i.valid = true
401 2 : return &i.key, i.value
402 2 : }
403 :
404 2 : if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
405 2 : // A pending range deletion deletes this key. Skip it.
406 2 : i.saveKey()
407 2 : i.skipInStripe()
408 2 : continue
409 2 : } else if cover == keyspan.CoversInvisibly {
410 2 : // i.iterKey would be deleted by a range deletion if there weren't
411 2 : // any open snapshots. Mark it as pinned.
412 2 : //
413 2 : // NB: there are multiple places in this file where we call
414 2 : // i.rangeDelFrag.Covers and this is the only one where we are writing
415 2 : // to i.snapshotPinned. Those other cases occur in mergeNext where the
416 2 : // caller is deciding whether the value should be merged or not, and the
417 2 : // key is in the same snapshot stripe. Hence, snapshotPinned is by
418 2 : // definition false in those cases.
419 2 : i.snapshotPinned = true
420 2 : i.forceObsoleteDueToRangeDel = true
421 2 : } else {
422 2 : i.forceObsoleteDueToRangeDel = false
423 2 : }
424 :
425 2 : switch i.iterKey.Kind() {
426 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
427 2 : if i.elideTombstone(i.iterKey.UserKey) {
428 2 : if i.curSnapshotIdx == 0 {
429 2 : // If we're at the last snapshot stripe and the tombstone
430 2 : // can be elided skip skippable keys in the same stripe.
431 2 : i.saveKey()
432 2 : i.skipInStripe()
433 2 : continue
434 2 : } else {
435 2 : // We're not at the last snapshot stripe, so the tombstone
436 2 : // can NOT yet be elided. Mark it as pinned, so that it's
437 2 : // included in table statistics appropriately.
438 2 : i.snapshotPinned = true
439 2 : }
440 : }
441 :
442 2 : switch i.iterKey.Kind() {
443 2 : case InternalKeyKindDelete:
444 2 : i.saveKey()
445 2 : i.value = i.iterValue
446 2 : i.valid = true
447 2 : i.skip = true
448 2 : return &i.key, i.value
449 :
450 2 : case InternalKeyKindDeleteSized:
451 2 : // We may skip subsequent keys because of this tombstone. Scan
452 2 : // ahead to see just how much data this tombstone drops and if
453 2 : // the tombstone's value should be updated accordingly.
454 2 : return i.deleteSizedNext()
455 :
456 2 : case InternalKeyKindSingleDelete:
457 2 : if i.singleDeleteNext() {
458 2 : return &i.key, i.value
459 2 : }
460 2 : continue
461 : }
462 :
463 2 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
464 2 : // The key we emit for this entry is a function of the current key
465 2 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
466 2 : // entry. setNext() does the work to move the iterator forward,
467 2 : // preserving the original value, and potentially mutating the key
468 2 : // kind.
469 2 : i.setNext()
470 2 : return &i.key, i.value
471 :
472 2 : case InternalKeyKindMerge:
473 2 : // Record the snapshot index before mergeNext as merging
474 2 : // advances the iterator, adjusting curSnapshotIdx.
475 2 : origSnapshotIdx := i.curSnapshotIdx
476 2 : var valueMerger ValueMerger
477 2 : valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
478 2 : var change stripeChangeType
479 2 : if i.err == nil {
480 2 : change = i.mergeNext(valueMerger)
481 2 : }
482 2 : var needDelete bool
483 2 : if i.err == nil {
484 2 : // includesBase is true whenever we've transformed the MERGE record
485 2 : // into a SET.
486 2 : includesBase := i.key.Kind() == InternalKeyKindSet
487 2 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
488 2 : }
489 2 : if i.err == nil {
490 2 : if needDelete {
491 1 : i.valid = false
492 1 : if i.closeValueCloser() != nil {
493 0 : return nil, nil
494 0 : }
495 1 : continue
496 : }
497 : // A non-skippable entry does not necessarily cover later merge
498 : // operands, so we must not zero the current merge result's seqnum.
499 : //
500 : // For example, suppose the forthcoming two keys are a range
501 : // tombstone, `[a, b)#3`, and a merge operand, `a#3`. Recall that
502 : // range tombstones do not cover point keys at the same seqnum, so
503 : // `a#3` is not deleted. The range tombstone will be seen first due
504 : // to its larger value type. Since it is a non-skippable key, the
505 : // current merge will not include `a#3`. If we zeroed the current
506 : // merge result's seqnum, then it would conflict with the upcoming
507 : // merge including `a#3`, whose seqnum will also be zeroed.
508 2 : if change != sameStripeNonSkippable {
509 2 : i.maybeZeroSeqnum(origSnapshotIdx)
510 2 : }
511 2 : return &i.key, i.value
512 : }
513 0 : if i.err != nil {
514 0 : i.valid = false
515 0 : i.err = base.MarkCorruptionError(i.err)
516 0 : }
517 0 : return nil, nil
518 :
519 1 : default:
520 1 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
521 1 : i.valid = false
522 1 : return nil, nil
523 : }
524 : }
525 :
526 2 : return nil, nil
527 : }
528 :
529 2 : func (i *compactionIter) closeValueCloser() error {
530 2 : if i.valueCloser == nil {
531 2 : return nil
532 2 : }
533 :
534 0 : i.err = i.valueCloser.Close()
535 0 : i.valueCloser = nil
536 0 : if i.err != nil {
537 0 : i.valid = false
538 0 : }
539 0 : return i.err
540 : }
541 :
542 : // snapshotIndex returns the index of the first sequence number in snapshots
543 : // which is greater than or equal to seq.
544 2 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
545 2 : index := sort.Search(len(snapshots), func(i int) bool {
546 2 : return snapshots[i] > seq
547 2 : })
548 2 : if index >= len(snapshots) {
549 2 : return index, InternalKeySeqNumMax
550 2 : }
551 2 : return index, snapshots[index]
552 : }
553 :
554 : // skipInStripe skips over skippable keys in the same stripe and user key.
555 2 : func (i *compactionIter) skipInStripe() {
556 2 : i.skip = true
557 2 : for i.nextInStripe() == sameStripeSkippable {
558 2 : }
559 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
560 : // in the same stripe on a non-skippable key. In that case we should preserve
561 : // `i.skip == true` such that later keys in the stripe will continue to be
562 : // skipped.
563 2 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
564 2 : i.skip = false
565 2 : }
566 : }
567 :
568 2 : func (i *compactionIter) iterNext() bool {
569 2 : var iterValue LazyValue
570 2 : i.iterKey, iterValue = i.iter.Next()
571 2 : i.iterValue, _, i.err = iterValue.Value(nil)
572 2 : if i.err != nil {
573 0 : i.iterKey = nil
574 0 : }
575 2 : return i.iterKey != nil
576 : }
577 :
578 : // stripeChangeType indicates how the snapshot stripe changed relative to the
579 : // previous key. If no change, it also indicates whether the current entry is
580 : // skippable. If the snapshot stripe changed, it also indicates whether the new
581 : // stripe was entered because the iterator progressed onto an entirely new key
582 : // or entered a new stripe within the same key.
583 : type stripeChangeType int
584 :
585 : const (
586 : newStripeNewKey stripeChangeType = iota
587 : newStripeSameKey
588 : sameStripeSkippable
589 : sameStripeNonSkippable
590 : )
591 :
592 : // nextInStripe advances the iterator and returns one of the above const ints
593 : // indicating how its state changed.
594 : //
595 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
596 : // temporary reference to the original key, so that forward iteration can
597 : // proceed with a reference to the original key. Care should be taken to avoid
598 : // overwriting or mutating the saved key or value before they have been returned
599 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
600 2 : func (i *compactionIter) nextInStripe() stripeChangeType {
601 2 : i.iterStripeChange = i.nextInStripeHelper()
602 2 : return i.iterStripeChange
603 2 : }
604 :
605 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
606 : // nextInStripe and not call nextInStripeHelper.
607 2 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
608 2 : if !i.iterNext() {
609 2 : return newStripeNewKey
610 2 : }
611 2 : key := i.iterKey
612 2 :
613 2 : // NB: The below conditional is an optimization to avoid a user key
614 2 : // comparison in many cases. Internal keys with the same user key are
615 2 : // ordered in (strictly) descending order by trailer. If the new key has a
616 2 : // greater or equal trailer, or the previous key had a zero sequence number,
617 2 : // the new key must have a new user key.
618 2 : //
619 2 : // A couple things make these cases common:
620 2 : // - Sequence-number zeroing ensures ~all of the keys in L6 have a zero
621 2 : // sequence number.
622 2 : // - Ingested sstables' keys all adopt the same sequence number.
623 2 : if i.keyTrailer <= base.InternalKeyZeroSeqnumMaxTrailer || key.Trailer >= i.keyTrailer {
624 2 : if invariants.Enabled && i.equal(i.key.UserKey, key.UserKey) {
625 0 : prevKey := i.key
626 0 : prevKey.Trailer = i.keyTrailer
627 0 : panic(fmt.Sprintf("pebble: invariant violation: %s and %s out of order", key, prevKey))
628 : }
629 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
630 2 : return newStripeNewKey
631 2 : } else if !i.equal(i.key.UserKey, key.UserKey) {
632 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
633 2 : return newStripeNewKey
634 2 : }
635 2 : origSnapshotIdx := i.curSnapshotIdx
636 2 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
637 2 : switch key.Kind() {
638 2 : case InternalKeyKindRangeDelete:
639 2 : // Range tombstones need to be exposed by the compactionIter to the upper level
640 2 : // `compaction` object, so return them regardless of whether they are in the same
641 2 : // snapshot stripe.
642 2 : if i.curSnapshotIdx == origSnapshotIdx {
643 2 : return sameStripeNonSkippable
644 2 : }
645 2 : return newStripeSameKey
646 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
647 0 : // Range keys are interleaved at the max sequence number for a given user
648 0 : // key, so we should not see any more range keys in this stripe.
649 0 : panic("unreachable")
650 1 : case InternalKeyKindInvalid:
651 1 : if i.curSnapshotIdx == origSnapshotIdx {
652 1 : return sameStripeNonSkippable
653 1 : }
654 0 : return newStripeSameKey
655 : }
656 2 : if i.curSnapshotIdx == origSnapshotIdx {
657 2 : return sameStripeSkippable
658 2 : }
659 2 : return newStripeSameKey
660 : }
661 :
662 2 : func (i *compactionIter) setNext() {
663 2 : // Save the current key.
664 2 : i.saveKey()
665 2 : i.value = i.iterValue
666 2 : i.valid = true
667 2 : i.maybeZeroSeqnum(i.curSnapshotIdx)
668 2 :
669 2 : // There are two cases where we can early return and skip the remaining
670 2 : // records in the stripe:
671 2 : // - If the DB does not SETWITHDEL.
672 2 : // - If this key is already a SETWITHDEL.
673 2 : if i.formatVersion < FormatSetWithDelete ||
674 2 : i.iterKey.Kind() == InternalKeyKindSetWithDelete {
675 2 : i.skip = true
676 2 : return
677 2 : }
678 :
679 : // We are iterating forward. Save the current value.
680 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
681 2 : i.value = i.valueBuf
682 2 :
683 2 : // Else, we continue to loop through entries in the stripe looking for a
684 2 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
685 2 : for {
686 2 : switch i.nextInStripe() {
687 2 : case newStripeNewKey, newStripeSameKey:
688 2 : i.pos = iterPosNext
689 2 : return
690 2 : case sameStripeNonSkippable:
691 2 : i.pos = iterPosNext
692 2 : // We iterated onto a key that we cannot skip. We can
693 2 : // conservatively transform the original SET into a SETWITHDEL
694 2 : // as an indication that there *may* still be a DEL/SINGLEDEL
695 2 : // under this SET, even if we did not actually encounter one.
696 2 : //
697 2 : // This is safe to do, as:
698 2 : //
699 2 : // - in the case that there *is not* actually a DEL/SINGLEDEL
700 2 : // under this entry, any SINGLEDEL above this now-transformed
701 2 : // SETWITHDEL will become a DEL when the two encounter in a
702 2 : // compaction. The DEL will eventually be elided in a
703 2 : // subsequent compaction. The cost for ensuring correctness is
704 2 : // that this entry is kept around for an additional compaction
705 2 : // cycle(s).
706 2 : //
707 2 : // - in the case there *is* indeed a DEL/SINGLEDEL under us
708 2 : // (but in a different stripe or sstable), then we will have
709 2 : // already done the work to transform the SET into a
710 2 : // SETWITHDEL, and we will skip any additional iteration when
711 2 : // this entry is encountered again in a subsequent compaction.
712 2 : //
713 2 : // Ideally, this codepath would be smart enough to handle the
714 2 : // case of SET <- RANGEDEL <- ... <- DEL/SINGLEDEL <- ....
715 2 : // This requires preserving any RANGEDEL entries we encounter
716 2 : // along the way, then emitting the original (possibly
717 2 : // transformed) key, followed by the RANGEDELs. This requires
718 2 : // a sizable refactoring of the existing code, as nextInStripe
719 2 : // currently returns a sameStripeNonSkippable when it
720 2 : // encounters a RANGEDEL.
721 2 : // TODO(travers): optimize to handle the RANGEDEL case if it
722 2 : // turns out to be a performance problem.
723 2 : i.key.SetKind(InternalKeyKindSetWithDelete)
724 2 :
725 2 : // By setting i.skip=true, we are saying that after the
726 2 : // non-skippable key is emitted (which is likely a RANGEDEL),
727 2 : // the remaining point keys that share the same user key as this
728 2 : // saved key should be skipped.
729 2 : i.skip = true
730 2 : return
731 2 : case sameStripeSkippable:
732 2 : // We're still in the same stripe. If this is a
733 2 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
734 2 : // Subsequent keys are eligible for skipping.
735 2 : if i.iterKey.Kind() == InternalKeyKindDelete ||
736 2 : i.iterKey.Kind() == InternalKeyKindSingleDelete ||
737 2 : i.iterKey.Kind() == InternalKeyKindDeleteSized {
738 2 : i.key.SetKind(InternalKeyKindSetWithDelete)
739 2 : i.skip = true
740 2 : return
741 2 : }
742 0 : default:
743 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
744 : }
745 : }
746 : }
747 :
748 2 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) stripeChangeType {
749 2 : // Save the current key.
750 2 : i.saveKey()
751 2 : i.valid = true
752 2 :
753 2 : // Loop looking for older values in the current snapshot stripe and merge
754 2 : // them.
755 2 : for {
756 2 : if i.nextInStripe() != sameStripeSkippable {
757 2 : i.pos = iterPosNext
758 2 : return i.iterStripeChange
759 2 : }
760 2 : key := i.iterKey
761 2 : switch key.Kind() {
762 2 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
763 2 : // We've hit a deletion tombstone. Return everything up to this point and
764 2 : // then skip entries until the next snapshot stripe. We change the kind
765 2 : // of the result key to a Set so that it shadows keys in lower
766 2 : // levels. That is, MERGE+DEL -> SET.
767 2 : // We do the same for SingleDelete since SingleDelete is only
768 2 : // permitted (with deterministic behavior) for keys that have been
769 2 : // set once since the last SingleDelete/Delete, so everything
770 2 : // older is acceptable to shadow. Note that this is slightly
771 2 : // different from singleDeleteNext() which implements stricter
772 2 : // semantics in terms of applying the SingleDelete to the single
773 2 : // next Set. But those stricter semantics are not observable to
774 2 : // the end-user since Iterator interprets SingleDelete as Delete.
775 2 : // We could do something more complicated here and consume only a
776 2 : // single Set, and then merge in any following Sets, but that is
777 2 : // complicated wrt code and unnecessary given the narrow permitted
778 2 : // use of SingleDelete.
779 2 : i.key.SetKind(InternalKeyKindSet)
780 2 : i.skip = true
781 2 : return sameStripeSkippable
782 :
783 2 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
784 2 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
785 2 : // We change the kind of the result key to a Set so that it shadows
786 2 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
787 2 : // strictly necessary, but provides consistency with the behavior of
788 2 : // MERGE+DEL.
789 2 : i.key.SetKind(InternalKeyKindSet)
790 2 : i.skip = true
791 2 : return sameStripeSkippable
792 2 : }
793 :
794 : // We've hit a Set or SetWithDel value. Merge with the existing
795 : // value and return. We change the kind of the resulting key to a
796 : // Set so that it shadows keys in lower levels. That is:
797 : // MERGE + (SET*) -> SET.
798 2 : i.err = valueMerger.MergeOlder(i.iterValue)
799 2 : if i.err != nil {
800 0 : i.valid = false
801 0 : return sameStripeSkippable
802 0 : }
803 2 : i.key.SetKind(InternalKeyKindSet)
804 2 : i.skip = true
805 2 : return sameStripeSkippable
806 :
807 2 : case InternalKeyKindMerge:
808 2 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
809 2 : // We change the kind of the result key to a Set so that it shadows
810 2 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
811 2 : // strictly necessary, but provides consistency with the behavior of
812 2 : // MERGE+DEL.
813 2 : i.key.SetKind(InternalKeyKindSet)
814 2 : i.skip = true
815 2 : return sameStripeSkippable
816 2 : }
817 :
818 : // We've hit another Merge value. Merge with the existing value and
819 : // continue looping.
820 2 : i.err = valueMerger.MergeOlder(i.iterValue)
821 2 : if i.err != nil {
822 0 : i.valid = false
823 0 : return sameStripeSkippable
824 0 : }
825 :
826 0 : default:
827 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
828 0 : i.valid = false
829 0 : return sameStripeSkippable
830 : }
831 : }
832 : }
833 :
834 2 : func (i *compactionIter) singleDeleteNext() bool {
835 2 : // Save the current key.
836 2 : i.saveKey()
837 2 : i.value = i.iterValue
838 2 : i.valid = true
839 2 :
840 2 : // Loop until finds a key to be passed to the next level.
841 2 : for {
842 2 : if i.nextInStripe() != sameStripeSkippable {
843 2 : i.pos = iterPosNext
844 2 : return true
845 2 : }
846 :
847 2 : key := i.iterKey
848 2 : switch key.Kind() {
849 2 : case InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
850 2 : // We've hit a Delete, DeleteSized, Merge, SetWithDelete, transform
851 2 : // the SingleDelete into a full Delete.
852 2 : i.key.SetKind(InternalKeyKindDelete)
853 2 : i.skip = true
854 2 : return true
855 :
856 2 : case InternalKeyKindSet:
857 2 : i.nextInStripe()
858 2 : i.valid = false
859 2 : return false
860 :
861 2 : case InternalKeyKindSingleDelete:
862 2 : continue
863 :
864 0 : default:
865 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
866 0 : i.valid = false
867 0 : return false
868 : }
869 : }
870 : }
871 :
872 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
873 : // these tombstones carry a value that's a varint indicating the size of the
874 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
875 : //
876 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
877 : // any, are elided as a result of the tombstone.
878 2 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
879 2 : i.saveKey()
880 2 : i.valid = true
881 2 : i.skip = true
882 2 :
883 2 : // The DELSIZED tombstone may have no value at all. This happens when the
884 2 : // tombstone has already deleted the key that the user originally predicted.
885 2 : // In this case, we still peek forward in case there's another DELSIZED key
886 2 : // with a lower sequence number, in which case we'll adopt its value.
887 2 : if len(i.iterValue) == 0 {
888 1 : i.value = i.valueBuf[:0]
889 2 : } else {
890 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
891 2 : i.value = i.valueBuf
892 2 : }
893 :
894 : // Loop through all the keys within this stripe that are skippable.
895 2 : i.pos = iterPosNext
896 2 : for i.nextInStripe() == sameStripeSkippable {
897 2 : switch i.iterKey.Kind() {
898 2 : case InternalKeyKindDelete, InternalKeyKindDeleteSized:
899 2 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
900 2 : // the original DELSIZED tombstone. This can happen in two cases:
901 2 : //
902 2 : // (1) These tombstones were intended to delete two distinct values,
903 2 : // and this DELSIZED has already dropped the relevant key. For
904 2 : // example:
905 2 : //
906 2 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
907 2 : //
908 2 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
909 2 : // already been zeroed out. In this case, we want to adopt the
910 2 : // value of the DELSIZED with the lower sequence number, in
911 2 : // case the a.SET.4 key has not yet been elided.
912 2 : //
913 2 : // (2) This DELSIZED was missized. The user thought they were
914 2 : // deleting a key with this user key, but this user key had
915 2 : // already been deleted.
916 2 : //
917 2 : // We can differentiate these two cases by examining the length of
918 2 : // the DELSIZED's value. A DELSIZED's value holds the size of both
919 2 : // the user key and value that it intends to delete. For any user
920 2 : // key with a length > 1, a DELSIZED that has not deleted a key must
921 2 : // have a value with a length > 1.
922 2 : //
923 2 : // We treat both cases the same functionally, adopting the identity
924 2 : // of the lower-sequence numbered tombstone. However in the second
925 2 : // case, we also increment the stat counting missized tombstones.
926 2 : if len(i.value) > 0 {
927 2 : // The original DELSIZED key was missized. The key that the user
928 2 : // thought they were deleting does not exist.
929 2 : i.stats.countMissizedDels++
930 2 : }
931 2 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
932 2 : i.value = i.valueBuf
933 2 : if i.iterKey.Kind() == InternalKeyKindDelete {
934 2 : // Convert the DELSIZED to a DEL—The DEL we're eliding may not
935 2 : // have deleted the key(s) it was intended to yet. The ordinary
936 2 : // DEL compaction heuristics are better suited at that, plus we
937 2 : // don't want to count it as a missized DEL. We early exit in
938 2 : // this case, after skipping the remainder of the snapshot
939 2 : // stripe.
940 2 : i.key.SetKind(i.iterKey.Kind())
941 2 : i.skipInStripe()
942 2 : return &i.key, i.value
943 2 : }
944 : // Continue, in case we uncover another DELSIZED or a key this
945 : // DELSIZED deletes.
946 2 : default:
947 2 : // If the DELSIZED is value-less, it already deleted the key that it
948 2 : // was intended to delete. This is possible with a sequence like:
949 2 : //
950 2 : // DELSIZED.8 SET.7 SET.3
951 2 : //
952 2 : // The DELSIZED only describes the size of the SET.7, which in this
953 2 : // case has already been elided. We don't count it as a missizing,
954 2 : // instead converting the DELSIZED to a DEL. Skip the remainder of
955 2 : // the snapshot stripe and return.
956 2 : if len(i.value) == 0 {
957 2 : i.key.SetKind(InternalKeyKindDelete)
958 2 : i.skipInStripe()
959 2 : return &i.key, i.value
960 2 : }
961 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
962 : // has a positive size.
963 2 : expectedSize, n := binary.Uvarint(i.value)
964 2 : if n != len(i.value) {
965 0 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
966 0 : i.valid = false
967 0 : return nil, nil
968 0 : }
969 2 : elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
970 2 : if elidedSize != expectedSize {
971 2 : // The original DELSIZED key was missized. It's unclear what to
972 2 : // do. The user-provided size was wrong, so it's unlikely to be
973 2 : // accurate or meaningful. We could:
974 2 : //
975 2 : // 1. return the DELSIZED with the original user-provided size unmodified
976 2 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
977 2 : // elided, even if it wasn't the anticipated size.
978 2 : // 3. subtract the elided size from the estimate and re-encode.
979 2 : // 4. convert the DELSIZED into a value-less DEL, so that
980 2 : // ordinary DEL heuristics apply.
981 2 : //
982 2 : // We opt for (4) under the rationale that we can't rely on the
983 2 : // user-provided size for accuracy, so ordinary DEL heuristics
984 2 : // are safer.
985 2 : i.key.SetKind(InternalKeyKindDelete)
986 2 : i.stats.countMissizedDels++
987 2 : }
988 : // NB: We remove the value regardless of whether the key was sized
989 : // appropriately. The size encoded is 'consumed' the first time it
990 : // meets a key that it deletes.
991 2 : i.value = i.valueBuf[:0]
992 : }
993 : }
994 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
995 : // in the same stripe on a non-skippable key. In that case we should preserve
996 : // `i.skip == true` such that later keys in the stripe will continue to be
997 : // skipped.
998 2 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
999 2 : i.skip = false
1000 2 : }
1001 2 : return &i.key, i.value
1002 : }
1003 :
1004 2 : func (i *compactionIter) saveKey() {
1005 2 : i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
1006 2 : i.key.UserKey = i.keyBuf
1007 2 : i.key.Trailer = i.iterKey.Trailer
1008 2 : i.keyTrailer = i.iterKey.Trailer
1009 2 : i.frontiers.Advance(i.key.UserKey)
1010 2 : }
1011 :
1012 2 : func (i *compactionIter) cloneKey(key []byte) []byte {
1013 2 : i.alloc, key = i.alloc.Copy(key)
1014 2 : return key
1015 2 : }
1016 :
1017 1 : func (i *compactionIter) Key() InternalKey {
1018 1 : return i.key
1019 1 : }
1020 :
1021 1 : func (i *compactionIter) Value() []byte {
1022 1 : return i.value
1023 1 : }
1024 :
1025 1 : func (i *compactionIter) Valid() bool {
1026 1 : return i.valid
1027 1 : }
1028 :
1029 1 : func (i *compactionIter) Error() error {
1030 1 : return i.err
1031 1 : }
1032 :
1033 2 : func (i *compactionIter) Close() error {
1034 2 : err := i.iter.Close()
1035 2 : if i.err == nil {
1036 2 : i.err = err
1037 2 : }
1038 :
1039 : // Close the closer for the current value if one was open.
1040 2 : if i.valueCloser != nil {
1041 0 : i.err = firstError(i.err, i.valueCloser.Close())
1042 0 : i.valueCloser = nil
1043 0 : }
1044 :
1045 2 : return i.err
1046 : }
1047 :
1048 : // Tombstones returns a list of pending range tombstones in the fragmenter
1049 : // up to the specified key, or all pending range tombstones if key = nil.
1050 2 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
1051 2 : if key == nil {
1052 2 : i.rangeDelFrag.Finish()
1053 2 : } else {
1054 2 : // The specified end key is exclusive; no versions of the specified
1055 2 : // user key (including range tombstones covering that key) should
1056 2 : // be flushed yet.
1057 2 : i.rangeDelFrag.TruncateAndFlushTo(key)
1058 2 : }
1059 2 : tombstones := i.tombstones
1060 2 : i.tombstones = nil
1061 2 : return tombstones
1062 : }
1063 :
1064 : // RangeKeys returns a list of pending fragmented range keys up to the specified
1065 : // key, or all pending range keys if key = nil.
1066 2 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
1067 2 : if key == nil {
1068 2 : i.rangeKeyFrag.Finish()
1069 2 : } else {
1070 2 : // The specified end key is exclusive; no versions of the specified
1071 2 : // user key (including range tombstones covering that key) should
1072 2 : // be flushed yet.
1073 2 : i.rangeKeyFrag.TruncateAndFlushTo(key)
1074 2 : }
1075 2 : rangeKeys := i.rangeKeys
1076 2 : i.rangeKeys = nil
1077 2 : return rangeKeys
1078 : }
1079 :
1080 2 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
1081 2 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1082 2 : // each snapshot stripe.
1083 2 : currentIdx := -1
1084 2 : keys := fragmented.Keys[:0]
1085 2 : for _, k := range fragmented.Keys {
1086 2 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1087 2 : if currentIdx == idx {
1088 2 : continue
1089 : }
1090 2 : if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
1091 2 : // This is the last snapshot stripe and the range tombstone
1092 2 : // can be elided.
1093 2 : break
1094 : }
1095 :
1096 2 : keys = append(keys, k)
1097 2 : if idx == 0 {
1098 2 : // This is the last snapshot stripe.
1099 2 : break
1100 : }
1101 2 : currentIdx = idx
1102 : }
1103 2 : if len(keys) > 0 {
1104 2 : i.tombstones = append(i.tombstones, keyspan.Span{
1105 2 : Start: fragmented.Start,
1106 2 : End: fragmented.End,
1107 2 : Keys: keys,
1108 2 : })
1109 2 : }
1110 : }
1111 :
1112 2 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
1113 2 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1114 2 : // do that here.
1115 2 : if len(fragmented.Keys) > 0 {
1116 2 : i.rangeKeys = append(i.rangeKeys, fragmented)
1117 2 : }
1118 : }
1119 :
1120 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1121 : // so improves compression and enables an optimization during forward iteration
1122 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1123 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1124 2 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1125 2 : if !i.allowZeroSeqNum {
1126 2 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1127 2 : // make the determination on a key by key basis, similar to what is done
1128 2 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1129 2 : // that isn't too expensive.
1130 2 : return
1131 2 : }
1132 2 : if snapshotIdx > 0 {
1133 2 : // This is not the last snapshot
1134 2 : return
1135 2 : }
1136 2 : i.key.SetSeqNum(base.SeqNumZero)
1137 : }
1138 :
1139 : // A frontier is used to monitor a compaction's progression across the user
1140 : // keyspace.
1141 : //
1142 : // A frontier hold a user key boundary that it's concerned with in its `key`
1143 : // field. If/when the compaction iterator returns an InternalKey with a user key
1144 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
1145 : // frontier's `reached` function, passing _k_ as its argument.
1146 : //
1147 : // The `reached` function returns a new value to use as the key. If `reached`
1148 : // returns nil, the frontier is forgotten and its `reached` method will not be
1149 : // invoked again, unless the user calls [Update] to set a new key.
1150 : //
1151 : // A frontier's key may be updated outside the context of a `reached`
1152 : // invocation at any time, through its Update method.
1153 : type frontier struct {
1154 : // container points to the containing *frontiers that was passed to Init
1155 : // when the frontier was initialized.
1156 : container *frontiers
1157 :
1158 : // key holds the frontier's current key. If nil, this frontier is inactive
1159 : // and its reached func will not be invoked. The value of this key may only
1160 : // be updated by the `frontiers` type, or the Update method.
1161 : key []byte
1162 :
1163 : // reached is invoked to inform a frontier that its key has been reached.
1164 : // It's invoked with the user key that reached the limit. The `key` argument
1165 : // is guaranteed to be ≥ the frontier's key.
1166 : //
1167 : // After reached is invoked, the frontier's key is updated to the return
1168 : // value of `reached`. Note bene, the frontier is permitted to update its
1169 : // key to a user key ≤ the argument `key`.
1170 : //
1171 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
1172 : // frontier will receive reached(k2) calls until it returns nil or a key
1173 : // `k3` such that k2 < k3. This property is useful for frontiers that use
1174 : // `reached` invocations to drive iteration through collections of keys that
1175 : // may contain multiple keys that are both < k2 and ≥ k1.
1176 : reached func(key []byte) (next []byte)
1177 : }
1178 :
1179 : // Init initializes the frontier with the provided key and reached callback.
1180 : // The frontier is attached to the provided *frontiers and the provided reached
1181 : // func will be invoked when the *frontiers is advanced to a key ≥ this
1182 : // frontier's key.
1183 : func (f *frontier) Init(
1184 : frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
1185 2 : ) {
1186 2 : *f = frontier{
1187 2 : container: frontiers,
1188 2 : key: initialKey,
1189 2 : reached: reached,
1190 2 : }
1191 2 : if initialKey != nil {
1192 2 : f.container.push(f)
1193 2 : }
1194 : }
1195 :
1196 : // String implements fmt.Stringer.
1197 1 : func (f *frontier) String() string {
1198 1 : return string(f.key)
1199 1 : }
1200 :
1201 : // Update replaces the existing frontier's key with the provided key. The
1202 : // frontier's reached func will be invoked when the new key is reached.
1203 2 : func (f *frontier) Update(key []byte) {
1204 2 : c := f.container
1205 2 : prevKeyIsNil := f.key == nil
1206 2 : f.key = key
1207 2 : if prevKeyIsNil {
1208 2 : if key != nil {
1209 2 : c.push(f)
1210 2 : }
1211 2 : return
1212 : }
1213 :
1214 : // Find the frontier within the heap (it must exist within the heap because
1215 : // f.key was != nil). If the frontier key is now nil, remove it from the
1216 : // heap. Otherwise, fix up its position.
1217 2 : for i := 0; i < len(c.items); i++ {
1218 2 : if c.items[i] == f {
1219 2 : if key != nil {
1220 2 : c.fix(i)
1221 2 : } else {
1222 2 : n := c.len() - 1
1223 2 : c.swap(i, n)
1224 2 : c.down(i, n)
1225 2 : c.items = c.items[:n]
1226 2 : }
1227 2 : return
1228 : }
1229 : }
1230 0 : panic("unreachable")
1231 : }
1232 :
1233 : // frontiers is used to track progression of a task (eg, compaction) across the
1234 : // keyspace. Clients that want to be informed when the task advances to a key ≥
1235 : // some frontier may register a frontier, providing a callback. The task calls
1236 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
1237 : // on all tracked frontiers with `key`s ≤ k.
1238 : //
1239 : // Internally, frontiers is implemented as a simple heap.
1240 : type frontiers struct {
1241 : cmp Compare
1242 : items []*frontier
1243 : }
1244 :
1245 : // String implements fmt.Stringer.
1246 1 : func (f *frontiers) String() string {
1247 1 : var buf bytes.Buffer
1248 1 : for i := 0; i < len(f.items); i++ {
1249 1 : if i > 0 {
1250 1 : fmt.Fprint(&buf, ", ")
1251 1 : }
1252 1 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
1253 : }
1254 1 : return buf.String()
1255 : }
1256 :
1257 : // Advance notifies all member frontiers with keys ≤ k.
1258 2 : func (f *frontiers) Advance(k []byte) {
1259 2 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
1260 2 : // This frontier has been reached. Invoke the closure and update with
1261 2 : // the next frontier.
1262 2 : f.items[0].key = f.items[0].reached(k)
1263 2 : if f.items[0].key == nil {
1264 2 : // This was the final frontier that this user was concerned with.
1265 2 : // Remove it from the heap.
1266 2 : f.pop()
1267 2 : } else {
1268 2 : // Fix up the heap root.
1269 2 : f.fix(0)
1270 2 : }
1271 : }
1272 : }
1273 :
1274 2 : func (f *frontiers) len() int {
1275 2 : return len(f.items)
1276 2 : }
1277 :
1278 2 : func (f *frontiers) less(i, j int) bool {
1279 2 : return f.cmp(f.items[i].key, f.items[j].key) < 0
1280 2 : }
1281 :
1282 2 : func (f *frontiers) swap(i, j int) {
1283 2 : f.items[i], f.items[j] = f.items[j], f.items[i]
1284 2 : }
1285 :
1286 : // fix, up and down are copied from the go stdlib.
1287 :
1288 2 : func (f *frontiers) fix(i int) {
1289 2 : if !f.down(i, f.len()) {
1290 2 : f.up(i)
1291 2 : }
1292 : }
1293 :
1294 2 : func (f *frontiers) push(ff *frontier) {
1295 2 : n := len(f.items)
1296 2 : f.items = append(f.items, ff)
1297 2 : f.up(n)
1298 2 : }
1299 :
1300 2 : func (f *frontiers) pop() *frontier {
1301 2 : n := f.len() - 1
1302 2 : f.swap(0, n)
1303 2 : f.down(0, n)
1304 2 : item := f.items[n]
1305 2 : f.items = f.items[:n]
1306 2 : return item
1307 2 : }
1308 :
1309 2 : func (f *frontiers) up(j int) {
1310 2 : for {
1311 2 : i := (j - 1) / 2 // parent
1312 2 : if i == j || !f.less(j, i) {
1313 2 : break
1314 : }
1315 2 : f.swap(i, j)
1316 2 : j = i
1317 : }
1318 : }
1319 :
1320 2 : func (f *frontiers) down(i0, n int) bool {
1321 2 : i := i0
1322 2 : for {
1323 2 : j1 := 2*i + 1
1324 2 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
1325 2 : break
1326 : }
1327 2 : j := j1 // left child
1328 2 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
1329 2 : j = j2 // = 2*i + 2 // right child
1330 2 : }
1331 2 : if !f.less(j, i) {
1332 2 : break
1333 : }
1334 2 : f.swap(i, j)
1335 2 : i = j
1336 : }
1337 2 : return i > i0
1338 : }
|