Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "encoding/binary"
10 : "fmt"
11 : "io"
12 : "sort"
13 : "strconv"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/rangekey"
21 : )
22 :
23 : // compactionIter provides a forward-only iterator that encapsulates the logic
24 : // for collapsing entries during compaction. It wraps an internal iterator and
25 : // collapses entries that are no longer necessary because they are shadowed by
26 : // newer entries. The simplest example of this is when the internal iterator
27 : // contains two keys: a.PUT.2 and a.PUT.1. Instead of returning both entries,
28 : // compactionIter collapses the second entry because it is no longer
29 : // necessary. The high-level structure for compactionIter is to iterate over
30 : // its internal iterator and output 1 entry for every user-key. There are four
31 : // complications to this story.
32 : //
33 : // 1. Eliding Deletion Tombstones
34 : //
35 : // Consider the entries a.DEL.2 and a.PUT.1. These entries collapse to
36 : // a.DEL.2. Do we have to output the entry a.DEL.2? Only if a.DEL.2 possibly
37 : // shadows an entry at a lower level. If we're compacting to the base-level in
38 : // the LSM tree then a.DEL.2 is definitely not shadowing an entry at a lower
39 : // level and can be elided.
40 : //
41 : // We can do slightly better than only eliding deletion tombstones at the base
42 : // level by observing that we can elide a deletion tombstone if there are no
43 : // sstables that contain the entry's key. This check is performed by
44 : // elideTombstone.
45 : //
46 : // 2. Merges
47 : //
48 : // The MERGE operation merges the value for an entry with the existing value
49 : // for an entry. The logical value of an entry can be composed of a series of
50 : // merge operations. When compactionIter sees a MERGE, it scans forward in its
51 : // internal iterator collapsing MERGE operations for the same key until it
52 : // encounters a SET or DELETE operation. For example, the keys a.MERGE.4,
53 : // a.MERGE.3, a.MERGE.2 will be collapsed to a.MERGE.4 and the values will be
54 : // merged using the specified Merger.
55 : //
56 : // An interesting case here occurs when MERGE is combined with SET. Consider
57 : // the entries a.MERGE.3 and a.SET.2. The collapsed key will be a.SET.3. The
58 : // reason that the kind is changed to SET is because the SET operation acts as
59 : // a barrier preventing further merging. This can be seen better in the
60 : // scenario a.MERGE.3, a.SET.2, a.MERGE.1. The entry a.MERGE.1 may be at lower
61 : // (older) level and not involved in the compaction. If the compaction of
62 : // a.MERGE.3 and a.SET.2 produced a.MERGE.3, a subsequent compaction with
63 : // a.MERGE.1 would merge the values together incorrectly.
64 : //
65 : // 3. Snapshots
66 : //
67 : // Snapshots are lightweight point-in-time views of the DB state. At its core,
68 : // a snapshot is a sequence number along with a guarantee from Pebble that it
69 : // will maintain the view of the database at that sequence number. Part of this
70 : // guarantee is relatively straightforward to achieve. When reading from the
71 : // database Pebble will ignore sequence numbers that are larger than the
72 : // snapshot sequence number. The primary complexity with snapshots occurs
73 : // during compaction: the collapsing of entries that are shadowed by newer
74 : // entries is at odds with the guarantee that Pebble will maintain the view of
75 : // the database at the snapshot sequence number. Rather than collapsing entries
76 : // up to the next user key, compactionIter can only collapse entries up to the
77 : // next snapshot boundary. That is, every snapshot boundary potentially causes
78 : // another entry for the same user-key to be emitted. Another way to view this
79 : // is that snapshots define stripes and entries are collapsed within stripes,
80 : // but not across stripes. Consider the following scenario:
81 : //
82 : // a.PUT.9
83 : // a.DEL.8
84 : // a.PUT.7
85 : // a.DEL.6
86 : // a.PUT.5
87 : //
88 : // In the absence of snapshots these entries would be collapsed to
89 : // a.PUT.9. What if there is a snapshot at sequence number 7? The entries can
90 : // be divided into two stripes and collapsed within the stripes:
91 : //
92 : // a.PUT.9 a.PUT.9
93 : // a.DEL.8 --->
94 : // a.PUT.7
95 : // -- --
96 : // a.DEL.6 ---> a.DEL.6
97 : // a.PUT.5
98 : //
99 : // All of the rules described earlier still apply, but they are confined to
100 : // operate within a snapshot stripe. Snapshots only affect compaction when the
101 : // snapshot sequence number lies within the range of sequence numbers being
102 : // compacted. In the above example, a snapshot at sequence number 10 or at
103 : // sequence number 5 would not have any effect.
104 : //
105 : // 4. Range Deletions
106 : //
107 : // Range deletions provide the ability to delete all of the keys (and values)
108 : // in a contiguous range. Range deletions are stored indexed by their start
109 : // key. The end key of the range is stored in the value. In order to support
110 : // lookup of the range deletions which overlap with a particular key, the range
111 : // deletion tombstones need to be fragmented whenever they overlap. This
112 : // fragmentation is performed by keyspan.Fragmenter. The fragments are then
113 : // subject to the rules for snapshots. For example, consider the two range
114 : // tombstones [a,e)#1 and [c,g)#2:
115 : //
116 : // 2: c-------g
117 : // 1: a-------e
118 : //
119 : // These tombstones will be fragmented into:
120 : //
121 : // 2: c---e---g
122 : // 1: a---c---e
123 : //
124 : // Do we output the fragment [c,e)#1? Since it is covered by [c-e]#2 the answer
125 : // depends on whether it is in a new snapshot stripe.
126 : //
127 : // In addition to the fragmentation of range tombstones, compaction also needs
128 : // to take the range tombstones into consideration when outputting normal
129 : // keys. Just as with point deletions, a range deletion covering an entry can
130 : // cause the entry to be elided.
131 : //
132 : // A note on the stability of keys and values.
133 : //
134 : // The stability guarantees of keys and values returned by the iterator tree
135 : // that backs a compactionIter is nuanced and care must be taken when
136 : // referencing any returned items.
137 : //
138 : // Keys and values returned by exported functions (i.e. First, Next, etc.) have
139 : // lifetimes that fall into two categories:
140 : //
141 : // Lifetime valid for duration of compaction. Range deletion keys and values are
142 : // stable for the duration of the compaction, due to way in which a
143 : // compactionIter is typically constructed (i.e. via (*compaction).newInputIter,
144 : // which wraps the iterator over the range deletion block in a noCloseIter,
145 : // preventing the release of the backing memory until the compaction is
146 : // finished).
147 : //
148 : // Lifetime limited to duration of sstable block liveness. Point keys (SET, DEL,
149 : // etc.) and values must be cloned / copied following the return from the
150 : // exported function, and before a subsequent call to Next advances the iterator
151 : // and mutates the contents of the returned key and value.
152 : type compactionIter struct {
153 : equal Equal
154 : merge Merge
155 : iter internalIterator
156 : err error
157 : // `key.UserKey` is set to `keyBuf` caused by saving `i.iterKey.UserKey`
158 : // and `key.Trailer` is set to `i.iterKey.Trailer`. This is the
159 : // case on return from all public methods -- these methods return `key`.
160 : // Additionally, it is the internal state when the code is moving to the
161 : // next key so it can determine whether the user key has changed from
162 : // the previous key.
163 : key InternalKey
164 : // keyTrailer is updated when `i.key` is updated and holds the key's
165 : // original trailer (eg, before any sequence-number zeroing or changes to
166 : // key kind).
167 : keyTrailer uint64
168 : value []byte
169 : valueCloser io.Closer
170 : // Temporary buffer used for storing the previous user key in order to
171 : // determine when iteration has advanced to a new user key and thus a new
172 : // snapshot stripe.
173 : keyBuf []byte
174 : // Temporary buffer used for storing the previous value, which may be an
175 : // unsafe, i.iter-owned slice that could be altered when the iterator is
176 : // advanced.
177 : valueBuf []byte
178 : // Is the current entry valid?
179 : valid bool
180 : iterKey *InternalKey
181 : iterValue []byte
182 : iterStripeChange stripeChangeType
183 : // `skip` indicates whether the remaining skippable entries in the current
184 : // snapshot stripe should be skipped or processed. An example of a non-
185 : // skippable entry is a range tombstone as we need to return it from the
186 : // `compactionIter`, even if a key covering its start key has already been
187 : // seen in the same stripe. `skip` has no effect when `pos == iterPosNext`.
188 : //
189 : // TODO(jackson): If we use keyspan.InterleavingIter for range deletions,
190 : // like we do for range keys, the only remaining 'non-skippable' key is
191 : // the invalid key. We should be able to simplify this logic and remove this
192 : // field.
193 : skip bool
194 : // `pos` indicates the iterator position at the top of `Next()`. Its type's
195 : // (`iterPos`) values take on the following meanings in the context of
196 : // `compactionIter`.
197 : //
198 : // - `iterPosCur`: the iterator is at the last key returned.
199 : // - `iterPosNext`: the iterator has already been advanced to the next
200 : // candidate key. For example, this happens when processing merge operands,
201 : // where we advance the iterator all the way into the next stripe or next
202 : // user key to ensure we've seen all mergeable operands.
203 : // - `iterPosPrev`: this is invalid as compactionIter is forward-only.
204 : pos iterPos
205 : // `snapshotPinned` indicates whether the last point key returned by the
206 : // compaction iterator was only returned because an open snapshot prevents
207 : // its elision. This field only applies to point keys, and not to range
208 : // deletions or range keys.
209 : //
210 : // For MERGE, it is possible that doing the merge is interrupted even when
211 : // the next point key is in the same stripe. This can happen if the loop in
212 : // mergeNext gets interrupted by sameStripeNonSkippable.
213 : // sameStripeNonSkippable occurs due to RANGEDELs that sort before
214 : // SET/MERGE/DEL with the same seqnum, so the RANGEDEL does not necessarily
215 : // delete the subsequent SET/MERGE/DEL keys.
216 : snapshotPinned bool
217 : // forceObsoleteDueToRangeDel is set to true in a subset of the cases that
218 : // snapshotPinned is true. This value is true when the point is obsolete due
219 : // to a RANGEDEL but could not be deleted due to a snapshot.
220 : //
221 : // NB: it may seem that the additional cases that snapshotPinned captures
222 : // are harmless in that they can also be used to mark a point as obsolete
223 : // (it is merely a duplication of some logic that happens in
224 : // Writer.AddWithForceObsolete), but that is not quite accurate as of this
225 : // writing -- snapshotPinned originated in stats collection and for a
226 : // sequence MERGE, SET, where the MERGE cannot merge with the (older) SET
227 : // due to a snapshot, the snapshotPinned value for the SET is true.
228 : //
229 : // TODO(sumeer,jackson): improve the logic of snapshotPinned and reconsider
230 : // whether we need forceObsoleteDueToRangeDel.
231 : forceObsoleteDueToRangeDel bool
232 : // The index of the snapshot for the current key within the snapshots slice.
233 : curSnapshotIdx int
234 : curSnapshotSeqNum uint64
235 : // The snapshot sequence numbers that need to be maintained. These sequence
236 : // numbers define the snapshot stripes (see the Snapshots description
237 : // above). The sequence numbers are in ascending order.
238 : snapshots []uint64
239 : // frontiers holds a heap of user keys that affect compaction behavior when
240 : // they're exceeded. Before a new key is returned, the compaction iterator
241 : // advances the frontier, notifying any code that subscribed to be notified
242 : // when a key was reached. The primary use today is within the
243 : // implementation of compactionOutputSplitters in compaction.go. Many of
244 : // these splitters wait for the compaction iterator to call Advance(k) when
245 : // it's returning a new key. If the key that they're waiting for is
246 : // surpassed, these splitters update internal state recording that they
247 : // should request a compaction split next time they're asked in
248 : // [shouldSplitBefore].
249 : frontiers frontiers
250 : // Reference to the range deletion tombstone fragmenter (e.g.,
251 : // `compaction.rangeDelFrag`).
252 : rangeDelFrag *keyspan.Fragmenter
253 : rangeKeyFrag *keyspan.Fragmenter
254 : // The fragmented tombstones.
255 : tombstones []keyspan.Span
256 : // The fragmented range keys.
257 : rangeKeys []keyspan.Span
258 : // Byte allocator for the tombstone keys.
259 : alloc bytealloc.A
260 : allowZeroSeqNum bool
261 : elideTombstone func(key []byte) bool
262 : elideRangeTombstone func(start, end []byte) bool
263 : // The on-disk format major version. This informs the types of keys that
264 : // may be written to disk during a compaction.
265 : formatVersion FormatMajorVersion
266 : stats struct {
267 : // count of DELSIZED keys that were missized.
268 : countMissizedDels uint64
269 : }
270 : }
271 :
272 : func newCompactionIter(
273 : cmp Compare,
274 : equal Equal,
275 : formatKey base.FormatKey,
276 : merge Merge,
277 : iter internalIterator,
278 : snapshots []uint64,
279 : rangeDelFrag *keyspan.Fragmenter,
280 : rangeKeyFrag *keyspan.Fragmenter,
281 : allowZeroSeqNum bool,
282 : elideTombstone func(key []byte) bool,
283 : elideRangeTombstone func(start, end []byte) bool,
284 : formatVersion FormatMajorVersion,
285 1 : ) *compactionIter {
286 1 : i := &compactionIter{
287 1 : equal: equal,
288 1 : merge: merge,
289 1 : iter: iter,
290 1 : snapshots: snapshots,
291 1 : frontiers: frontiers{cmp: cmp},
292 1 : rangeDelFrag: rangeDelFrag,
293 1 : rangeKeyFrag: rangeKeyFrag,
294 1 : allowZeroSeqNum: allowZeroSeqNum,
295 1 : elideTombstone: elideTombstone,
296 1 : elideRangeTombstone: elideRangeTombstone,
297 1 : formatVersion: formatVersion,
298 1 : }
299 1 : i.rangeDelFrag.Cmp = cmp
300 1 : i.rangeDelFrag.Format = formatKey
301 1 : i.rangeDelFrag.Emit = i.emitRangeDelChunk
302 1 : i.rangeKeyFrag.Cmp = cmp
303 1 : i.rangeKeyFrag.Format = formatKey
304 1 : i.rangeKeyFrag.Emit = i.emitRangeKeyChunk
305 1 : return i
306 1 : }
307 :
308 1 : func (i *compactionIter) First() (*InternalKey, []byte) {
309 1 : if i.err != nil {
310 0 : return nil, nil
311 0 : }
312 1 : var iterValue LazyValue
313 1 : i.iterKey, iterValue = i.iter.First()
314 1 : i.iterValue, _, i.err = iterValue.Value(nil)
315 1 : if i.err != nil {
316 0 : return nil, nil
317 0 : }
318 1 : if i.iterKey != nil {
319 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(i.iterKey.SeqNum(), i.snapshots)
320 1 : }
321 1 : i.pos = iterPosNext
322 1 : i.iterStripeChange = newStripeNewKey
323 1 : return i.Next()
324 : }
325 :
326 1 : func (i *compactionIter) Next() (*InternalKey, []byte) {
327 1 : if i.err != nil {
328 0 : return nil, nil
329 0 : }
330 :
331 : // Close the closer for the current value if one was open.
332 1 : if i.closeValueCloser() != nil {
333 0 : return nil, nil
334 0 : }
335 :
336 : // Prior to this call to `Next()` we are in one of three situations with
337 : // respect to `iterKey` and related state:
338 : //
339 : // - `!skip && pos == iterPosNext`: `iterKey` is already at the next key.
340 : // - `!skip && pos == iterPosCurForward`: We are at the key that has been returned.
341 : // To move forward we advance by one key, even if that lands us in the same
342 : // snapshot stripe.
343 : // - `skip && pos == iterPosCurForward`: We are at the key that has been returned.
344 : // To move forward we skip skippable entries in the stripe.
345 1 : if i.pos == iterPosCurForward {
346 1 : if i.skip {
347 1 : i.skipInStripe()
348 1 : } else {
349 1 : i.nextInStripe()
350 1 : }
351 : }
352 :
353 1 : i.pos = iterPosCurForward
354 1 : i.valid = false
355 1 :
356 1 : for i.iterKey != nil {
357 1 : // If we entered a new snapshot stripe with the same key, any key we
358 1 : // return on this iteration is only returned because the open snapshot
359 1 : // prevented it from being elided or merged with the key returned for
360 1 : // the previous stripe. Mark it as pinned so that the compaction loop
361 1 : // can correctly populate output tables' pinned statistics. We might
362 1 : // also set snapshotPinned=true down below if we observe that the key is
363 1 : // deleted by a range deletion in a higher stripe or that this key is a
364 1 : // tombstone that could be elided if only it were in the last snapshot
365 1 : // stripe.
366 1 : i.snapshotPinned = i.iterStripeChange == newStripeSameKey
367 1 :
368 1 : if i.iterKey.Kind() == InternalKeyKindRangeDelete || rangekey.IsRangeKey(i.iterKey.Kind()) {
369 1 : // Return the span so the compaction can use it for file truncation and add
370 1 : // it to the relevant fragmenter. We do not set `skip` to true before
371 1 : // returning as there may be a forthcoming point key with the same user key
372 1 : // and sequence number. Such a point key must be visible (i.e., not skipped
373 1 : // over) since we promise point keys are not deleted by range tombstones at
374 1 : // the same sequence number.
375 1 : //
376 1 : // Although, note that `skip` may already be true before reaching here
377 1 : // due to an earlier key in the stripe. Then it is fine to leave it set
378 1 : // to true, as the earlier key must have had a higher sequence number.
379 1 : //
380 1 : // NOTE: there is a subtle invariant violation here in that calling
381 1 : // saveKey and returning a reference to the temporary slice violates
382 1 : // the stability guarantee for range deletion keys. A potential
383 1 : // mediation could return the original iterKey and iterValue
384 1 : // directly, as the backing memory is guaranteed to be stable until
385 1 : // the compaction completes. The violation here is only minor in
386 1 : // that the caller immediately clones the range deletion InternalKey
387 1 : // when passing the key to the deletion fragmenter (see the
388 1 : // call-site in compaction.go).
389 1 : // TODO(travers): address this violation by removing the call to
390 1 : // saveKey and instead return the original iterKey and iterValue.
391 1 : // This goes against the comment on i.key in the struct, and
392 1 : // therefore warrants some investigation.
393 1 : i.saveKey()
394 1 : // TODO(jackson): Handle tracking pinned statistics for range keys
395 1 : // and range deletions. This would require updating
396 1 : // emitRangeDelChunk and rangeKeyCompactionTransform to update
397 1 : // statistics when they apply their own snapshot striping logic.
398 1 : i.snapshotPinned = false
399 1 : i.value = i.iterValue
400 1 : i.valid = true
401 1 : return &i.key, i.value
402 1 : }
403 :
404 1 : if cover := i.rangeDelFrag.Covers(*i.iterKey, i.curSnapshotSeqNum); cover == keyspan.CoversVisibly {
405 1 : // A pending range deletion deletes this key. Skip it.
406 1 : i.saveKey()
407 1 : i.skipInStripe()
408 1 : continue
409 1 : } else if cover == keyspan.CoversInvisibly {
410 1 : // i.iterKey would be deleted by a range deletion if there weren't
411 1 : // any open snapshots. Mark it as pinned.
412 1 : //
413 1 : // NB: there are multiple places in this file where we call
414 1 : // i.rangeDelFrag.Covers and this is the only one where we are writing
415 1 : // to i.snapshotPinned. Those other cases occur in mergeNext where the
416 1 : // caller is deciding whether the value should be merged or not, and the
417 1 : // key is in the same snapshot stripe. Hence, snapshotPinned is by
418 1 : // definition false in those cases.
419 1 : i.snapshotPinned = true
420 1 : i.forceObsoleteDueToRangeDel = true
421 1 : } else {
422 1 : i.forceObsoleteDueToRangeDel = false
423 1 : }
424 :
425 1 : switch i.iterKey.Kind() {
426 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
427 1 : if i.elideTombstone(i.iterKey.UserKey) {
428 1 : if i.curSnapshotIdx == 0 {
429 1 : // If we're at the last snapshot stripe and the tombstone
430 1 : // can be elided skip skippable keys in the same stripe.
431 1 : i.saveKey()
432 1 : i.skipInStripe()
433 1 : continue
434 1 : } else {
435 1 : // We're not at the last snapshot stripe, so the tombstone
436 1 : // can NOT yet be elided. Mark it as pinned, so that it's
437 1 : // included in table statistics appropriately.
438 1 : i.snapshotPinned = true
439 1 : }
440 : }
441 :
442 1 : switch i.iterKey.Kind() {
443 1 : case InternalKeyKindDelete:
444 1 : i.saveKey()
445 1 : i.value = i.iterValue
446 1 : i.valid = true
447 1 : i.skip = true
448 1 : return &i.key, i.value
449 :
450 1 : case InternalKeyKindDeleteSized:
451 1 : // We may skip subsequent keys because of this tombstone. Scan
452 1 : // ahead to see just how much data this tombstone drops and if
453 1 : // the tombstone's value should be updated accordingly.
454 1 : return i.deleteSizedNext()
455 :
456 1 : case InternalKeyKindSingleDelete:
457 1 : if i.singleDeleteNext() {
458 1 : return &i.key, i.value
459 1 : }
460 1 : continue
461 : }
462 :
463 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
464 1 : // The key we emit for this entry is a function of the current key
465 1 : // kind, and whether this entry is followed by a DEL/SINGLEDEL
466 1 : // entry. setNext() does the work to move the iterator forward,
467 1 : // preserving the original value, and potentially mutating the key
468 1 : // kind.
469 1 : i.setNext()
470 1 : return &i.key, i.value
471 :
472 1 : case InternalKeyKindMerge:
473 1 : // Record the snapshot index before mergeNext as merging
474 1 : // advances the iterator, adjusting curSnapshotIdx.
475 1 : origSnapshotIdx := i.curSnapshotIdx
476 1 : var valueMerger ValueMerger
477 1 : valueMerger, i.err = i.merge(i.iterKey.UserKey, i.iterValue)
478 1 : var change stripeChangeType
479 1 : if i.err == nil {
480 1 : change = i.mergeNext(valueMerger)
481 1 : }
482 1 : var needDelete bool
483 1 : if i.err == nil {
484 1 : // includesBase is true whenever we've transformed the MERGE record
485 1 : // into a SET.
486 1 : includesBase := i.key.Kind() == InternalKeyKindSet
487 1 : i.value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, includesBase)
488 1 : }
489 1 : if i.err == nil {
490 1 : if needDelete {
491 0 : i.valid = false
492 0 : if i.closeValueCloser() != nil {
493 0 : return nil, nil
494 0 : }
495 0 : continue
496 : }
497 : // A non-skippable entry does not necessarily cover later merge
498 : // operands, so we must not zero the current merge result's seqnum.
499 : //
500 : // For example, suppose the forthcoming two keys are a range
501 : // tombstone, `[a, b)#3`, and a merge operand, `a#3`. Recall that
502 : // range tombstones do not cover point keys at the same seqnum, so
503 : // `a#3` is not deleted. The range tombstone will be seen first due
504 : // to its larger value type. Since it is a non-skippable key, the
505 : // current merge will not include `a#3`. If we zeroed the current
506 : // merge result's seqnum, then it would conflict with the upcoming
507 : // merge including `a#3`, whose seqnum will also be zeroed.
508 1 : if change != sameStripeNonSkippable {
509 1 : i.maybeZeroSeqnum(origSnapshotIdx)
510 1 : }
511 1 : return &i.key, i.value
512 : }
513 0 : if i.err != nil {
514 0 : i.valid = false
515 0 : i.err = base.MarkCorruptionError(i.err)
516 0 : }
517 0 : return nil, nil
518 :
519 0 : default:
520 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
521 0 : i.valid = false
522 0 : return nil, nil
523 : }
524 : }
525 :
526 1 : return nil, nil
527 : }
528 :
529 1 : func (i *compactionIter) closeValueCloser() error {
530 1 : if i.valueCloser == nil {
531 1 : return nil
532 1 : }
533 :
534 0 : i.err = i.valueCloser.Close()
535 0 : i.valueCloser = nil
536 0 : if i.err != nil {
537 0 : i.valid = false
538 0 : }
539 0 : return i.err
540 : }
541 :
542 : // snapshotIndex returns the index of the first sequence number in snapshots
543 : // which is greater than or equal to seq.
544 1 : func snapshotIndex(seq uint64, snapshots []uint64) (int, uint64) {
545 1 : index := sort.Search(len(snapshots), func(i int) bool {
546 1 : return snapshots[i] > seq
547 1 : })
548 1 : if index >= len(snapshots) {
549 1 : return index, InternalKeySeqNumMax
550 1 : }
551 1 : return index, snapshots[index]
552 : }
553 :
554 : // skipInStripe skips over skippable keys in the same stripe and user key.
555 1 : func (i *compactionIter) skipInStripe() {
556 1 : i.skip = true
557 1 : for i.nextInStripe() == sameStripeSkippable {
558 1 : }
559 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
560 : // in the same stripe on a non-skippable key. In that case we should preserve
561 : // `i.skip == true` such that later keys in the stripe will continue to be
562 : // skipped.
563 1 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
564 1 : i.skip = false
565 1 : }
566 : }
567 :
568 1 : func (i *compactionIter) iterNext() bool {
569 1 : var iterValue LazyValue
570 1 : i.iterKey, iterValue = i.iter.Next()
571 1 : i.iterValue, _, i.err = iterValue.Value(nil)
572 1 : if i.err != nil {
573 0 : i.iterKey = nil
574 0 : }
575 1 : return i.iterKey != nil
576 : }
577 :
578 : // stripeChangeType indicates how the snapshot stripe changed relative to the
579 : // previous key. If no change, it also indicates whether the current entry is
580 : // skippable. If the snapshot stripe changed, it also indicates whether the new
581 : // stripe was entered because the iterator progressed onto an entirely new key
582 : // or entered a new stripe within the same key.
583 : type stripeChangeType int
584 :
585 : const (
586 : newStripeNewKey stripeChangeType = iota
587 : newStripeSameKey
588 : sameStripeSkippable
589 : sameStripeNonSkippable
590 : )
591 :
592 : // nextInStripe advances the iterator and returns one of the above const ints
593 : // indicating how its state changed.
594 : //
595 : // Calls to nextInStripe must be preceded by a call to saveKey to retain a
596 : // temporary reference to the original key, so that forward iteration can
597 : // proceed with a reference to the original key. Care should be taken to avoid
598 : // overwriting or mutating the saved key or value before they have been returned
599 : // to the caller of the exported function (i.e. the caller of Next, First, etc.)
600 1 : func (i *compactionIter) nextInStripe() stripeChangeType {
601 1 : i.iterStripeChange = i.nextInStripeHelper()
602 1 : return i.iterStripeChange
603 1 : }
604 :
605 : // nextInStripeHelper is an internal helper for nextInStripe; callers should use
606 : // nextInStripe and not call nextInStripeHelper.
607 1 : func (i *compactionIter) nextInStripeHelper() stripeChangeType {
608 1 : if !i.iterNext() {
609 1 : return newStripeNewKey
610 1 : }
611 1 : key := i.iterKey
612 1 :
613 1 : // NB: The below conditional is an optimization to avoid a user key
614 1 : // comparison in many cases. Internal keys with the same user key are
615 1 : // ordered in (strictly) descending order by trailer. If the new key has a
616 1 : // greater or equal trailer, or the previous key had a zero sequence number,
617 1 : // the new key must have a new user key.
618 1 : //
619 1 : // A couple things make these cases common:
620 1 : // - Sequence-number zeroing ensures ~all of the keys in L6 have a zero
621 1 : // sequence number.
622 1 : // - Ingested sstables' keys all adopt the same sequence number.
623 1 : if i.keyTrailer <= base.InternalKeyZeroSeqnumMaxTrailer || key.Trailer >= i.keyTrailer {
624 1 : if invariants.Enabled && i.equal(i.key.UserKey, key.UserKey) {
625 0 : prevKey := i.key
626 0 : prevKey.Trailer = i.keyTrailer
627 0 : panic(fmt.Sprintf("pebble: invariant violation: %s and %s out of order", key, prevKey))
628 : }
629 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
630 1 : return newStripeNewKey
631 1 : } else if !i.equal(i.key.UserKey, key.UserKey) {
632 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
633 1 : return newStripeNewKey
634 1 : }
635 1 : origSnapshotIdx := i.curSnapshotIdx
636 1 : i.curSnapshotIdx, i.curSnapshotSeqNum = snapshotIndex(key.SeqNum(), i.snapshots)
637 1 : switch key.Kind() {
638 1 : case InternalKeyKindRangeDelete:
639 1 : // Range tombstones need to be exposed by the compactionIter to the upper level
640 1 : // `compaction` object, so return them regardless of whether they are in the same
641 1 : // snapshot stripe.
642 1 : if i.curSnapshotIdx == origSnapshotIdx {
643 1 : return sameStripeNonSkippable
644 1 : }
645 1 : return newStripeSameKey
646 0 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
647 0 : // Range keys are interleaved at the max sequence number for a given user
648 0 : // key, so we should not see any more range keys in this stripe.
649 0 : panic("unreachable")
650 0 : case InternalKeyKindInvalid:
651 0 : if i.curSnapshotIdx == origSnapshotIdx {
652 0 : return sameStripeNonSkippable
653 0 : }
654 0 : return newStripeSameKey
655 : }
656 1 : if i.curSnapshotIdx == origSnapshotIdx {
657 1 : return sameStripeSkippable
658 1 : }
659 1 : return newStripeSameKey
660 : }
661 :
662 1 : func (i *compactionIter) setNext() {
663 1 : // Save the current key.
664 1 : i.saveKey()
665 1 : i.value = i.iterValue
666 1 : i.valid = true
667 1 : i.maybeZeroSeqnum(i.curSnapshotIdx)
668 1 :
669 1 : // There are two cases where we can early return and skip the remaining
670 1 : // records in the stripe:
671 1 : // - If the DB does not SETWITHDEL.
672 1 : // - If this key is already a SETWITHDEL.
673 1 : if i.formatVersion < FormatSetWithDelete ||
674 1 : i.iterKey.Kind() == InternalKeyKindSetWithDelete {
675 1 : i.skip = true
676 1 : return
677 1 : }
678 :
679 : // We are iterating forward. Save the current value.
680 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
681 1 : i.value = i.valueBuf
682 1 :
683 1 : // Else, we continue to loop through entries in the stripe looking for a
684 1 : // DEL. Note that we may stop *before* encountering a DEL, if one exists.
685 1 : for {
686 1 : switch i.nextInStripe() {
687 1 : case newStripeNewKey, newStripeSameKey:
688 1 : i.pos = iterPosNext
689 1 : return
690 1 : case sameStripeNonSkippable:
691 1 : i.pos = iterPosNext
692 1 : // We iterated onto a key that we cannot skip. We can
693 1 : // conservatively transform the original SET into a SETWITHDEL
694 1 : // as an indication that there *may* still be a DEL/SINGLEDEL
695 1 : // under this SET, even if we did not actually encounter one.
696 1 : //
697 1 : // This is safe to do, as:
698 1 : //
699 1 : // - in the case that there *is not* actually a DEL/SINGLEDEL
700 1 : // under this entry, any SINGLEDEL above this now-transformed
701 1 : // SETWITHDEL will become a DEL when the two encounter in a
702 1 : // compaction. The DEL will eventually be elided in a
703 1 : // subsequent compaction. The cost for ensuring correctness is
704 1 : // that this entry is kept around for an additional compaction
705 1 : // cycle(s).
706 1 : //
707 1 : // - in the case there *is* indeed a DEL/SINGLEDEL under us
708 1 : // (but in a different stripe or sstable), then we will have
709 1 : // already done the work to transform the SET into a
710 1 : // SETWITHDEL, and we will skip any additional iteration when
711 1 : // this entry is encountered again in a subsequent compaction.
712 1 : //
713 1 : // Ideally, this codepath would be smart enough to handle the
714 1 : // case of SET <- RANGEDEL <- ... <- DEL/SINGLEDEL <- ....
715 1 : // This requires preserving any RANGEDEL entries we encounter
716 1 : // along the way, then emitting the original (possibly
717 1 : // transformed) key, followed by the RANGEDELs. This requires
718 1 : // a sizable refactoring of the existing code, as nextInStripe
719 1 : // currently returns a sameStripeNonSkippable when it
720 1 : // encounters a RANGEDEL.
721 1 : // TODO(travers): optimize to handle the RANGEDEL case if it
722 1 : // turns out to be a performance problem.
723 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
724 1 :
725 1 : // By setting i.skip=true, we are saying that after the
726 1 : // non-skippable key is emitted (which is likely a RANGEDEL),
727 1 : // the remaining point keys that share the same user key as this
728 1 : // saved key should be skipped.
729 1 : i.skip = true
730 1 : return
731 1 : case sameStripeSkippable:
732 1 : // We're still in the same stripe. If this is a
733 1 : // DEL/SINGLEDEL/DELSIZED, we stop looking and emit a SETWITHDEL.
734 1 : // Subsequent keys are eligible for skipping.
735 1 : if i.iterKey.Kind() == InternalKeyKindDelete ||
736 1 : i.iterKey.Kind() == InternalKeyKindSingleDelete ||
737 1 : i.iterKey.Kind() == InternalKeyKindDeleteSized {
738 1 : i.key.SetKind(InternalKeyKindSetWithDelete)
739 1 : i.skip = true
740 1 : return
741 1 : }
742 0 : default:
743 0 : panic("pebble: unexpected stripeChangeType: " + strconv.Itoa(int(i.iterStripeChange)))
744 : }
745 : }
746 : }
747 :
748 1 : func (i *compactionIter) mergeNext(valueMerger ValueMerger) stripeChangeType {
749 1 : // Save the current key.
750 1 : i.saveKey()
751 1 : i.valid = true
752 1 :
753 1 : // Loop looking for older values in the current snapshot stripe and merge
754 1 : // them.
755 1 : for {
756 1 : if i.nextInStripe() != sameStripeSkippable {
757 1 : i.pos = iterPosNext
758 1 : return i.iterStripeChange
759 1 : }
760 1 : key := i.iterKey
761 1 : switch key.Kind() {
762 1 : case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
763 1 : // We've hit a deletion tombstone. Return everything up to this point and
764 1 : // then skip entries until the next snapshot stripe. We change the kind
765 1 : // of the result key to a Set so that it shadows keys in lower
766 1 : // levels. That is, MERGE+DEL -> SET.
767 1 : // We do the same for SingleDelete since SingleDelete is only
768 1 : // permitted (with deterministic behavior) for keys that have been
769 1 : // set once since the last SingleDelete/Delete, so everything
770 1 : // older is acceptable to shadow. Note that this is slightly
771 1 : // different from singleDeleteNext() which implements stricter
772 1 : // semantics in terms of applying the SingleDelete to the single
773 1 : // next Set. But those stricter semantics are not observable to
774 1 : // the end-user since Iterator interprets SingleDelete as Delete.
775 1 : // We could do something more complicated here and consume only a
776 1 : // single Set, and then merge in any following Sets, but that is
777 1 : // complicated wrt code and unnecessary given the narrow permitted
778 1 : // use of SingleDelete.
779 1 : i.key.SetKind(InternalKeyKindSet)
780 1 : i.skip = true
781 1 : return sameStripeSkippable
782 :
783 1 : case InternalKeyKindSet, InternalKeyKindSetWithDelete:
784 1 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
785 1 : // We change the kind of the result key to a Set so that it shadows
786 1 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
787 1 : // strictly necessary, but provides consistency with the behavior of
788 1 : // MERGE+DEL.
789 1 : i.key.SetKind(InternalKeyKindSet)
790 1 : i.skip = true
791 1 : return sameStripeSkippable
792 1 : }
793 :
794 : // We've hit a Set or SetWithDel value. Merge with the existing
795 : // value and return. We change the kind of the resulting key to a
796 : // Set so that it shadows keys in lower levels. That is:
797 : // MERGE + (SET*) -> SET.
798 1 : i.err = valueMerger.MergeOlder(i.iterValue)
799 1 : if i.err != nil {
800 0 : i.valid = false
801 0 : return sameStripeSkippable
802 0 : }
803 1 : i.key.SetKind(InternalKeyKindSet)
804 1 : i.skip = true
805 1 : return sameStripeSkippable
806 :
807 1 : case InternalKeyKindMerge:
808 1 : if i.rangeDelFrag.Covers(*key, i.curSnapshotSeqNum) == keyspan.CoversVisibly {
809 1 : // We change the kind of the result key to a Set so that it shadows
810 1 : // keys in lower levels. That is, MERGE+RANGEDEL -> SET. This isn't
811 1 : // strictly necessary, but provides consistency with the behavior of
812 1 : // MERGE+DEL.
813 1 : i.key.SetKind(InternalKeyKindSet)
814 1 : i.skip = true
815 1 : return sameStripeSkippable
816 1 : }
817 :
818 : // We've hit another Merge value. Merge with the existing value and
819 : // continue looping.
820 1 : i.err = valueMerger.MergeOlder(i.iterValue)
821 1 : if i.err != nil {
822 0 : i.valid = false
823 0 : return sameStripeSkippable
824 0 : }
825 :
826 0 : default:
827 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
828 0 : i.valid = false
829 0 : return sameStripeSkippable
830 : }
831 : }
832 : }
833 :
834 1 : func (i *compactionIter) singleDeleteNext() bool {
835 1 : // Save the current key.
836 1 : i.saveKey()
837 1 : i.value = i.iterValue
838 1 : i.valid = true
839 1 :
840 1 : // Loop until finds a key to be passed to the next level.
841 1 : for {
842 1 : if i.nextInStripe() != sameStripeSkippable {
843 1 : i.pos = iterPosNext
844 1 : return true
845 1 : }
846 :
847 1 : key := i.iterKey
848 1 : switch key.Kind() {
849 1 : case InternalKeyKindDelete, InternalKeyKindMerge, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
850 1 : // We've hit a Delete, DeleteSized, Merge, SetWithDelete, transform
851 1 : // the SingleDelete into a full Delete.
852 1 : i.key.SetKind(InternalKeyKindDelete)
853 1 : i.skip = true
854 1 : return true
855 :
856 1 : case InternalKeyKindSet:
857 1 : i.nextInStripe()
858 1 : i.valid = false
859 1 : return false
860 :
861 1 : case InternalKeyKindSingleDelete:
862 1 : continue
863 :
864 0 : default:
865 0 : i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKey.Kind()))
866 0 : i.valid = false
867 0 : return false
868 : }
869 : }
870 : }
871 :
872 : // deleteSizedNext processes a DELSIZED point tombstone. Unlike ordinary DELs,
873 : // these tombstones carry a value that's a varint indicating the size of the
874 : // entry (len(key)+len(value)) that the tombstone is expected to delete.
875 : //
876 : // When a deleteSizedNext is encountered, we skip ahead to see which keys, if
877 : // any, are elided as a result of the tombstone.
878 1 : func (i *compactionIter) deleteSizedNext() (*base.InternalKey, []byte) {
879 1 : i.saveKey()
880 1 : i.valid = true
881 1 : i.skip = true
882 1 :
883 1 : // The DELSIZED tombstone may have no value at all. This happens when the
884 1 : // tombstone has already deleted the key that the user originally predicted.
885 1 : // In this case, we still peek forward in case there's another DELSIZED key
886 1 : // with a lower sequence number, in which case we'll adopt its value.
887 1 : if len(i.iterValue) == 0 {
888 1 : i.value = i.valueBuf[:0]
889 1 : } else {
890 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
891 1 : i.value = i.valueBuf
892 1 : }
893 :
894 : // Loop through all the keys within this stripe that are skippable.
895 1 : i.pos = iterPosNext
896 1 : for i.nextInStripe() == sameStripeSkippable {
897 1 : switch i.iterKey.Kind() {
898 1 : case InternalKeyKindDelete, InternalKeyKindDeleteSized:
899 1 : // We encountered a tombstone (DEL, or DELSIZED) that's deleted by
900 1 : // the original DELSIZED tombstone. This can happen in two cases:
901 1 : //
902 1 : // (1) These tombstones were intended to delete two distinct values,
903 1 : // and this DELSIZED has already dropped the relevant key. For
904 1 : // example:
905 1 : //
906 1 : // a.DELSIZED.9 a.SET.7 a.DELSIZED.5 a.SET.4
907 1 : //
908 1 : // If a.DELSIZED.9 has already deleted a.SET.7, its size has
909 1 : // already been zeroed out. In this case, we want to adopt the
910 1 : // value of the DELSIZED with the lower sequence number, in
911 1 : // case the a.SET.4 key has not yet been elided.
912 1 : //
913 1 : // (2) This DELSIZED was missized. The user thought they were
914 1 : // deleting a key with this user key, but this user key had
915 1 : // already been deleted.
916 1 : //
917 1 : // We can differentiate these two cases by examining the length of
918 1 : // the DELSIZED's value. A DELSIZED's value holds the size of both
919 1 : // the user key and value that it intends to delete. For any user
920 1 : // key with a length > 1, a DELSIZED that has not deleted a key must
921 1 : // have a value with a length > 1.
922 1 : //
923 1 : // We treat both cases the same functionally, adopting the identity
924 1 : // of the lower-sequence numbered tombstone. However in the second
925 1 : // case, we also increment the stat counting missized tombstones.
926 1 : if len(i.value) > 0 {
927 1 : // The original DELSIZED key was missized. The key that the user
928 1 : // thought they were deleting does not exist.
929 1 : i.stats.countMissizedDels++
930 1 : }
931 1 : i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
932 1 : i.value = i.valueBuf
933 1 : if i.iterKey.Kind() == InternalKeyKindDelete {
934 1 : // Convert the DELSIZED to a DEL—The DEL we're eliding may not
935 1 : // have deleted the key(s) it was intended to yet. The ordinary
936 1 : // DEL compaction heuristics are better suited at that, plus we
937 1 : // don't want to count it as a missized DEL. We early exit in
938 1 : // this case, after skipping the remainder of the snapshot
939 1 : // stripe.
940 1 : i.key.SetKind(i.iterKey.Kind())
941 1 : i.skipInStripe()
942 1 : return &i.key, i.value
943 1 : }
944 : // Continue, in case we uncover another DELSIZED or a key this
945 : // DELSIZED deletes.
946 1 : default:
947 1 : // If the DELSIZED is value-less, it already deleted the key that it
948 1 : // was intended to delete. This is possible with a sequence like:
949 1 : //
950 1 : // DELSIZED.8 SET.7 SET.3
951 1 : //
952 1 : // The DELSIZED only describes the size of the SET.7, which in this
953 1 : // case has already been elided. We don't count it as a missizing,
954 1 : // instead converting the DELSIZED to a DEL. Skip the remainder of
955 1 : // the snapshot stripe and return.
956 1 : if len(i.value) == 0 {
957 1 : i.key.SetKind(InternalKeyKindDelete)
958 1 : i.skipInStripe()
959 1 : return &i.key, i.value
960 1 : }
961 : // The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
962 : // has a positive size.
963 1 : expectedSize, n := binary.Uvarint(i.value)
964 1 : if n != len(i.value) {
965 0 : i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
966 0 : i.valid = false
967 0 : return nil, nil
968 0 : }
969 1 : elidedSize := uint64(len(i.iterKey.UserKey)) + uint64(len(i.iterValue))
970 1 : if elidedSize != expectedSize {
971 1 : // The original DELSIZED key was missized. It's unclear what to
972 1 : // do. The user-provided size was wrong, so it's unlikely to be
973 1 : // accurate or meaningful. We could:
974 1 : //
975 1 : // 1. return the DELSIZED with the original user-provided size unmodified
976 1 : // 2. return the DELZIZED with a zeroed size to reflect that a key was
977 1 : // elided, even if it wasn't the anticipated size.
978 1 : // 3. subtract the elided size from the estimate and re-encode.
979 1 : // 4. convert the DELSIZED into a value-less DEL, so that
980 1 : // ordinary DEL heuristics apply.
981 1 : //
982 1 : // We opt for (4) under the rationale that we can't rely on the
983 1 : // user-provided size for accuracy, so ordinary DEL heuristics
984 1 : // are safer.
985 1 : i.key.SetKind(InternalKeyKindDelete)
986 1 : i.stats.countMissizedDels++
987 1 : }
988 : // NB: We remove the value regardless of whether the key was sized
989 : // appropriately. The size encoded is 'consumed' the first time it
990 : // meets a key that it deletes.
991 1 : i.value = i.valueBuf[:0]
992 : }
993 : }
994 : // Reset skip if we landed outside the original stripe. Otherwise, we landed
995 : // in the same stripe on a non-skippable key. In that case we should preserve
996 : // `i.skip == true` such that later keys in the stripe will continue to be
997 : // skipped.
998 1 : if i.iterStripeChange == newStripeNewKey || i.iterStripeChange == newStripeSameKey {
999 1 : i.skip = false
1000 1 : }
1001 1 : return &i.key, i.value
1002 : }
1003 :
1004 1 : func (i *compactionIter) saveKey() {
1005 1 : i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
1006 1 : i.key.UserKey = i.keyBuf
1007 1 : i.key.Trailer = i.iterKey.Trailer
1008 1 : i.keyTrailer = i.iterKey.Trailer
1009 1 : i.frontiers.Advance(i.key.UserKey)
1010 1 : }
1011 :
1012 1 : func (i *compactionIter) cloneKey(key []byte) []byte {
1013 1 : i.alloc, key = i.alloc.Copy(key)
1014 1 : return key
1015 1 : }
1016 :
1017 0 : func (i *compactionIter) Key() InternalKey {
1018 0 : return i.key
1019 0 : }
1020 :
1021 0 : func (i *compactionIter) Value() []byte {
1022 0 : return i.value
1023 0 : }
1024 :
1025 0 : func (i *compactionIter) Valid() bool {
1026 0 : return i.valid
1027 0 : }
1028 :
1029 0 : func (i *compactionIter) Error() error {
1030 0 : return i.err
1031 0 : }
1032 :
1033 1 : func (i *compactionIter) Close() error {
1034 1 : err := i.iter.Close()
1035 1 : if i.err == nil {
1036 1 : i.err = err
1037 1 : }
1038 :
1039 : // Close the closer for the current value if one was open.
1040 1 : if i.valueCloser != nil {
1041 0 : i.err = firstError(i.err, i.valueCloser.Close())
1042 0 : i.valueCloser = nil
1043 0 : }
1044 :
1045 1 : return i.err
1046 : }
1047 :
1048 : // Tombstones returns a list of pending range tombstones in the fragmenter
1049 : // up to the specified key, or all pending range tombstones if key = nil.
1050 1 : func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
1051 1 : if key == nil {
1052 1 : i.rangeDelFrag.Finish()
1053 1 : } else {
1054 1 : // The specified end key is exclusive; no versions of the specified
1055 1 : // user key (including range tombstones covering that key) should
1056 1 : // be flushed yet.
1057 1 : i.rangeDelFrag.TruncateAndFlushTo(key)
1058 1 : }
1059 1 : tombstones := i.tombstones
1060 1 : i.tombstones = nil
1061 1 : return tombstones
1062 : }
1063 :
1064 : // RangeKeys returns a list of pending fragmented range keys up to the specified
1065 : // key, or all pending range keys if key = nil.
1066 1 : func (i *compactionIter) RangeKeys(key []byte) []keyspan.Span {
1067 1 : if key == nil {
1068 1 : i.rangeKeyFrag.Finish()
1069 1 : } else {
1070 1 : // The specified end key is exclusive; no versions of the specified
1071 1 : // user key (including range tombstones covering that key) should
1072 1 : // be flushed yet.
1073 1 : i.rangeKeyFrag.TruncateAndFlushTo(key)
1074 1 : }
1075 1 : rangeKeys := i.rangeKeys
1076 1 : i.rangeKeys = nil
1077 1 : return rangeKeys
1078 : }
1079 :
1080 1 : func (i *compactionIter) emitRangeDelChunk(fragmented keyspan.Span) {
1081 1 : // Apply the snapshot stripe rules, keeping only the latest tombstone for
1082 1 : // each snapshot stripe.
1083 1 : currentIdx := -1
1084 1 : keys := fragmented.Keys[:0]
1085 1 : for _, k := range fragmented.Keys {
1086 1 : idx, _ := snapshotIndex(k.SeqNum(), i.snapshots)
1087 1 : if currentIdx == idx {
1088 1 : continue
1089 : }
1090 1 : if idx == 0 && i.elideRangeTombstone(fragmented.Start, fragmented.End) {
1091 1 : // This is the last snapshot stripe and the range tombstone
1092 1 : // can be elided.
1093 1 : break
1094 : }
1095 :
1096 1 : keys = append(keys, k)
1097 1 : if idx == 0 {
1098 1 : // This is the last snapshot stripe.
1099 1 : break
1100 : }
1101 1 : currentIdx = idx
1102 : }
1103 1 : if len(keys) > 0 {
1104 1 : i.tombstones = append(i.tombstones, keyspan.Span{
1105 1 : Start: fragmented.Start,
1106 1 : End: fragmented.End,
1107 1 : Keys: keys,
1108 1 : })
1109 1 : }
1110 : }
1111 :
1112 1 : func (i *compactionIter) emitRangeKeyChunk(fragmented keyspan.Span) {
1113 1 : // Elision of snapshot stripes happens in rangeKeyCompactionTransform, so no need to
1114 1 : // do that here.
1115 1 : if len(fragmented.Keys) > 0 {
1116 1 : i.rangeKeys = append(i.rangeKeys, fragmented)
1117 1 : }
1118 : }
1119 :
1120 : // maybeZeroSeqnum attempts to set the seqnum for the current key to 0. Doing
1121 : // so improves compression and enables an optimization during forward iteration
1122 : // to skip some key comparisons. The seqnum for an entry can be zeroed if the
1123 : // entry is on the bottom snapshot stripe and on the bottom level of the LSM.
1124 1 : func (i *compactionIter) maybeZeroSeqnum(snapshotIdx int) {
1125 1 : if !i.allowZeroSeqNum {
1126 1 : // TODO(peter): allowZeroSeqNum applies to the entire compaction. We could
1127 1 : // make the determination on a key by key basis, similar to what is done
1128 1 : // for elideTombstone. Need to add a benchmark for compactionIter to verify
1129 1 : // that isn't too expensive.
1130 1 : return
1131 1 : }
1132 1 : if snapshotIdx > 0 {
1133 1 : // This is not the last snapshot
1134 1 : return
1135 1 : }
1136 1 : i.key.SetSeqNum(base.SeqNumZero)
1137 : }
1138 :
1139 : // A frontier is used to monitor a compaction's progression across the user
1140 : // keyspace.
1141 : //
1142 : // A frontier hold a user key boundary that it's concerned with in its `key`
1143 : // field. If/when the compaction iterator returns an InternalKey with a user key
1144 : // _k_ such that k ≥ frontier.key, the compaction iterator invokes the
1145 : // frontier's `reached` function, passing _k_ as its argument.
1146 : //
1147 : // The `reached` function returns a new value to use as the key. If `reached`
1148 : // returns nil, the frontier is forgotten and its `reached` method will not be
1149 : // invoked again, unless the user calls [Update] to set a new key.
1150 : //
1151 : // A frontier's key may be updated outside the context of a `reached`
1152 : // invocation at any time, through its Update method.
1153 : type frontier struct {
1154 : // container points to the containing *frontiers that was passed to Init
1155 : // when the frontier was initialized.
1156 : container *frontiers
1157 :
1158 : // key holds the frontier's current key. If nil, this frontier is inactive
1159 : // and its reached func will not be invoked. The value of this key may only
1160 : // be updated by the `frontiers` type, or the Update method.
1161 : key []byte
1162 :
1163 : // reached is invoked to inform a frontier that its key has been reached.
1164 : // It's invoked with the user key that reached the limit. The `key` argument
1165 : // is guaranteed to be ≥ the frontier's key.
1166 : //
1167 : // After reached is invoked, the frontier's key is updated to the return
1168 : // value of `reached`. Note bene, the frontier is permitted to update its
1169 : // key to a user key ≤ the argument `key`.
1170 : //
1171 : // If a frontier is set to key k1, and reached(k2) is invoked (k2 ≥ k1), the
1172 : // frontier will receive reached(k2) calls until it returns nil or a key
1173 : // `k3` such that k2 < k3. This property is useful for frontiers that use
1174 : // `reached` invocations to drive iteration through collections of keys that
1175 : // may contain multiple keys that are both < k2 and ≥ k1.
1176 : reached func(key []byte) (next []byte)
1177 : }
1178 :
1179 : // Init initializes the frontier with the provided key and reached callback.
1180 : // The frontier is attached to the provided *frontiers and the provided reached
1181 : // func will be invoked when the *frontiers is advanced to a key ≥ this
1182 : // frontier's key.
1183 : func (f *frontier) Init(
1184 : frontiers *frontiers, initialKey []byte, reached func(key []byte) (next []byte),
1185 1 : ) {
1186 1 : *f = frontier{
1187 1 : container: frontiers,
1188 1 : key: initialKey,
1189 1 : reached: reached,
1190 1 : }
1191 1 : if initialKey != nil {
1192 1 : f.container.push(f)
1193 1 : }
1194 : }
1195 :
1196 : // String implements fmt.Stringer.
1197 0 : func (f *frontier) String() string {
1198 0 : return string(f.key)
1199 0 : }
1200 :
1201 : // Update replaces the existing frontier's key with the provided key. The
1202 : // frontier's reached func will be invoked when the new key is reached.
1203 1 : func (f *frontier) Update(key []byte) {
1204 1 : c := f.container
1205 1 : prevKeyIsNil := f.key == nil
1206 1 : f.key = key
1207 1 : if prevKeyIsNil {
1208 1 : if key != nil {
1209 1 : c.push(f)
1210 1 : }
1211 1 : return
1212 : }
1213 :
1214 : // Find the frontier within the heap (it must exist within the heap because
1215 : // f.key was != nil). If the frontier key is now nil, remove it from the
1216 : // heap. Otherwise, fix up its position.
1217 1 : for i := 0; i < len(c.items); i++ {
1218 1 : if c.items[i] == f {
1219 1 : if key != nil {
1220 1 : c.fix(i)
1221 1 : } else {
1222 1 : n := c.len() - 1
1223 1 : c.swap(i, n)
1224 1 : c.down(i, n)
1225 1 : c.items = c.items[:n]
1226 1 : }
1227 1 : return
1228 : }
1229 : }
1230 0 : panic("unreachable")
1231 : }
1232 :
1233 : // frontiers is used to track progression of a task (eg, compaction) across the
1234 : // keyspace. Clients that want to be informed when the task advances to a key ≥
1235 : // some frontier may register a frontier, providing a callback. The task calls
1236 : // `Advance(k)` with each user key encountered, which invokes the `reached` func
1237 : // on all tracked frontiers with `key`s ≤ k.
1238 : //
1239 : // Internally, frontiers is implemented as a simple heap.
1240 : type frontiers struct {
1241 : cmp Compare
1242 : items []*frontier
1243 : }
1244 :
1245 : // String implements fmt.Stringer.
1246 0 : func (f *frontiers) String() string {
1247 0 : var buf bytes.Buffer
1248 0 : for i := 0; i < len(f.items); i++ {
1249 0 : if i > 0 {
1250 0 : fmt.Fprint(&buf, ", ")
1251 0 : }
1252 0 : fmt.Fprintf(&buf, "%s: %q", f.items[i], f.items[i].key)
1253 : }
1254 0 : return buf.String()
1255 : }
1256 :
1257 : // Advance notifies all member frontiers with keys ≤ k.
1258 1 : func (f *frontiers) Advance(k []byte) {
1259 1 : for len(f.items) > 0 && f.cmp(k, f.items[0].key) >= 0 {
1260 1 : // This frontier has been reached. Invoke the closure and update with
1261 1 : // the next frontier.
1262 1 : f.items[0].key = f.items[0].reached(k)
1263 1 : if f.items[0].key == nil {
1264 1 : // This was the final frontier that this user was concerned with.
1265 1 : // Remove it from the heap.
1266 1 : f.pop()
1267 1 : } else {
1268 1 : // Fix up the heap root.
1269 1 : f.fix(0)
1270 1 : }
1271 : }
1272 : }
1273 :
1274 1 : func (f *frontiers) len() int {
1275 1 : return len(f.items)
1276 1 : }
1277 :
1278 1 : func (f *frontiers) less(i, j int) bool {
1279 1 : return f.cmp(f.items[i].key, f.items[j].key) < 0
1280 1 : }
1281 :
1282 1 : func (f *frontiers) swap(i, j int) {
1283 1 : f.items[i], f.items[j] = f.items[j], f.items[i]
1284 1 : }
1285 :
1286 : // fix, up and down are copied from the go stdlib.
1287 :
1288 1 : func (f *frontiers) fix(i int) {
1289 1 : if !f.down(i, f.len()) {
1290 1 : f.up(i)
1291 1 : }
1292 : }
1293 :
1294 1 : func (f *frontiers) push(ff *frontier) {
1295 1 : n := len(f.items)
1296 1 : f.items = append(f.items, ff)
1297 1 : f.up(n)
1298 1 : }
1299 :
1300 1 : func (f *frontiers) pop() *frontier {
1301 1 : n := f.len() - 1
1302 1 : f.swap(0, n)
1303 1 : f.down(0, n)
1304 1 : item := f.items[n]
1305 1 : f.items = f.items[:n]
1306 1 : return item
1307 1 : }
1308 :
1309 1 : func (f *frontiers) up(j int) {
1310 1 : for {
1311 1 : i := (j - 1) / 2 // parent
1312 1 : if i == j || !f.less(j, i) {
1313 1 : break
1314 : }
1315 1 : f.swap(i, j)
1316 1 : j = i
1317 : }
1318 : }
1319 :
1320 1 : func (f *frontiers) down(i0, n int) bool {
1321 1 : i := i0
1322 1 : for {
1323 1 : j1 := 2*i + 1
1324 1 : if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
1325 1 : break
1326 : }
1327 1 : j := j1 // left child
1328 1 : if j2 := j1 + 1; j2 < n && f.less(j2, j1) {
1329 1 : j = j2 // = 2*i + 2 // right child
1330 1 : }
1331 1 : if !f.less(j, i) {
1332 1 : break
1333 : }
1334 1 : f.swap(i, j)
1335 1 : i = j
1336 : }
1337 1 : return i > i0
1338 : }
|