Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bytes"
9 : stdcmp "cmp"
10 : "fmt"
11 : "math"
12 : "slices"
13 : "sort"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : )
20 :
21 : // errInvalidL0SublevelsOpt is for use in AddL0Files when the incremental
22 : // sublevel generation optimization failed, and NewL0Sublevels must be called.
23 : var errInvalidL0SublevelsOpt = errors.New("pebble: L0 sublevel generation optimization cannot be used")
24 :
25 : // Intervals are of the form [start, end) with no gap between intervals. Each
26 : // file overlaps perfectly with a sequence of intervals. This perfect overlap
27 : // occurs because the union of file boundary keys is used to pick intervals.
28 : // However the largest key in a file is inclusive, so when it is used as
29 : // an interval, the actual key is ImmediateSuccessor(key). We don't have the
30 : // ImmediateSuccessor function to do this computation, so we instead keep an
31 : // isLargest bool to remind the code about this fact. This is used for
32 : // comparisons in the following manner:
33 : // - intervalKey{k, false} < intervalKey{k, true}
34 : // - k1 < k2 -> intervalKey{k1, _} < intervalKey{k2, _}.
35 : //
36 : // Note that the file's largest key is exclusive if the internal key
37 : // has a trailer matching the rangedel sentinel key. In this case, we set
38 : // isLargest to false for end interval computation.
39 : //
40 : // For example, consider three files with bounds [a,e], [b,g], and [e,j]. The
41 : // interval keys produced would be intervalKey{a, false}, intervalKey{b, false},
42 : // intervalKey{e, false}, intervalKey{e, true}, intervalKey{g, true} and
43 : // intervalKey{j, true}, resulting in intervals
44 : // [a, b), [b, (e, false)), [(e,false), (e, true)), [(e, true), (g, true)) and
45 : // [(g, true), (j, true)). The first file overlaps with the first three
46 : // perfectly, the second file overlaps with the second through to fourth
47 : // intervals, and the third file overlaps with the last three.
48 : //
49 : // The intervals are indexed starting from 0, with the index of the interval
50 : // being the index of the start key of the interval.
51 : //
52 : // In addition to helping with compaction picking, we use interval indices
53 : // to assign each file an interval range once. Subsequent operations, say
54 : // picking overlapping files for a compaction, only need to use the index
55 : // numbers and so avoid expensive byte slice comparisons.
56 : type intervalKey struct {
57 : key []byte
58 : isLargest bool
59 : }
60 :
61 : // intervalKeyTemp is used in the sortAndSweep step. It contains additional metadata
62 : // which is used to generate the {min,max}IntervalIndex for files.
63 : type intervalKeyTemp struct {
64 : intervalKey intervalKey
65 : fileMeta *FileMetadata
66 : isEndKey bool
67 : }
68 :
69 2 : func (i *intervalKeyTemp) setFileIntervalIndex(idx int) {
70 2 : if i.isEndKey {
71 2 : // This is the right endpoint of some file interval, so the
72 2 : // file.maxIntervalIndex must be j - 1 as maxIntervalIndex is
73 2 : // inclusive.
74 2 : i.fileMeta.maxIntervalIndex = idx - 1
75 2 : return
76 2 : }
77 : // This is the left endpoint for some file interval, so the
78 : // file.minIntervalIndex must be j.
79 2 : i.fileMeta.minIntervalIndex = idx
80 : }
81 :
82 2 : func intervalKeyCompare(cmp Compare, a, b intervalKey) int {
83 2 : rv := cmp(a.key, b.key)
84 2 : if rv == 0 {
85 2 : if a.isLargest && !b.isLargest {
86 2 : return +1
87 2 : }
88 2 : if !a.isLargest && b.isLargest {
89 2 : return -1
90 2 : }
91 : }
92 2 : return rv
93 : }
94 :
95 : type intervalKeySorter struct {
96 : keys []intervalKeyTemp
97 : cmp Compare
98 : }
99 :
100 2 : func (s intervalKeySorter) Len() int { return len(s.keys) }
101 2 : func (s intervalKeySorter) Less(i, j int) bool {
102 2 : return intervalKeyCompare(s.cmp, s.keys[i].intervalKey, s.keys[j].intervalKey) < 0
103 2 : }
104 2 : func (s intervalKeySorter) Swap(i, j int) {
105 2 : s.keys[i], s.keys[j] = s.keys[j], s.keys[i]
106 2 : }
107 :
108 : // sortAndSweep will sort the intervalKeys using intervalKeySorter, remove the
109 : // duplicate fileIntervals, and set the {min, max}IntervalIndex for the files.
110 2 : func sortAndSweep(keys []intervalKeyTemp, cmp Compare) []intervalKeyTemp {
111 2 : if len(keys) == 0 {
112 2 : return nil
113 2 : }
114 2 : sorter := intervalKeySorter{keys: keys, cmp: cmp}
115 2 : sort.Sort(sorter)
116 2 :
117 2 : // intervalKeys are generated using the file bounds. Specifically, there are
118 2 : // 2 intervalKeys for each file, and len(keys) = 2 * number of files. Each
119 2 : // `intervalKeyTemp` stores information about which file it was generated
120 2 : // from, and whether the key represents the end key of the file. So, as
121 2 : // we're deduplicating the `keys` slice, we're guaranteed to iterate over
122 2 : // the interval keys belonging to each of the files. Since the
123 2 : // file.{min,max}IntervalIndex points to the position of the files bounds in
124 2 : // the deduplicated `keys` slice, we can determine
125 2 : // file.{min,max}IntervalIndex during the iteration.
126 2 : i := 0
127 2 : j := 0
128 2 : for i < len(keys) {
129 2 : // loop invariant: j <= i
130 2 : currKey := keys[i]
131 2 : keys[j] = keys[i]
132 2 :
133 2 : for {
134 2 : keys[i].setFileIntervalIndex(j)
135 2 : i++
136 2 : if i >= len(keys) || intervalKeyCompare(cmp, currKey.intervalKey, keys[i].intervalKey) != 0 {
137 2 : break
138 : }
139 : }
140 2 : j++
141 : }
142 2 : return keys[:j]
143 : }
144 :
145 : // A key interval of the form [start, end). The end is not represented here
146 : // since it is implicit in the start of the next interval. The last interval is
147 : // an exception but we don't need to ever lookup the end of that interval; the
148 : // last fileInterval will only act as an end key marker. The set of intervals
149 : // is const after initialization.
150 : type fileInterval struct {
151 : index int
152 : startKey intervalKey
153 :
154 : // True iff some file in this interval is compacting to base. Such intervals
155 : // cannot have any files participate in L0 -> Lbase compactions.
156 : isBaseCompacting bool
157 :
158 : // The min and max intervals index across all the files that overlap with
159 : // this interval. Inclusive on both sides.
160 : filesMinIntervalIndex int
161 : filesMaxIntervalIndex int
162 :
163 : // True if another interval that has a file extending into this interval is
164 : // undergoing a compaction into Lbase. In other words, this bool is true if
165 : // any interval in [filesMinIntervalIndex, filesMaxIntervalIndex] has
166 : // isBaseCompacting set to true. This lets the compaction picker
167 : // de-prioritize this interval for picking compactions, since there's a high
168 : // chance that a base compaction with a sufficient height of sublevels
169 : // rooted at this interval could not be chosen due to the ongoing base
170 : // compaction in the other interval. If the file straddling the two
171 : // intervals is at a sufficiently high sublevel (with enough compactible
172 : // files below it to satisfy minCompactionDepth), this is not an issue, but
173 : // to optimize for quickly picking base compactions far away from other base
174 : // compactions, this bool is used as a heuristic (but not as a complete
175 : // disqualifier).
176 : intervalRangeIsBaseCompacting bool
177 :
178 : // All files in this interval, in increasing sublevel order.
179 : files []*FileMetadata
180 :
181 : // len(files) - compactingFileCount is the stack depth that requires
182 : // starting new compactions. This metric is not precise since the
183 : // compactingFileCount can include files that are part of N (where N > 1)
184 : // intra-L0 compactions, so the stack depth after those complete will be
185 : // len(files) - compactingFileCount + N. We ignore this imprecision since we
186 : // don't want to track which files are part of which intra-L0 compaction.
187 : compactingFileCount int
188 :
189 : // Interpolated from files in this interval. For files spanning multiple
190 : // intervals, we assume an equal distribution of bytes across all those
191 : // intervals.
192 : estimatedBytes uint64
193 : }
194 :
195 : // Helper type for any cases requiring a bool slice.
196 : type bitSet []bool
197 :
198 2 : func newBitSet(n int) bitSet {
199 2 : return make([]bool, n)
200 2 : }
201 :
202 2 : func (b *bitSet) markBit(i int) {
203 2 : (*b)[i] = true
204 2 : }
205 :
206 2 : func (b *bitSet) markBits(start, end int) {
207 2 : for i := start; i < end; i++ {
208 2 : (*b)[i] = true
209 2 : }
210 : }
211 :
212 2 : func (b *bitSet) clearAllBits() {
213 2 : for i := range *b {
214 2 : (*b)[i] = false
215 2 : }
216 : }
217 :
218 : // L0Compaction describes an active compaction with inputs from L0.
219 : type L0Compaction struct {
220 : Smallest InternalKey
221 : Largest InternalKey
222 : IsIntraL0 bool
223 : }
224 :
225 : // L0Sublevels represents a sublevel view of SSTables in L0. Tables in one
226 : // sublevel are non-overlapping in key ranges, and keys in higher-indexed
227 : // sublevels shadow older versions in lower-indexed sublevels. These invariants
228 : // are similar to the regular level invariants, except with higher indexed
229 : // sublevels having newer keys as opposed to lower indexed levels.
230 : //
231 : // There is no limit to the number of sublevels that can exist in L0 at any
232 : // time, however read and compaction performance is best when there are as few
233 : // sublevels as possible.
234 : type L0Sublevels struct {
235 : // Levels are ordered from oldest sublevel to youngest sublevel in the
236 : // outer slice, and the inner slice contains non-overlapping files for
237 : // that sublevel in increasing key order. Levels is constructed from
238 : // levelFiles and is used by callers that require a LevelSlice. The below two
239 : // fields are treated as immutable once created in NewL0Sublevels.
240 : Levels []LevelSlice
241 : levelFiles [][]*FileMetadata
242 :
243 : cmp Compare
244 : formatKey base.FormatKey
245 :
246 : fileBytes uint64
247 : // All the L0 files, ordered from oldest to youngest.
248 : levelMetadata *LevelMetadata
249 :
250 : // The file intervals in increasing key order.
251 : orderedIntervals []fileInterval
252 :
253 : // Keys to break flushes at.
254 : flushSplitUserKeys [][]byte
255 :
256 : // Only used to check invariants.
257 : addL0FilesCalled bool
258 : }
259 :
260 : type sublevelSorter []*FileMetadata
261 :
262 : // Len implements sort.Interface.
263 2 : func (sl sublevelSorter) Len() int {
264 2 : return len(sl)
265 2 : }
266 :
267 : // Less implements sort.Interface.
268 2 : func (sl sublevelSorter) Less(i, j int) bool {
269 2 : return sl[i].minIntervalIndex < sl[j].minIntervalIndex
270 2 : }
271 :
272 : // Swap implements sort.Interface.
273 2 : func (sl sublevelSorter) Swap(i, j int) {
274 2 : sl[i], sl[j] = sl[j], sl[i]
275 2 : }
276 :
277 : // NewL0Sublevels creates an L0Sublevels instance for a given set of L0 files.
278 : // These files must all be in L0 and must be sorted by seqnum (see
279 : // SortBySeqNum). During interval iteration, when flushSplitMaxBytes bytes are
280 : // exceeded in the range of intervals since the last flush split key, a flush
281 : // split key is added.
282 : //
283 : // This method can be called without DB.mu being held, so any DB.mu protected
284 : // fields in FileMetadata cannot be accessed here, such as Compacting and
285 : // IsIntraL0Compacting. Those fields are accessed in InitCompactingFileInfo
286 : // instead.
287 : func NewL0Sublevels(
288 : levelMetadata *LevelMetadata, cmp Compare, formatKey base.FormatKey, flushSplitMaxBytes int64,
289 2 : ) (*L0Sublevels, error) {
290 2 : s := &L0Sublevels{cmp: cmp, formatKey: formatKey}
291 2 : s.levelMetadata = levelMetadata
292 2 : keys := make([]intervalKeyTemp, 0, 2*s.levelMetadata.Len())
293 2 : iter := levelMetadata.Iter()
294 2 : for i, f := 0, iter.First(); f != nil; i, f = i+1, iter.Next() {
295 2 : f.L0Index = i
296 2 : keys = append(keys, intervalKeyTemp{
297 2 : intervalKey: intervalKey{key: f.Smallest.UserKey},
298 2 : fileMeta: f,
299 2 : isEndKey: false,
300 2 : })
301 2 : keys = append(keys, intervalKeyTemp{
302 2 : intervalKey: intervalKey{
303 2 : key: f.Largest.UserKey,
304 2 : isLargest: !f.Largest.IsExclusiveSentinel(),
305 2 : },
306 2 : fileMeta: f,
307 2 : isEndKey: true,
308 2 : })
309 2 : }
310 2 : keys = sortAndSweep(keys, cmp)
311 2 : // All interval indices reference s.orderedIntervals.
312 2 : s.orderedIntervals = make([]fileInterval, len(keys))
313 2 : for i := range keys {
314 2 : s.orderedIntervals[i] = fileInterval{
315 2 : index: i,
316 2 : startKey: keys[i].intervalKey,
317 2 : filesMinIntervalIndex: i,
318 2 : filesMaxIntervalIndex: i,
319 2 : }
320 2 : }
321 : // Initialize minIntervalIndex and maxIntervalIndex for each file, and use that
322 : // to update intervals.
323 2 : for f := iter.First(); f != nil; f = iter.Next() {
324 2 : if err := s.addFileToSublevels(f, false /* checkInvariant */); err != nil {
325 0 : return nil, err
326 0 : }
327 : }
328 : // Sort each sublevel in increasing key order.
329 2 : for i := range s.levelFiles {
330 2 : sort.Sort(sublevelSorter(s.levelFiles[i]))
331 2 : }
332 :
333 : // Construct a parallel slice of sublevel B-Trees.
334 : // TODO(jackson): Consolidate and only use the B-Trees.
335 2 : for _, sublevelFiles := range s.levelFiles {
336 2 : tr, ls := makeBTree(btreeCmpSmallestKey(cmp), sublevelFiles)
337 2 : s.Levels = append(s.Levels, ls)
338 2 : tr.Release()
339 2 : }
340 :
341 2 : s.calculateFlushSplitKeys(flushSplitMaxBytes)
342 2 : return s, nil
343 : }
344 :
345 : // Helper function to merge new intervalKeys into an existing slice of old
346 : // fileIntervals, into result. Returns the new result and a slice of ints
347 : // mapping old interval indices to new ones. The added intervalKeys do not need
348 : // to be sorted; they get sorted and deduped in this function.
349 : func mergeIntervals(
350 : old, result []fileInterval, added []intervalKeyTemp, compare Compare,
351 2 : ) ([]fileInterval, []int) {
352 2 : sorter := intervalKeySorter{keys: added, cmp: compare}
353 2 : sort.Sort(sorter)
354 2 :
355 2 : oldToNewMap := make([]int, len(old))
356 2 : i := 0
357 2 : j := 0
358 2 :
359 2 : for i < len(old) || j < len(added) {
360 2 : for j > 0 && j < len(added) && intervalKeyCompare(compare, added[j-1].intervalKey, added[j].intervalKey) == 0 {
361 2 : added[j].setFileIntervalIndex(len(result) - 1)
362 2 : j++
363 2 : }
364 2 : if i >= len(old) && j >= len(added) {
365 2 : break
366 : }
367 2 : var cmp int
368 2 : if i >= len(old) {
369 2 : cmp = +1
370 2 : }
371 2 : if j >= len(added) {
372 2 : cmp = -1
373 2 : }
374 2 : if cmp == 0 {
375 2 : cmp = intervalKeyCompare(compare, old[i].startKey, added[j].intervalKey)
376 2 : }
377 2 : switch {
378 2 : case cmp <= 0:
379 2 : // Shallow-copy the existing interval.
380 2 : newInterval := old[i]
381 2 : result = append(result, newInterval)
382 2 : oldToNewMap[i] = len(result) - 1
383 2 : i++
384 2 : if cmp == 0 {
385 2 : added[j].setFileIntervalIndex(len(result) - 1)
386 2 : j++
387 2 : }
388 2 : case cmp > 0:
389 2 : var prevInterval fileInterval
390 2 : // Insert a new interval for a newly-added file. prevInterval, if
391 2 : // non-zero, will be "inherited"; we copy its files as those extend
392 2 : // into this interval.
393 2 : if len(result) > 0 {
394 2 : prevInterval = result[len(result)-1]
395 2 : }
396 2 : newInterval := fileInterval{
397 2 : index: len(result),
398 2 : startKey: added[j].intervalKey,
399 2 : filesMinIntervalIndex: len(result),
400 2 : filesMaxIntervalIndex: len(result),
401 2 :
402 2 : // estimatedBytes gets recalculated later on, as the number of intervals
403 2 : // the file bytes are interpolated over has changed.
404 2 : estimatedBytes: 0,
405 2 : // Copy the below attributes from prevInterval.
406 2 : files: append([]*FileMetadata(nil), prevInterval.files...),
407 2 : isBaseCompacting: prevInterval.isBaseCompacting,
408 2 : intervalRangeIsBaseCompacting: prevInterval.intervalRangeIsBaseCompacting,
409 2 : compactingFileCount: prevInterval.compactingFileCount,
410 2 : }
411 2 : result = append(result, newInterval)
412 2 : added[j].setFileIntervalIndex(len(result) - 1)
413 2 : j++
414 : }
415 : }
416 2 : return result, oldToNewMap
417 : }
418 :
419 : // AddL0Files incrementally builds a new L0Sublevels for when the only change
420 : // since the receiver L0Sublevels was an addition of the specified files, with
421 : // no L0 deletions. The common case of this is an ingestion or a flush. These
422 : // files can "sit on top" of existing sublevels, creating at most one new
423 : // sublevel for a flush (and possibly multiple for an ingestion), and at most
424 : // 2*len(files) additions to s.orderedIntervals. No files must have been deleted
425 : // from L0, and the added files must all be newer in sequence numbers than
426 : // existing files in L0Sublevels. The files parameter must be sorted in seqnum
427 : // order. The levelMetadata parameter corresponds to the new L0 post addition of
428 : // files. This method is meant to be significantly more performant than
429 : // NewL0Sublevels.
430 : //
431 : // Note that this function can only be called once on a given receiver; it
432 : // appends to some slices in s which is only safe when done once. This is okay,
433 : // as the common case (generating a new L0Sublevels after a flush/ingestion) is
434 : // only going to necessitate one call of this method on a given receiver. The
435 : // returned value, if non-nil, can then have [*L0Sublevels.AddL0Files] called on
436 : // it again, and so on. If [errInvalidL0SublevelsOpt] is returned as an error,
437 : // it likely means the optimization could not be applied (i.e. files added were
438 : // older than files already in the sublevels, which is possible around
439 : // ingestions and in tests). Eg. it can happen when an ingested file was
440 : // ingested without queueing a flush since it did not actually overlap with any
441 : // keys in the memtable. Later on the memtable was flushed, and the memtable had
442 : // keys spanning around the ingested file, producing a flushed file that
443 : // overlapped with the ingested file in file bounds but not in keys. It's
444 : // possible for that flushed file to have a lower LargestSeqNum than the
445 : // ingested file if all the additions after the ingestion were to another
446 : // flushed file that was split into a separate sstable during flush. Any other
447 : // non-nil error means [L0Sublevels] generation failed in the same way as
448 : // [NewL0Sublevels] would likely fail.
449 : func (s *L0Sublevels) AddL0Files(
450 : files []*FileMetadata, flushSplitMaxBytes int64, levelMetadata *LevelMetadata,
451 2 : ) (*L0Sublevels, error) {
452 2 : if invariants.Enabled && s.addL0FilesCalled {
453 0 : panic("AddL0Files called twice on the same receiver")
454 : }
455 2 : s.addL0FilesCalled = true
456 2 :
457 2 : // Start with a shallow copy of s.
458 2 : newVal := &L0Sublevels{}
459 2 : *newVal = *s
460 2 :
461 2 : newVal.addL0FilesCalled = false
462 2 : newVal.levelMetadata = levelMetadata
463 2 : // Deep copy levelFiles and Levels, as they are mutated and sorted below.
464 2 : // Shallow copies of slices that we just append to, are okay.
465 2 : newVal.levelFiles = make([][]*FileMetadata, len(s.levelFiles))
466 2 : for i := range s.levelFiles {
467 2 : newVal.levelFiles[i] = make([]*FileMetadata, len(s.levelFiles[i]))
468 2 : copy(newVal.levelFiles[i], s.levelFiles[i])
469 2 : }
470 2 : newVal.Levels = make([]LevelSlice, len(s.Levels))
471 2 : copy(newVal.Levels, s.Levels)
472 2 :
473 2 : fileKeys := make([]intervalKeyTemp, 0, 2*len(files))
474 2 : for _, f := range files {
475 2 : left := intervalKeyTemp{
476 2 : intervalKey: intervalKey{key: f.Smallest.UserKey},
477 2 : fileMeta: f,
478 2 : }
479 2 : right := intervalKeyTemp{
480 2 : intervalKey: intervalKey{
481 2 : key: f.Largest.UserKey,
482 2 : isLargest: !f.Largest.IsExclusiveSentinel(),
483 2 : },
484 2 : fileMeta: f,
485 2 : isEndKey: true,
486 2 : }
487 2 : fileKeys = append(fileKeys, left, right)
488 2 : }
489 2 : keys := make([]fileInterval, 0, 2*levelMetadata.Len())
490 2 : var oldToNewMap []int
491 2 : // We can avoid the sortAndSweep step on the combined length of
492 2 : // s.orderedIntervals and fileKeys by treating this as a merge of two sorted
493 2 : // runs, fileKeys and s.orderedIntervals, into `keys` which will form
494 2 : // newVal.orderedIntervals.
495 2 : keys, oldToNewMap = mergeIntervals(s.orderedIntervals, keys, fileKeys, s.cmp)
496 2 : if invariants.Enabled {
497 2 : for i := 1; i < len(keys); i++ {
498 2 : if intervalKeyCompare(newVal.cmp, keys[i-1].startKey, keys[i].startKey) >= 0 {
499 0 : panic("keys not sorted correctly")
500 : }
501 : }
502 : }
503 2 : newVal.orderedIntervals = keys
504 2 : // Update indices in s.orderedIntervals for fileIntervals we retained.
505 2 : for _, newIdx := range oldToNewMap {
506 2 : newInterval := &keys[newIdx]
507 2 : newInterval.index = newIdx
508 2 : // This code, and related code in the for loop below, adjusts
509 2 : // files{Min,Max}IntervalIndex just for interval indices shifting due to
510 2 : // new intervals, and not for any of the new files being added to the
511 2 : // same intervals. The goal is to produce a state of the system that's
512 2 : // accurate for all existing files, and has all the new intervals to
513 2 : // support new files. Once that's done, we can just call
514 2 : // addFileToSublevel to adjust all relevant intervals for new files.
515 2 : newInterval.filesMinIntervalIndex = oldToNewMap[newInterval.filesMinIntervalIndex]
516 2 : // maxIntervalIndexes are special. Since it's an inclusive end bound, we
517 2 : // actually have to map it to the _next_ old interval's new previous
518 2 : // interval. This logic is easier to understand if you see
519 2 : // [f.minIntervalIndex, f.maxIntervalIndex] as [f.minIntervalIndex,
520 2 : // f.maxIntervalIndex+1). The other case to remember is when the
521 2 : // interval is completely empty (i.e. len(newInterval.files) == 0); in
522 2 : // that case we want to refer back to ourselves regardless of additions
523 2 : // to the right of us.
524 2 : if newInterval.filesMaxIntervalIndex < len(oldToNewMap)-1 && len(newInterval.files) > 0 {
525 2 : newInterval.filesMaxIntervalIndex = oldToNewMap[newInterval.filesMaxIntervalIndex+1] - 1
526 2 : } else {
527 2 : // newInterval.filesMaxIntervalIndex == len(oldToNewMap)-1.
528 2 : newInterval.filesMaxIntervalIndex = oldToNewMap[newInterval.filesMaxIntervalIndex]
529 2 : }
530 : }
531 : // Loop through all instances of new intervals added between two old
532 : // intervals and expand [filesMinIntervalIndex, filesMaxIntervalIndex] of
533 : // new intervals to reflect that of adjacent old intervals.
534 2 : {
535 2 : // We can skip cases where new intervals were added to the left of all
536 2 : // existing intervals (eg. if the first entry in oldToNewMap is
537 2 : // oldToNewMap[0] >= 1). Those intervals will only contain newly added
538 2 : // files and will have their parameters adjusted down in
539 2 : // addFileToSublevels. The same can also be said about new intervals
540 2 : // that are to the right of all existing intervals.
541 2 : lastIdx := 0
542 2 : for _, newIdx := range oldToNewMap {
543 2 : for i := lastIdx + 1; i < newIdx; i++ {
544 2 : minIntervalIndex := i
545 2 : maxIntervalIndex := i
546 2 : if keys[lastIdx].filesMaxIntervalIndex != lastIdx {
547 2 : // Last old interval has files extending into keys[i].
548 2 : minIntervalIndex = keys[lastIdx].filesMinIntervalIndex
549 2 : maxIntervalIndex = keys[lastIdx].filesMaxIntervalIndex
550 2 : }
551 :
552 2 : keys[i].filesMinIntervalIndex = minIntervalIndex
553 2 : keys[i].filesMaxIntervalIndex = maxIntervalIndex
554 : }
555 2 : lastIdx = newIdx
556 : }
557 : }
558 : // Go through old files and update interval indices.
559 : //
560 : // TODO(bilal): This is the only place in this method where we loop through
561 : // all existing files, which could be much more in number than newly added
562 : // files. See if we can avoid the need for this, either by getting rid of
563 : // f.minIntervalIndex and f.maxIntervalIndex and calculating them on the fly
564 : // with a binary search, or by only looping through files to the right of
565 : // the first interval touched by this method.
566 2 : for sublevel := range s.Levels {
567 2 : s.Levels[sublevel].Each(func(f *FileMetadata) {
568 2 : oldIntervalDelta := f.maxIntervalIndex - f.minIntervalIndex + 1
569 2 : oldMinIntervalIndex := f.minIntervalIndex
570 2 : f.minIntervalIndex = oldToNewMap[f.minIntervalIndex]
571 2 : // maxIntervalIndex is special. Since it's an inclusive end bound,
572 2 : // we actually have to map it to the _next_ old interval's new
573 2 : // previous interval. This logic is easier to understand if you see
574 2 : // [f.minIntervalIndex, f.maxIntervalIndex] as [f.minIntervalIndex,
575 2 : // f.maxIntervalIndex+1).
576 2 : f.maxIntervalIndex = oldToNewMap[f.maxIntervalIndex+1] - 1
577 2 : newIntervalDelta := f.maxIntervalIndex - f.minIntervalIndex + 1
578 2 : // Recalculate estimatedBytes for all old files across new
579 2 : // intervals, but only if new intervals were added in between.
580 2 : if oldIntervalDelta != newIntervalDelta {
581 2 : // j is incremented so that oldToNewMap[j] points to the next
582 2 : // old interval. This is used to distinguish between old
583 2 : // intervals (i.e. ones where we need to subtract
584 2 : // f.Size/oldIntervalDelta) from new ones (where we don't need
585 2 : // to subtract). In both cases we need to add
586 2 : // f.Size/newIntervalDelta.
587 2 : j := oldMinIntervalIndex
588 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
589 2 : if oldToNewMap[j] == i {
590 2 : newVal.orderedIntervals[i].estimatedBytes -= f.Size / uint64(oldIntervalDelta)
591 2 : j++
592 2 : }
593 2 : newVal.orderedIntervals[i].estimatedBytes += f.Size / uint64(newIntervalDelta)
594 : }
595 : }
596 : })
597 : }
598 2 : updatedSublevels := make([]int, 0)
599 2 : // Update interval indices for new files.
600 2 : for i, f := range files {
601 2 : f.L0Index = s.levelMetadata.Len() + i
602 2 : if err := newVal.addFileToSublevels(f, true /* checkInvariant */); err != nil {
603 2 : return nil, err
604 2 : }
605 2 : updatedSublevels = append(updatedSublevels, f.SubLevel)
606 : }
607 :
608 : // Sort and deduplicate updatedSublevels.
609 2 : sort.Ints(updatedSublevels)
610 2 : {
611 2 : j := 0
612 2 : for i := 1; i < len(updatedSublevels); i++ {
613 2 : if updatedSublevels[i] != updatedSublevels[j] {
614 2 : j++
615 2 : updatedSublevels[j] = updatedSublevels[i]
616 2 : }
617 : }
618 2 : updatedSublevels = updatedSublevels[:j+1]
619 : }
620 :
621 : // Sort each updated sublevel in increasing key order.
622 2 : for _, sublevel := range updatedSublevels {
623 2 : sort.Sort(sublevelSorter(newVal.levelFiles[sublevel]))
624 2 : }
625 :
626 : // Construct a parallel slice of sublevel B-Trees.
627 : // TODO(jackson): Consolidate and only use the B-Trees.
628 2 : for _, sublevel := range updatedSublevels {
629 2 : tr, ls := makeBTree(btreeCmpSmallestKey(newVal.cmp), newVal.levelFiles[sublevel])
630 2 : if sublevel == len(newVal.Levels) {
631 2 : newVal.Levels = append(newVal.Levels, ls)
632 2 : } else {
633 2 : // sublevel < len(s.Levels). If this panics, updatedSublevels was not
634 2 : // populated correctly.
635 2 : newVal.Levels[sublevel] = ls
636 2 : }
637 2 : tr.Release()
638 : }
639 :
640 2 : newVal.flushSplitUserKeys = nil
641 2 : newVal.calculateFlushSplitKeys(flushSplitMaxBytes)
642 2 : return newVal, nil
643 : }
644 :
645 : // addFileToSublevels is called during L0Sublevels generation, and adds f to the
646 : // correct sublevel's levelFiles, the relevant intervals' files slices, and sets
647 : // interval indices on f. This method, if called successively on multiple files,
648 : // _must_ be called on successively newer files (by seqnum). If checkInvariant
649 : // is true, it could check for this in some cases and return
650 : // [errInvalidL0SublevelsOpt] if that invariant isn't held.
651 2 : func (s *L0Sublevels) addFileToSublevels(f *FileMetadata, checkInvariant bool) error {
652 2 : // This is a simple and not very accurate estimate of the number of
653 2 : // bytes this SSTable contributes to the intervals it is a part of.
654 2 : //
655 2 : // TODO(bilal): Call EstimateDiskUsage in sstable.Reader with interval
656 2 : // bounds to get a better estimate for each interval.
657 2 : interpolatedBytes := f.Size / uint64(f.maxIntervalIndex-f.minIntervalIndex+1)
658 2 : s.fileBytes += f.Size
659 2 : subLevel := 0
660 2 : // Update state in every fileInterval for this file.
661 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
662 2 : interval := &s.orderedIntervals[i]
663 2 : if len(interval.files) > 0 {
664 2 : if checkInvariant && interval.files[len(interval.files)-1].LargestSeqNum > f.LargestSeqNum {
665 2 : // We are sliding this file "underneath" an existing file. Throw away
666 2 : // and start over in NewL0Sublevels.
667 2 : return errInvalidL0SublevelsOpt
668 2 : }
669 : // interval.files is sorted by sublevels, from lowest to highest.
670 : // AddL0Files can only add files at sublevels higher than existing files
671 : // in the same key intervals.
672 2 : if maxSublevel := interval.files[len(interval.files)-1].SubLevel; subLevel <= maxSublevel {
673 2 : subLevel = maxSublevel + 1
674 2 : }
675 : }
676 2 : interval.estimatedBytes += interpolatedBytes
677 2 : if f.minIntervalIndex < interval.filesMinIntervalIndex {
678 2 : interval.filesMinIntervalIndex = f.minIntervalIndex
679 2 : }
680 2 : if f.maxIntervalIndex > interval.filesMaxIntervalIndex {
681 2 : interval.filesMaxIntervalIndex = f.maxIntervalIndex
682 2 : }
683 2 : interval.files = append(interval.files, f)
684 : }
685 2 : f.SubLevel = subLevel
686 2 : if subLevel > len(s.levelFiles) {
687 0 : return errors.Errorf("chose a sublevel beyond allowed range of sublevels: %d vs 0-%d", subLevel, len(s.levelFiles))
688 0 : }
689 2 : if subLevel == len(s.levelFiles) {
690 2 : s.levelFiles = append(s.levelFiles, []*FileMetadata{f})
691 2 : } else {
692 2 : s.levelFiles[subLevel] = append(s.levelFiles[subLevel], f)
693 2 : }
694 2 : return nil
695 : }
696 :
697 2 : func (s *L0Sublevels) calculateFlushSplitKeys(flushSplitMaxBytes int64) {
698 2 : var cumulativeBytes uint64
699 2 : // Multiply flushSplitMaxBytes by the number of sublevels. This prevents
700 2 : // excessive flush splitting when the number of sublevels increases.
701 2 : flushSplitMaxBytes *= int64(len(s.levelFiles))
702 2 : for i := 0; i < len(s.orderedIntervals); i++ {
703 2 : interval := &s.orderedIntervals[i]
704 2 : if flushSplitMaxBytes > 0 && cumulativeBytes > uint64(flushSplitMaxBytes) &&
705 2 : (len(s.flushSplitUserKeys) == 0 ||
706 2 : !bytes.Equal(interval.startKey.key, s.flushSplitUserKeys[len(s.flushSplitUserKeys)-1])) {
707 2 : s.flushSplitUserKeys = append(s.flushSplitUserKeys, interval.startKey.key)
708 2 : cumulativeBytes = 0
709 2 : }
710 2 : cumulativeBytes += s.orderedIntervals[i].estimatedBytes
711 : }
712 : }
713 :
714 : // InitCompactingFileInfo initializes internal flags relating to compacting
715 : // files. Must be called after sublevel initialization.
716 : //
717 : // Requires DB.mu *and* the manifest lock to be held.
718 2 : func (s *L0Sublevels) InitCompactingFileInfo(inProgress []L0Compaction) {
719 2 : for i := range s.orderedIntervals {
720 2 : s.orderedIntervals[i].compactingFileCount = 0
721 2 : s.orderedIntervals[i].isBaseCompacting = false
722 2 : s.orderedIntervals[i].intervalRangeIsBaseCompacting = false
723 2 : }
724 :
725 2 : iter := s.levelMetadata.Iter()
726 2 : for f := iter.First(); f != nil; f = iter.Next() {
727 2 : if invariants.Enabled {
728 2 : if !bytes.Equal(s.orderedIntervals[f.minIntervalIndex].startKey.key, f.Smallest.UserKey) {
729 0 : panic(fmt.Sprintf("f.minIntervalIndex in FileMetadata out of sync with intervals in L0Sublevels: %s != %s",
730 0 : s.formatKey(s.orderedIntervals[f.minIntervalIndex].startKey.key), s.formatKey(f.Smallest.UserKey)))
731 : }
732 2 : if !bytes.Equal(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key, f.Largest.UserKey) {
733 0 : panic(fmt.Sprintf("f.maxIntervalIndex in FileMetadata out of sync with intervals in L0Sublevels: %s != %s",
734 0 : s.formatKey(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key), s.formatKey(f.Smallest.UserKey)))
735 : }
736 : }
737 2 : if !f.IsCompacting() {
738 2 : continue
739 : }
740 2 : if invariants.Enabled {
741 2 : if s.cmp(s.orderedIntervals[f.minIntervalIndex].startKey.key, f.Smallest.UserKey) != 0 || s.cmp(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key, f.Largest.UserKey) != 0 {
742 0 : panic(fmt.Sprintf("file %s has inconsistent L0 Sublevel interval bounds: %s-%s, %s-%s", f.FileNum,
743 0 : s.orderedIntervals[f.minIntervalIndex].startKey.key, s.orderedIntervals[f.maxIntervalIndex+1].startKey.key,
744 0 : f.Smallest.UserKey, f.Largest.UserKey))
745 : }
746 : }
747 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
748 2 : interval := &s.orderedIntervals[i]
749 2 : interval.compactingFileCount++
750 2 : if !f.IsIntraL0Compacting {
751 2 : // If f.Compacting && !f.IsIntraL0Compacting, this file is
752 2 : // being compacted to Lbase.
753 2 : interval.isBaseCompacting = true
754 2 : }
755 : }
756 : }
757 :
758 : // Some intervals may be base compacting without the files contained within
759 : // those intervals being marked as compacting. This is possible if the files
760 : // were added after the compaction initiated, and the active compaction
761 : // files straddle the input file. Mark these intervals as base compacting.
762 2 : for _, c := range inProgress {
763 2 : startIK := intervalKey{key: c.Smallest.UserKey, isLargest: false}
764 2 : endIK := intervalKey{key: c.Largest.UserKey, isLargest: !c.Largest.IsExclusiveSentinel()}
765 2 : start, _ := slices.BinarySearchFunc(s.orderedIntervals, startIK, func(a fileInterval, b intervalKey) int {
766 2 : return intervalKeyCompare(s.cmp, a.startKey, b)
767 2 : })
768 2 : end, _ := slices.BinarySearchFunc(s.orderedIntervals, endIK, func(a fileInterval, b intervalKey) int {
769 2 : return intervalKeyCompare(s.cmp, a.startKey, b)
770 2 : })
771 2 : for i := start; i < end && i < len(s.orderedIntervals); i++ {
772 2 : interval := &s.orderedIntervals[i]
773 2 : if !c.IsIntraL0 {
774 2 : interval.isBaseCompacting = true
775 2 : }
776 : }
777 : }
778 :
779 2 : min := 0
780 2 : for i := range s.orderedIntervals {
781 2 : interval := &s.orderedIntervals[i]
782 2 : if interval.isBaseCompacting {
783 2 : minIndex := interval.filesMinIntervalIndex
784 2 : if minIndex < min {
785 2 : minIndex = min
786 2 : }
787 2 : for j := minIndex; j <= interval.filesMaxIntervalIndex; j++ {
788 2 : min = j
789 2 : s.orderedIntervals[j].intervalRangeIsBaseCompacting = true
790 2 : }
791 : }
792 : }
793 : }
794 :
795 : // String produces a string containing useful debug information. Useful in test
796 : // code and debugging.
797 0 : func (s *L0Sublevels) String() string {
798 0 : return s.describe(false)
799 0 : }
800 :
801 1 : func (s *L0Sublevels) describe(verbose bool) string {
802 1 : var buf strings.Builder
803 1 : fmt.Fprintf(&buf, "file count: %d, sublevels: %d, intervals: %d\nflush split keys(%d): [",
804 1 : s.levelMetadata.Len(), len(s.levelFiles), len(s.orderedIntervals), len(s.flushSplitUserKeys))
805 1 : for i := range s.flushSplitUserKeys {
806 1 : fmt.Fprintf(&buf, "%s", s.formatKey(s.flushSplitUserKeys[i]))
807 1 : if i < len(s.flushSplitUserKeys)-1 {
808 1 : fmt.Fprintf(&buf, ", ")
809 1 : }
810 : }
811 1 : fmt.Fprintln(&buf, "]")
812 1 : numCompactingFiles := 0
813 1 : for i := len(s.levelFiles) - 1; i >= 0; i-- {
814 1 : maxIntervals := 0
815 1 : sumIntervals := 0
816 1 : var totalBytes uint64
817 1 : for _, f := range s.levelFiles[i] {
818 1 : intervals := f.maxIntervalIndex - f.minIntervalIndex + 1
819 1 : if intervals > maxIntervals {
820 1 : maxIntervals = intervals
821 1 : }
822 1 : sumIntervals += intervals
823 1 : totalBytes += f.Size
824 1 : if f.IsCompacting() {
825 1 : numCompactingFiles++
826 1 : }
827 : }
828 1 : fmt.Fprintf(&buf, "0.%d: file count: %d, bytes: %d, width (mean, max): %0.1f, %d, interval range: [%d, %d]\n",
829 1 : i, len(s.levelFiles[i]), totalBytes, float64(sumIntervals)/float64(len(s.levelFiles[i])), maxIntervals, s.levelFiles[i][0].minIntervalIndex,
830 1 : s.levelFiles[i][len(s.levelFiles[i])-1].maxIntervalIndex)
831 1 : for _, f := range s.levelFiles[i] {
832 1 : intervals := f.maxIntervalIndex - f.minIntervalIndex + 1
833 1 : if verbose {
834 1 : fmt.Fprintf(&buf, "\t%s\n", f)
835 1 : }
836 1 : if s.levelMetadata.Len() > 50 && intervals*3 > len(s.orderedIntervals) {
837 0 : var intervalsBytes uint64
838 0 : for k := f.minIntervalIndex; k <= f.maxIntervalIndex; k++ {
839 0 : intervalsBytes += s.orderedIntervals[k].estimatedBytes
840 0 : }
841 0 : fmt.Fprintf(&buf, "wide file: %d, [%d, %d], byte fraction: %f\n",
842 0 : f.FileNum, f.minIntervalIndex, f.maxIntervalIndex,
843 0 : float64(intervalsBytes)/float64(s.fileBytes))
844 : }
845 : }
846 : }
847 :
848 1 : lastCompactingIntervalStart := -1
849 1 : fmt.Fprintf(&buf, "compacting file count: %d, base compacting intervals: ", numCompactingFiles)
850 1 : i := 0
851 1 : foundBaseCompactingIntervals := false
852 1 : for ; i < len(s.orderedIntervals); i++ {
853 1 : interval := &s.orderedIntervals[i]
854 1 : if len(interval.files) == 0 {
855 1 : continue
856 : }
857 1 : if !interval.isBaseCompacting {
858 1 : if lastCompactingIntervalStart != -1 {
859 1 : if foundBaseCompactingIntervals {
860 1 : buf.WriteString(", ")
861 1 : }
862 1 : fmt.Fprintf(&buf, "[%d, %d]", lastCompactingIntervalStart, i-1)
863 1 : foundBaseCompactingIntervals = true
864 : }
865 1 : lastCompactingIntervalStart = -1
866 1 : } else {
867 1 : if lastCompactingIntervalStart == -1 {
868 1 : lastCompactingIntervalStart = i
869 1 : }
870 : }
871 : }
872 1 : if lastCompactingIntervalStart != -1 {
873 1 : if foundBaseCompactingIntervals {
874 1 : buf.WriteString(", ")
875 1 : }
876 1 : fmt.Fprintf(&buf, "[%d, %d]", lastCompactingIntervalStart, i-1)
877 1 : } else if !foundBaseCompactingIntervals {
878 1 : fmt.Fprintf(&buf, "none")
879 1 : }
880 1 : fmt.Fprintln(&buf, "")
881 1 : return buf.String()
882 : }
883 :
884 : // ReadAmplification returns the contribution of L0Sublevels to the read
885 : // amplification for any particular point key. It is the maximum height of any
886 : // tracked fileInterval. This is always less than or equal to the number of
887 : // sublevels.
888 2 : func (s *L0Sublevels) ReadAmplification() int {
889 2 : amp := 0
890 2 : for i := range s.orderedIntervals {
891 2 : interval := &s.orderedIntervals[i]
892 2 : fileCount := len(interval.files)
893 2 : if amp < fileCount {
894 2 : amp = fileCount
895 2 : }
896 : }
897 2 : return amp
898 : }
899 :
900 : // UserKeyRange encodes a key range in user key space. A UserKeyRange's Start
901 : // and End boundaries are both inclusive.
902 : type UserKeyRange struct {
903 : Start, End []byte
904 : }
905 :
906 : // InUseKeyRanges returns the merged table bounds of L0 files overlapping the
907 : // provided user key range. The returned key ranges are sorted and
908 : // nonoverlapping.
909 2 : func (s *L0Sublevels) InUseKeyRanges(smallest, largest []byte) []UserKeyRange {
910 2 : // Binary search to find the provided keys within the intervals.
911 2 : startIK := intervalKey{key: smallest, isLargest: false}
912 2 : endIK := intervalKey{key: largest, isLargest: true}
913 2 : start := sort.Search(len(s.orderedIntervals), func(i int) bool {
914 2 : return intervalKeyCompare(s.cmp, s.orderedIntervals[i].startKey, startIK) > 0
915 2 : })
916 2 : if start > 0 {
917 2 : // Back up to the first interval with a start key <= startIK.
918 2 : start--
919 2 : }
920 2 : end := sort.Search(len(s.orderedIntervals), func(i int) bool {
921 2 : return intervalKeyCompare(s.cmp, s.orderedIntervals[i].startKey, endIK) > 0
922 2 : })
923 :
924 2 : var keyRanges []UserKeyRange
925 2 : var curr *UserKeyRange
926 2 : for i := start; i < end; {
927 2 : // Intervals with no files are not in use and can be skipped, once we
928 2 : // end the current UserKeyRange.
929 2 : if len(s.orderedIntervals[i].files) == 0 {
930 2 : curr = nil
931 2 : i++
932 2 : continue
933 : }
934 :
935 : // If curr is nil, start a new in-use key range.
936 2 : if curr == nil {
937 2 : keyRanges = append(keyRanges, UserKeyRange{
938 2 : Start: s.orderedIntervals[i].startKey.key,
939 2 : })
940 2 : curr = &keyRanges[len(keyRanges)-1]
941 2 : }
942 :
943 : // If the filesMaxIntervalIndex is not the current index, we can jump to
944 : // the max index, knowing that all intermediary intervals are overlapped
945 : // by some file.
946 2 : if maxIdx := s.orderedIntervals[i].filesMaxIntervalIndex; maxIdx != i {
947 2 : // Note that end may be less than or equal to maxIdx if we're
948 2 : // concerned with a key range that ends before the interval at
949 2 : // maxIdx starts. We must set curr.End now, before making that leap,
950 2 : // because this iteration may be the last.
951 2 : i = maxIdx
952 2 : curr.End = s.orderedIntervals[i+1].startKey.key
953 2 : continue
954 : }
955 :
956 : // No files overlapping with this interval overlap with the next
957 : // interval. Update the current end to be the next interval's start key.
958 : // Note that curr is not necessarily finished, because there may be an
959 : // abutting non-empty interval.
960 2 : curr.End = s.orderedIntervals[i+1].startKey.key
961 2 : i++
962 : }
963 2 : return keyRanges
964 : }
965 :
966 : // FlushSplitKeys returns a slice of user keys to split flushes at. Used by
967 : // flushes to avoid writing sstables that straddle these split keys. These
968 : // should be interpreted as the keys to start the next sstable (not the last key
969 : // to include in the prev sstable). These are user keys so that range tombstones
970 : // can be properly truncated (untruncated range tombstones are not permitted for
971 : // L0 files).
972 2 : func (s *L0Sublevels) FlushSplitKeys() [][]byte {
973 2 : return s.flushSplitUserKeys
974 2 : }
975 :
976 : // MaxDepthAfterOngoingCompactions returns an estimate of maximum depth of
977 : // sublevels after all ongoing compactions run to completion. Used by compaction
978 : // picker to decide compaction score for L0. There is no scoring for intra-L0
979 : // compactions -- they only run if L0 score is high but we're unable to pick an
980 : // L0 -> Lbase compaction.
981 2 : func (s *L0Sublevels) MaxDepthAfterOngoingCompactions() int {
982 2 : depth := 0
983 2 : for i := range s.orderedIntervals {
984 2 : interval := &s.orderedIntervals[i]
985 2 : intervalDepth := len(interval.files) - interval.compactingFileCount
986 2 : if depth < intervalDepth {
987 2 : depth = intervalDepth
988 2 : }
989 : }
990 2 : return depth
991 : }
992 :
993 : // Only for temporary debugging in the absence of proper tests.
994 : //
995 : // TODO(bilal): Simplify away the debugging statements in this method, and make
996 : // this a pure sanity checker.
997 : //
998 : //lint:ignore U1000 - useful for debugging
999 0 : func (s *L0Sublevels) checkCompaction(c *L0CompactionFiles) error {
1000 0 : includedFiles := newBitSet(s.levelMetadata.Len())
1001 0 : fileIntervalsByLevel := make([]struct {
1002 0 : min int
1003 0 : max int
1004 0 : }, len(s.levelFiles))
1005 0 : for i := range fileIntervalsByLevel {
1006 0 : fileIntervalsByLevel[i].min = math.MaxInt32
1007 0 : fileIntervalsByLevel[i].max = 0
1008 0 : }
1009 0 : var topLevel int
1010 0 : var increment int
1011 0 : var limitReached func(int) bool
1012 0 : if c.isIntraL0 {
1013 0 : topLevel = len(s.levelFiles) - 1
1014 0 : increment = +1
1015 0 : limitReached = func(level int) bool {
1016 0 : return level == len(s.levelFiles)
1017 0 : }
1018 0 : } else {
1019 0 : topLevel = 0
1020 0 : increment = -1
1021 0 : limitReached = func(level int) bool {
1022 0 : return level < 0
1023 0 : }
1024 : }
1025 0 : for _, f := range c.Files {
1026 0 : if fileIntervalsByLevel[f.SubLevel].min > f.minIntervalIndex {
1027 0 : fileIntervalsByLevel[f.SubLevel].min = f.minIntervalIndex
1028 0 : }
1029 0 : if fileIntervalsByLevel[f.SubLevel].max < f.maxIntervalIndex {
1030 0 : fileIntervalsByLevel[f.SubLevel].max = f.maxIntervalIndex
1031 0 : }
1032 0 : includedFiles.markBit(f.L0Index)
1033 0 : if c.isIntraL0 {
1034 0 : if topLevel > f.SubLevel {
1035 0 : topLevel = f.SubLevel
1036 0 : }
1037 0 : } else {
1038 0 : if topLevel < f.SubLevel {
1039 0 : topLevel = f.SubLevel
1040 0 : }
1041 : }
1042 : }
1043 0 : min := fileIntervalsByLevel[topLevel].min
1044 0 : max := fileIntervalsByLevel[topLevel].max
1045 0 : for level := topLevel; !limitReached(level); level += increment {
1046 0 : if fileIntervalsByLevel[level].min < min {
1047 0 : min = fileIntervalsByLevel[level].min
1048 0 : }
1049 0 : if fileIntervalsByLevel[level].max > max {
1050 0 : max = fileIntervalsByLevel[level].max
1051 0 : }
1052 0 : index, _ := slices.BinarySearchFunc(s.levelFiles[level], min, func(a *FileMetadata, b int) int {
1053 0 : return stdcmp.Compare(a.maxIntervalIndex, b)
1054 0 : })
1055 : // start := index
1056 0 : for ; index < len(s.levelFiles[level]); index++ {
1057 0 : f := s.levelFiles[level][index]
1058 0 : if f.minIntervalIndex > max {
1059 0 : break
1060 : }
1061 0 : if c.isIntraL0 && f.LargestSeqNum >= c.earliestUnflushedSeqNum {
1062 0 : return errors.Errorf(
1063 0 : "sstable %s in compaction has sequence numbers higher than the earliest unflushed seqnum %d: %d-%d",
1064 0 : f.FileNum, c.earliestUnflushedSeqNum, f.SmallestSeqNum,
1065 0 : f.LargestSeqNum)
1066 0 : }
1067 0 : if !includedFiles[f.L0Index] {
1068 0 : var buf strings.Builder
1069 0 : fmt.Fprintf(&buf, "bug %t, seed interval: %d: level %d, sl index %d, f.index %d, min %d, max %d, pre-min %d, pre-max %d, f.min %d, f.max %d, filenum: %d, isCompacting: %t\n%s\n",
1070 0 : c.isIntraL0, c.seedInterval, level, index, f.L0Index, min, max, c.preExtensionMinInterval, c.preExtensionMaxInterval,
1071 0 : f.minIntervalIndex, f.maxIntervalIndex,
1072 0 : f.FileNum, f.IsCompacting(), s)
1073 0 : fmt.Fprintf(&buf, "files included:\n")
1074 0 : for _, f := range c.Files {
1075 0 : fmt.Fprintf(&buf, "filenum: %d, sl: %d, index: %d, [%d, %d]\n",
1076 0 : f.FileNum, f.SubLevel, f.L0Index, f.minIntervalIndex, f.maxIntervalIndex)
1077 0 : }
1078 0 : fmt.Fprintf(&buf, "files added:\n")
1079 0 : for _, f := range c.filesAdded {
1080 0 : fmt.Fprintf(&buf, "filenum: %d, sl: %d, index: %d, [%d, %d]\n",
1081 0 : f.FileNum, f.SubLevel, f.L0Index, f.minIntervalIndex, f.maxIntervalIndex)
1082 0 : }
1083 0 : return errors.New(buf.String())
1084 : }
1085 : }
1086 : }
1087 0 : return nil
1088 : }
1089 :
1090 : // UpdateStateForStartedCompaction updates internal L0Sublevels state for a
1091 : // recently started compaction. isBase specifies if this is a base compaction;
1092 : // if false, this is assumed to be an intra-L0 compaction. The specified
1093 : // compaction must be involving L0 SSTables. It's assumed that the Compacting
1094 : // and IsIntraL0Compacting fields are already set on all [FileMetadata]s passed
1095 : // in.
1096 2 : func (s *L0Sublevels) UpdateStateForStartedCompaction(inputs []LevelSlice, isBase bool) error {
1097 2 : minIntervalIndex := -1
1098 2 : maxIntervalIndex := 0
1099 2 : for i := range inputs {
1100 2 : iter := inputs[i].Iter()
1101 2 : for f := iter.First(); f != nil; f = iter.Next() {
1102 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
1103 2 : interval := &s.orderedIntervals[i]
1104 2 : interval.compactingFileCount++
1105 2 : }
1106 2 : if f.minIntervalIndex < minIntervalIndex || minIntervalIndex == -1 {
1107 2 : minIntervalIndex = f.minIntervalIndex
1108 2 : }
1109 2 : if f.maxIntervalIndex > maxIntervalIndex {
1110 2 : maxIntervalIndex = f.maxIntervalIndex
1111 2 : }
1112 : }
1113 : }
1114 2 : if isBase {
1115 2 : for i := minIntervalIndex; i <= maxIntervalIndex; i++ {
1116 2 : interval := &s.orderedIntervals[i]
1117 2 : interval.isBaseCompacting = isBase
1118 2 : for j := interval.filesMinIntervalIndex; j <= interval.filesMaxIntervalIndex; j++ {
1119 2 : s.orderedIntervals[j].intervalRangeIsBaseCompacting = true
1120 2 : }
1121 : }
1122 : }
1123 2 : return nil
1124 : }
1125 :
1126 : // L0CompactionFiles represents a candidate set of L0 files for compaction. Also
1127 : // referred to as "lcf". Contains state information useful for generating the
1128 : // compaction (such as Files), as well as for picking between candidate
1129 : // compactions (eg. fileBytes and seedIntervalStackDepthReduction).
1130 : type L0CompactionFiles struct {
1131 : Files []*FileMetadata
1132 :
1133 : FilesIncluded bitSet
1134 : // A "seed interval" is an interval with a high stack depth that was chosen
1135 : // to bootstrap this compaction candidate. seedIntervalStackDepthReduction
1136 : // is the number of sublevels that have a file in the seed interval that is
1137 : // a part of this compaction.
1138 : seedIntervalStackDepthReduction int
1139 : // For base compactions, seedIntervalMinLevel is 0, and for intra-L0
1140 : // compactions, seedIntervalMaxLevel is len(s.Files)-1 i.e. the highest
1141 : // sublevel.
1142 : seedIntervalMinLevel int
1143 : seedIntervalMaxLevel int
1144 : // Index of the seed interval.
1145 : seedInterval int
1146 : // Sum of file sizes for all files in this compaction.
1147 : fileBytes uint64
1148 : // Intervals with index [minIntervalIndex, maxIntervalIndex] are
1149 : // participating in this compaction; it's the union set of all intervals
1150 : // overlapped by participating files.
1151 : minIntervalIndex int
1152 : maxIntervalIndex int
1153 :
1154 : // Set for intra-L0 compactions. SSTables with sequence numbers greater
1155 : // than earliestUnflushedSeqNum cannot be a part of intra-L0 compactions.
1156 : isIntraL0 bool
1157 : earliestUnflushedSeqNum uint64
1158 :
1159 : // For debugging purposes only. Used in checkCompaction().
1160 : preExtensionMinInterval int
1161 : preExtensionMaxInterval int
1162 : filesAdded []*FileMetadata
1163 : }
1164 :
1165 : // Clone allocates a new L0CompactionFiles, with the same underlying data. Note
1166 : // that the two fileMetadata slices contain values that point to the same
1167 : // underlying fileMetadata object. This is safe because these objects are read
1168 : // only.
1169 2 : func (l *L0CompactionFiles) Clone() *L0CompactionFiles {
1170 2 : oldLcf := *l
1171 2 : return &oldLcf
1172 2 : }
1173 :
1174 : // String merely prints the starting address of the first file, if it exists.
1175 1 : func (l *L0CompactionFiles) String() string {
1176 1 : if len(l.Files) > 0 {
1177 1 : return fmt.Sprintf("First File Address: %p", &l.Files[0])
1178 1 : }
1179 0 : return ""
1180 : }
1181 :
1182 : // addFile adds the specified file to the LCF.
1183 2 : func (l *L0CompactionFiles) addFile(f *FileMetadata) {
1184 2 : if l.FilesIncluded[f.L0Index] {
1185 2 : return
1186 2 : }
1187 2 : l.FilesIncluded.markBit(f.L0Index)
1188 2 : l.Files = append(l.Files, f)
1189 2 : l.filesAdded = append(l.filesAdded, f)
1190 2 : l.fileBytes += f.Size
1191 2 : if f.minIntervalIndex < l.minIntervalIndex {
1192 2 : l.minIntervalIndex = f.minIntervalIndex
1193 2 : }
1194 2 : if f.maxIntervalIndex > l.maxIntervalIndex {
1195 2 : l.maxIntervalIndex = f.maxIntervalIndex
1196 2 : }
1197 : }
1198 :
1199 : // Helper to order intervals being considered for compaction.
1200 : type intervalAndScore struct {
1201 : interval int
1202 : score int
1203 : }
1204 : type intervalSorterByDecreasingScore []intervalAndScore
1205 :
1206 2 : func (is intervalSorterByDecreasingScore) Len() int { return len(is) }
1207 2 : func (is intervalSorterByDecreasingScore) Less(i, j int) bool {
1208 2 : return is[i].score > is[j].score
1209 2 : }
1210 2 : func (is intervalSorterByDecreasingScore) Swap(i, j int) {
1211 2 : is[i], is[j] = is[j], is[i]
1212 2 : }
1213 :
1214 : // Compactions:
1215 : //
1216 : // The sub-levels and intervals can be visualized in 2 dimensions as the X axis
1217 : // containing intervals in increasing order and the Y axis containing sub-levels
1218 : // (older to younger). The intervals can be sparse wrt sub-levels. We observe
1219 : // that the system is typically under severe pressure in L0 during large numbers
1220 : // of ingestions where most files added to L0 are narrow and non-overlapping.
1221 : //
1222 : // L0.1 d---g
1223 : // L0.0 c--e g--j o--s u--x
1224 : //
1225 : // As opposed to a case with a lot of wide, overlapping L0 files:
1226 : //
1227 : // L0.3 d-----------r
1228 : // L0.2 c--------o
1229 : // L0.1 b-----------q
1230 : // L0.0 a----------------x
1231 : //
1232 : // In that case we expect the rectangle represented in the good visualization
1233 : // above (i.e. the first one) to be wide and short, and not too sparse (most
1234 : // intervals will have fileCount close to the sub-level count), which would make
1235 : // it amenable to concurrent L0 -> Lbase compactions.
1236 : //
1237 : // L0 -> Lbase: The high-level goal of a L0 -> Lbase compaction is to reduce
1238 : // stack depth, by compacting files in the intervals with the highest (fileCount
1239 : // - compactingCount). Additionally, we would like compactions to not involve a
1240 : // huge number of files, so that they finish quickly, and to allow for
1241 : // concurrent L0 -> Lbase compactions when needed. In order to achieve these
1242 : // goals we would like compactions to visualize as capturing thin and tall
1243 : // rectangles. The approach below is to consider intervals in some order and
1244 : // then try to construct a compaction using the interval. The first interval we
1245 : // can construct a compaction for is the compaction that is started. There can
1246 : // be multiple heuristics in choosing the ordering of the intervals -- the code
1247 : // uses one heuristic that worked well for a large ingestion stemming from a
1248 : // cockroachdb import, but additional experimentation is necessary to pick a
1249 : // general heuristic. Additionally, the compaction that gets picked may be not
1250 : // as desirable as one that could be constructed later in terms of reducing
1251 : // stack depth (since adding more files to the compaction can get blocked by
1252 : // needing to encompass files that are already being compacted). So an
1253 : // alternative would be to try to construct more than one compaction and pick
1254 : // the best one.
1255 : //
1256 : // Here's a visualization of an ideal L0->LBase compaction selection:
1257 : //
1258 : // L0.3 a--d g-j
1259 : // L0.2 f--j r-t
1260 : // L0.1 b-d e---j
1261 : // L0.0 a--d f--j l--o p-----x
1262 : //
1263 : // Lbase a--------i m---------w
1264 : //
1265 : // The [g,j] interval has the highest stack depth, so it would have the highest
1266 : // priority for selecting a base compaction candidate. Assuming none of the
1267 : // files are already compacting, this is the compaction that will be chosen:
1268 : //
1269 : // _______
1270 : // L0.3 a--d | g-j|
1271 : // L0.2 | f--j| r-t
1272 : // L0.1 b-d |e---j|
1273 : // L0.0 a--d | f--j| l--o p-----x
1274 : //
1275 : // Lbase a--------i m---------w
1276 : //
1277 : // Note that running this compaction will mark the a--i file in Lbase as
1278 : // compacting, and when ExtendL0ForBaseCompactionTo is called with the bounds of
1279 : // that base file, it'll expand the compaction to also include all L0 files in
1280 : // the a-d interval. The resultant compaction would then be:
1281 : //
1282 : // _____________
1283 : // L0.3 |a--d g-j|
1284 : // L0.2 | f--j| r-t
1285 : // L0.1 | b-d e---j|
1286 : // L0.0 |a--d f--j| l--o p-----x
1287 : //
1288 : // Lbase a--------i m---------w
1289 : //
1290 : // The next best interval for base compaction would therefore be the one
1291 : // including r--t in L0.2 and p--x in L0.0, and both this compaction and the one
1292 : // picked earlier can run in parallel. This is assuming minCompactionDepth >= 2,
1293 : // otherwise the second compaction has too little depth to pick.
1294 : //
1295 : // _____________
1296 : // L0.3 |a--d g-j| _________
1297 : // L0.2 | f--j| | r-t |
1298 : // L0.1 | b-d e---j| | |
1299 : // L0.0 |a--d f--j| l--o |p-----x|
1300 : //
1301 : // Lbase a--------i m---------w
1302 : //
1303 : // Note that when ExtendL0ForBaseCompactionTo is called, the compaction expands
1304 : // to the following, given that the [l,o] file can be added without including
1305 : // additional files in Lbase:
1306 : //
1307 : // _____________
1308 : // L0.3 |a--d g-j| _________
1309 : // L0.2 | f--j| | r-t |
1310 : // L0.1 | b-d e---j|______| |
1311 : // L0.0 |a--d f--j||l--o p-----x|
1312 : //
1313 : // Lbase a--------i m---------w
1314 : //
1315 : // If an additional file existed in LBase that overlapped with [l,o], it would
1316 : // be excluded from the compaction. Concretely:
1317 : //
1318 : // _____________
1319 : // L0.3 |a--d g-j| _________
1320 : // L0.2 | f--j| | r-t |
1321 : // L0.1 | b-d e---j| | |
1322 : // L0.0 |a--d f--j| l--o |p-----x|
1323 : //
1324 : // Lbase a--------ij--lm---------w
1325 : //
1326 : // Intra-L0: If the L0 score is high, but PickBaseCompaction() is unable to pick
1327 : // a compaction, PickIntraL0Compaction will be used to pick an intra-L0
1328 : // compaction. Similar to L0 -> Lbase compactions, we want to allow for multiple
1329 : // intra-L0 compactions and not generate wide output files that hinder later
1330 : // concurrency of L0 -> Lbase compactions. Also compactions that produce wide
1331 : // files don't reduce stack depth -- they represent wide rectangles in our
1332 : // visualization, which means many intervals have their depth reduced by a small
1333 : // amount. Typically, L0 files have non-overlapping sequence numbers, and
1334 : // sticking to that invariant would require us to consider intra-L0 compactions
1335 : // that proceed from youngest to oldest files, which could result in the
1336 : // aforementioned undesirable wide rectangle shape. But this non-overlapping
1337 : // sequence number is already relaxed in RocksDB -- sstables are primarily
1338 : // ordered by their largest sequence number. So we can arrange for intra-L0
1339 : // compactions to capture thin and tall rectangles starting with the top of the
1340 : // stack (youngest files). Like the L0 -> Lbase case we order the intervals
1341 : // using a heuristic and consider each in turn. The same comment about better L0
1342 : // -> Lbase heuristics and not being greedy applies here.
1343 : //
1344 : // Going back to a modified version of our example from earlier, let's say these
1345 : // are the base compactions in progress:
1346 : // _______
1347 : // L0.3 a--d | g-j| _________
1348 : // L0.2 | f--j| | r-t |
1349 : // L0.1 b-d |e---j| | |
1350 : // L0.0 a--d | f--j| l--o |p-----x|
1351 : //
1352 : // Lbase a---------i m---------w
1353 : //
1354 : // Since both LBase files are compacting, the only L0 compaction that can be
1355 : // picked is an intra-L0 compaction. For this, the b--d interval has the highest
1356 : // stack depth (3), and starting with a--d in L0.3 as the seed file, we can
1357 : // iterate downward and build this compaction, assuming all files in that
1358 : // interval are not compacting and have a highest sequence number less than
1359 : // earliestUnflushedSeqNum:
1360 : //
1361 : // _______
1362 : // L0.3 |a--d| | g-j| _________
1363 : // L0.2 | | | f--j| | r-t |
1364 : // L0.1 | b-d| |e---j| | |
1365 : // L0.0 |a--d| | f--j| l--o |p-----x|
1366 : // ------
1367 : // Lbase a---------i m---------w
1368 : //
1369 :
1370 : // PickBaseCompaction picks a base compaction based on the above specified
1371 : // heuristics, for the specified Lbase files and a minimum depth of overlapping
1372 : // files that can be selected for compaction. Returns nil if no compaction is
1373 : // possible.
1374 : func (s *L0Sublevels) PickBaseCompaction(
1375 : minCompactionDepth int, baseFiles LevelSlice,
1376 2 : ) (*L0CompactionFiles, error) {
1377 2 : // For LBase compactions, we consider intervals in a greedy manner in the
1378 2 : // following order:
1379 2 : // - Intervals that are unlikely to be blocked due
1380 2 : // to ongoing L0 -> Lbase compactions. These are the ones with
1381 2 : // !isBaseCompacting && !intervalRangeIsBaseCompacting.
1382 2 : // - Intervals that are !isBaseCompacting && intervalRangeIsBaseCompacting.
1383 2 : //
1384 2 : // The ordering heuristic exists just to avoid wasted work. Ideally,
1385 2 : // we would consider all intervals with isBaseCompacting = false and
1386 2 : // construct a compaction for it and compare the constructed compactions
1387 2 : // and pick the best one. If microbenchmarks show that we can afford
1388 2 : // this cost we can eliminate this heuristic.
1389 2 : scoredIntervals := make([]intervalAndScore, 0, len(s.orderedIntervals))
1390 2 : sublevelCount := len(s.levelFiles)
1391 2 : for i := range s.orderedIntervals {
1392 2 : interval := &s.orderedIntervals[i]
1393 2 : depth := len(interval.files) - interval.compactingFileCount
1394 2 : if interval.isBaseCompacting || minCompactionDepth > depth {
1395 2 : continue
1396 : }
1397 2 : if interval.intervalRangeIsBaseCompacting {
1398 2 : scoredIntervals = append(scoredIntervals, intervalAndScore{interval: i, score: depth})
1399 2 : } else {
1400 2 : // Prioritize this interval by incrementing the score by the number
1401 2 : // of sublevels.
1402 2 : scoredIntervals = append(scoredIntervals, intervalAndScore{interval: i, score: depth + sublevelCount})
1403 2 : }
1404 : }
1405 2 : sort.Sort(intervalSorterByDecreasingScore(scoredIntervals))
1406 2 :
1407 2 : // Optimization to avoid considering different intervals that
1408 2 : // are likely to choose the same seed file. Again this is just
1409 2 : // to reduce wasted work.
1410 2 : consideredIntervals := newBitSet(len(s.orderedIntervals))
1411 2 : for _, scoredInterval := range scoredIntervals {
1412 2 : interval := &s.orderedIntervals[scoredInterval.interval]
1413 2 : if consideredIntervals[interval.index] {
1414 2 : continue
1415 : }
1416 :
1417 : // Pick the seed file for the interval as the file
1418 : // in the lowest sub-level.
1419 2 : f := interval.files[0]
1420 2 : // Don't bother considering the intervals that are covered by the seed
1421 2 : // file since they are likely nearby. Note that it is possible that
1422 2 : // those intervals have seed files at lower sub-levels so could be
1423 2 : // viable for compaction.
1424 2 : if f == nil {
1425 0 : return nil, errors.New("no seed file found in sublevel intervals")
1426 0 : }
1427 2 : consideredIntervals.markBits(f.minIntervalIndex, f.maxIntervalIndex+1)
1428 2 : if f.IsCompacting() {
1429 2 : if f.IsIntraL0Compacting {
1430 1 : // If we're picking a base compaction and we came across a seed
1431 1 : // file candidate that's being intra-L0 compacted, skip the
1432 1 : // interval instead of erroring out.
1433 1 : continue
1434 : }
1435 : // We chose a compaction seed file that should not be compacting.
1436 : // Usually means the score is not accurately accounting for files
1437 : // already compacting, or internal state is inconsistent.
1438 1 : return nil, errors.Errorf("file %s chosen as seed file for compaction should not be compacting", f.FileNum)
1439 : }
1440 :
1441 2 : c := s.baseCompactionUsingSeed(f, interval.index, minCompactionDepth)
1442 2 : if c != nil {
1443 2 : // Check if the chosen compaction overlaps with any files in Lbase
1444 2 : // that have Compacting = true. If that's the case, this compaction
1445 2 : // cannot be chosen.
1446 2 : baseIter := baseFiles.Iter()
1447 2 : // An interval starting at ImmediateSuccessor(key) can never be the
1448 2 : // first interval of a compaction since no file can start at that
1449 2 : // interval.
1450 2 : m := baseIter.SeekGE(s.cmp, s.orderedIntervals[c.minIntervalIndex].startKey.key)
1451 2 :
1452 2 : var baseCompacting bool
1453 2 : for ; m != nil && !baseCompacting; m = baseIter.Next() {
1454 2 : cmp := s.cmp(m.Smallest.UserKey, s.orderedIntervals[c.maxIntervalIndex+1].startKey.key)
1455 2 : // Compaction is ending at exclusive bound of c.maxIntervalIndex+1
1456 2 : if cmp > 0 || (cmp == 0 && !s.orderedIntervals[c.maxIntervalIndex+1].startKey.isLargest) {
1457 2 : break
1458 : }
1459 2 : baseCompacting = baseCompacting || m.IsCompacting()
1460 : }
1461 2 : if baseCompacting {
1462 2 : continue
1463 : }
1464 2 : return c, nil
1465 : }
1466 : }
1467 2 : return nil, nil
1468 : }
1469 :
1470 : // Helper function for building an L0 -> Lbase compaction using a seed interval
1471 : // and seed file in that seed interval.
1472 : func (s *L0Sublevels) baseCompactionUsingSeed(
1473 : f *FileMetadata, intervalIndex int, minCompactionDepth int,
1474 2 : ) *L0CompactionFiles {
1475 2 : c := &L0CompactionFiles{
1476 2 : FilesIncluded: newBitSet(s.levelMetadata.Len()),
1477 2 : seedInterval: intervalIndex,
1478 2 : seedIntervalMinLevel: 0,
1479 2 : minIntervalIndex: f.minIntervalIndex,
1480 2 : maxIntervalIndex: f.maxIntervalIndex,
1481 2 : }
1482 2 : c.addFile(f)
1483 2 :
1484 2 : // The first iteration of this loop builds the compaction at the seed file's
1485 2 : // sublevel. Future iterations expand on this compaction by stacking more
1486 2 : // files from intervalIndex and repeating. This is an optional activity so
1487 2 : // when it fails we can fallback to the last successful candidate.
1488 2 : var lastCandidate *L0CompactionFiles
1489 2 : interval := &s.orderedIntervals[intervalIndex]
1490 2 :
1491 2 : for i := 0; i < len(interval.files); i++ {
1492 2 : f2 := interval.files[i]
1493 2 : sl := f2.SubLevel
1494 2 : c.seedIntervalStackDepthReduction++
1495 2 : c.seedIntervalMaxLevel = sl
1496 2 : c.addFile(f2)
1497 2 : // The seed file is in the lowest sublevel in the seed interval, but it
1498 2 : // may overlap with other files in even lower sublevels. For correctness
1499 2 : // we need to grow our interval to include those files, and capture all
1500 2 : // files in the next level that fall in this extended interval and so
1501 2 : // on. This can result in a triangular shape like the following where
1502 2 : // again the X axis is the key intervals and the Y axis is oldest to
1503 2 : // youngest. Note that it is not necessary for correctness to fill out
1504 2 : // the shape at the higher sub-levels to make it more rectangular since
1505 2 : // the invariant only requires that younger versions of a key not be
1506 2 : // moved to Lbase while leaving behind older versions.
1507 2 : // -
1508 2 : // ---
1509 2 : // -----
1510 2 : // It may be better for performance to have a more rectangular shape
1511 2 : // since the files being left behind will overlap with the same Lbase
1512 2 : // key range as that of this compaction. But there is also the danger
1513 2 : // that in trying to construct a more rectangular shape we will be
1514 2 : // forced to pull in a file that is already compacting. We expect
1515 2 : // extendCandidateToRectangle to eventually be called on this compaction
1516 2 : // if it's chosen, at which point we would iterate backward and choose
1517 2 : // those files. This logic is similar to compaction.grow for non-L0
1518 2 : // compactions.
1519 2 : done := false
1520 2 : for currLevel := sl - 1; currLevel >= 0; currLevel-- {
1521 2 : if !s.extendFiles(currLevel, math.MaxUint64, c) {
1522 2 : // Failed to extend due to ongoing compaction.
1523 2 : done = true
1524 2 : break
1525 : }
1526 : }
1527 2 : if done {
1528 2 : break
1529 : }
1530 : // Observed some compactions using > 1GB from L0 in an import
1531 : // experiment. Very long running compactions are not great as they
1532 : // reduce concurrency while they run, and take a while to produce
1533 : // results, though they're sometimes unavoidable. There is a tradeoff
1534 : // here in that adding more depth is more efficient in reducing stack
1535 : // depth, but long running compactions reduce flexibility in what can
1536 : // run concurrently in L0 and even Lbase -> Lbase+1. An increase more
1537 : // than 150% in bytes since the last candidate compaction (along with a
1538 : // total compaction size in excess of 100mb), or a total compaction size
1539 : // beyond a hard limit of 500mb, is criteria for rejecting this
1540 : // candidate. This lets us prefer slow growths as we add files, while
1541 : // still having a hard limit. Note that if this is the first compaction
1542 : // candidate to reach a stack depth reduction of minCompactionDepth or
1543 : // higher, this candidate will be chosen regardless.
1544 2 : if lastCandidate == nil {
1545 2 : lastCandidate = &L0CompactionFiles{}
1546 2 : } else if lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth &&
1547 2 : c.fileBytes > 100<<20 &&
1548 2 : (float64(c.fileBytes)/float64(lastCandidate.fileBytes) > 1.5 || c.fileBytes > 500<<20) {
1549 1 : break
1550 : }
1551 2 : *lastCandidate = *c
1552 : }
1553 2 : if lastCandidate != nil && lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth {
1554 2 : lastCandidate.FilesIncluded.clearAllBits()
1555 2 : for _, f := range lastCandidate.Files {
1556 2 : lastCandidate.FilesIncluded.markBit(f.L0Index)
1557 2 : }
1558 2 : return lastCandidate
1559 : }
1560 2 : return nil
1561 : }
1562 :
1563 : // Expands fields in the provided L0CompactionFiles instance (cFiles) to
1564 : // include overlapping files in the specified sublevel. Returns true if the
1565 : // compaction is possible (i.e. does not conflict with any base/intra-L0
1566 : // compacting files).
1567 : func (s *L0Sublevels) extendFiles(
1568 : sl int, earliestUnflushedSeqNum uint64, cFiles *L0CompactionFiles,
1569 2 : ) bool {
1570 2 : index, _ := slices.BinarySearchFunc(s.levelFiles[sl], cFiles.minIntervalIndex, func(a *FileMetadata, b int) int {
1571 2 : return stdcmp.Compare(a.maxIntervalIndex, b)
1572 2 : })
1573 2 : for ; index < len(s.levelFiles[sl]); index++ {
1574 2 : f := s.levelFiles[sl][index]
1575 2 : if f.minIntervalIndex > cFiles.maxIntervalIndex {
1576 2 : break
1577 : }
1578 2 : if f.IsCompacting() {
1579 2 : return false
1580 2 : }
1581 : // Skip over files that are newer than earliestUnflushedSeqNum. This is
1582 : // okay because this compaction can just pretend these files are not in
1583 : // L0 yet. These files must be in higher sublevels than any overlapping
1584 : // files with f.LargestSeqNum < earliestUnflushedSeqNum, and the output
1585 : // of the compaction will also go in a lower (older) sublevel than this
1586 : // file by definition.
1587 2 : if f.LargestSeqNum >= earliestUnflushedSeqNum {
1588 2 : continue
1589 : }
1590 2 : cFiles.addFile(f)
1591 : }
1592 2 : return true
1593 : }
1594 :
1595 : // PickIntraL0Compaction picks an intra-L0 compaction for files in this
1596 : // sublevel. This method is only called when a base compaction cannot be chosen.
1597 : // See comment above [PickBaseCompaction] for heuristics involved in this
1598 : // selection.
1599 : func (s *L0Sublevels) PickIntraL0Compaction(
1600 : earliestUnflushedSeqNum uint64, minCompactionDepth int,
1601 2 : ) (*L0CompactionFiles, error) {
1602 2 : scoredIntervals := make([]intervalAndScore, len(s.orderedIntervals))
1603 2 : for i := range s.orderedIntervals {
1604 2 : interval := &s.orderedIntervals[i]
1605 2 : depth := len(interval.files) - interval.compactingFileCount
1606 2 : if minCompactionDepth > depth {
1607 2 : continue
1608 : }
1609 2 : scoredIntervals[i] = intervalAndScore{interval: i, score: depth}
1610 : }
1611 2 : sort.Sort(intervalSorterByDecreasingScore(scoredIntervals))
1612 2 :
1613 2 : // Optimization to avoid considering different intervals that are likely to
1614 2 : // choose the same seed file. Again this is just to reduce wasted work.
1615 2 : consideredIntervals := newBitSet(len(s.orderedIntervals))
1616 2 : for _, scoredInterval := range scoredIntervals {
1617 2 : interval := &s.orderedIntervals[scoredInterval.interval]
1618 2 : if consideredIntervals[interval.index] {
1619 2 : continue
1620 : }
1621 :
1622 2 : var f *FileMetadata
1623 2 : // Pick the seed file for the interval as the file in the highest
1624 2 : // sub-level.
1625 2 : stackDepthReduction := scoredInterval.score
1626 2 : for i := len(interval.files) - 1; i >= 0; i-- {
1627 2 : f = interval.files[i]
1628 2 : if f.IsCompacting() {
1629 2 : break
1630 : }
1631 2 : consideredIntervals.markBits(f.minIntervalIndex, f.maxIntervalIndex+1)
1632 2 : // Can this be the seed file? Files with newer sequence numbers than
1633 2 : // earliestUnflushedSeqNum cannot be in the compaction.
1634 2 : if f.LargestSeqNum >= earliestUnflushedSeqNum {
1635 2 : stackDepthReduction--
1636 2 : if stackDepthReduction == 0 {
1637 1 : break
1638 : }
1639 2 : } else {
1640 2 : break
1641 : }
1642 : }
1643 2 : if stackDepthReduction < minCompactionDepth {
1644 2 : // Can't use this interval.
1645 2 : continue
1646 : }
1647 :
1648 2 : if f == nil {
1649 0 : return nil, errors.New("no seed file found in sublevel intervals")
1650 0 : }
1651 2 : if f.IsCompacting() {
1652 1 : // This file could be in a concurrent intra-L0 or base compaction.
1653 1 : // Try another interval.
1654 1 : continue
1655 : }
1656 :
1657 : // We have a seed file. Build a compaction off of that seed.
1658 2 : c := s.intraL0CompactionUsingSeed(
1659 2 : f, interval.index, earliestUnflushedSeqNum, minCompactionDepth)
1660 2 : if c != nil {
1661 2 : return c, nil
1662 2 : }
1663 : }
1664 2 : return nil, nil
1665 : }
1666 :
1667 : func (s *L0Sublevels) intraL0CompactionUsingSeed(
1668 : f *FileMetadata, intervalIndex int, earliestUnflushedSeqNum uint64, minCompactionDepth int,
1669 2 : ) *L0CompactionFiles {
1670 2 : // We know that all the files that overlap with intervalIndex have
1671 2 : // LargestSeqNum < earliestUnflushedSeqNum, but for other intervals
1672 2 : // we need to exclude files >= earliestUnflushedSeqNum
1673 2 :
1674 2 : c := &L0CompactionFiles{
1675 2 : FilesIncluded: newBitSet(s.levelMetadata.Len()),
1676 2 : seedInterval: intervalIndex,
1677 2 : seedIntervalMaxLevel: len(s.levelFiles) - 1,
1678 2 : minIntervalIndex: f.minIntervalIndex,
1679 2 : maxIntervalIndex: f.maxIntervalIndex,
1680 2 : isIntraL0: true,
1681 2 : earliestUnflushedSeqNum: earliestUnflushedSeqNum,
1682 2 : }
1683 2 : c.addFile(f)
1684 2 :
1685 2 : var lastCandidate *L0CompactionFiles
1686 2 : interval := &s.orderedIntervals[intervalIndex]
1687 2 : slIndex := len(interval.files) - 1
1688 2 : for {
1689 2 : if interval.files[slIndex] == f {
1690 2 : break
1691 : }
1692 2 : slIndex--
1693 : }
1694 : // The first iteration of this loop produces an intra-L0 compaction at the
1695 : // seed level. Iterations after that optionally add to the compaction by
1696 : // stacking more files from intervalIndex and repeating. This is an optional
1697 : // activity so when it fails we can fallback to the last successful
1698 : // candidate. The code stops adding when it can't add more, or when
1699 : // fileBytes grows too large.
1700 2 : for ; slIndex >= 0; slIndex-- {
1701 2 : f2 := interval.files[slIndex]
1702 2 : sl := f2.SubLevel
1703 2 : if f2.IsCompacting() {
1704 2 : break
1705 : }
1706 2 : c.seedIntervalStackDepthReduction++
1707 2 : c.seedIntervalMinLevel = sl
1708 2 : c.addFile(f2)
1709 2 : // The seed file captures all files in the higher level that fall in the
1710 2 : // range of intervals. That may extend the range of intervals so for
1711 2 : // correctness we need to capture all files in the next higher level
1712 2 : // that fall in this extended interval and so on. This can result in an
1713 2 : // inverted triangular shape like the following where again the X axis
1714 2 : // is the key intervals and the Y axis is oldest to youngest. Note that
1715 2 : // it is not necessary for correctness to fill out the shape at lower
1716 2 : // sub-levels to make it more rectangular since the invariant only
1717 2 : // requires that if we move an older seqnum for key k into a file that
1718 2 : // has a higher seqnum, we also move all younger seqnums for that key k
1719 2 : // into that file.
1720 2 : // -----
1721 2 : // ---
1722 2 : // -
1723 2 : // It may be better for performance to have a more rectangular shape
1724 2 : // since it will reduce the stack depth for more intervals. But there is
1725 2 : // also the danger that in explicitly trying to construct a more
1726 2 : // rectangular shape we will be forced to pull in a file that is already
1727 2 : // compacting. We assume that the performance concern is not a practical
1728 2 : // issue.
1729 2 : done := false
1730 2 : for currLevel := sl + 1; currLevel < len(s.levelFiles); currLevel++ {
1731 2 : if !s.extendFiles(currLevel, earliestUnflushedSeqNum, c) {
1732 1 : // Failed to extend due to ongoing compaction.
1733 1 : done = true
1734 1 : break
1735 : }
1736 : }
1737 2 : if done {
1738 1 : break
1739 : }
1740 2 : if lastCandidate == nil {
1741 2 : lastCandidate = &L0CompactionFiles{}
1742 2 : } else if lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth &&
1743 2 : c.fileBytes > 100<<20 &&
1744 2 : (float64(c.fileBytes)/float64(lastCandidate.fileBytes) > 1.5 || c.fileBytes > 500<<20) {
1745 0 : break
1746 : }
1747 2 : *lastCandidate = *c
1748 : }
1749 2 : if lastCandidate != nil && lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth {
1750 2 : lastCandidate.FilesIncluded.clearAllBits()
1751 2 : for _, f := range lastCandidate.Files {
1752 2 : lastCandidate.FilesIncluded.markBit(f.L0Index)
1753 2 : }
1754 2 : s.extendCandidateToRectangle(
1755 2 : lastCandidate.minIntervalIndex, lastCandidate.maxIntervalIndex, lastCandidate, false)
1756 2 : return lastCandidate
1757 : }
1758 1 : return nil
1759 : }
1760 :
1761 : // ExtendL0ForBaseCompactionTo extends the specified base compaction candidate
1762 : // L0CompactionFiles to optionally cover more files in L0 without "touching" any
1763 : // of the passed-in keys (i.e. the smallest/largest bounds are exclusive), as
1764 : // including any user keys for those internal keys could require choosing more
1765 : // files in LBase which is undesirable. Unbounded start/end keys are indicated
1766 : // by passing in the InvalidInternalKey.
1767 : func (s *L0Sublevels) ExtendL0ForBaseCompactionTo(
1768 : smallest, largest InternalKey, candidate *L0CompactionFiles,
1769 2 : ) bool {
1770 2 : firstIntervalIndex := 0
1771 2 : lastIntervalIndex := len(s.orderedIntervals) - 1
1772 2 : if smallest.Kind() != base.InternalKeyKindInvalid {
1773 2 : if smallest.Trailer == base.InternalKeyRangeDeleteSentinel {
1774 1 : // Starting at smallest.UserKey == interval.startKey is okay.
1775 1 : firstIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1776 1 : return s.cmp(smallest.UserKey, s.orderedIntervals[i].startKey.key) <= 0
1777 1 : })
1778 2 : } else {
1779 2 : firstIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1780 2 : // Need to start at >= smallest since if we widen too much we may miss
1781 2 : // an Lbase file that overlaps with an L0 file that will get picked in
1782 2 : // this widening, which would be bad. This interval will not start with
1783 2 : // an immediate successor key.
1784 2 : return s.cmp(smallest.UserKey, s.orderedIntervals[i].startKey.key) < 0
1785 2 : })
1786 : }
1787 : }
1788 2 : if largest.Kind() != base.InternalKeyKindInvalid {
1789 2 : // First interval that starts at or beyond the largest. This interval will not
1790 2 : // start with an immediate successor key.
1791 2 : lastIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1792 2 : return s.cmp(largest.UserKey, s.orderedIntervals[i].startKey.key) <= 0
1793 2 : })
1794 : // Right now, lastIntervalIndex has a startKey that extends beyond largest.
1795 : // The previous interval, by definition, has an end key higher than largest.
1796 : // Iterate back twice to get the last interval that's completely within
1797 : // (smallest, largest). Except in the case where we went past the end of the
1798 : // list; in that case, the last interval to include is the very last
1799 : // interval in the list.
1800 2 : if lastIntervalIndex < len(s.orderedIntervals) {
1801 2 : lastIntervalIndex--
1802 2 : }
1803 2 : lastIntervalIndex--
1804 : }
1805 2 : if lastIntervalIndex < firstIntervalIndex {
1806 1 : return false
1807 1 : }
1808 2 : return s.extendCandidateToRectangle(firstIntervalIndex, lastIntervalIndex, candidate, true)
1809 : }
1810 :
1811 : // Best-effort attempt to make the compaction include more files in the
1812 : // rectangle defined by [minIntervalIndex, maxIntervalIndex] on the X axis and
1813 : // bounded on the Y axis by seedIntervalMinLevel and seedIntervalMaxLevel.
1814 : //
1815 : // This is strictly an optional extension; at any point where we can't feasibly
1816 : // add more files, the sublevel iteration can be halted early and candidate will
1817 : // still be a correct compaction candidate.
1818 : //
1819 : // Consider this scenario (original candidate is inside the rectangle), with
1820 : // isBase = true and interval bounds a-j (from the union of base file bounds and
1821 : // that of compaction candidate):
1822 : //
1823 : // _______
1824 : // L0.3 a--d | g-j|
1825 : // L0.2 | f--j| r-t
1826 : // L0.1 b-d |e---j|
1827 : // L0.0 a--d | f--j| l--o p-----x
1828 : //
1829 : // Lbase a--------i m---------w
1830 : //
1831 : // This method will iterate from the bottom up. At L0.0, it will add a--d since
1832 : // it's in the bounds, then add b-d, then a--d, and so on, to produce this:
1833 : //
1834 : // _____________
1835 : // L0.3 |a--d g-j|
1836 : // L0.2 | f--j| r-t
1837 : // L0.1 | b-d e---j|
1838 : // L0.0 |a--d f--j| l--o p-----x
1839 : //
1840 : // Lbase a-------i m---------w
1841 : //
1842 : // Let's assume that, instead of a--d in the top sublevel, we had 3 files, a-b,
1843 : // bb-c, and cc-d, of which bb-c is compacting. Let's also add another sublevel
1844 : // L0.4 with some files, all of which aren't compacting:
1845 : //
1846 : // L0.4 a------c ca--d _______
1847 : // L0.3 a-b bb-c cc-d | g-j|
1848 : // L0.2 | f--j| r-t
1849 : // L0.1 b----------d |e---j|
1850 : // L0.0 a------------d | f--j| l--o p-----x
1851 : //
1852 : // Lbase a------------------i m---------w
1853 : //
1854 : // This method then needs to choose between the left side of L0.3 bb-c (i.e.
1855 : // a-b), or the right side (i.e. cc-d and g-j) for inclusion in this compaction.
1856 : // Since the right side has more files as well as one file that has already been
1857 : // picked, it gets chosen at that sublevel, resulting in this intermediate
1858 : // compaction:
1859 : //
1860 : // L0.4 a------c ca--d
1861 : // ______________
1862 : // L0.3 a-b bb-c| cc-d g-j|
1863 : // L0.2 _________| f--j| r-t
1864 : // L0.1 | b----------d e---j|
1865 : // L0.0 |a------------d f--j| l--o p-----x
1866 : //
1867 : // Lbase a------------------i m---------w
1868 : //
1869 : // Since bb-c had to be excluded at L0.3, the interval bounds for L0.4 are
1870 : // actually ca-j, since ca is the next interval start key after the end interval
1871 : // of bb-c. This would result in only ca-d being chosen at that sublevel, even
1872 : // though a--c is also not compacting. This is the final result:
1873 : //
1874 : // ______________
1875 : // L0.4 a------c|ca--d |
1876 : // L0.3 a-b bb-c| cc-d g-j|
1877 : // L0.2 _________| f--j| r-t
1878 : // L0.1 | b----------d e---j|
1879 : // L0.0 |a------------d f--j| l--o p-----x
1880 : //
1881 : // Lbase a------------------i m---------w
1882 : //
1883 : // TODO(bilal): Add more targeted tests for this method, through
1884 : // ExtendL0ForBaseCompactionTo and intraL0CompactionUsingSeed.
1885 : func (s *L0Sublevels) extendCandidateToRectangle(
1886 : minIntervalIndex int, maxIntervalIndex int, candidate *L0CompactionFiles, isBase bool,
1887 2 : ) bool {
1888 2 : candidate.preExtensionMinInterval = candidate.minIntervalIndex
1889 2 : candidate.preExtensionMaxInterval = candidate.maxIntervalIndex
1890 2 : // Extend {min,max}IntervalIndex to include all of the candidate's current
1891 2 : // bounds.
1892 2 : if minIntervalIndex > candidate.minIntervalIndex {
1893 1 : minIntervalIndex = candidate.minIntervalIndex
1894 1 : }
1895 2 : if maxIntervalIndex < candidate.maxIntervalIndex {
1896 1 : maxIntervalIndex = candidate.maxIntervalIndex
1897 1 : }
1898 2 : var startLevel, increment, endLevel int
1899 2 : if isBase {
1900 2 : startLevel = 0
1901 2 : increment = +1
1902 2 : // seedIntervalMaxLevel is inclusive, while endLevel is exclusive.
1903 2 : endLevel = candidate.seedIntervalMaxLevel + 1
1904 2 : } else {
1905 2 : startLevel = len(s.levelFiles) - 1
1906 2 : increment = -1
1907 2 : // seedIntervalMinLevel is inclusive, while endLevel is exclusive.
1908 2 : endLevel = candidate.seedIntervalMinLevel - 1
1909 2 : }
1910 : // Stats for files.
1911 2 : addedCount := 0
1912 2 : // Iterate from the oldest sub-level for L0 -> Lbase and youngest sub-level
1913 2 : // for intra-L0. The idea here is that anything that can't be included from
1914 2 : // that level constrains what can be included from the next level. This
1915 2 : // change in constraint is directly incorporated into minIntervalIndex,
1916 2 : // maxIntervalIndex.
1917 2 : for sl := startLevel; sl != endLevel; sl += increment {
1918 2 : files := s.levelFiles[sl]
1919 2 : // Find the first file that overlaps with minIntervalIndex.
1920 2 : index := sort.Search(len(files), func(i int) bool {
1921 2 : return minIntervalIndex <= files[i].maxIntervalIndex
1922 2 : })
1923 : // Track the files that are fully within the current constraint of
1924 : // [minIntervalIndex, maxIntervalIndex].
1925 2 : firstIndex := -1
1926 2 : lastIndex := -1
1927 2 : for ; index < len(files); index++ {
1928 2 : f := files[index]
1929 2 : if f.minIntervalIndex > maxIntervalIndex {
1930 2 : break
1931 : }
1932 2 : include := true
1933 2 : // Extends out on the left so can't be included. This narrows what
1934 2 : // we can included in the next level.
1935 2 : if f.minIntervalIndex < minIntervalIndex {
1936 2 : include = false
1937 2 : minIntervalIndex = f.maxIntervalIndex + 1
1938 2 : }
1939 : // Extends out on the right so can't be included.
1940 2 : if f.maxIntervalIndex > maxIntervalIndex {
1941 2 : include = false
1942 2 : maxIntervalIndex = f.minIntervalIndex - 1
1943 2 : }
1944 2 : if !include {
1945 2 : continue
1946 : }
1947 2 : if firstIndex == -1 {
1948 2 : firstIndex = index
1949 2 : }
1950 2 : lastIndex = index
1951 : }
1952 2 : if minIntervalIndex > maxIntervalIndex {
1953 1 : // We excluded files that prevent continuation.
1954 1 : break
1955 : }
1956 2 : if firstIndex < 0 {
1957 2 : // No files to add in this sub-level.
1958 2 : continue
1959 : }
1960 : // We have the files in [firstIndex, lastIndex] as potential for
1961 : // inclusion. Some of these may already have been picked. Some of them
1962 : // may be already compacting. The latter is tricky since we have to
1963 : // decide whether to contract minIntervalIndex or maxIntervalIndex when
1964 : // we encounter an already compacting file. We pick the longest sequence
1965 : // between firstIndex and lastIndex of non-compacting files -- this is
1966 : // represented by [candidateNonCompactingFirst,
1967 : // candidateNonCompactingLast].
1968 2 : nonCompactingFirst := -1
1969 2 : currentRunHasAlreadyPickedFiles := false
1970 2 : candidateNonCompactingFirst := -1
1971 2 : candidateNonCompactingLast := -1
1972 2 : candidateHasAlreadyPickedFiles := false
1973 2 : for index = firstIndex; index <= lastIndex; index++ {
1974 2 : f := files[index]
1975 2 : if f.IsCompacting() {
1976 2 : if nonCompactingFirst != -1 {
1977 2 : last := index - 1
1978 2 : // Prioritize runs of consecutive non-compacting files that
1979 2 : // have files that have already been picked. That is to say,
1980 2 : // if candidateHasAlreadyPickedFiles == true, we stick with
1981 2 : // it, and if currentRunHasAlreadyPickedfiles == true, we
1982 2 : // pick that run even if it contains fewer files than the
1983 2 : // previous candidate.
1984 2 : if !candidateHasAlreadyPickedFiles && (candidateNonCompactingFirst == -1 ||
1985 2 : currentRunHasAlreadyPickedFiles ||
1986 2 : (last-nonCompactingFirst) > (candidateNonCompactingLast-candidateNonCompactingFirst)) {
1987 2 : candidateNonCompactingFirst = nonCompactingFirst
1988 2 : candidateNonCompactingLast = last
1989 2 : candidateHasAlreadyPickedFiles = currentRunHasAlreadyPickedFiles
1990 2 : }
1991 : }
1992 2 : nonCompactingFirst = -1
1993 2 : currentRunHasAlreadyPickedFiles = false
1994 2 : continue
1995 : }
1996 2 : if nonCompactingFirst == -1 {
1997 2 : nonCompactingFirst = index
1998 2 : }
1999 2 : if candidate.FilesIncluded[f.L0Index] {
2000 2 : currentRunHasAlreadyPickedFiles = true
2001 2 : }
2002 : }
2003 : // Logic duplicated from inside the for loop above.
2004 2 : if nonCompactingFirst != -1 {
2005 2 : last := index - 1
2006 2 : if !candidateHasAlreadyPickedFiles && (candidateNonCompactingFirst == -1 ||
2007 2 : currentRunHasAlreadyPickedFiles ||
2008 2 : (last-nonCompactingFirst) > (candidateNonCompactingLast-candidateNonCompactingFirst)) {
2009 2 : candidateNonCompactingFirst = nonCompactingFirst
2010 2 : candidateNonCompactingLast = last
2011 2 : }
2012 : }
2013 2 : if candidateNonCompactingFirst == -1 {
2014 0 : // All files are compacting. There will be gaps that we could
2015 0 : // exploit to continue, but don't bother.
2016 0 : break
2017 : }
2018 : // May need to shrink [minIntervalIndex, maxIntervalIndex] for the next level.
2019 2 : if candidateNonCompactingFirst > firstIndex {
2020 2 : minIntervalIndex = files[candidateNonCompactingFirst-1].maxIntervalIndex + 1
2021 2 : }
2022 2 : if candidateNonCompactingLast < lastIndex {
2023 2 : maxIntervalIndex = files[candidateNonCompactingLast+1].minIntervalIndex - 1
2024 2 : }
2025 2 : for index := candidateNonCompactingFirst; index <= candidateNonCompactingLast; index++ {
2026 2 : f := files[index]
2027 2 : if f.IsCompacting() {
2028 0 : // TODO(bilal): Do a logger.Fatalf instead of a panic, for
2029 0 : // cleaner unwinding and error messages.
2030 0 : panic(fmt.Sprintf("expected %s to not be compacting", f.FileNum))
2031 : }
2032 2 : if candidate.isIntraL0 && f.LargestSeqNum >= candidate.earliestUnflushedSeqNum {
2033 2 : continue
2034 : }
2035 2 : if !candidate.FilesIncluded[f.L0Index] {
2036 2 : addedCount++
2037 2 : candidate.addFile(f)
2038 2 : }
2039 : }
2040 : }
2041 2 : return addedCount > 0
2042 : }
|