Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "fmt"
9 : "sort"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/invariants"
13 : )
14 :
15 : type spansByStartKey struct {
16 : cmp base.Compare
17 : buf []Span
18 : }
19 :
20 1 : func (v *spansByStartKey) Len() int { return len(v.buf) }
21 1 : func (v *spansByStartKey) Less(i, j int) bool {
22 1 : return v.cmp(v.buf[i].Start, v.buf[j].Start) < 0
23 1 : }
24 1 : func (v *spansByStartKey) Swap(i, j int) {
25 1 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
26 1 : }
27 :
28 : type spansByEndKey struct {
29 : cmp base.Compare
30 : buf []Span
31 : }
32 :
33 2 : func (v *spansByEndKey) Len() int { return len(v.buf) }
34 2 : func (v *spansByEndKey) Less(i, j int) bool {
35 2 : return v.cmp(v.buf[i].End, v.buf[j].End) < 0
36 2 : }
37 2 : func (v *spansByEndKey) Swap(i, j int) {
38 2 : v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
39 2 : }
40 :
41 : // keysBySeqNumKind sorts spans by the start key's sequence number in
42 : // descending order. If two spans have equal sequence number, they're compared
43 : // by key kind in descending order. This ordering matches the ordering of
44 : // base.InternalCompare among keys with matching user keys.
45 : type keysBySeqNumKind []Key
46 :
47 2 : func (v *keysBySeqNumKind) Len() int { return len(*v) }
48 2 : func (v *keysBySeqNumKind) Less(i, j int) bool { return (*v)[i].Trailer > (*v)[j].Trailer }
49 2 : func (v *keysBySeqNumKind) Swap(i, j int) { (*v)[i], (*v)[j] = (*v)[j], (*v)[i] }
50 :
51 : // Sort the spans by start key. This is the ordering required by the
52 : // Fragmenter. Usually spans are naturally sorted by their start key,
53 : // but that isn't true for range deletion tombstones in the legacy
54 : // range-del-v1 block format.
55 1 : func Sort(cmp base.Compare, spans []Span) {
56 1 : sorter := spansByStartKey{
57 1 : cmp: cmp,
58 1 : buf: spans,
59 1 : }
60 1 : sort.Sort(&sorter)
61 1 : }
62 :
63 : // Fragmenter fragments a set of spans such that overlapping spans are
64 : // split at their overlap points. The fragmented spans are output to the
65 : // supplied Output function.
66 : type Fragmenter struct {
67 : Cmp base.Compare
68 : Format base.FormatKey
69 : // Emit is called to emit a fragmented span and its keys. Every key defined
70 : // within the emitted Span applies to the entirety of the Span's key span.
71 : // Keys are ordered in decreasing order of their sequence numbers, and if
72 : // equal, decreasing order of key kind.
73 : Emit func(Span)
74 : // pending contains the list of pending fragments that have not been
75 : // flushed to the block writer. Note that the spans have not been
76 : // fragmented on the end keys yet. That happens as the spans are
77 : // flushed. All pending spans have the same Start.
78 : pending []Span
79 : // doneBuf is used to buffer completed span fragments when flushing to a
80 : // specific key (e.g. TruncateAndFlushTo). It is cached in the Fragmenter to
81 : // allow reuse.
82 : doneBuf []Span
83 : // sortBuf is used to sort fragments by end key when flushing.
84 : sortBuf spansByEndKey
85 : // flushBuf is used to sort keys by (seqnum,kind) before emitting.
86 : flushBuf keysBySeqNumKind
87 : // flushedKey is the key that fragments have been flushed up to. Any
88 : // additional spans added to the fragmenter must have a start key >=
89 : // flushedKey. A nil value indicates flushedKey has not been set.
90 : flushedKey []byte
91 : finished bool
92 : }
93 :
94 0 : func (f *Fragmenter) checkInvariants(buf []Span) {
95 0 : for i := 1; i < len(buf); i++ {
96 0 : if f.Cmp(buf[i].Start, buf[i].End) >= 0 {
97 0 : panic(fmt.Sprintf("pebble: empty pending span invariant violated: %s", buf[i]))
98 : }
99 0 : if f.Cmp(buf[i-1].Start, buf[i].Start) != 0 {
100 0 : panic(fmt.Sprintf("pebble: pending span invariant violated: %s %s",
101 0 : f.Format(buf[i-1].Start), f.Format(buf[i].Start)))
102 : }
103 : }
104 : }
105 :
106 : // Add adds a span to the fragmenter. Spans may overlap and the
107 : // fragmenter will internally split them. The spans must be presented in
108 : // increasing start key order. That is, Add must be called with a series
109 : // of spans like:
110 : //
111 : // a---e
112 : // c---g
113 : // c-----i
114 : // j---n
115 : // j-l
116 : //
117 : // We need to fragment the spans at overlap points. In the above
118 : // example, we'd create:
119 : //
120 : // a-c-e
121 : // c-e-g
122 : // c-e-g-i
123 : // j-l-n
124 : // j-l
125 : //
126 : // The fragments need to be output sorted by start key, and for equal start
127 : // keys, sorted by descending sequence number. This last part requires a mild
128 : // bit of care as the fragments are not created in descending sequence number
129 : // order.
130 : //
131 : // Once a start key has been seen, we know that we'll never see a smaller
132 : // start key and can thus flush all of the fragments that lie before that
133 : // start key.
134 : //
135 : // Walking through the example above, we start with:
136 : //
137 : // a---e
138 : //
139 : // Next we add [c,g) resulting in:
140 : //
141 : // a-c-e
142 : // c---g
143 : //
144 : // The fragment [a,c) is flushed leaving the pending spans as:
145 : //
146 : // c-e
147 : // c---g
148 : //
149 : // The next span is [c,i):
150 : //
151 : // c-e
152 : // c---g
153 : // c-----i
154 : //
155 : // No fragments are flushed. The next span is [j,n):
156 : //
157 : // c-e
158 : // c---g
159 : // c-----i
160 : // j---n
161 : //
162 : // The fragments [c,e), [c,g) and [c,i) are flushed. We sort these fragments
163 : // by their end key, then split the fragments on the end keys:
164 : //
165 : // c-e
166 : // c-e-g
167 : // c-e---i
168 : //
169 : // The [c,e) fragments all get flushed leaving:
170 : //
171 : // e-g
172 : // e---i
173 : //
174 : // This process continues until there are no more fragments to flush.
175 : //
176 : // WARNING: the slices backing Start, End, Keys, Key.Suffix and Key.Value are
177 : // all retained after this method returns and should not be modified. This is
178 : // safe for spans that are added from a memtable or batch. It is partially
179 : // unsafe for a span read from an sstable. Specifically, the Keys slice of a
180 : // Span returned during sstable iteration is only valid until the next iterator
181 : // operation. The stability of the user keys depend on whether the block is
182 : // prefix compressed, and in practice Pebble never prefix compresses range
183 : // deletion and range key blocks, so these keys are stable. Because of this key
184 : // stability, typically callers only need to perform a shallow clone of the Span
185 : // before Add-ing it to the fragmenter.
186 : //
187 : // Add requires the provided span's keys are sorted in Trailer descending order.
188 2 : func (f *Fragmenter) Add(s Span) {
189 2 : if f.finished {
190 0 : panic("pebble: span fragmenter already finished")
191 2 : } else if s.KeysOrder != ByTrailerDesc {
192 0 : panic("pebble: span keys unexpectedly not in trailer descending order")
193 : }
194 2 : if f.flushedKey != nil {
195 2 : switch c := f.Cmp(s.Start, f.flushedKey); {
196 1 : case c < 0:
197 1 : panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
198 1 : f.Format(s.Start), f.Format(f.flushedKey)))
199 : }
200 : }
201 2 : if f.Cmp(s.Start, s.End) >= 0 {
202 2 : // An empty span, we can ignore it.
203 2 : return
204 2 : }
205 2 : if invariants.RaceEnabled {
206 0 : f.checkInvariants(f.pending)
207 0 : defer func() { f.checkInvariants(f.pending) }()
208 : }
209 :
210 2 : if len(f.pending) > 0 {
211 2 : // Since all of the pending spans have the same start key, we only need
212 2 : // to compare against the first one.
213 2 : switch c := f.Cmp(f.pending[0].Start, s.Start); {
214 1 : case c > 0:
215 1 : panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
216 1 : f.Format(f.pending[0].Start), f.Format(s.Start)))
217 2 : case c == 0:
218 2 : // The new span has the same start key as the existing pending
219 2 : // spans. Add it to the pending buffer.
220 2 : f.pending = append(f.pending, s)
221 2 : return
222 : }
223 :
224 : // At this point we know that the new start key is greater than the pending
225 : // spans start keys.
226 2 : f.truncateAndFlush(s.Start)
227 : }
228 :
229 2 : f.pending = append(f.pending, s)
230 : }
231 :
232 : // Cover is returned by Framenter.Covers and describes a span's relationship to
233 : // a key at a particular snapshot.
234 : type Cover int8
235 :
236 : const (
237 : // NoCover indicates the tested key does not fall within the span's bounds,
238 : // or the span contains no keys with sequence numbers higher than the key's.
239 : NoCover Cover = iota
240 : // CoversInvisibly indicates the tested key does fall within the span's
241 : // bounds and the span contains at least one key with a higher sequence
242 : // number, but none visible at the provided snapshot.
243 : CoversInvisibly
244 : // CoversVisibly indicates the tested key does fall within the span's
245 : // bounds, and the span constains at least one key with a sequence number
246 : // higher than the key's sequence number that is visible at the provided
247 : // snapshot.
248 : CoversVisibly
249 : )
250 :
251 : // Covers returns an enum indicating whether the specified key is covered by one
252 : // of the pending keys. The provided key must be consistent with the ordering of
253 : // the spans. That is, it is invalid to specify a key here that is out of order
254 : // with the span start keys passed to Add.
255 2 : func (f *Fragmenter) Covers(key base.InternalKey, snapshot uint64) Cover {
256 2 : if f.finished {
257 0 : panic("pebble: span fragmenter already finished")
258 : }
259 2 : if len(f.pending) == 0 {
260 2 : return NoCover
261 2 : }
262 :
263 2 : if f.Cmp(f.pending[0].Start, key.UserKey) > 0 {
264 1 : panic(fmt.Sprintf("pebble: keys must be in order: %s > %s",
265 1 : f.Format(f.pending[0].Start), key.Pretty(f.Format)))
266 : }
267 :
268 2 : cover := NoCover
269 2 : seqNum := key.SeqNum()
270 2 : for _, s := range f.pending {
271 2 : if f.Cmp(key.UserKey, s.End) < 0 {
272 2 : // NB: A range deletion tombstone does not delete a point operation
273 2 : // at the same sequence number, and broadly a span is not considered
274 2 : // to cover a point operation at the same sequence number.
275 2 :
276 2 : for i := range s.Keys {
277 2 : if kseq := s.Keys[i].SeqNum(); kseq > seqNum {
278 2 : // This key from the span has a higher sequence number than
279 2 : // `key`. It covers `key`, although the span's key might not
280 2 : // be visible if its snapshot is too high.
281 2 : //
282 2 : // Batch keys are always be visible.
283 2 : if kseq < snapshot || kseq&base.InternalKeySeqNumBatch != 0 {
284 2 : return CoversVisibly
285 2 : }
286 : // s.Keys[i] is not visible.
287 2 : cover = CoversInvisibly
288 : }
289 : }
290 : }
291 : }
292 2 : return cover
293 : }
294 :
295 : // Empty returns true if all fragments added so far have finished flushing.
296 2 : func (f *Fragmenter) Empty() bool {
297 2 : return f.finished || len(f.pending) == 0
298 2 : }
299 :
300 : // TruncateAndFlushTo flushes all of the fragments with a start key <= key,
301 : // truncating spans to the specified end key. Used during compaction to force
302 : // emitting of spans which straddle an sstable boundary. Consider
303 : // the scenario:
304 : //
305 : // a---------k#10
306 : // f#8
307 : // f#7
308 : //
309 : // Let's say the next user key after f is g. Calling TruncateAndFlushTo(g) will
310 : // flush this span:
311 : //
312 : // a-------g#10
313 : // f#8
314 : // f#7
315 : //
316 : // And leave this one in f.pending:
317 : //
318 : // g----k#10
319 : //
320 : // WARNING: The fragmenter could hold on to the specified end key. Ensure it's
321 : // a safe byte slice that could outlast the current sstable output, and one
322 : // that will never be modified.
323 2 : func (f *Fragmenter) TruncateAndFlushTo(key []byte) {
324 2 : if f.finished {
325 0 : panic("pebble: span fragmenter already finished")
326 : }
327 2 : if f.flushedKey != nil {
328 2 : switch c := f.Cmp(key, f.flushedKey); {
329 1 : case c < 0:
330 1 : panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
331 1 : f.Format(key), f.Format(f.flushedKey)))
332 : }
333 : }
334 2 : if invariants.RaceEnabled {
335 0 : f.checkInvariants(f.pending)
336 0 : defer func() { f.checkInvariants(f.pending) }()
337 : }
338 2 : if len(f.pending) > 0 {
339 2 : // Since all of the pending spans have the same start key, we only need
340 2 : // to compare against the first one.
341 2 : switch c := f.Cmp(f.pending[0].Start, key); {
342 0 : case c > 0:
343 0 : panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
344 0 : f.Format(f.pending[0].Start), f.Format(key)))
345 1 : case c == 0:
346 1 : return
347 : }
348 : }
349 2 : f.truncateAndFlush(key)
350 : }
351 :
352 : // Start returns the start key of the first span in the pending buffer, or nil
353 : // if there are no pending spans. The start key of all pending spans is the same
354 : // as that of the first one.
355 2 : func (f *Fragmenter) Start() []byte {
356 2 : if len(f.pending) > 0 {
357 2 : return f.pending[0].Start
358 2 : }
359 2 : return nil
360 : }
361 :
362 : // Flushes all pending spans up to key (exclusive).
363 : //
364 : // WARNING: The specified key is stored without making a copy, so all callers
365 : // must ensure it is safe.
366 2 : func (f *Fragmenter) truncateAndFlush(key []byte) {
367 2 : f.flushedKey = append(f.flushedKey[:0], key...)
368 2 : done := f.doneBuf[:0]
369 2 : pending := f.pending
370 2 : f.pending = f.pending[:0]
371 2 :
372 2 : // pending and f.pending share the same underlying storage. As we iterate
373 2 : // over pending we append to f.pending, but only one entry is appended in
374 2 : // each iteration, after we have read the entry being overwritten.
375 2 : for _, s := range pending {
376 2 : if f.Cmp(key, s.End) < 0 {
377 2 : // s: a--+--e
378 2 : // new: c------
379 2 : if f.Cmp(s.Start, key) < 0 {
380 2 : done = append(done, Span{
381 2 : Start: s.Start,
382 2 : End: key,
383 2 : Keys: s.Keys,
384 2 : })
385 2 : }
386 2 : f.pending = append(f.pending, Span{
387 2 : Start: key,
388 2 : End: s.End,
389 2 : Keys: s.Keys,
390 2 : })
391 2 : } else {
392 2 : // s: a-----e
393 2 : // new: e----
394 2 : done = append(done, s)
395 2 : }
396 : }
397 :
398 2 : f.doneBuf = done[:0]
399 2 : f.flush(done, nil)
400 : }
401 :
402 : // flush a group of range spans to the block. The spans are required to all have
403 : // the same start key. We flush all span fragments until startKey > lastKey. If
404 : // lastKey is nil, all span fragments are flushed. The specification of a
405 : // non-nil lastKey occurs for range deletion tombstones during compaction where
406 : // we want to flush (but not truncate) all range tombstones that start at or
407 : // before the first key in the next sstable. Consider:
408 : //
409 : // a---e#10
410 : // a------h#9
411 : //
412 : // If a compaction splits the sstables at key c we want the first sstable to
413 : // contain the tombstones [a,e)#10 and [a,e)#9. Fragmentation would naturally
414 : // produce a tombstone [e,h)#9, but we don't need to output that tombstone to
415 : // the first sstable.
416 2 : func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
417 2 : if invariants.RaceEnabled {
418 0 : f.checkInvariants(buf)
419 0 : }
420 :
421 : // Sort the spans by end key. This will allow us to walk over the spans and
422 : // easily determine the next split point (the smallest end-key).
423 2 : f.sortBuf.cmp = f.Cmp
424 2 : f.sortBuf.buf = buf
425 2 : sort.Sort(&f.sortBuf)
426 2 :
427 2 : // Loop over the spans, splitting by end key.
428 2 : for len(buf) > 0 {
429 2 : // A prefix of spans will end at split. remove represents the count of
430 2 : // that prefix.
431 2 : remove := 1
432 2 : split := buf[0].End
433 2 : f.flushBuf = append(f.flushBuf[:0], buf[0].Keys...)
434 2 :
435 2 : for i := 1; i < len(buf); i++ {
436 2 : if f.Cmp(split, buf[i].End) == 0 {
437 2 : remove++
438 2 : }
439 2 : f.flushBuf = append(f.flushBuf, buf[i].Keys...)
440 : }
441 :
442 2 : sort.Sort(&f.flushBuf)
443 2 :
444 2 : f.Emit(Span{
445 2 : Start: buf[0].Start,
446 2 : End: split,
447 2 : // Copy the sorted keys to a new slice.
448 2 : //
449 2 : // This allocation is an unfortunate side effect of the Fragmenter and
450 2 : // the expectation that the spans it produces are available in-memory
451 2 : // indefinitely.
452 2 : //
453 2 : // Eventually, we should be able to replace the fragmenter with the
454 2 : // keyspan.MergingIter which will perform just-in-time
455 2 : // fragmentation, and only guaranteeing the memory lifetime for the
456 2 : // current span. The MergingIter fragments while only needing to
457 2 : // access one Span per level. It only accesses the Span at the
458 2 : // current position for each level. During compactions, we can write
459 2 : // these spans to sstables without retaining previous Spans.
460 2 : Keys: append([]Key(nil), f.flushBuf...),
461 2 : })
462 2 :
463 2 : if lastKey != nil && f.Cmp(split, lastKey) > 0 {
464 0 : break
465 : }
466 :
467 : // Adjust the start key for every remaining span.
468 2 : buf = buf[remove:]
469 2 : for i := range buf {
470 2 : buf[i].Start = split
471 2 : }
472 : }
473 : }
474 :
475 : // Finish flushes any remaining fragments to the output. It is an error to call
476 : // this if any other spans will be added.
477 2 : func (f *Fragmenter) Finish() {
478 2 : if f.finished {
479 0 : panic("pebble: span fragmenter already finished")
480 : }
481 2 : f.flush(f.pending, nil)
482 2 : f.finished = true
483 : }
|