Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "fmt"
9 :
10 : "github.com/cockroachdb/pebble/internal/base"
11 : "github.com/cockroachdb/pebble/internal/invariants"
12 : )
13 :
14 : // Fragmenter fragments a set of spans such that overlapping spans are
15 : // split at their overlap points. The fragmented spans are output to the
16 : // supplied Output function.
17 : type Fragmenter struct {
18 : Cmp base.Compare
19 : Format base.FormatKey
20 : // Emit is called to emit a fragmented span and its keys. Every key defined
21 : // within the emitted Span applies to the entirety of the Span's key span.
22 : // Keys are ordered in decreasing order of their sequence numbers, and if
23 : // equal, decreasing order of key kind.
24 : Emit func(Span)
25 : // pending contains the list of pending fragments that have not been
26 : // flushed to the block writer. Note that the spans have not been
27 : // fragmented on the end keys yet. That happens as the spans are
28 : // flushed. All pending spans have the same Start.
29 : pending []Span
30 : // doneBuf is used to buffer completed span fragments when flushing to a
31 : // specific key (e.g. TruncateAndFlushTo). It is cached in the Fragmenter to
32 : // allow reuse.
33 : doneBuf []Span
34 : // flushBuf is used to sort keys by (seqnum,kind) before emitting.
35 : flushBuf []Key
36 : // flushedKey is the key that fragments have been flushed up to. Any
37 : // additional spans added to the fragmenter must have a start key >=
38 : // flushedKey. A nil value indicates flushedKey has not been set.
39 : flushedKey []byte
40 : finished bool
41 : }
42 :
43 0 : func (f *Fragmenter) checkInvariants(buf []Span) {
44 0 : for i := 1; i < len(buf); i++ {
45 0 : if f.Cmp(buf[i].Start, buf[i].End) >= 0 {
46 0 : panic(fmt.Sprintf("pebble: empty pending span invariant violated: %s", buf[i]))
47 : }
48 0 : if f.Cmp(buf[i-1].Start, buf[i].Start) != 0 {
49 0 : panic(fmt.Sprintf("pebble: pending span invariant violated: %s %s",
50 0 : f.Format(buf[i-1].Start), f.Format(buf[i].Start)))
51 : }
52 : }
53 : }
54 :
55 : // Add adds a span to the fragmenter. Spans may overlap and the
56 : // fragmenter will internally split them. The spans must be presented in
57 : // increasing start key order. That is, Add must be called with a series
58 : // of spans like:
59 : //
60 : // a---e
61 : // c---g
62 : // c-----i
63 : // j---n
64 : // j-l
65 : //
66 : // We need to fragment the spans at overlap points. In the above
67 : // example, we'd create:
68 : //
69 : // a-c-e
70 : // c-e-g
71 : // c-e-g-i
72 : // j-l-n
73 : // j-l
74 : //
75 : // The fragments need to be output sorted by start key, and for equal start
76 : // keys, sorted by descending sequence number. This last part requires a mild
77 : // bit of care as the fragments are not created in descending sequence number
78 : // order.
79 : //
80 : // Once a start key has been seen, we know that we'll never see a smaller
81 : // start key and can thus flush all of the fragments that lie before that
82 : // start key.
83 : //
84 : // Walking through the example above, we start with:
85 : //
86 : // a---e
87 : //
88 : // Next we add [c,g) resulting in:
89 : //
90 : // a-c-e
91 : // c---g
92 : //
93 : // The fragment [a,c) is flushed leaving the pending spans as:
94 : //
95 : // c-e
96 : // c---g
97 : //
98 : // The next span is [c,i):
99 : //
100 : // c-e
101 : // c---g
102 : // c-----i
103 : //
104 : // No fragments are flushed. The next span is [j,n):
105 : //
106 : // c-e
107 : // c---g
108 : // c-----i
109 : // j---n
110 : //
111 : // The fragments [c,e), [c,g) and [c,i) are flushed. We sort these fragments
112 : // by their end key, then split the fragments on the end keys:
113 : //
114 : // c-e
115 : // c-e-g
116 : // c-e---i
117 : //
118 : // The [c,e) fragments all get flushed leaving:
119 : //
120 : // e-g
121 : // e---i
122 : //
123 : // This process continues until there are no more fragments to flush.
124 : //
125 : // WARNING: the slices backing Start, End, Keys, Key.Suffix and Key.Value are
126 : // all retained after this method returns and should not be modified. This is
127 : // safe for spans that are added from a memtable or batch. It is partially
128 : // unsafe for a span read from an sstable. Specifically, the Keys slice of a
129 : // Span returned during sstable iteration is only valid until the next iterator
130 : // operation. The stability of the user keys depend on whether the block is
131 : // prefix compressed, and in practice Pebble never prefix compresses range
132 : // deletion and range key blocks, so these keys are stable. Because of this key
133 : // stability, typically callers only need to perform a shallow clone of the Span
134 : // before Add-ing it to the fragmenter.
135 : //
136 : // Add requires the provided span's keys are sorted in InternalKeyTrailer descending order.
137 2 : func (f *Fragmenter) Add(s Span) {
138 2 : if f.finished {
139 0 : panic("pebble: span fragmenter already finished")
140 2 : } else if s.KeysOrder != ByTrailerDesc {
141 0 : panic("pebble: span keys unexpectedly not in trailer descending order")
142 : }
143 2 : if f.flushedKey != nil {
144 2 : switch c := f.Cmp(s.Start, f.flushedKey); {
145 0 : case c < 0:
146 0 : panic(fmt.Sprintf("pebble: start key (%s) < flushed key (%s)",
147 0 : f.Format(s.Start), f.Format(f.flushedKey)))
148 : }
149 : }
150 2 : if f.Cmp(s.Start, s.End) >= 0 {
151 1 : // An empty span, we can ignore it.
152 1 : return
153 1 : }
154 2 : if invariants.RaceEnabled {
155 0 : f.checkInvariants(f.pending)
156 0 : defer func() { f.checkInvariants(f.pending) }()
157 : }
158 :
159 2 : if len(f.pending) > 0 {
160 2 : // Since all of the pending spans have the same start key, we only need
161 2 : // to compare against the first one.
162 2 : switch c := f.Cmp(f.pending[0].Start, s.Start); {
163 1 : case c > 0:
164 1 : panic(fmt.Sprintf("pebble: keys must be added in order: %s > %s",
165 1 : f.Format(f.pending[0].Start), f.Format(s.Start)))
166 2 : case c == 0:
167 2 : // The new span has the same start key as the existing pending
168 2 : // spans. Add it to the pending buffer.
169 2 : f.pending = append(f.pending, s)
170 2 : return
171 : }
172 :
173 : // At this point we know that the new start key is greater than the pending
174 : // spans start keys.
175 2 : f.truncateAndFlush(s.Start)
176 : }
177 :
178 2 : f.pending = append(f.pending, s)
179 : }
180 :
181 : // Empty returns true if all fragments added so far have finished flushing.
182 0 : func (f *Fragmenter) Empty() bool {
183 0 : return f.finished || len(f.pending) == 0
184 0 : }
185 :
186 : // Start returns the start key of the first span in the pending buffer, or nil
187 : // if there are no pending spans. The start key of all pending spans is the same
188 : // as that of the first one.
189 1 : func (f *Fragmenter) Start() []byte {
190 1 : if len(f.pending) > 0 {
191 1 : return f.pending[0].Start
192 1 : }
193 1 : return nil
194 : }
195 :
196 : // Truncate truncates all pending spans up to key (exclusive), flushes them, and
197 : // retains any spans that continue onward for future flushes.
198 0 : func (f *Fragmenter) Truncate(key []byte) {
199 0 : if len(f.pending) > 0 {
200 0 : f.truncateAndFlush(key)
201 0 : }
202 : }
203 :
204 : // Flushes all pending spans up to key (exclusive).
205 : //
206 : // WARNING: The specified key is stored without making a copy, so all callers
207 : // must ensure it is safe.
208 2 : func (f *Fragmenter) truncateAndFlush(key []byte) {
209 2 : f.flushedKey = append(f.flushedKey[:0], key...)
210 2 : done := f.doneBuf[:0]
211 2 : pending := f.pending
212 2 : f.pending = f.pending[:0]
213 2 :
214 2 : // pending and f.pending share the same underlying storage. As we iterate
215 2 : // over pending we append to f.pending, but only one entry is appended in
216 2 : // each iteration, after we have read the entry being overwritten.
217 2 : for _, s := range pending {
218 2 : if f.Cmp(key, s.End) < 0 {
219 2 : // s: a--+--e
220 2 : // new: c------
221 2 : if f.Cmp(s.Start, key) < 0 {
222 2 : done = append(done, Span{
223 2 : Start: s.Start,
224 2 : End: key,
225 2 : Keys: s.Keys,
226 2 : })
227 2 : }
228 2 : f.pending = append(f.pending, Span{
229 2 : Start: key,
230 2 : End: s.End,
231 2 : Keys: s.Keys,
232 2 : })
233 2 : } else {
234 2 : // s: a-----e
235 2 : // new: e----
236 2 : done = append(done, s)
237 2 : }
238 : }
239 :
240 2 : f.doneBuf = done[:0]
241 2 : f.flush(done, nil)
242 : }
243 :
244 : // flush a group of range spans to the block. The spans are required to all have
245 : // the same start key. We flush all span fragments until startKey > lastKey. If
246 : // lastKey is nil, all span fragments are flushed. The specification of a
247 : // non-nil lastKey occurs for range deletion tombstones during compaction where
248 : // we want to flush (but not truncate) all range tombstones that start at or
249 : // before the first key in the next sstable. Consider:
250 : //
251 : // a---e#10
252 : // a------h#9
253 : //
254 : // If a compaction splits the sstables at key c we want the first sstable to
255 : // contain the tombstones [a,e)#10 and [a,e)#9. Fragmentation would naturally
256 : // produce a tombstone [e,h)#9, but we don't need to output that tombstone to
257 : // the first sstable.
258 2 : func (f *Fragmenter) flush(buf []Span, lastKey []byte) {
259 2 : if invariants.RaceEnabled {
260 0 : f.checkInvariants(buf)
261 0 : }
262 :
263 : // Sort the spans by end key. This will allow us to walk over the spans and
264 : // easily determine the next split point (the smallest end-key).
265 2 : SortSpansByEndKey(f.Cmp, buf)
266 2 :
267 2 : // Loop over the spans, splitting by end key.
268 2 : for len(buf) > 0 {
269 2 : // A prefix of spans will end at split. remove represents the count of
270 2 : // that prefix.
271 2 : remove := 1
272 2 : split := buf[0].End
273 2 : f.flushBuf = append(f.flushBuf[:0], buf[0].Keys...)
274 2 :
275 2 : for i := 1; i < len(buf); i++ {
276 2 : if f.Cmp(split, buf[i].End) == 0 {
277 2 : remove++
278 2 : }
279 2 : f.flushBuf = append(f.flushBuf, buf[i].Keys...)
280 : }
281 :
282 2 : SortKeysByTrailer(f.flushBuf)
283 2 :
284 2 : f.Emit(Span{
285 2 : Start: buf[0].Start,
286 2 : End: split,
287 2 : // Copy the sorted keys to a new slice.
288 2 : //
289 2 : // This allocation is an unfortunate side effect of the Fragmenter and
290 2 : // the expectation that the spans it produces are available in-memory
291 2 : // indefinitely.
292 2 : //
293 2 : // Eventually, we should be able to replace the fragmenter with the
294 2 : // keyspanimpl.MergingIter which will perform just-in-time
295 2 : // fragmentation, and only guaranteeing the memory lifetime for the
296 2 : // current span. The MergingIter fragments while only needing to
297 2 : // access one Span per level. It only accesses the Span at the
298 2 : // current position for each level. During compactions, we can write
299 2 : // these spans to sstables without retaining previous Spans.
300 2 : Keys: append([]Key(nil), f.flushBuf...),
301 2 : })
302 2 :
303 2 : if lastKey != nil && f.Cmp(split, lastKey) > 0 {
304 0 : break
305 : }
306 :
307 : // Adjust the start key for every remaining span.
308 2 : buf = buf[remove:]
309 2 : for i := range buf {
310 2 : buf[i].Start = split
311 2 : }
312 : }
313 : }
314 :
315 : // Finish flushes any remaining fragments to the output. It is an error to call
316 : // this if any other spans will be added.
317 2 : func (f *Fragmenter) Finish() {
318 2 : if f.finished {
319 0 : panic("pebble: span fragmenter already finished")
320 : }
321 2 : f.flush(f.pending, nil)
322 2 : f.finished = true
323 : }
|