Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "bytes"
9 : stdcmp "cmp"
10 : "fmt"
11 : "math"
12 : "slices"
13 : "sort"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : )
20 :
21 : // errInvalidL0SublevelsOpt is for use in AddL0Files when the incremental
22 : // sublevel generation optimization failed, and NewL0Sublevels must be called.
23 : var errInvalidL0SublevelsOpt = errors.New("pebble: L0 sublevel generation optimization cannot be used")
24 :
25 : // Intervals are of the form [start, end) with no gap between intervals. Each
26 : // file overlaps perfectly with a sequence of intervals. This perfect overlap
27 : // occurs because the union of file boundary keys is used to pick intervals.
28 : // However when the largest key in a file is inclusive and it is used as an end
29 : // of an interval, the actual key is ImmediateSuccessor(key). We don't have the
30 : // ImmediateSuccessor function to do this computation, so we instead keep an
31 : // isInclusiveEndBound bool to remind the code about this fact. This is used for
32 : // comparisons in the following manner:
33 : // - intervalKey{k, false} < intervalKey{k, true}
34 : // - k1 < k2 -> intervalKey{k1, _} < intervalKey{k2, _}.
35 : //
36 : // Note that the file's largest key is exclusive if the internal key
37 : // has a trailer matching the rangedel sentinel key. In this case, we set
38 : // isInclusiveEndBound to false for end interval computation.
39 : //
40 : // For example, consider three files with bounds [a,e], [b,g], and [e,j]. The
41 : // interval keys produced would be intervalKey{a, false}, intervalKey{b, false},
42 : // intervalKey{e, false}, intervalKey{e, true}, intervalKey{g, true} and
43 : // intervalKey{j, true}, resulting in intervals
44 : // [a, b), [b, (e, false)), [(e,false), (e, true)), [(e, true), (g, true)) and
45 : // [(g, true), (j, true)). The first file overlaps with the first three
46 : // perfectly, the second file overlaps with the second through to fourth
47 : // intervals, and the third file overlaps with the last three.
48 : //
49 : // The intervals are indexed starting from 0, with the index of the interval
50 : // being the index of the start key of the interval.
51 : //
52 : // In addition to helping with compaction picking, we use interval indices
53 : // to assign each file an interval range once. Subsequent operations, say
54 : // picking overlapping files for a compaction, only need to use the index
55 : // numbers and so avoid expensive byte slice comparisons.
56 : type intervalKey struct {
57 : key []byte
58 : isInclusiveEndBound bool
59 : }
60 :
61 2 : func (k *intervalKey) toEndBoundary() base.UserKeyBoundary {
62 2 : return base.UserKeyExclusiveIf(k.key, !k.isInclusiveEndBound)
63 2 : }
64 :
65 : // intervalKeyTemp is used in the sortAndSweep step. It contains additional metadata
66 : // which is used to generate the {min,max}IntervalIndex for files.
67 : type intervalKeyTemp struct {
68 : intervalKey intervalKey
69 : fileMeta *FileMetadata
70 : isEndKey bool
71 : }
72 :
73 2 : func (i *intervalKeyTemp) setFileIntervalIndex(idx int) {
74 2 : if i.isEndKey {
75 2 : // This is the right endpoint of some file interval, so the
76 2 : // file.maxIntervalIndex must be j - 1 as maxIntervalIndex is
77 2 : // inclusive.
78 2 : i.fileMeta.maxIntervalIndex = idx - 1
79 2 : return
80 2 : }
81 : // This is the left endpoint for some file interval, so the
82 : // file.minIntervalIndex must be j.
83 2 : i.fileMeta.minIntervalIndex = idx
84 : }
85 :
86 2 : func intervalKeyCompare(cmp Compare, a, b intervalKey) int {
87 2 : rv := cmp(a.key, b.key)
88 2 : if rv == 0 {
89 2 : if a.isInclusiveEndBound && !b.isInclusiveEndBound {
90 2 : return +1
91 2 : }
92 2 : if !a.isInclusiveEndBound && b.isInclusiveEndBound {
93 2 : return -1
94 2 : }
95 : }
96 2 : return rv
97 : }
98 :
99 : type intervalKeySorter struct {
100 : keys []intervalKeyTemp
101 : cmp Compare
102 : }
103 :
104 2 : func (s intervalKeySorter) Len() int { return len(s.keys) }
105 2 : func (s intervalKeySorter) Less(i, j int) bool {
106 2 : return intervalKeyCompare(s.cmp, s.keys[i].intervalKey, s.keys[j].intervalKey) < 0
107 2 : }
108 2 : func (s intervalKeySorter) Swap(i, j int) {
109 2 : s.keys[i], s.keys[j] = s.keys[j], s.keys[i]
110 2 : }
111 :
112 : // sortAndSweep will sort the intervalKeys using intervalKeySorter, remove the
113 : // duplicate fileIntervals, and set the {min, max}IntervalIndex for the files.
114 2 : func sortAndSweep(keys []intervalKeyTemp, cmp Compare) []intervalKeyTemp {
115 2 : if len(keys) == 0 {
116 2 : return nil
117 2 : }
118 2 : sorter := intervalKeySorter{keys: keys, cmp: cmp}
119 2 : sort.Sort(sorter)
120 2 :
121 2 : // intervalKeys are generated using the file bounds. Specifically, there are
122 2 : // 2 intervalKeys for each file, and len(keys) = 2 * number of files. Each
123 2 : // `intervalKeyTemp` stores information about which file it was generated
124 2 : // from, and whether the key represents the end key of the file. So, as
125 2 : // we're deduplicating the `keys` slice, we're guaranteed to iterate over
126 2 : // the interval keys belonging to each of the files. Since the
127 2 : // file.{min,max}IntervalIndex points to the position of the files bounds in
128 2 : // the deduplicated `keys` slice, we can determine
129 2 : // file.{min,max}IntervalIndex during the iteration.
130 2 : i := 0
131 2 : j := 0
132 2 : for i < len(keys) {
133 2 : // loop invariant: j <= i
134 2 : currKey := keys[i]
135 2 : keys[j] = keys[i]
136 2 :
137 2 : for {
138 2 : keys[i].setFileIntervalIndex(j)
139 2 : i++
140 2 : if i >= len(keys) || intervalKeyCompare(cmp, currKey.intervalKey, keys[i].intervalKey) != 0 {
141 2 : break
142 : }
143 : }
144 2 : j++
145 : }
146 2 : return keys[:j]
147 : }
148 :
149 : // A key interval of the form [start, end). The end is not represented here
150 : // since it is implicit in the start of the next interval. The last interval is
151 : // an exception but we don't need to ever lookup the end of that interval; the
152 : // last fileInterval will only act as an end key marker. The set of intervals
153 : // is const after initialization.
154 : type fileInterval struct {
155 : index int
156 : startKey intervalKey
157 :
158 : // True iff some file in this interval is compacting to base. Such intervals
159 : // cannot have any files participate in L0 -> Lbase compactions.
160 : isBaseCompacting bool
161 :
162 : // The min and max intervals index across all the files that overlap with
163 : // this interval. Inclusive on both sides.
164 : filesMinIntervalIndex int
165 : filesMaxIntervalIndex int
166 :
167 : // True if another interval that has a file extending into this interval is
168 : // undergoing a compaction into Lbase. In other words, this bool is true if
169 : // any interval in [filesMinIntervalIndex, filesMaxIntervalIndex] has
170 : // isBaseCompacting set to true. This lets the compaction picker
171 : // de-prioritize this interval for picking compactions, since there's a high
172 : // chance that a base compaction with a sufficient height of sublevels
173 : // rooted at this interval could not be chosen due to the ongoing base
174 : // compaction in the other interval. If the file straddling the two
175 : // intervals is at a sufficiently high sublevel (with enough compactible
176 : // files below it to satisfy minCompactionDepth), this is not an issue, but
177 : // to optimize for quickly picking base compactions far away from other base
178 : // compactions, this bool is used as a heuristic (but not as a complete
179 : // disqualifier).
180 : intervalRangeIsBaseCompacting bool
181 :
182 : // All files in this interval, in increasing sublevel order.
183 : files []*FileMetadata
184 :
185 : // len(files) - compactingFileCount is the stack depth that requires
186 : // starting new compactions. This metric is not precise since the
187 : // compactingFileCount can include files that are part of N (where N > 1)
188 : // intra-L0 compactions, so the stack depth after those complete will be
189 : // len(files) - compactingFileCount + N. We ignore this imprecision since we
190 : // don't want to track which files are part of which intra-L0 compaction.
191 : compactingFileCount int
192 :
193 : // Interpolated from files in this interval. For files spanning multiple
194 : // intervals, we assume an equal distribution of bytes across all those
195 : // intervals.
196 : estimatedBytes uint64
197 : }
198 :
199 : // Helper type for any cases requiring a bool slice.
200 : type bitSet []bool
201 :
202 2 : func newBitSet(n int) bitSet {
203 2 : return make([]bool, n)
204 2 : }
205 :
206 2 : func (b *bitSet) markBit(i int) {
207 2 : (*b)[i] = true
208 2 : }
209 :
210 2 : func (b *bitSet) markBits(start, end int) {
211 2 : for i := start; i < end; i++ {
212 2 : (*b)[i] = true
213 2 : }
214 : }
215 :
216 2 : func (b *bitSet) clearAllBits() {
217 2 : for i := range *b {
218 2 : (*b)[i] = false
219 2 : }
220 : }
221 :
222 : // L0Compaction describes an active compaction with inputs from L0.
223 : type L0Compaction struct {
224 : Smallest InternalKey
225 : Largest InternalKey
226 : IsIntraL0 bool
227 : }
228 :
229 : // L0Sublevels represents a sublevel view of SSTables in L0. Tables in one
230 : // sublevel are non-overlapping in key ranges, and keys in higher-indexed
231 : // sublevels shadow older versions in lower-indexed sublevels. These invariants
232 : // are similar to the regular level invariants, except with higher indexed
233 : // sublevels having newer keys as opposed to lower indexed levels.
234 : //
235 : // There is no limit to the number of sublevels that can exist in L0 at any
236 : // time, however read and compaction performance is best when there are as few
237 : // sublevels as possible.
238 : type L0Sublevels struct {
239 : // Levels are ordered from oldest sublevel to youngest sublevel in the
240 : // outer slice, and the inner slice contains non-overlapping files for
241 : // that sublevel in increasing key order. Levels is constructed from
242 : // levelFiles and is used by callers that require a LevelSlice. The below two
243 : // fields are treated as immutable once created in NewL0Sublevels.
244 : Levels []LevelSlice
245 : levelFiles [][]*FileMetadata
246 :
247 : cmp Compare
248 : formatKey base.FormatKey
249 :
250 : fileBytes uint64
251 : // All the L0 files, ordered from oldest to youngest.
252 : levelMetadata *LevelMetadata
253 :
254 : // The file intervals in increasing key order.
255 : orderedIntervals []fileInterval
256 :
257 : // Keys to break flushes at.
258 : flushSplitUserKeys [][]byte
259 :
260 : // Only used to check invariants.
261 : addL0FilesCalled bool
262 : }
263 :
264 : type sublevelSorter []*FileMetadata
265 :
266 : // Len implements sort.Interface.
267 2 : func (sl sublevelSorter) Len() int {
268 2 : return len(sl)
269 2 : }
270 :
271 : // Less implements sort.Interface.
272 2 : func (sl sublevelSorter) Less(i, j int) bool {
273 2 : return sl[i].minIntervalIndex < sl[j].minIntervalIndex
274 2 : }
275 :
276 : // Swap implements sort.Interface.
277 2 : func (sl sublevelSorter) Swap(i, j int) {
278 2 : sl[i], sl[j] = sl[j], sl[i]
279 2 : }
280 :
281 : // NewL0Sublevels creates an L0Sublevels instance for a given set of L0 files.
282 : // These files must all be in L0 and must be sorted by seqnum (see
283 : // SortBySeqNum). During interval iteration, when flushSplitMaxBytes bytes are
284 : // exceeded in the range of intervals since the last flush split key, a flush
285 : // split key is added.
286 : //
287 : // This method can be called without DB.mu being held, so any DB.mu protected
288 : // fields in FileMetadata cannot be accessed here, such as Compacting and
289 : // IsIntraL0Compacting. Those fields are accessed in InitCompactingFileInfo
290 : // instead.
291 : func NewL0Sublevels(
292 : levelMetadata *LevelMetadata, cmp Compare, formatKey base.FormatKey, flushSplitMaxBytes int64,
293 2 : ) (*L0Sublevels, error) {
294 2 : s := &L0Sublevels{cmp: cmp, formatKey: formatKey}
295 2 : s.levelMetadata = levelMetadata
296 2 : keys := make([]intervalKeyTemp, 0, 2*s.levelMetadata.Len())
297 2 : iter := levelMetadata.Iter()
298 2 : for i, f := 0, iter.First(); f != nil; i, f = i+1, iter.Next() {
299 2 : f.L0Index = i
300 2 : keys = append(keys, intervalKeyTemp{
301 2 : intervalKey: intervalKey{key: f.Smallest.UserKey},
302 2 : fileMeta: f,
303 2 : isEndKey: false,
304 2 : })
305 2 : keys = append(keys, intervalKeyTemp{
306 2 : intervalKey: intervalKey{
307 2 : key: f.Largest.UserKey,
308 2 : isInclusiveEndBound: !f.Largest.IsExclusiveSentinel(),
309 2 : },
310 2 : fileMeta: f,
311 2 : isEndKey: true,
312 2 : })
313 2 : }
314 2 : keys = sortAndSweep(keys, cmp)
315 2 : // All interval indices reference s.orderedIntervals.
316 2 : s.orderedIntervals = make([]fileInterval, len(keys))
317 2 : for i := range keys {
318 2 : s.orderedIntervals[i] = fileInterval{
319 2 : index: i,
320 2 : startKey: keys[i].intervalKey,
321 2 : filesMinIntervalIndex: i,
322 2 : filesMaxIntervalIndex: i,
323 2 : }
324 2 : }
325 : // Initialize minIntervalIndex and maxIntervalIndex for each file, and use that
326 : // to update intervals.
327 2 : for f := iter.First(); f != nil; f = iter.Next() {
328 2 : if err := s.addFileToSublevels(f, false /* checkInvariant */); err != nil {
329 0 : return nil, err
330 0 : }
331 : }
332 : // Sort each sublevel in increasing key order.
333 2 : for i := range s.levelFiles {
334 2 : sort.Sort(sublevelSorter(s.levelFiles[i]))
335 2 : }
336 :
337 : // Construct a parallel slice of sublevel B-Trees.
338 : // TODO(jackson): Consolidate and only use the B-Trees.
339 2 : for _, sublevelFiles := range s.levelFiles {
340 2 : tr, ls := makeBTree(btreeCmpSmallestKey(cmp), sublevelFiles)
341 2 : s.Levels = append(s.Levels, ls)
342 2 : tr.Release()
343 2 : }
344 :
345 2 : s.calculateFlushSplitKeys(flushSplitMaxBytes)
346 2 : return s, nil
347 : }
348 :
349 : // Helper function to merge new intervalKeys into an existing slice of old
350 : // fileIntervals, into result. Returns the new result and a slice of ints
351 : // mapping old interval indices to new ones. The added intervalKeys do not need
352 : // to be sorted; they get sorted and deduped in this function.
353 : func mergeIntervals(
354 : old, result []fileInterval, added []intervalKeyTemp, compare Compare,
355 2 : ) ([]fileInterval, []int) {
356 2 : sorter := intervalKeySorter{keys: added, cmp: compare}
357 2 : sort.Sort(sorter)
358 2 :
359 2 : oldToNewMap := make([]int, len(old))
360 2 : i := 0
361 2 : j := 0
362 2 :
363 2 : for i < len(old) || j < len(added) {
364 2 : for j > 0 && j < len(added) && intervalKeyCompare(compare, added[j-1].intervalKey, added[j].intervalKey) == 0 {
365 2 : added[j].setFileIntervalIndex(len(result) - 1)
366 2 : j++
367 2 : }
368 2 : if i >= len(old) && j >= len(added) {
369 2 : break
370 : }
371 2 : var cmp int
372 2 : if i >= len(old) {
373 2 : cmp = +1
374 2 : }
375 2 : if j >= len(added) {
376 2 : cmp = -1
377 2 : }
378 2 : if cmp == 0 {
379 2 : cmp = intervalKeyCompare(compare, old[i].startKey, added[j].intervalKey)
380 2 : }
381 2 : switch {
382 2 : case cmp <= 0:
383 2 : // Shallow-copy the existing interval.
384 2 : newInterval := old[i]
385 2 : result = append(result, newInterval)
386 2 : oldToNewMap[i] = len(result) - 1
387 2 : i++
388 2 : if cmp == 0 {
389 2 : added[j].setFileIntervalIndex(len(result) - 1)
390 2 : j++
391 2 : }
392 2 : case cmp > 0:
393 2 : var prevInterval fileInterval
394 2 : // Insert a new interval for a newly-added file. prevInterval, if
395 2 : // non-zero, will be "inherited"; we copy its files as those extend
396 2 : // into this interval.
397 2 : if len(result) > 0 {
398 2 : prevInterval = result[len(result)-1]
399 2 : }
400 2 : newInterval := fileInterval{
401 2 : index: len(result),
402 2 : startKey: added[j].intervalKey,
403 2 : filesMinIntervalIndex: len(result),
404 2 : filesMaxIntervalIndex: len(result),
405 2 :
406 2 : // estimatedBytes gets recalculated later on, as the number of intervals
407 2 : // the file bytes are interpolated over has changed.
408 2 : estimatedBytes: 0,
409 2 : // Copy the below attributes from prevInterval.
410 2 : files: append([]*FileMetadata(nil), prevInterval.files...),
411 2 : isBaseCompacting: prevInterval.isBaseCompacting,
412 2 : intervalRangeIsBaseCompacting: prevInterval.intervalRangeIsBaseCompacting,
413 2 : compactingFileCount: prevInterval.compactingFileCount,
414 2 : }
415 2 : result = append(result, newInterval)
416 2 : added[j].setFileIntervalIndex(len(result) - 1)
417 2 : j++
418 : }
419 : }
420 2 : return result, oldToNewMap
421 : }
422 :
423 : // AddL0Files incrementally builds a new L0Sublevels for when the only change
424 : // since the receiver L0Sublevels was an addition of the specified files, with
425 : // no L0 deletions. The common case of this is an ingestion or a flush. These
426 : // files can "sit on top" of existing sublevels, creating at most one new
427 : // sublevel for a flush (and possibly multiple for an ingestion), and at most
428 : // 2*len(files) additions to s.orderedIntervals. No files must have been deleted
429 : // from L0, and the added files must all be newer in sequence numbers than
430 : // existing files in L0Sublevels. The files parameter must be sorted in seqnum
431 : // order. The levelMetadata parameter corresponds to the new L0 post addition of
432 : // files. This method is meant to be significantly more performant than
433 : // NewL0Sublevels.
434 : //
435 : // Note that this function can only be called once on a given receiver; it
436 : // appends to some slices in s which is only safe when done once. This is okay,
437 : // as the common case (generating a new L0Sublevels after a flush/ingestion) is
438 : // only going to necessitate one call of this method on a given receiver. The
439 : // returned value, if non-nil, can then have [*L0Sublevels.AddL0Files] called on
440 : // it again, and so on. If [errInvalidL0SublevelsOpt] is returned as an error,
441 : // it likely means the optimization could not be applied (i.e. files added were
442 : // older than files already in the sublevels, which is possible around
443 : // ingestions and in tests). Eg. it can happen when an ingested file was
444 : // ingested without queueing a flush since it did not actually overlap with any
445 : // keys in the memtable. Later on the memtable was flushed, and the memtable had
446 : // keys spanning around the ingested file, producing a flushed file that
447 : // overlapped with the ingested file in file bounds but not in keys. It's
448 : // possible for that flushed file to have a lower LargestSeqNum than the
449 : // ingested file if all the additions after the ingestion were to another
450 : // flushed file that was split into a separate sstable during flush. Any other
451 : // non-nil error means [L0Sublevels] generation failed in the same way as
452 : // [NewL0Sublevels] would likely fail.
453 : func (s *L0Sublevels) AddL0Files(
454 : files []*FileMetadata, flushSplitMaxBytes int64, levelMetadata *LevelMetadata,
455 2 : ) (*L0Sublevels, error) {
456 2 : if invariants.Enabled && s.addL0FilesCalled {
457 0 : panic("AddL0Files called twice on the same receiver")
458 : }
459 2 : s.addL0FilesCalled = true
460 2 :
461 2 : // Start with a shallow copy of s.
462 2 : newVal := &L0Sublevels{}
463 2 : *newVal = *s
464 2 :
465 2 : newVal.addL0FilesCalled = false
466 2 : newVal.levelMetadata = levelMetadata
467 2 : // Deep copy levelFiles and Levels, as they are mutated and sorted below.
468 2 : // Shallow copies of slices that we just append to, are okay.
469 2 : newVal.levelFiles = make([][]*FileMetadata, len(s.levelFiles))
470 2 : for i := range s.levelFiles {
471 2 : newVal.levelFiles[i] = make([]*FileMetadata, len(s.levelFiles[i]))
472 2 : copy(newVal.levelFiles[i], s.levelFiles[i])
473 2 : }
474 2 : newVal.Levels = make([]LevelSlice, len(s.Levels))
475 2 : copy(newVal.Levels, s.Levels)
476 2 :
477 2 : fileKeys := make([]intervalKeyTemp, 0, 2*len(files))
478 2 : for _, f := range files {
479 2 : left := intervalKeyTemp{
480 2 : intervalKey: intervalKey{key: f.Smallest.UserKey},
481 2 : fileMeta: f,
482 2 : }
483 2 : right := intervalKeyTemp{
484 2 : intervalKey: intervalKey{
485 2 : key: f.Largest.UserKey,
486 2 : isInclusiveEndBound: !f.Largest.IsExclusiveSentinel(),
487 2 : },
488 2 : fileMeta: f,
489 2 : isEndKey: true,
490 2 : }
491 2 : fileKeys = append(fileKeys, left, right)
492 2 : }
493 2 : keys := make([]fileInterval, 0, 2*levelMetadata.Len())
494 2 : var oldToNewMap []int
495 2 : // We can avoid the sortAndSweep step on the combined length of
496 2 : // s.orderedIntervals and fileKeys by treating this as a merge of two sorted
497 2 : // runs, fileKeys and s.orderedIntervals, into `keys` which will form
498 2 : // newVal.orderedIntervals.
499 2 : keys, oldToNewMap = mergeIntervals(s.orderedIntervals, keys, fileKeys, s.cmp)
500 2 : if invariants.Enabled {
501 2 : for i := 1; i < len(keys); i++ {
502 2 : if intervalKeyCompare(newVal.cmp, keys[i-1].startKey, keys[i].startKey) >= 0 {
503 0 : panic("keys not sorted correctly")
504 : }
505 : }
506 : }
507 2 : newVal.orderedIntervals = keys
508 2 : // Update indices in s.orderedIntervals for fileIntervals we retained.
509 2 : for _, newIdx := range oldToNewMap {
510 2 : newInterval := &keys[newIdx]
511 2 : newInterval.index = newIdx
512 2 : // This code, and related code in the for loop below, adjusts
513 2 : // files{Min,Max}IntervalIndex just for interval indices shifting due to
514 2 : // new intervals, and not for any of the new files being added to the
515 2 : // same intervals. The goal is to produce a state of the system that's
516 2 : // accurate for all existing files, and has all the new intervals to
517 2 : // support new files. Once that's done, we can just call
518 2 : // addFileToSublevel to adjust all relevant intervals for new files.
519 2 : newInterval.filesMinIntervalIndex = oldToNewMap[newInterval.filesMinIntervalIndex]
520 2 : // maxIntervalIndexes are special. Since it's an inclusive end bound, we
521 2 : // actually have to map it to the _next_ old interval's new previous
522 2 : // interval. This logic is easier to understand if you see
523 2 : // [f.minIntervalIndex, f.maxIntervalIndex] as [f.minIntervalIndex,
524 2 : // f.maxIntervalIndex+1). The other case to remember is when the
525 2 : // interval is completely empty (i.e. len(newInterval.files) == 0); in
526 2 : // that case we want to refer back to ourselves regardless of additions
527 2 : // to the right of us.
528 2 : if newInterval.filesMaxIntervalIndex < len(oldToNewMap)-1 && len(newInterval.files) > 0 {
529 2 : newInterval.filesMaxIntervalIndex = oldToNewMap[newInterval.filesMaxIntervalIndex+1] - 1
530 2 : } else {
531 2 : // newInterval.filesMaxIntervalIndex == len(oldToNewMap)-1.
532 2 : newInterval.filesMaxIntervalIndex = oldToNewMap[newInterval.filesMaxIntervalIndex]
533 2 : }
534 : }
535 : // Loop through all instances of new intervals added between two old
536 : // intervals and expand [filesMinIntervalIndex, filesMaxIntervalIndex] of
537 : // new intervals to reflect that of adjacent old intervals.
538 2 : {
539 2 : // We can skip cases where new intervals were added to the left of all
540 2 : // existing intervals (eg. if the first entry in oldToNewMap is
541 2 : // oldToNewMap[0] >= 1). Those intervals will only contain newly added
542 2 : // files and will have their parameters adjusted down in
543 2 : // addFileToSublevels. The same can also be said about new intervals
544 2 : // that are to the right of all existing intervals.
545 2 : lastIdx := 0
546 2 : for _, newIdx := range oldToNewMap {
547 2 : for i := lastIdx + 1; i < newIdx; i++ {
548 2 : minIntervalIndex := i
549 2 : maxIntervalIndex := i
550 2 : if keys[lastIdx].filesMaxIntervalIndex != lastIdx {
551 2 : // Last old interval has files extending into keys[i].
552 2 : minIntervalIndex = keys[lastIdx].filesMinIntervalIndex
553 2 : maxIntervalIndex = keys[lastIdx].filesMaxIntervalIndex
554 2 : }
555 :
556 2 : keys[i].filesMinIntervalIndex = minIntervalIndex
557 2 : keys[i].filesMaxIntervalIndex = maxIntervalIndex
558 : }
559 2 : lastIdx = newIdx
560 : }
561 : }
562 : // Go through old files and update interval indices.
563 : //
564 : // TODO(bilal): This is the only place in this method where we loop through
565 : // all existing files, which could be much more in number than newly added
566 : // files. See if we can avoid the need for this, either by getting rid of
567 : // f.minIntervalIndex and f.maxIntervalIndex and calculating them on the fly
568 : // with a binary search, or by only looping through files to the right of
569 : // the first interval touched by this method.
570 2 : for sublevel := range s.Levels {
571 2 : s.Levels[sublevel].Each(func(f *FileMetadata) {
572 2 : oldIntervalDelta := f.maxIntervalIndex - f.minIntervalIndex + 1
573 2 : oldMinIntervalIndex := f.minIntervalIndex
574 2 : f.minIntervalIndex = oldToNewMap[f.minIntervalIndex]
575 2 : // maxIntervalIndex is special. Since it's an inclusive end bound,
576 2 : // we actually have to map it to the _next_ old interval's new
577 2 : // previous interval. This logic is easier to understand if you see
578 2 : // [f.minIntervalIndex, f.maxIntervalIndex] as [f.minIntervalIndex,
579 2 : // f.maxIntervalIndex+1).
580 2 : f.maxIntervalIndex = oldToNewMap[f.maxIntervalIndex+1] - 1
581 2 : newIntervalDelta := f.maxIntervalIndex - f.minIntervalIndex + 1
582 2 : // Recalculate estimatedBytes for all old files across new
583 2 : // intervals, but only if new intervals were added in between.
584 2 : if oldIntervalDelta != newIntervalDelta {
585 2 : // j is incremented so that oldToNewMap[j] points to the next
586 2 : // old interval. This is used to distinguish between old
587 2 : // intervals (i.e. ones where we need to subtract
588 2 : // f.Size/oldIntervalDelta) from new ones (where we don't need
589 2 : // to subtract). In both cases we need to add
590 2 : // f.Size/newIntervalDelta.
591 2 : j := oldMinIntervalIndex
592 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
593 2 : if oldToNewMap[j] == i {
594 2 : newVal.orderedIntervals[i].estimatedBytes -= f.Size / uint64(oldIntervalDelta)
595 2 : j++
596 2 : }
597 2 : newVal.orderedIntervals[i].estimatedBytes += f.Size / uint64(newIntervalDelta)
598 : }
599 : }
600 : })
601 : }
602 2 : updatedSublevels := make([]int, 0)
603 2 : // Update interval indices for new files.
604 2 : for i, f := range files {
605 2 : f.L0Index = s.levelMetadata.Len() + i
606 2 : if err := newVal.addFileToSublevels(f, true /* checkInvariant */); err != nil {
607 2 : return nil, err
608 2 : }
609 2 : updatedSublevels = append(updatedSublevels, f.SubLevel)
610 : }
611 :
612 : // Sort and deduplicate updatedSublevels.
613 2 : sort.Ints(updatedSublevels)
614 2 : {
615 2 : j := 0
616 2 : for i := 1; i < len(updatedSublevels); i++ {
617 2 : if updatedSublevels[i] != updatedSublevels[j] {
618 2 : j++
619 2 : updatedSublevels[j] = updatedSublevels[i]
620 2 : }
621 : }
622 2 : updatedSublevels = updatedSublevels[:j+1]
623 : }
624 :
625 : // Sort each updated sublevel in increasing key order.
626 2 : for _, sublevel := range updatedSublevels {
627 2 : sort.Sort(sublevelSorter(newVal.levelFiles[sublevel]))
628 2 : }
629 :
630 : // Construct a parallel slice of sublevel B-Trees.
631 : // TODO(jackson): Consolidate and only use the B-Trees.
632 2 : for _, sublevel := range updatedSublevels {
633 2 : tr, ls := makeBTree(btreeCmpSmallestKey(newVal.cmp), newVal.levelFiles[sublevel])
634 2 : if sublevel == len(newVal.Levels) {
635 2 : newVal.Levels = append(newVal.Levels, ls)
636 2 : } else {
637 2 : // sublevel < len(s.Levels). If this panics, updatedSublevels was not
638 2 : // populated correctly.
639 2 : newVal.Levels[sublevel] = ls
640 2 : }
641 2 : tr.Release()
642 : }
643 :
644 2 : newVal.flushSplitUserKeys = nil
645 2 : newVal.calculateFlushSplitKeys(flushSplitMaxBytes)
646 2 : return newVal, nil
647 : }
648 :
649 : // addFileToSublevels is called during L0Sublevels generation, and adds f to the
650 : // correct sublevel's levelFiles, the relevant intervals' files slices, and sets
651 : // interval indices on f. This method, if called successively on multiple files,
652 : // _must_ be called on successively newer files (by seqnum). If checkInvariant
653 : // is true, it could check for this in some cases and return
654 : // [errInvalidL0SublevelsOpt] if that invariant isn't held.
655 2 : func (s *L0Sublevels) addFileToSublevels(f *FileMetadata, checkInvariant bool) error {
656 2 : // This is a simple and not very accurate estimate of the number of
657 2 : // bytes this SSTable contributes to the intervals it is a part of.
658 2 : //
659 2 : // TODO(bilal): Call EstimateDiskUsage in sstable.Reader with interval
660 2 : // bounds to get a better estimate for each interval.
661 2 : interpolatedBytes := f.Size / uint64(f.maxIntervalIndex-f.minIntervalIndex+1)
662 2 : s.fileBytes += f.Size
663 2 : subLevel := 0
664 2 : // Update state in every fileInterval for this file.
665 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
666 2 : interval := &s.orderedIntervals[i]
667 2 : if len(interval.files) > 0 {
668 2 : if checkInvariant && interval.files[len(interval.files)-1].LargestSeqNum > f.LargestSeqNum {
669 2 : // We are sliding this file "underneath" an existing file. Throw away
670 2 : // and start over in NewL0Sublevels.
671 2 : return errInvalidL0SublevelsOpt
672 2 : }
673 : // interval.files is sorted by sublevels, from lowest to highest.
674 : // AddL0Files can only add files at sublevels higher than existing files
675 : // in the same key intervals.
676 2 : if maxSublevel := interval.files[len(interval.files)-1].SubLevel; subLevel <= maxSublevel {
677 2 : subLevel = maxSublevel + 1
678 2 : }
679 : }
680 2 : interval.estimatedBytes += interpolatedBytes
681 2 : if f.minIntervalIndex < interval.filesMinIntervalIndex {
682 2 : interval.filesMinIntervalIndex = f.minIntervalIndex
683 2 : }
684 2 : if f.maxIntervalIndex > interval.filesMaxIntervalIndex {
685 2 : interval.filesMaxIntervalIndex = f.maxIntervalIndex
686 2 : }
687 2 : interval.files = append(interval.files, f)
688 : }
689 2 : f.SubLevel = subLevel
690 2 : if subLevel > len(s.levelFiles) {
691 0 : return errors.Errorf("chose a sublevel beyond allowed range of sublevels: %d vs 0-%d", subLevel, len(s.levelFiles))
692 0 : }
693 2 : if subLevel == len(s.levelFiles) {
694 2 : s.levelFiles = append(s.levelFiles, []*FileMetadata{f})
695 2 : } else {
696 2 : s.levelFiles[subLevel] = append(s.levelFiles[subLevel], f)
697 2 : }
698 2 : return nil
699 : }
700 :
701 2 : func (s *L0Sublevels) calculateFlushSplitKeys(flushSplitMaxBytes int64) {
702 2 : var cumulativeBytes uint64
703 2 : // Multiply flushSplitMaxBytes by the number of sublevels. This prevents
704 2 : // excessive flush splitting when the number of sublevels increases.
705 2 : flushSplitMaxBytes *= int64(len(s.levelFiles))
706 2 : for i := 0; i < len(s.orderedIntervals); i++ {
707 2 : interval := &s.orderedIntervals[i]
708 2 : if flushSplitMaxBytes > 0 && cumulativeBytes > uint64(flushSplitMaxBytes) &&
709 2 : (len(s.flushSplitUserKeys) == 0 ||
710 2 : !bytes.Equal(interval.startKey.key, s.flushSplitUserKeys[len(s.flushSplitUserKeys)-1])) {
711 2 : s.flushSplitUserKeys = append(s.flushSplitUserKeys, interval.startKey.key)
712 2 : cumulativeBytes = 0
713 2 : }
714 2 : cumulativeBytes += s.orderedIntervals[i].estimatedBytes
715 : }
716 : }
717 :
718 : // InitCompactingFileInfo initializes internal flags relating to compacting
719 : // files. Must be called after sublevel initialization.
720 : //
721 : // Requires DB.mu *and* the manifest lock to be held.
722 2 : func (s *L0Sublevels) InitCompactingFileInfo(inProgress []L0Compaction) {
723 2 : for i := range s.orderedIntervals {
724 2 : s.orderedIntervals[i].compactingFileCount = 0
725 2 : s.orderedIntervals[i].isBaseCompacting = false
726 2 : s.orderedIntervals[i].intervalRangeIsBaseCompacting = false
727 2 : }
728 :
729 2 : iter := s.levelMetadata.Iter()
730 2 : for f := iter.First(); f != nil; f = iter.Next() {
731 2 : if invariants.Enabled {
732 2 : if !bytes.Equal(s.orderedIntervals[f.minIntervalIndex].startKey.key, f.Smallest.UserKey) {
733 0 : panic(fmt.Sprintf("f.minIntervalIndex in FileMetadata out of sync with intervals in L0Sublevels: %s != %s",
734 0 : s.formatKey(s.orderedIntervals[f.minIntervalIndex].startKey.key), s.formatKey(f.Smallest.UserKey)))
735 : }
736 2 : if !bytes.Equal(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key, f.Largest.UserKey) {
737 0 : panic(fmt.Sprintf("f.maxIntervalIndex in FileMetadata out of sync with intervals in L0Sublevels: %s != %s",
738 0 : s.formatKey(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key), s.formatKey(f.Smallest.UserKey)))
739 : }
740 : }
741 2 : if !f.IsCompacting() {
742 2 : continue
743 : }
744 2 : if invariants.Enabled {
745 2 : if s.cmp(s.orderedIntervals[f.minIntervalIndex].startKey.key, f.Smallest.UserKey) != 0 || s.cmp(s.orderedIntervals[f.maxIntervalIndex+1].startKey.key, f.Largest.UserKey) != 0 {
746 0 : panic(fmt.Sprintf("file %s has inconsistent L0 Sublevel interval bounds: %s-%s, %s-%s", f.FileNum,
747 0 : s.orderedIntervals[f.minIntervalIndex].startKey.key, s.orderedIntervals[f.maxIntervalIndex+1].startKey.key,
748 0 : f.Smallest.UserKey, f.Largest.UserKey))
749 : }
750 : }
751 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
752 2 : interval := &s.orderedIntervals[i]
753 2 : interval.compactingFileCount++
754 2 : if !f.IsIntraL0Compacting {
755 2 : // If f.Compacting && !f.IsIntraL0Compacting, this file is
756 2 : // being compacted to Lbase.
757 2 : interval.isBaseCompacting = true
758 2 : }
759 : }
760 : }
761 :
762 : // Some intervals may be base compacting without the files contained within
763 : // those intervals being marked as compacting. This is possible if the files
764 : // were added after the compaction initiated, and the active compaction
765 : // files straddle the input file. Mark these intervals as base compacting.
766 2 : for _, c := range inProgress {
767 2 : startIK := intervalKey{key: c.Smallest.UserKey, isInclusiveEndBound: false}
768 2 : endIK := intervalKey{key: c.Largest.UserKey, isInclusiveEndBound: !c.Largest.IsExclusiveSentinel()}
769 2 : start, _ := slices.BinarySearchFunc(s.orderedIntervals, startIK, func(a fileInterval, b intervalKey) int {
770 2 : return intervalKeyCompare(s.cmp, a.startKey, b)
771 2 : })
772 2 : end, _ := slices.BinarySearchFunc(s.orderedIntervals, endIK, func(a fileInterval, b intervalKey) int {
773 2 : return intervalKeyCompare(s.cmp, a.startKey, b)
774 2 : })
775 2 : for i := start; i < end && i < len(s.orderedIntervals); i++ {
776 2 : interval := &s.orderedIntervals[i]
777 2 : if !c.IsIntraL0 {
778 2 : interval.isBaseCompacting = true
779 2 : }
780 : }
781 : }
782 :
783 2 : min := 0
784 2 : for i := range s.orderedIntervals {
785 2 : interval := &s.orderedIntervals[i]
786 2 : if interval.isBaseCompacting {
787 2 : minIndex := interval.filesMinIntervalIndex
788 2 : if minIndex < min {
789 2 : minIndex = min
790 2 : }
791 2 : for j := minIndex; j <= interval.filesMaxIntervalIndex; j++ {
792 2 : min = j
793 2 : s.orderedIntervals[j].intervalRangeIsBaseCompacting = true
794 2 : }
795 : }
796 : }
797 : }
798 :
799 : // String produces a string containing useful debug information. Useful in test
800 : // code and debugging.
801 0 : func (s *L0Sublevels) String() string {
802 0 : return s.describe(false)
803 0 : }
804 :
805 1 : func (s *L0Sublevels) describe(verbose bool) string {
806 1 : var buf strings.Builder
807 1 : fmt.Fprintf(&buf, "file count: %d, sublevels: %d, intervals: %d\nflush split keys(%d): [",
808 1 : s.levelMetadata.Len(), len(s.levelFiles), len(s.orderedIntervals), len(s.flushSplitUserKeys))
809 1 : for i := range s.flushSplitUserKeys {
810 1 : fmt.Fprintf(&buf, "%s", s.formatKey(s.flushSplitUserKeys[i]))
811 1 : if i < len(s.flushSplitUserKeys)-1 {
812 1 : fmt.Fprintf(&buf, ", ")
813 1 : }
814 : }
815 1 : fmt.Fprintln(&buf, "]")
816 1 : numCompactingFiles := 0
817 1 : for i := len(s.levelFiles) - 1; i >= 0; i-- {
818 1 : maxIntervals := 0
819 1 : sumIntervals := 0
820 1 : var totalBytes uint64
821 1 : for _, f := range s.levelFiles[i] {
822 1 : intervals := f.maxIntervalIndex - f.minIntervalIndex + 1
823 1 : if intervals > maxIntervals {
824 1 : maxIntervals = intervals
825 1 : }
826 1 : sumIntervals += intervals
827 1 : totalBytes += f.Size
828 1 : if f.IsCompacting() {
829 1 : numCompactingFiles++
830 1 : }
831 : }
832 1 : fmt.Fprintf(&buf, "0.%d: file count: %d, bytes: %d, width (mean, max): %0.1f, %d, interval range: [%d, %d]\n",
833 1 : i, len(s.levelFiles[i]), totalBytes, float64(sumIntervals)/float64(len(s.levelFiles[i])), maxIntervals, s.levelFiles[i][0].minIntervalIndex,
834 1 : s.levelFiles[i][len(s.levelFiles[i])-1].maxIntervalIndex)
835 1 : for _, f := range s.levelFiles[i] {
836 1 : intervals := f.maxIntervalIndex - f.minIntervalIndex + 1
837 1 : if verbose {
838 1 : fmt.Fprintf(&buf, "\t%s\n", f)
839 1 : }
840 1 : if s.levelMetadata.Len() > 50 && intervals*3 > len(s.orderedIntervals) {
841 0 : var intervalsBytes uint64
842 0 : for k := f.minIntervalIndex; k <= f.maxIntervalIndex; k++ {
843 0 : intervalsBytes += s.orderedIntervals[k].estimatedBytes
844 0 : }
845 0 : fmt.Fprintf(&buf, "wide file: %d, [%d, %d], byte fraction: %f\n",
846 0 : f.FileNum, f.minIntervalIndex, f.maxIntervalIndex,
847 0 : float64(intervalsBytes)/float64(s.fileBytes))
848 : }
849 : }
850 : }
851 :
852 1 : lastCompactingIntervalStart := -1
853 1 : fmt.Fprintf(&buf, "compacting file count: %d, base compacting intervals: ", numCompactingFiles)
854 1 : i := 0
855 1 : foundBaseCompactingIntervals := false
856 1 : for ; i < len(s.orderedIntervals); i++ {
857 1 : interval := &s.orderedIntervals[i]
858 1 : if len(interval.files) == 0 {
859 1 : continue
860 : }
861 1 : if !interval.isBaseCompacting {
862 1 : if lastCompactingIntervalStart != -1 {
863 1 : if foundBaseCompactingIntervals {
864 1 : buf.WriteString(", ")
865 1 : }
866 1 : fmt.Fprintf(&buf, "[%d, %d]", lastCompactingIntervalStart, i-1)
867 1 : foundBaseCompactingIntervals = true
868 : }
869 1 : lastCompactingIntervalStart = -1
870 1 : } else {
871 1 : if lastCompactingIntervalStart == -1 {
872 1 : lastCompactingIntervalStart = i
873 1 : }
874 : }
875 : }
876 1 : if lastCompactingIntervalStart != -1 {
877 1 : if foundBaseCompactingIntervals {
878 1 : buf.WriteString(", ")
879 1 : }
880 1 : fmt.Fprintf(&buf, "[%d, %d]", lastCompactingIntervalStart, i-1)
881 1 : } else if !foundBaseCompactingIntervals {
882 1 : fmt.Fprintf(&buf, "none")
883 1 : }
884 1 : fmt.Fprintln(&buf, "")
885 1 : return buf.String()
886 : }
887 :
888 : // ReadAmplification returns the contribution of L0Sublevels to the read
889 : // amplification for any particular point key. It is the maximum height of any
890 : // tracked fileInterval. This is always less than or equal to the number of
891 : // sublevels.
892 2 : func (s *L0Sublevels) ReadAmplification() int {
893 2 : amp := 0
894 2 : for i := range s.orderedIntervals {
895 2 : interval := &s.orderedIntervals[i]
896 2 : fileCount := len(interval.files)
897 2 : if amp < fileCount {
898 2 : amp = fileCount
899 2 : }
900 : }
901 2 : return amp
902 : }
903 :
904 : // InUseKeyRanges returns the merged table bounds of L0 files overlapping the
905 : // provided user key range. The returned key ranges are sorted and
906 : // nonoverlapping.
907 2 : func (s *L0Sublevels) InUseKeyRanges(smallest, largest []byte) []base.UserKeyBounds {
908 2 : // Binary search to find the provided keys within the intervals.
909 2 : startIK := intervalKey{key: smallest, isInclusiveEndBound: false}
910 2 : endIK := intervalKey{key: largest, isInclusiveEndBound: true}
911 2 : start := sort.Search(len(s.orderedIntervals), func(i int) bool {
912 2 : return intervalKeyCompare(s.cmp, s.orderedIntervals[i].startKey, startIK) > 0
913 2 : })
914 2 : if start > 0 {
915 2 : // Back up to the first interval with a start key <= startIK.
916 2 : start--
917 2 : }
918 2 : end := sort.Search(len(s.orderedIntervals), func(i int) bool {
919 2 : return intervalKeyCompare(s.cmp, s.orderedIntervals[i].startKey, endIK) > 0
920 2 : })
921 :
922 2 : var keyRanges []base.UserKeyBounds
923 2 : var curr *base.UserKeyBounds
924 2 : for i := start; i < end; {
925 2 : // Intervals with no files are not in use and can be skipped, once we
926 2 : // end the current UserKeyRange.
927 2 : if len(s.orderedIntervals[i].files) == 0 {
928 2 : curr = nil
929 2 : i++
930 2 : continue
931 : }
932 :
933 : // If curr is nil, start a new in-use key range.
934 2 : if curr == nil {
935 2 : keyRanges = append(keyRanges, base.UserKeyBounds{
936 2 : Start: s.orderedIntervals[i].startKey.key,
937 2 : })
938 2 : curr = &keyRanges[len(keyRanges)-1]
939 2 : }
940 :
941 : // If the filesMaxIntervalIndex is not the current index, we can jump to
942 : // the max index, knowing that all intermediary intervals are overlapped
943 : // by some file.
944 2 : if maxIdx := s.orderedIntervals[i].filesMaxIntervalIndex; maxIdx != i {
945 2 : // Note that end may be less than or equal to maxIdx if we're
946 2 : // concerned with a key range that ends before the interval at
947 2 : // maxIdx starts. We must set curr.End now, before making that leap,
948 2 : // because this iteration may be the last.
949 2 : i = maxIdx
950 2 : curr.End = s.orderedIntervals[i+1].startKey.toEndBoundary()
951 2 : continue
952 : }
953 :
954 : // No files overlapping with this interval overlap with the next
955 : // interval. Update the current end to be the next interval's start key.
956 : // Note that curr is not necessarily finished, because there may be an
957 : // abutting non-empty interval.
958 2 : curr.End = s.orderedIntervals[i+1].startKey.toEndBoundary()
959 2 : i++
960 : }
961 2 : return keyRanges
962 : }
963 :
964 : // FlushSplitKeys returns a slice of user keys to split flushes at. Used by
965 : // flushes to avoid writing sstables that straddle these split keys. These
966 : // should be interpreted as the keys to start the next sstable (not the last key
967 : // to include in the prev sstable). These are user keys so that range tombstones
968 : // can be properly truncated (untruncated range tombstones are not permitted for
969 : // L0 files).
970 2 : func (s *L0Sublevels) FlushSplitKeys() [][]byte {
971 2 : return s.flushSplitUserKeys
972 2 : }
973 :
974 : // MaxDepthAfterOngoingCompactions returns an estimate of maximum depth of
975 : // sublevels after all ongoing compactions run to completion. Used by compaction
976 : // picker to decide compaction score for L0. There is no scoring for intra-L0
977 : // compactions -- they only run if L0 score is high but we're unable to pick an
978 : // L0 -> Lbase compaction.
979 2 : func (s *L0Sublevels) MaxDepthAfterOngoingCompactions() int {
980 2 : depth := 0
981 2 : for i := range s.orderedIntervals {
982 2 : interval := &s.orderedIntervals[i]
983 2 : intervalDepth := len(interval.files) - interval.compactingFileCount
984 2 : if depth < intervalDepth {
985 2 : depth = intervalDepth
986 2 : }
987 : }
988 2 : return depth
989 : }
990 :
991 : // Only for temporary debugging in the absence of proper tests.
992 : //
993 : // TODO(bilal): Simplify away the debugging statements in this method, and make
994 : // this a pure sanity checker.
995 : //
996 : //lint:ignore U1000 - useful for debugging
997 0 : func (s *L0Sublevels) checkCompaction(c *L0CompactionFiles) error {
998 0 : includedFiles := newBitSet(s.levelMetadata.Len())
999 0 : fileIntervalsByLevel := make([]struct {
1000 0 : min int
1001 0 : max int
1002 0 : }, len(s.levelFiles))
1003 0 : for i := range fileIntervalsByLevel {
1004 0 : fileIntervalsByLevel[i].min = math.MaxInt32
1005 0 : fileIntervalsByLevel[i].max = 0
1006 0 : }
1007 0 : var topLevel int
1008 0 : var increment int
1009 0 : var limitReached func(int) bool
1010 0 : if c.isIntraL0 {
1011 0 : topLevel = len(s.levelFiles) - 1
1012 0 : increment = +1
1013 0 : limitReached = func(level int) bool {
1014 0 : return level == len(s.levelFiles)
1015 0 : }
1016 0 : } else {
1017 0 : topLevel = 0
1018 0 : increment = -1
1019 0 : limitReached = func(level int) bool {
1020 0 : return level < 0
1021 0 : }
1022 : }
1023 0 : for _, f := range c.Files {
1024 0 : if fileIntervalsByLevel[f.SubLevel].min > f.minIntervalIndex {
1025 0 : fileIntervalsByLevel[f.SubLevel].min = f.minIntervalIndex
1026 0 : }
1027 0 : if fileIntervalsByLevel[f.SubLevel].max < f.maxIntervalIndex {
1028 0 : fileIntervalsByLevel[f.SubLevel].max = f.maxIntervalIndex
1029 0 : }
1030 0 : includedFiles.markBit(f.L0Index)
1031 0 : if c.isIntraL0 {
1032 0 : if topLevel > f.SubLevel {
1033 0 : topLevel = f.SubLevel
1034 0 : }
1035 0 : } else {
1036 0 : if topLevel < f.SubLevel {
1037 0 : topLevel = f.SubLevel
1038 0 : }
1039 : }
1040 : }
1041 0 : min := fileIntervalsByLevel[topLevel].min
1042 0 : max := fileIntervalsByLevel[topLevel].max
1043 0 : for level := topLevel; !limitReached(level); level += increment {
1044 0 : if fileIntervalsByLevel[level].min < min {
1045 0 : min = fileIntervalsByLevel[level].min
1046 0 : }
1047 0 : if fileIntervalsByLevel[level].max > max {
1048 0 : max = fileIntervalsByLevel[level].max
1049 0 : }
1050 0 : index, _ := slices.BinarySearchFunc(s.levelFiles[level], min, func(a *FileMetadata, b int) int {
1051 0 : return stdcmp.Compare(a.maxIntervalIndex, b)
1052 0 : })
1053 : // start := index
1054 0 : for ; index < len(s.levelFiles[level]); index++ {
1055 0 : f := s.levelFiles[level][index]
1056 0 : if f.minIntervalIndex > max {
1057 0 : break
1058 : }
1059 0 : if c.isIntraL0 && f.LargestSeqNum >= c.earliestUnflushedSeqNum {
1060 0 : return errors.Errorf(
1061 0 : "sstable %s in compaction has sequence numbers higher than the earliest unflushed seqnum %d: %d-%d",
1062 0 : f.FileNum, c.earliestUnflushedSeqNum, f.SmallestSeqNum,
1063 0 : f.LargestSeqNum)
1064 0 : }
1065 0 : if !includedFiles[f.L0Index] {
1066 0 : var buf strings.Builder
1067 0 : fmt.Fprintf(&buf, "bug %t, seed interval: %d: level %d, sl index %d, f.index %d, min %d, max %d, pre-min %d, pre-max %d, f.min %d, f.max %d, filenum: %d, isCompacting: %t\n%s\n",
1068 0 : c.isIntraL0, c.seedInterval, level, index, f.L0Index, min, max, c.preExtensionMinInterval, c.preExtensionMaxInterval,
1069 0 : f.minIntervalIndex, f.maxIntervalIndex,
1070 0 : f.FileNum, f.IsCompacting(), s)
1071 0 : fmt.Fprintf(&buf, "files included:\n")
1072 0 : for _, f := range c.Files {
1073 0 : fmt.Fprintf(&buf, "filenum: %d, sl: %d, index: %d, [%d, %d]\n",
1074 0 : f.FileNum, f.SubLevel, f.L0Index, f.minIntervalIndex, f.maxIntervalIndex)
1075 0 : }
1076 0 : fmt.Fprintf(&buf, "files added:\n")
1077 0 : for _, f := range c.filesAdded {
1078 0 : fmt.Fprintf(&buf, "filenum: %d, sl: %d, index: %d, [%d, %d]\n",
1079 0 : f.FileNum, f.SubLevel, f.L0Index, f.minIntervalIndex, f.maxIntervalIndex)
1080 0 : }
1081 0 : return errors.New(buf.String())
1082 : }
1083 : }
1084 : }
1085 0 : return nil
1086 : }
1087 :
1088 : // UpdateStateForStartedCompaction updates internal L0Sublevels state for a
1089 : // recently started compaction. isBase specifies if this is a base compaction;
1090 : // if false, this is assumed to be an intra-L0 compaction. The specified
1091 : // compaction must be involving L0 SSTables. It's assumed that the Compacting
1092 : // and IsIntraL0Compacting fields are already set on all [FileMetadata]s passed
1093 : // in.
1094 2 : func (s *L0Sublevels) UpdateStateForStartedCompaction(inputs []LevelSlice, isBase bool) error {
1095 2 : minIntervalIndex := -1
1096 2 : maxIntervalIndex := 0
1097 2 : for i := range inputs {
1098 2 : iter := inputs[i].Iter()
1099 2 : for f := iter.First(); f != nil; f = iter.Next() {
1100 2 : for i := f.minIntervalIndex; i <= f.maxIntervalIndex; i++ {
1101 2 : interval := &s.orderedIntervals[i]
1102 2 : interval.compactingFileCount++
1103 2 : }
1104 2 : if f.minIntervalIndex < minIntervalIndex || minIntervalIndex == -1 {
1105 2 : minIntervalIndex = f.minIntervalIndex
1106 2 : }
1107 2 : if f.maxIntervalIndex > maxIntervalIndex {
1108 2 : maxIntervalIndex = f.maxIntervalIndex
1109 2 : }
1110 : }
1111 : }
1112 2 : if isBase {
1113 2 : for i := minIntervalIndex; i <= maxIntervalIndex; i++ {
1114 2 : interval := &s.orderedIntervals[i]
1115 2 : interval.isBaseCompacting = isBase
1116 2 : for j := interval.filesMinIntervalIndex; j <= interval.filesMaxIntervalIndex; j++ {
1117 2 : s.orderedIntervals[j].intervalRangeIsBaseCompacting = true
1118 2 : }
1119 : }
1120 : }
1121 2 : return nil
1122 : }
1123 :
1124 : // L0CompactionFiles represents a candidate set of L0 files for compaction. Also
1125 : // referred to as "lcf". Contains state information useful for generating the
1126 : // compaction (such as Files), as well as for picking between candidate
1127 : // compactions (eg. fileBytes and seedIntervalStackDepthReduction).
1128 : type L0CompactionFiles struct {
1129 : Files []*FileMetadata
1130 :
1131 : FilesIncluded bitSet
1132 : // A "seed interval" is an interval with a high stack depth that was chosen
1133 : // to bootstrap this compaction candidate. seedIntervalStackDepthReduction
1134 : // is the number of sublevels that have a file in the seed interval that is
1135 : // a part of this compaction.
1136 : seedIntervalStackDepthReduction int
1137 : // For base compactions, seedIntervalMinLevel is 0, and for intra-L0
1138 : // compactions, seedIntervalMaxLevel is len(s.Files)-1 i.e. the highest
1139 : // sublevel.
1140 : seedIntervalMinLevel int
1141 : seedIntervalMaxLevel int
1142 : // Index of the seed interval.
1143 : seedInterval int
1144 : // Sum of file sizes for all files in this compaction.
1145 : fileBytes uint64
1146 : // Intervals with index [minIntervalIndex, maxIntervalIndex] are
1147 : // participating in this compaction; it's the union set of all intervals
1148 : // overlapped by participating files.
1149 : minIntervalIndex int
1150 : maxIntervalIndex int
1151 :
1152 : // Set for intra-L0 compactions. SSTables with sequence numbers greater
1153 : // than earliestUnflushedSeqNum cannot be a part of intra-L0 compactions.
1154 : isIntraL0 bool
1155 : earliestUnflushedSeqNum base.SeqNum
1156 :
1157 : // For debugging purposes only. Used in checkCompaction().
1158 : preExtensionMinInterval int
1159 : preExtensionMaxInterval int
1160 : filesAdded []*FileMetadata
1161 : }
1162 :
1163 : // Clone allocates a new L0CompactionFiles, with the same underlying data. Note
1164 : // that the two fileMetadata slices contain values that point to the same
1165 : // underlying fileMetadata object. This is safe because these objects are read
1166 : // only.
1167 2 : func (l *L0CompactionFiles) Clone() *L0CompactionFiles {
1168 2 : oldLcf := *l
1169 2 : return &oldLcf
1170 2 : }
1171 :
1172 : // String merely prints the starting address of the first file, if it exists.
1173 1 : func (l *L0CompactionFiles) String() string {
1174 1 : if len(l.Files) > 0 {
1175 1 : return fmt.Sprintf("First File Address: %p", &l.Files[0])
1176 1 : }
1177 0 : return ""
1178 : }
1179 :
1180 : // addFile adds the specified file to the LCF.
1181 2 : func (l *L0CompactionFiles) addFile(f *FileMetadata) {
1182 2 : if l.FilesIncluded[f.L0Index] {
1183 2 : return
1184 2 : }
1185 2 : l.FilesIncluded.markBit(f.L0Index)
1186 2 : l.Files = append(l.Files, f)
1187 2 : l.filesAdded = append(l.filesAdded, f)
1188 2 : l.fileBytes += f.Size
1189 2 : if f.minIntervalIndex < l.minIntervalIndex {
1190 2 : l.minIntervalIndex = f.minIntervalIndex
1191 2 : }
1192 2 : if f.maxIntervalIndex > l.maxIntervalIndex {
1193 2 : l.maxIntervalIndex = f.maxIntervalIndex
1194 2 : }
1195 : }
1196 :
1197 : // Helper to order intervals being considered for compaction.
1198 : type intervalAndScore struct {
1199 : interval int
1200 : score int
1201 : }
1202 : type intervalSorterByDecreasingScore []intervalAndScore
1203 :
1204 2 : func (is intervalSorterByDecreasingScore) Len() int { return len(is) }
1205 2 : func (is intervalSorterByDecreasingScore) Less(i, j int) bool {
1206 2 : return is[i].score > is[j].score
1207 2 : }
1208 2 : func (is intervalSorterByDecreasingScore) Swap(i, j int) {
1209 2 : is[i], is[j] = is[j], is[i]
1210 2 : }
1211 :
1212 : // Compactions:
1213 : //
1214 : // The sub-levels and intervals can be visualized in 2 dimensions as the X axis
1215 : // containing intervals in increasing order and the Y axis containing sub-levels
1216 : // (older to younger). The intervals can be sparse wrt sub-levels. We observe
1217 : // that the system is typically under severe pressure in L0 during large numbers
1218 : // of ingestions where most files added to L0 are narrow and non-overlapping.
1219 : //
1220 : // L0.1 d---g
1221 : // L0.0 c--e g--j o--s u--x
1222 : //
1223 : // As opposed to a case with a lot of wide, overlapping L0 files:
1224 : //
1225 : // L0.3 d-----------r
1226 : // L0.2 c--------o
1227 : // L0.1 b-----------q
1228 : // L0.0 a----------------x
1229 : //
1230 : // In that case we expect the rectangle represented in the good visualization
1231 : // above (i.e. the first one) to be wide and short, and not too sparse (most
1232 : // intervals will have fileCount close to the sub-level count), which would make
1233 : // it amenable to concurrent L0 -> Lbase compactions.
1234 : //
1235 : // L0 -> Lbase: The high-level goal of a L0 -> Lbase compaction is to reduce
1236 : // stack depth, by compacting files in the intervals with the highest (fileCount
1237 : // - compactingCount). Additionally, we would like compactions to not involve a
1238 : // huge number of files, so that they finish quickly, and to allow for
1239 : // concurrent L0 -> Lbase compactions when needed. In order to achieve these
1240 : // goals we would like compactions to visualize as capturing thin and tall
1241 : // rectangles. The approach below is to consider intervals in some order and
1242 : // then try to construct a compaction using the interval. The first interval we
1243 : // can construct a compaction for is the compaction that is started. There can
1244 : // be multiple heuristics in choosing the ordering of the intervals -- the code
1245 : // uses one heuristic that worked well for a large ingestion stemming from a
1246 : // cockroachdb import, but additional experimentation is necessary to pick a
1247 : // general heuristic. Additionally, the compaction that gets picked may be not
1248 : // as desirable as one that could be constructed later in terms of reducing
1249 : // stack depth (since adding more files to the compaction can get blocked by
1250 : // needing to encompass files that are already being compacted). So an
1251 : // alternative would be to try to construct more than one compaction and pick
1252 : // the best one.
1253 : //
1254 : // Here's a visualization of an ideal L0->LBase compaction selection:
1255 : //
1256 : // L0.3 a--d g-j
1257 : // L0.2 f--j r-t
1258 : // L0.1 b-d e---j
1259 : // L0.0 a--d f--j l--o p-----x
1260 : //
1261 : // Lbase a--------i m---------w
1262 : //
1263 : // The [g,j] interval has the highest stack depth, so it would have the highest
1264 : // priority for selecting a base compaction candidate. Assuming none of the
1265 : // files are already compacting, this is the compaction that will be chosen:
1266 : //
1267 : // _______
1268 : // L0.3 a--d | g-j|
1269 : // L0.2 | f--j| r-t
1270 : // L0.1 b-d |e---j|
1271 : // L0.0 a--d | f--j| l--o p-----x
1272 : //
1273 : // Lbase a--------i m---------w
1274 : //
1275 : // Note that running this compaction will mark the a--i file in Lbase as
1276 : // compacting, and when ExtendL0ForBaseCompactionTo is called with the bounds of
1277 : // that base file, it'll expand the compaction to also include all L0 files in
1278 : // the a-d interval. The resultant compaction would then be:
1279 : //
1280 : // _____________
1281 : // L0.3 |a--d g-j|
1282 : // L0.2 | f--j| r-t
1283 : // L0.1 | b-d e---j|
1284 : // L0.0 |a--d f--j| l--o p-----x
1285 : //
1286 : // Lbase a--------i m---------w
1287 : //
1288 : // The next best interval for base compaction would therefore be the one
1289 : // including r--t in L0.2 and p--x in L0.0, and both this compaction and the one
1290 : // picked earlier can run in parallel. This is assuming minCompactionDepth >= 2,
1291 : // otherwise the second compaction has too little depth to pick.
1292 : //
1293 : // _____________
1294 : // L0.3 |a--d g-j| _________
1295 : // L0.2 | f--j| | r-t |
1296 : // L0.1 | b-d e---j| | |
1297 : // L0.0 |a--d f--j| l--o |p-----x|
1298 : //
1299 : // Lbase a--------i m---------w
1300 : //
1301 : // Note that when ExtendL0ForBaseCompactionTo is called, the compaction expands
1302 : // to the following, given that the [l,o] file can be added without including
1303 : // additional files in Lbase:
1304 : //
1305 : // _____________
1306 : // L0.3 |a--d g-j| _________
1307 : // L0.2 | f--j| | r-t |
1308 : // L0.1 | b-d e---j|______| |
1309 : // L0.0 |a--d f--j||l--o p-----x|
1310 : //
1311 : // Lbase a--------i m---------w
1312 : //
1313 : // If an additional file existed in LBase that overlapped with [l,o], it would
1314 : // be excluded from the compaction. Concretely:
1315 : //
1316 : // _____________
1317 : // L0.3 |a--d g-j| _________
1318 : // L0.2 | f--j| | r-t |
1319 : // L0.1 | b-d e---j| | |
1320 : // L0.0 |a--d f--j| l--o |p-----x|
1321 : //
1322 : // Lbase a--------ij--lm---------w
1323 : //
1324 : // Intra-L0: If the L0 score is high, but PickBaseCompaction() is unable to pick
1325 : // a compaction, PickIntraL0Compaction will be used to pick an intra-L0
1326 : // compaction. Similar to L0 -> Lbase compactions, we want to allow for multiple
1327 : // intra-L0 compactions and not generate wide output files that hinder later
1328 : // concurrency of L0 -> Lbase compactions. Also compactions that produce wide
1329 : // files don't reduce stack depth -- they represent wide rectangles in our
1330 : // visualization, which means many intervals have their depth reduced by a small
1331 : // amount. Typically, L0 files have non-overlapping sequence numbers, and
1332 : // sticking to that invariant would require us to consider intra-L0 compactions
1333 : // that proceed from youngest to oldest files, which could result in the
1334 : // aforementioned undesirable wide rectangle shape. But this non-overlapping
1335 : // sequence number is already relaxed in RocksDB -- sstables are primarily
1336 : // ordered by their largest sequence number. So we can arrange for intra-L0
1337 : // compactions to capture thin and tall rectangles starting with the top of the
1338 : // stack (youngest files). Like the L0 -> Lbase case we order the intervals
1339 : // using a heuristic and consider each in turn. The same comment about better L0
1340 : // -> Lbase heuristics and not being greedy applies here.
1341 : //
1342 : // Going back to a modified version of our example from earlier, let's say these
1343 : // are the base compactions in progress:
1344 : // _______
1345 : // L0.3 a--d | g-j| _________
1346 : // L0.2 | f--j| | r-t |
1347 : // L0.1 b-d |e---j| | |
1348 : // L0.0 a--d | f--j| l--o |p-----x|
1349 : //
1350 : // Lbase a---------i m---------w
1351 : //
1352 : // Since both LBase files are compacting, the only L0 compaction that can be
1353 : // picked is an intra-L0 compaction. For this, the b--d interval has the highest
1354 : // stack depth (3), and starting with a--d in L0.3 as the seed file, we can
1355 : // iterate downward and build this compaction, assuming all files in that
1356 : // interval are not compacting and have a highest sequence number less than
1357 : // earliestUnflushedSeqNum:
1358 : //
1359 : // _______
1360 : // L0.3 |a--d| | g-j| _________
1361 : // L0.2 | | | f--j| | r-t |
1362 : // L0.1 | b-d| |e---j| | |
1363 : // L0.0 |a--d| | f--j| l--o |p-----x|
1364 : // ------
1365 : // Lbase a---------i m---------w
1366 : //
1367 :
1368 : // PickBaseCompaction picks a base compaction based on the above specified
1369 : // heuristics, for the specified Lbase files and a minimum depth of overlapping
1370 : // files that can be selected for compaction. Returns nil if no compaction is
1371 : // possible.
1372 : func (s *L0Sublevels) PickBaseCompaction(
1373 : minCompactionDepth int, baseFiles LevelSlice,
1374 2 : ) (*L0CompactionFiles, error) {
1375 2 : // For LBase compactions, we consider intervals in a greedy manner in the
1376 2 : // following order:
1377 2 : // - Intervals that are unlikely to be blocked due
1378 2 : // to ongoing L0 -> Lbase compactions. These are the ones with
1379 2 : // !isBaseCompacting && !intervalRangeIsBaseCompacting.
1380 2 : // - Intervals that are !isBaseCompacting && intervalRangeIsBaseCompacting.
1381 2 : //
1382 2 : // The ordering heuristic exists just to avoid wasted work. Ideally,
1383 2 : // we would consider all intervals with isBaseCompacting = false and
1384 2 : // construct a compaction for it and compare the constructed compactions
1385 2 : // and pick the best one. If microbenchmarks show that we can afford
1386 2 : // this cost we can eliminate this heuristic.
1387 2 : scoredIntervals := make([]intervalAndScore, 0, len(s.orderedIntervals))
1388 2 : sublevelCount := len(s.levelFiles)
1389 2 : for i := range s.orderedIntervals {
1390 2 : interval := &s.orderedIntervals[i]
1391 2 : depth := len(interval.files) - interval.compactingFileCount
1392 2 : if interval.isBaseCompacting || minCompactionDepth > depth {
1393 2 : continue
1394 : }
1395 2 : if interval.intervalRangeIsBaseCompacting {
1396 2 : scoredIntervals = append(scoredIntervals, intervalAndScore{interval: i, score: depth})
1397 2 : } else {
1398 2 : // Prioritize this interval by incrementing the score by the number
1399 2 : // of sublevels.
1400 2 : scoredIntervals = append(scoredIntervals, intervalAndScore{interval: i, score: depth + sublevelCount})
1401 2 : }
1402 : }
1403 2 : sort.Sort(intervalSorterByDecreasingScore(scoredIntervals))
1404 2 :
1405 2 : // Optimization to avoid considering different intervals that
1406 2 : // are likely to choose the same seed file. Again this is just
1407 2 : // to reduce wasted work.
1408 2 : consideredIntervals := newBitSet(len(s.orderedIntervals))
1409 2 : for _, scoredInterval := range scoredIntervals {
1410 2 : interval := &s.orderedIntervals[scoredInterval.interval]
1411 2 : if consideredIntervals[interval.index] {
1412 2 : continue
1413 : }
1414 :
1415 : // Pick the seed file for the interval as the file
1416 : // in the lowest sub-level.
1417 2 : f := interval.files[0]
1418 2 : // Don't bother considering the intervals that are covered by the seed
1419 2 : // file since they are likely nearby. Note that it is possible that
1420 2 : // those intervals have seed files at lower sub-levels so could be
1421 2 : // viable for compaction.
1422 2 : if f == nil {
1423 0 : return nil, errors.New("no seed file found in sublevel intervals")
1424 0 : }
1425 2 : consideredIntervals.markBits(f.minIntervalIndex, f.maxIntervalIndex+1)
1426 2 : if f.IsCompacting() {
1427 2 : if f.IsIntraL0Compacting {
1428 1 : // If we're picking a base compaction and we came across a seed
1429 1 : // file candidate that's being intra-L0 compacted, skip the
1430 1 : // interval instead of erroring out.
1431 1 : continue
1432 : }
1433 : // We chose a compaction seed file that should not be compacting.
1434 : // Usually means the score is not accurately accounting for files
1435 : // already compacting, or internal state is inconsistent.
1436 1 : return nil, errors.Errorf("file %s chosen as seed file for compaction should not be compacting", f.FileNum)
1437 : }
1438 :
1439 2 : c := s.baseCompactionUsingSeed(f, interval.index, minCompactionDepth)
1440 2 : if c != nil {
1441 2 : // Check if the chosen compaction overlaps with any files in Lbase
1442 2 : // that have Compacting = true. If that's the case, this compaction
1443 2 : // cannot be chosen.
1444 2 : baseIter := baseFiles.Iter()
1445 2 : // An interval starting at ImmediateSuccessor(key) can never be the
1446 2 : // first interval of a compaction since no file can start at that
1447 2 : // interval.
1448 2 : m := baseIter.SeekGE(s.cmp, s.orderedIntervals[c.minIntervalIndex].startKey.key)
1449 2 :
1450 2 : var baseCompacting bool
1451 2 : for ; m != nil && !baseCompacting; m = baseIter.Next() {
1452 2 : cmp := s.cmp(m.Smallest.UserKey, s.orderedIntervals[c.maxIntervalIndex+1].startKey.key)
1453 2 : // Compaction is ending at exclusive bound of c.maxIntervalIndex+1
1454 2 : if cmp > 0 || (cmp == 0 && !s.orderedIntervals[c.maxIntervalIndex+1].startKey.isInclusiveEndBound) {
1455 2 : break
1456 : }
1457 2 : baseCompacting = baseCompacting || m.IsCompacting()
1458 : }
1459 2 : if baseCompacting {
1460 2 : continue
1461 : }
1462 2 : return c, nil
1463 : }
1464 : }
1465 2 : return nil, nil
1466 : }
1467 :
1468 : // Helper function for building an L0 -> Lbase compaction using a seed interval
1469 : // and seed file in that seed interval.
1470 : func (s *L0Sublevels) baseCompactionUsingSeed(
1471 : f *FileMetadata, intervalIndex int, minCompactionDepth int,
1472 2 : ) *L0CompactionFiles {
1473 2 : c := &L0CompactionFiles{
1474 2 : FilesIncluded: newBitSet(s.levelMetadata.Len()),
1475 2 : seedInterval: intervalIndex,
1476 2 : seedIntervalMinLevel: 0,
1477 2 : minIntervalIndex: f.minIntervalIndex,
1478 2 : maxIntervalIndex: f.maxIntervalIndex,
1479 2 : }
1480 2 : c.addFile(f)
1481 2 :
1482 2 : // The first iteration of this loop builds the compaction at the seed file's
1483 2 : // sublevel. Future iterations expand on this compaction by stacking more
1484 2 : // files from intervalIndex and repeating. This is an optional activity so
1485 2 : // when it fails we can fallback to the last successful candidate.
1486 2 : var lastCandidate *L0CompactionFiles
1487 2 : interval := &s.orderedIntervals[intervalIndex]
1488 2 :
1489 2 : for i := 0; i < len(interval.files); i++ {
1490 2 : f2 := interval.files[i]
1491 2 : sl := f2.SubLevel
1492 2 : c.seedIntervalStackDepthReduction++
1493 2 : c.seedIntervalMaxLevel = sl
1494 2 : c.addFile(f2)
1495 2 : // The seed file is in the lowest sublevel in the seed interval, but it
1496 2 : // may overlap with other files in even lower sublevels. For correctness
1497 2 : // we need to grow our interval to include those files, and capture all
1498 2 : // files in the next level that fall in this extended interval and so
1499 2 : // on. This can result in a triangular shape like the following where
1500 2 : // again the X axis is the key intervals and the Y axis is oldest to
1501 2 : // youngest. Note that it is not necessary for correctness to fill out
1502 2 : // the shape at the higher sub-levels to make it more rectangular since
1503 2 : // the invariant only requires that younger versions of a key not be
1504 2 : // moved to Lbase while leaving behind older versions.
1505 2 : // -
1506 2 : // ---
1507 2 : // -----
1508 2 : // It may be better for performance to have a more rectangular shape
1509 2 : // since the files being left behind will overlap with the same Lbase
1510 2 : // key range as that of this compaction. But there is also the danger
1511 2 : // that in trying to construct a more rectangular shape we will be
1512 2 : // forced to pull in a file that is already compacting. We expect
1513 2 : // extendCandidateToRectangle to eventually be called on this compaction
1514 2 : // if it's chosen, at which point we would iterate backward and choose
1515 2 : // those files. This logic is similar to compaction.grow for non-L0
1516 2 : // compactions.
1517 2 : done := false
1518 2 : for currLevel := sl - 1; currLevel >= 0; currLevel-- {
1519 2 : if !s.extendFiles(currLevel, math.MaxUint64, c) {
1520 2 : // Failed to extend due to ongoing compaction.
1521 2 : done = true
1522 2 : break
1523 : }
1524 : }
1525 2 : if done {
1526 2 : break
1527 : }
1528 : // Observed some compactions using > 1GB from L0 in an import
1529 : // experiment. Very long running compactions are not great as they
1530 : // reduce concurrency while they run, and take a while to produce
1531 : // results, though they're sometimes unavoidable. There is a tradeoff
1532 : // here in that adding more depth is more efficient in reducing stack
1533 : // depth, but long running compactions reduce flexibility in what can
1534 : // run concurrently in L0 and even Lbase -> Lbase+1. An increase more
1535 : // than 150% in bytes since the last candidate compaction (along with a
1536 : // total compaction size in excess of 100mb), or a total compaction size
1537 : // beyond a hard limit of 500mb, is criteria for rejecting this
1538 : // candidate. This lets us prefer slow growths as we add files, while
1539 : // still having a hard limit. Note that if this is the first compaction
1540 : // candidate to reach a stack depth reduction of minCompactionDepth or
1541 : // higher, this candidate will be chosen regardless.
1542 2 : if lastCandidate == nil {
1543 2 : lastCandidate = &L0CompactionFiles{}
1544 2 : } else if lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth &&
1545 2 : c.fileBytes > 100<<20 &&
1546 2 : (float64(c.fileBytes)/float64(lastCandidate.fileBytes) > 1.5 || c.fileBytes > 500<<20) {
1547 1 : break
1548 : }
1549 2 : *lastCandidate = *c
1550 : }
1551 2 : if lastCandidate != nil && lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth {
1552 2 : lastCandidate.FilesIncluded.clearAllBits()
1553 2 : for _, f := range lastCandidate.Files {
1554 2 : lastCandidate.FilesIncluded.markBit(f.L0Index)
1555 2 : }
1556 2 : return lastCandidate
1557 : }
1558 2 : return nil
1559 : }
1560 :
1561 : // Expands fields in the provided L0CompactionFiles instance (cFiles) to
1562 : // include overlapping files in the specified sublevel. Returns true if the
1563 : // compaction is possible (i.e. does not conflict with any base/intra-L0
1564 : // compacting files).
1565 : func (s *L0Sublevels) extendFiles(
1566 : sl int, earliestUnflushedSeqNum base.SeqNum, cFiles *L0CompactionFiles,
1567 2 : ) bool {
1568 2 : index, _ := slices.BinarySearchFunc(s.levelFiles[sl], cFiles.minIntervalIndex, func(a *FileMetadata, b int) int {
1569 2 : return stdcmp.Compare(a.maxIntervalIndex, b)
1570 2 : })
1571 2 : for ; index < len(s.levelFiles[sl]); index++ {
1572 2 : f := s.levelFiles[sl][index]
1573 2 : if f.minIntervalIndex > cFiles.maxIntervalIndex {
1574 2 : break
1575 : }
1576 2 : if f.IsCompacting() {
1577 2 : return false
1578 2 : }
1579 : // Skip over files that are newer than earliestUnflushedSeqNum. This is
1580 : // okay because this compaction can just pretend these files are not in
1581 : // L0 yet. These files must be in higher sublevels than any overlapping
1582 : // files with f.LargestSeqNum < earliestUnflushedSeqNum, and the output
1583 : // of the compaction will also go in a lower (older) sublevel than this
1584 : // file by definition.
1585 2 : if f.LargestSeqNum >= earliestUnflushedSeqNum {
1586 2 : continue
1587 : }
1588 2 : cFiles.addFile(f)
1589 : }
1590 2 : return true
1591 : }
1592 :
1593 : // PickIntraL0Compaction picks an intra-L0 compaction for files in this
1594 : // sublevel. This method is only called when a base compaction cannot be chosen.
1595 : // See comment above [PickBaseCompaction] for heuristics involved in this
1596 : // selection.
1597 : func (s *L0Sublevels) PickIntraL0Compaction(
1598 : earliestUnflushedSeqNum base.SeqNum, minCompactionDepth int,
1599 2 : ) (*L0CompactionFiles, error) {
1600 2 : scoredIntervals := make([]intervalAndScore, len(s.orderedIntervals))
1601 2 : for i := range s.orderedIntervals {
1602 2 : interval := &s.orderedIntervals[i]
1603 2 : depth := len(interval.files) - interval.compactingFileCount
1604 2 : if minCompactionDepth > depth {
1605 2 : continue
1606 : }
1607 2 : scoredIntervals[i] = intervalAndScore{interval: i, score: depth}
1608 : }
1609 2 : sort.Sort(intervalSorterByDecreasingScore(scoredIntervals))
1610 2 :
1611 2 : // Optimization to avoid considering different intervals that are likely to
1612 2 : // choose the same seed file. Again this is just to reduce wasted work.
1613 2 : consideredIntervals := newBitSet(len(s.orderedIntervals))
1614 2 : for _, scoredInterval := range scoredIntervals {
1615 2 : interval := &s.orderedIntervals[scoredInterval.interval]
1616 2 : if consideredIntervals[interval.index] {
1617 2 : continue
1618 : }
1619 :
1620 2 : var f *FileMetadata
1621 2 : // Pick the seed file for the interval as the file in the highest
1622 2 : // sub-level.
1623 2 : stackDepthReduction := scoredInterval.score
1624 2 : for i := len(interval.files) - 1; i >= 0; i-- {
1625 2 : f = interval.files[i]
1626 2 : if f.IsCompacting() {
1627 2 : break
1628 : }
1629 2 : consideredIntervals.markBits(f.minIntervalIndex, f.maxIntervalIndex+1)
1630 2 : // Can this be the seed file? Files with newer sequence numbers than
1631 2 : // earliestUnflushedSeqNum cannot be in the compaction.
1632 2 : if f.LargestSeqNum >= earliestUnflushedSeqNum {
1633 2 : stackDepthReduction--
1634 2 : if stackDepthReduction == 0 {
1635 2 : break
1636 : }
1637 2 : } else {
1638 2 : break
1639 : }
1640 : }
1641 2 : if stackDepthReduction < minCompactionDepth {
1642 2 : // Can't use this interval.
1643 2 : continue
1644 : }
1645 :
1646 2 : if f == nil {
1647 0 : return nil, errors.New("no seed file found in sublevel intervals")
1648 0 : }
1649 2 : if f.IsCompacting() {
1650 1 : // This file could be in a concurrent intra-L0 or base compaction.
1651 1 : // Try another interval.
1652 1 : continue
1653 : }
1654 :
1655 : // We have a seed file. Build a compaction off of that seed.
1656 2 : c := s.intraL0CompactionUsingSeed(
1657 2 : f, interval.index, earliestUnflushedSeqNum, minCompactionDepth)
1658 2 : if c != nil {
1659 2 : return c, nil
1660 2 : }
1661 : }
1662 2 : return nil, nil
1663 : }
1664 :
1665 : func (s *L0Sublevels) intraL0CompactionUsingSeed(
1666 : f *FileMetadata, intervalIndex int, earliestUnflushedSeqNum base.SeqNum, minCompactionDepth int,
1667 2 : ) *L0CompactionFiles {
1668 2 : // We know that all the files that overlap with intervalIndex have
1669 2 : // LargestSeqNum < earliestUnflushedSeqNum, but for other intervals
1670 2 : // we need to exclude files >= earliestUnflushedSeqNum
1671 2 :
1672 2 : c := &L0CompactionFiles{
1673 2 : FilesIncluded: newBitSet(s.levelMetadata.Len()),
1674 2 : seedInterval: intervalIndex,
1675 2 : seedIntervalMaxLevel: len(s.levelFiles) - 1,
1676 2 : minIntervalIndex: f.minIntervalIndex,
1677 2 : maxIntervalIndex: f.maxIntervalIndex,
1678 2 : isIntraL0: true,
1679 2 : earliestUnflushedSeqNum: earliestUnflushedSeqNum,
1680 2 : }
1681 2 : c.addFile(f)
1682 2 :
1683 2 : var lastCandidate *L0CompactionFiles
1684 2 : interval := &s.orderedIntervals[intervalIndex]
1685 2 : slIndex := len(interval.files) - 1
1686 2 : for {
1687 2 : if interval.files[slIndex] == f {
1688 2 : break
1689 : }
1690 2 : slIndex--
1691 : }
1692 : // The first iteration of this loop produces an intra-L0 compaction at the
1693 : // seed level. Iterations after that optionally add to the compaction by
1694 : // stacking more files from intervalIndex and repeating. This is an optional
1695 : // activity so when it fails we can fallback to the last successful
1696 : // candidate. The code stops adding when it can't add more, or when
1697 : // fileBytes grows too large.
1698 2 : for ; slIndex >= 0; slIndex-- {
1699 2 : f2 := interval.files[slIndex]
1700 2 : sl := f2.SubLevel
1701 2 : if f2.IsCompacting() {
1702 2 : break
1703 : }
1704 2 : c.seedIntervalStackDepthReduction++
1705 2 : c.seedIntervalMinLevel = sl
1706 2 : c.addFile(f2)
1707 2 : // The seed file captures all files in the higher level that fall in the
1708 2 : // range of intervals. That may extend the range of intervals so for
1709 2 : // correctness we need to capture all files in the next higher level
1710 2 : // that fall in this extended interval and so on. This can result in an
1711 2 : // inverted triangular shape like the following where again the X axis
1712 2 : // is the key intervals and the Y axis is oldest to youngest. Note that
1713 2 : // it is not necessary for correctness to fill out the shape at lower
1714 2 : // sub-levels to make it more rectangular since the invariant only
1715 2 : // requires that if we move an older seqnum for key k into a file that
1716 2 : // has a higher seqnum, we also move all younger seqnums for that key k
1717 2 : // into that file.
1718 2 : // -----
1719 2 : // ---
1720 2 : // -
1721 2 : // It may be better for performance to have a more rectangular shape
1722 2 : // since it will reduce the stack depth for more intervals. But there is
1723 2 : // also the danger that in explicitly trying to construct a more
1724 2 : // rectangular shape we will be forced to pull in a file that is already
1725 2 : // compacting. We assume that the performance concern is not a practical
1726 2 : // issue.
1727 2 : done := false
1728 2 : for currLevel := sl + 1; currLevel < len(s.levelFiles); currLevel++ {
1729 2 : if !s.extendFiles(currLevel, earliestUnflushedSeqNum, c) {
1730 1 : // Failed to extend due to ongoing compaction.
1731 1 : done = true
1732 1 : break
1733 : }
1734 : }
1735 2 : if done {
1736 1 : break
1737 : }
1738 2 : if lastCandidate == nil {
1739 2 : lastCandidate = &L0CompactionFiles{}
1740 2 : } else if lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth &&
1741 2 : c.fileBytes > 100<<20 &&
1742 2 : (float64(c.fileBytes)/float64(lastCandidate.fileBytes) > 1.5 || c.fileBytes > 500<<20) {
1743 0 : break
1744 : }
1745 2 : *lastCandidate = *c
1746 : }
1747 2 : if lastCandidate != nil && lastCandidate.seedIntervalStackDepthReduction >= minCompactionDepth {
1748 2 : lastCandidate.FilesIncluded.clearAllBits()
1749 2 : for _, f := range lastCandidate.Files {
1750 2 : lastCandidate.FilesIncluded.markBit(f.L0Index)
1751 2 : }
1752 2 : s.extendCandidateToRectangle(
1753 2 : lastCandidate.minIntervalIndex, lastCandidate.maxIntervalIndex, lastCandidate, false)
1754 2 : return lastCandidate
1755 : }
1756 1 : return nil
1757 : }
1758 :
1759 : // ExtendL0ForBaseCompactionTo extends the specified base compaction candidate
1760 : // L0CompactionFiles to optionally cover more files in L0 without "touching" any
1761 : // of the passed-in keys (i.e. the smallest/largest bounds are exclusive), as
1762 : // including any user keys for those internal keys could require choosing more
1763 : // files in LBase which is undesirable. Unbounded start/end keys are indicated
1764 : // by passing in the InvalidInternalKey.
1765 : func (s *L0Sublevels) ExtendL0ForBaseCompactionTo(
1766 : smallest, largest InternalKey, candidate *L0CompactionFiles,
1767 2 : ) bool {
1768 2 : firstIntervalIndex := 0
1769 2 : lastIntervalIndex := len(s.orderedIntervals) - 1
1770 2 : if smallest.Kind() != base.InternalKeyKindInvalid {
1771 2 : if smallest.Trailer == base.InternalKeyRangeDeleteSentinel {
1772 2 : // Starting at smallest.UserKey == interval.startKey is okay.
1773 2 : firstIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1774 2 : return s.cmp(smallest.UserKey, s.orderedIntervals[i].startKey.key) <= 0
1775 2 : })
1776 2 : } else {
1777 2 : firstIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1778 2 : // Need to start at >= smallest since if we widen too much we may miss
1779 2 : // an Lbase file that overlaps with an L0 file that will get picked in
1780 2 : // this widening, which would be bad. This interval will not start with
1781 2 : // an immediate successor key.
1782 2 : return s.cmp(smallest.UserKey, s.orderedIntervals[i].startKey.key) < 0
1783 2 : })
1784 : }
1785 : }
1786 2 : if largest.Kind() != base.InternalKeyKindInvalid {
1787 2 : // First interval that starts at or beyond the largest. This interval will not
1788 2 : // start with an immediate successor key.
1789 2 : lastIntervalIndex = sort.Search(len(s.orderedIntervals), func(i int) bool {
1790 2 : return s.cmp(largest.UserKey, s.orderedIntervals[i].startKey.key) <= 0
1791 2 : })
1792 : // Right now, lastIntervalIndex has a startKey that extends beyond largest.
1793 : // The previous interval, by definition, has an end key higher than largest.
1794 : // Iterate back twice to get the last interval that's completely within
1795 : // (smallest, largest). Except in the case where we went past the end of the
1796 : // list; in that case, the last interval to include is the very last
1797 : // interval in the list.
1798 2 : if lastIntervalIndex < len(s.orderedIntervals) {
1799 2 : lastIntervalIndex--
1800 2 : }
1801 2 : lastIntervalIndex--
1802 : }
1803 2 : if lastIntervalIndex < firstIntervalIndex {
1804 1 : return false
1805 1 : }
1806 2 : return s.extendCandidateToRectangle(firstIntervalIndex, lastIntervalIndex, candidate, true)
1807 : }
1808 :
1809 : // Best-effort attempt to make the compaction include more files in the
1810 : // rectangle defined by [minIntervalIndex, maxIntervalIndex] on the X axis and
1811 : // bounded on the Y axis by seedIntervalMinLevel and seedIntervalMaxLevel.
1812 : //
1813 : // This is strictly an optional extension; at any point where we can't feasibly
1814 : // add more files, the sublevel iteration can be halted early and candidate will
1815 : // still be a correct compaction candidate.
1816 : //
1817 : // Consider this scenario (original candidate is inside the rectangle), with
1818 : // isBase = true and interval bounds a-j (from the union of base file bounds and
1819 : // that of compaction candidate):
1820 : //
1821 : // _______
1822 : // L0.3 a--d | g-j|
1823 : // L0.2 | f--j| r-t
1824 : // L0.1 b-d |e---j|
1825 : // L0.0 a--d | f--j| l--o p-----x
1826 : //
1827 : // Lbase a--------i m---------w
1828 : //
1829 : // This method will iterate from the bottom up. At L0.0, it will add a--d since
1830 : // it's in the bounds, then add b-d, then a--d, and so on, to produce this:
1831 : //
1832 : // _____________
1833 : // L0.3 |a--d g-j|
1834 : // L0.2 | f--j| r-t
1835 : // L0.1 | b-d e---j|
1836 : // L0.0 |a--d f--j| l--o p-----x
1837 : //
1838 : // Lbase a-------i m---------w
1839 : //
1840 : // Let's assume that, instead of a--d in the top sublevel, we had 3 files, a-b,
1841 : // bb-c, and cc-d, of which bb-c is compacting. Let's also add another sublevel
1842 : // L0.4 with some files, all of which aren't compacting:
1843 : //
1844 : // L0.4 a------c ca--d _______
1845 : // L0.3 a-b bb-c cc-d | g-j|
1846 : // L0.2 | f--j| r-t
1847 : // L0.1 b----------d |e---j|
1848 : // L0.0 a------------d | f--j| l--o p-----x
1849 : //
1850 : // Lbase a------------------i m---------w
1851 : //
1852 : // This method then needs to choose between the left side of L0.3 bb-c (i.e.
1853 : // a-b), or the right side (i.e. cc-d and g-j) for inclusion in this compaction.
1854 : // Since the right side has more files as well as one file that has already been
1855 : // picked, it gets chosen at that sublevel, resulting in this intermediate
1856 : // compaction:
1857 : //
1858 : // L0.4 a------c ca--d
1859 : // ______________
1860 : // L0.3 a-b bb-c| cc-d g-j|
1861 : // L0.2 _________| f--j| r-t
1862 : // L0.1 | b----------d e---j|
1863 : // L0.0 |a------------d f--j| l--o p-----x
1864 : //
1865 : // Lbase a------------------i m---------w
1866 : //
1867 : // Since bb-c had to be excluded at L0.3, the interval bounds for L0.4 are
1868 : // actually ca-j, since ca is the next interval start key after the end interval
1869 : // of bb-c. This would result in only ca-d being chosen at that sublevel, even
1870 : // though a--c is also not compacting. This is the final result:
1871 : //
1872 : // ______________
1873 : // L0.4 a------c|ca--d |
1874 : // L0.3 a-b bb-c| cc-d g-j|
1875 : // L0.2 _________| f--j| r-t
1876 : // L0.1 | b----------d e---j|
1877 : // L0.0 |a------------d f--j| l--o p-----x
1878 : //
1879 : // Lbase a------------------i m---------w
1880 : //
1881 : // TODO(bilal): Add more targeted tests for this method, through
1882 : // ExtendL0ForBaseCompactionTo and intraL0CompactionUsingSeed.
1883 : func (s *L0Sublevels) extendCandidateToRectangle(
1884 : minIntervalIndex int, maxIntervalIndex int, candidate *L0CompactionFiles, isBase bool,
1885 2 : ) bool {
1886 2 : candidate.preExtensionMinInterval = candidate.minIntervalIndex
1887 2 : candidate.preExtensionMaxInterval = candidate.maxIntervalIndex
1888 2 : // Extend {min,max}IntervalIndex to include all of the candidate's current
1889 2 : // bounds.
1890 2 : if minIntervalIndex > candidate.minIntervalIndex {
1891 2 : minIntervalIndex = candidate.minIntervalIndex
1892 2 : }
1893 2 : if maxIntervalIndex < candidate.maxIntervalIndex {
1894 2 : maxIntervalIndex = candidate.maxIntervalIndex
1895 2 : }
1896 2 : var startLevel, increment, endLevel int
1897 2 : if isBase {
1898 2 : startLevel = 0
1899 2 : increment = +1
1900 2 : // seedIntervalMaxLevel is inclusive, while endLevel is exclusive.
1901 2 : endLevel = candidate.seedIntervalMaxLevel + 1
1902 2 : } else {
1903 2 : startLevel = len(s.levelFiles) - 1
1904 2 : increment = -1
1905 2 : // seedIntervalMinLevel is inclusive, while endLevel is exclusive.
1906 2 : endLevel = candidate.seedIntervalMinLevel - 1
1907 2 : }
1908 : // Stats for files.
1909 2 : addedCount := 0
1910 2 : // Iterate from the oldest sub-level for L0 -> Lbase and youngest sub-level
1911 2 : // for intra-L0. The idea here is that anything that can't be included from
1912 2 : // that level constrains what can be included from the next level. This
1913 2 : // change in constraint is directly incorporated into minIntervalIndex,
1914 2 : // maxIntervalIndex.
1915 2 : for sl := startLevel; sl != endLevel; sl += increment {
1916 2 : files := s.levelFiles[sl]
1917 2 : // Find the first file that overlaps with minIntervalIndex.
1918 2 : index := sort.Search(len(files), func(i int) bool {
1919 2 : return minIntervalIndex <= files[i].maxIntervalIndex
1920 2 : })
1921 : // Track the files that are fully within the current constraint of
1922 : // [minIntervalIndex, maxIntervalIndex].
1923 2 : firstIndex := -1
1924 2 : lastIndex := -1
1925 2 : for ; index < len(files); index++ {
1926 2 : f := files[index]
1927 2 : if f.minIntervalIndex > maxIntervalIndex {
1928 2 : break
1929 : }
1930 2 : include := true
1931 2 : // Extends out on the left so can't be included. This narrows what
1932 2 : // we can included in the next level.
1933 2 : if f.minIntervalIndex < minIntervalIndex {
1934 2 : include = false
1935 2 : minIntervalIndex = f.maxIntervalIndex + 1
1936 2 : }
1937 : // Extends out on the right so can't be included.
1938 2 : if f.maxIntervalIndex > maxIntervalIndex {
1939 2 : include = false
1940 2 : maxIntervalIndex = f.minIntervalIndex - 1
1941 2 : }
1942 2 : if !include {
1943 2 : continue
1944 : }
1945 2 : if firstIndex == -1 {
1946 2 : firstIndex = index
1947 2 : }
1948 2 : lastIndex = index
1949 : }
1950 2 : if minIntervalIndex > maxIntervalIndex {
1951 1 : // We excluded files that prevent continuation.
1952 1 : break
1953 : }
1954 2 : if firstIndex < 0 {
1955 2 : // No files to add in this sub-level.
1956 2 : continue
1957 : }
1958 : // We have the files in [firstIndex, lastIndex] as potential for
1959 : // inclusion. Some of these may already have been picked. Some of them
1960 : // may be already compacting. The latter is tricky since we have to
1961 : // decide whether to contract minIntervalIndex or maxIntervalIndex when
1962 : // we encounter an already compacting file. We pick the longest sequence
1963 : // between firstIndex and lastIndex of non-compacting files -- this is
1964 : // represented by [candidateNonCompactingFirst,
1965 : // candidateNonCompactingLast].
1966 2 : nonCompactingFirst := -1
1967 2 : currentRunHasAlreadyPickedFiles := false
1968 2 : candidateNonCompactingFirst := -1
1969 2 : candidateNonCompactingLast := -1
1970 2 : candidateHasAlreadyPickedFiles := false
1971 2 : for index = firstIndex; index <= lastIndex; index++ {
1972 2 : f := files[index]
1973 2 : if f.IsCompacting() {
1974 2 : if nonCompactingFirst != -1 {
1975 2 : last := index - 1
1976 2 : // Prioritize runs of consecutive non-compacting files that
1977 2 : // have files that have already been picked. That is to say,
1978 2 : // if candidateHasAlreadyPickedFiles == true, we stick with
1979 2 : // it, and if currentRunHasAlreadyPickedfiles == true, we
1980 2 : // pick that run even if it contains fewer files than the
1981 2 : // previous candidate.
1982 2 : if !candidateHasAlreadyPickedFiles && (candidateNonCompactingFirst == -1 ||
1983 2 : currentRunHasAlreadyPickedFiles ||
1984 2 : (last-nonCompactingFirst) > (candidateNonCompactingLast-candidateNonCompactingFirst)) {
1985 2 : candidateNonCompactingFirst = nonCompactingFirst
1986 2 : candidateNonCompactingLast = last
1987 2 : candidateHasAlreadyPickedFiles = currentRunHasAlreadyPickedFiles
1988 2 : }
1989 : }
1990 2 : nonCompactingFirst = -1
1991 2 : currentRunHasAlreadyPickedFiles = false
1992 2 : continue
1993 : }
1994 2 : if nonCompactingFirst == -1 {
1995 2 : nonCompactingFirst = index
1996 2 : }
1997 2 : if candidate.FilesIncluded[f.L0Index] {
1998 2 : currentRunHasAlreadyPickedFiles = true
1999 2 : }
2000 : }
2001 : // Logic duplicated from inside the for loop above.
2002 2 : if nonCompactingFirst != -1 {
2003 2 : last := index - 1
2004 2 : if !candidateHasAlreadyPickedFiles && (candidateNonCompactingFirst == -1 ||
2005 2 : currentRunHasAlreadyPickedFiles ||
2006 2 : (last-nonCompactingFirst) > (candidateNonCompactingLast-candidateNonCompactingFirst)) {
2007 2 : candidateNonCompactingFirst = nonCompactingFirst
2008 2 : candidateNonCompactingLast = last
2009 2 : }
2010 : }
2011 2 : if candidateNonCompactingFirst == -1 {
2012 0 : // All files are compacting. There will be gaps that we could
2013 0 : // exploit to continue, but don't bother.
2014 0 : break
2015 : }
2016 : // May need to shrink [minIntervalIndex, maxIntervalIndex] for the next level.
2017 2 : if candidateNonCompactingFirst > firstIndex {
2018 2 : minIntervalIndex = files[candidateNonCompactingFirst-1].maxIntervalIndex + 1
2019 2 : }
2020 2 : if candidateNonCompactingLast < lastIndex {
2021 2 : maxIntervalIndex = files[candidateNonCompactingLast+1].minIntervalIndex - 1
2022 2 : }
2023 2 : for index := candidateNonCompactingFirst; index <= candidateNonCompactingLast; index++ {
2024 2 : f := files[index]
2025 2 : if f.IsCompacting() {
2026 0 : // TODO(bilal): Do a logger.Fatalf instead of a panic, for
2027 0 : // cleaner unwinding and error messages.
2028 0 : panic(fmt.Sprintf("expected %s to not be compacting", f.FileNum))
2029 : }
2030 2 : if candidate.isIntraL0 && f.LargestSeqNum >= candidate.earliestUnflushedSeqNum {
2031 2 : continue
2032 : }
2033 2 : if !candidate.FilesIncluded[f.L0Index] {
2034 2 : addedCount++
2035 2 : candidate.addFile(f)
2036 2 : }
2037 : }
2038 : }
2039 2 : return addedCount > 0
2040 : }
|