Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "math"
11 : "sort"
12 : "strings"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/humanize"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : )
19 :
20 : // The minimum count for an intra-L0 compaction. This matches the RocksDB
21 : // heuristic.
22 : const minIntraL0Count = 4
23 :
24 : type compactionEnv struct {
25 : // diskAvailBytes holds a statistic on the number of bytes available on
26 : // disk, as reported by the filesystem. It's used to be more restrictive in
27 : // expanding compactions if available disk space is limited.
28 : //
29 : // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
30 : // and whenever a compaction or flush completes. Since file removal is the
31 : // primary means of reclaiming space, there is a rough bound on the
32 : // statistic's staleness when available bytes is growing. Compactions and
33 : // flushes are longer, slower operations and provide a much looser bound
34 : // when available bytes is decreasing.
35 : diskAvailBytes uint64
36 : earliestUnflushedSeqNum uint64
37 : earliestSnapshotSeqNum uint64
38 : inProgressCompactions []compactionInfo
39 : readCompactionEnv readCompactionEnv
40 : }
41 :
42 : type compactionPicker interface {
43 : getScores([]compactionInfo) [numLevels]float64
44 : getBaseLevel() int
45 : estimatedCompactionDebt(l0ExtraSize uint64) uint64
46 : pickAuto(env compactionEnv) (pc *pickedCompaction)
47 : pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
48 : pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
49 : pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
50 : forceBaseLevel1()
51 : }
52 :
53 : // readCompactionEnv is used to hold data required to perform read compactions
54 : type readCompactionEnv struct {
55 : rescheduleReadCompaction *bool
56 : readCompactions *readCompactionQueue
57 : flushing bool
58 : }
59 :
60 : // Information about in-progress compactions provided to the compaction picker.
61 : // These are used to constrain the new compactions that will be picked.
62 : type compactionInfo struct {
63 : // versionEditApplied is true if this compaction's version edit has already
64 : // been committed. The compaction may still be in-progress deleting newly
65 : // obsolete files.
66 : versionEditApplied bool
67 : inputs []compactionLevel
68 : outputLevel int
69 : smallest InternalKey
70 : largest InternalKey
71 : }
72 :
73 1 : func (info compactionInfo) String() string {
74 1 : var buf bytes.Buffer
75 1 : var largest int
76 1 : for i, in := range info.inputs {
77 1 : if i > 0 {
78 1 : fmt.Fprintf(&buf, " -> ")
79 1 : }
80 1 : fmt.Fprintf(&buf, "L%d", in.level)
81 1 : in.files.Each(func(m *fileMetadata) {
82 1 : fmt.Fprintf(&buf, " %s", m.FileNum)
83 1 : })
84 1 : if largest < in.level {
85 1 : largest = in.level
86 1 : }
87 : }
88 1 : if largest != info.outputLevel || len(info.inputs) == 1 {
89 1 : fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
90 1 : }
91 1 : return buf.String()
92 : }
93 :
94 : type sortCompactionLevelsByPriority []candidateLevelInfo
95 :
96 2 : func (s sortCompactionLevelsByPriority) Len() int {
97 2 : return len(s)
98 2 : }
99 :
100 : // A level should be picked for compaction if the compensatedScoreRatio is >= the
101 : // compactionScoreThreshold.
102 : const compactionScoreThreshold = 1
103 :
104 : // Less should return true if s[i] must be placed earlier than s[j] in the final
105 : // sorted list. The candidateLevelInfo for the level placed earlier is more likely
106 : // to be picked for a compaction.
107 2 : func (s sortCompactionLevelsByPriority) Less(i, j int) bool {
108 2 : iShouldCompact := s[i].compensatedScoreRatio >= compactionScoreThreshold
109 2 : jShouldCompact := s[j].compensatedScoreRatio >= compactionScoreThreshold
110 2 : // Ordering is defined as decreasing on (shouldCompact, uncompensatedScoreRatio)
111 2 : // where shouldCompact is 1 for true and 0 for false.
112 2 : if iShouldCompact && !jShouldCompact {
113 2 : return true
114 2 : }
115 2 : if !iShouldCompact && jShouldCompact {
116 2 : return false
117 2 : }
118 :
119 2 : if s[i].uncompensatedScoreRatio != s[j].uncompensatedScoreRatio {
120 2 : return s[i].uncompensatedScoreRatio > s[j].uncompensatedScoreRatio
121 2 : }
122 2 : return s[i].level < s[j].level
123 : }
124 :
125 2 : func (s sortCompactionLevelsByPriority) Swap(i, j int) {
126 2 : s[i], s[j] = s[j], s[i]
127 2 : }
128 :
129 : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
130 : // sublevel.
131 : type sublevelInfo struct {
132 : manifest.LevelSlice
133 : sublevel manifest.Level
134 : }
135 :
136 2 : func (cl sublevelInfo) Clone() sublevelInfo {
137 2 : return sublevelInfo{
138 2 : sublevel: cl.sublevel,
139 2 : LevelSlice: cl.LevelSlice.Reslice(func(start, end *manifest.LevelIterator) {}),
140 : }
141 : }
142 1 : func (cl sublevelInfo) String() string {
143 1 : return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
144 1 : }
145 :
146 : // generateSublevelInfo will generate the level slices for each of the sublevels
147 : // from the level slice for all of L0.
148 2 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
149 2 : sublevelMap := make(map[uint64][]*fileMetadata)
150 2 : it := levelFiles.Iter()
151 2 : for f := it.First(); f != nil; f = it.Next() {
152 2 : sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
153 2 : }
154 :
155 2 : var sublevels []int
156 2 : for level := range sublevelMap {
157 2 : sublevels = append(sublevels, int(level))
158 2 : }
159 2 : sort.Ints(sublevels)
160 2 :
161 2 : var levelSlices []sublevelInfo
162 2 : for _, sublevel := range sublevels {
163 2 : metas := sublevelMap[uint64(sublevel)]
164 2 : levelSlices = append(
165 2 : levelSlices,
166 2 : sublevelInfo{
167 2 : manifest.NewLevelSliceKeySorted(cmp, metas),
168 2 : manifest.L0Sublevel(sublevel),
169 2 : },
170 2 : )
171 2 : }
172 2 : return levelSlices
173 : }
174 :
175 : // compactionPickerMetrics holds metrics related to the compaction picking process
176 : type compactionPickerMetrics struct {
177 : // scores contains the compensatedScoreRatio from the candidateLevelInfo.
178 : scores []float64
179 : singleLevelOverlappingRatio float64
180 : multiLevelOverlappingRatio float64
181 : }
182 :
183 : // pickedCompaction contains information about a compaction that has already
184 : // been chosen, and is being constructed. Compaction construction info lives in
185 : // this struct, and is copied over into the compaction struct when that's
186 : // created.
187 : type pickedCompaction struct {
188 : cmp Compare
189 : // score of the chosen compaction. This is the same as the
190 : // compensatedScoreRatio in the candidateLevelInfo.
191 : score float64
192 : // kind indicates the kind of compaction.
193 : kind compactionKind
194 : // startLevel is the level that is being compacted. Inputs from startLevel
195 : // and outputLevel will be merged to produce a set of outputLevel files.
196 : startLevel *compactionLevel
197 : // outputLevel is the level that files are being produced in. outputLevel is
198 : // equal to startLevel+1 except when:
199 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
200 : // - in multilevel compaction, the output level is the lowest level involved in
201 : // the compaction
202 : outputLevel *compactionLevel
203 : // extraLevels contain additional levels in between the input and output
204 : // levels that get compacted in multi level compactions
205 : extraLevels []*compactionLevel
206 : inputs []compactionLevel
207 : // LBase at the time of compaction picking.
208 : baseLevel int
209 : // L0-specific compaction info. Set to a non-nil value for all compactions
210 : // where startLevel == 0 that were generated by L0Sublevels.
211 : lcf *manifest.L0CompactionFiles
212 : // maxOutputFileSize is the maximum size of an individual table created
213 : // during compaction.
214 : maxOutputFileSize uint64
215 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
216 : // single output table with the tables in the grandparent level.
217 : maxOverlapBytes uint64
218 : // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
219 : // overlap in its output level with. If the overlap is greater than
220 : // maxReadCompaction bytes, then we don't proceed with the compaction.
221 : maxReadCompactionBytes uint64
222 : // The boundaries of the input data.
223 : smallest InternalKey
224 : largest InternalKey
225 : version *version
226 : pickerMetrics compactionPickerMetrics
227 : }
228 :
229 2 : func (pc *pickedCompaction) userKeyBounds() base.UserKeyBounds {
230 2 : return base.UserKeyBoundsFromInternal(pc.smallest, pc.largest)
231 2 : }
232 :
233 2 : func defaultOutputLevel(startLevel, baseLevel int) int {
234 2 : outputLevel := startLevel + 1
235 2 : if startLevel == 0 {
236 2 : outputLevel = baseLevel
237 2 : }
238 2 : if outputLevel >= numLevels-1 {
239 2 : outputLevel = numLevels - 1
240 2 : }
241 2 : return outputLevel
242 : }
243 :
244 : func newPickedCompaction(
245 : opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
246 2 : ) *pickedCompaction {
247 2 : if startLevel > 0 && startLevel < baseLevel {
248 1 : panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
249 1 : startLevel, baseLevel))
250 : }
251 :
252 2 : adjustedLevel := adjustedOutputLevel(outputLevel, baseLevel)
253 2 : pc := &pickedCompaction{
254 2 : cmp: opts.Comparer.Compare,
255 2 : version: cur,
256 2 : baseLevel: baseLevel,
257 2 : inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
258 2 : maxOutputFileSize: uint64(opts.Level(adjustedLevel).TargetFileSize),
259 2 : maxOverlapBytes: maxGrandparentOverlapBytes(opts, adjustedLevel),
260 2 : maxReadCompactionBytes: maxReadCompactionBytes(opts, adjustedLevel),
261 2 : }
262 2 : pc.startLevel = &pc.inputs[0]
263 2 : pc.outputLevel = &pc.inputs[1]
264 2 : return pc
265 : }
266 :
267 : // adjustedOutputLevel is the output level used for the purpose of
268 : // determining the target output file size, overlap bytes, and expanded
269 : // bytes, taking into account the base level.
270 2 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
271 2 : adjustedOutputLevel := outputLevel
272 2 : if adjustedOutputLevel > 0 {
273 2 : // Output level is in the range [baseLevel, numLevels]. For the purpose of
274 2 : // determining the target output file size, overlap bytes, and expanded
275 2 : // bytes, we want to adjust the range to [1,numLevels].
276 2 : adjustedOutputLevel = 1 + outputLevel - baseLevel
277 2 : }
278 2 : return adjustedOutputLevel
279 : }
280 :
281 : func newPickedCompactionFromL0(
282 : lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
283 2 : ) *pickedCompaction {
284 2 : outputLevel := baseLevel
285 2 : if !isBase {
286 2 : outputLevel = 0 // Intra L0
287 2 : }
288 :
289 2 : pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
290 2 : pc.lcf = lcf
291 2 : pc.outputLevel.level = outputLevel
292 2 :
293 2 : // Manually build the compaction as opposed to calling
294 2 : // pickAutoHelper. This is because L0Sublevels has already added
295 2 : // any overlapping L0 SSTables that need to be added, and
296 2 : // because compactions built by L0SSTables do not necessarily
297 2 : // pick contiguous sequences of files in pc.version.Levels[0].
298 2 : files := make([]*manifest.FileMetadata, 0, len(lcf.Files))
299 2 : iter := vers.Levels[0].Iter()
300 2 : for f := iter.First(); f != nil; f = iter.Next() {
301 2 : if lcf.FilesIncluded[f.L0Index] {
302 2 : files = append(files, f)
303 2 : }
304 : }
305 2 : pc.startLevel.files = manifest.NewLevelSliceSeqSorted(files)
306 2 : return pc
307 : }
308 :
309 1 : func (pc *pickedCompaction) String() string {
310 1 : var builder strings.Builder
311 1 : builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
312 1 : builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
313 1 : builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
314 1 : builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
315 1 : builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
316 1 : builder.WriteString(fmt.Sprintf(`smallest=%s, `, pc.smallest))
317 1 : builder.WriteString(fmt.Sprintf(`largest=%s, `, pc.largest))
318 1 : builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
319 1 : builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
320 1 : builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
321 1 : builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
322 1 : builder.WriteString(fmt.Sprintf(`extraLevels=%s, `, pc.extraLevels))
323 1 : builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
324 1 : builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
325 1 : return builder.String()
326 1 : }
327 :
328 : // Clone creates a deep copy of the pickedCompaction
329 2 : func (pc *pickedCompaction) clone() *pickedCompaction {
330 2 :
331 2 : // Quickly copy over fields that do not require special deep copy care, and
332 2 : // set all fields that will require a deep copy to nil.
333 2 : newPC := &pickedCompaction{
334 2 : cmp: pc.cmp,
335 2 : score: pc.score,
336 2 : kind: pc.kind,
337 2 : baseLevel: pc.baseLevel,
338 2 : maxOutputFileSize: pc.maxOutputFileSize,
339 2 : maxOverlapBytes: pc.maxOverlapBytes,
340 2 : maxReadCompactionBytes: pc.maxReadCompactionBytes,
341 2 : smallest: pc.smallest.Clone(),
342 2 : largest: pc.largest.Clone(),
343 2 :
344 2 : // TODO(msbutler): properly clone picker metrics
345 2 : pickerMetrics: pc.pickerMetrics,
346 2 :
347 2 : // Both copies see the same manifest, therefore, it's ok for them to se
348 2 : // share the same pc. version.
349 2 : version: pc.version,
350 2 : }
351 2 :
352 2 : newPC.inputs = make([]compactionLevel, len(pc.inputs))
353 2 : newPC.extraLevels = make([]*compactionLevel, 0, len(pc.extraLevels))
354 2 : for i := range pc.inputs {
355 2 : newPC.inputs[i] = pc.inputs[i].Clone()
356 2 : if i == 0 {
357 2 : newPC.startLevel = &newPC.inputs[i]
358 2 : } else if i == len(pc.inputs)-1 {
359 2 : newPC.outputLevel = &newPC.inputs[i]
360 2 : } else {
361 1 : newPC.extraLevels = append(newPC.extraLevels, &newPC.inputs[i])
362 1 : }
363 : }
364 :
365 2 : if len(pc.startLevel.l0SublevelInfo) > 0 {
366 2 : newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
367 2 : for i := range pc.startLevel.l0SublevelInfo {
368 2 : newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
369 2 : }
370 : }
371 2 : if pc.lcf != nil {
372 2 : newPC.lcf = pc.lcf.Clone()
373 2 : }
374 2 : return newPC
375 : }
376 :
377 : // maybeExpandedBounds is a helper function for setupInputs which ensures the
378 : // pickedCompaction's smallest and largest internal keys are updated iff
379 : // the candidate keys expand the key span. This avoids a bug for multi-level
380 : // compactions: during the second call to setupInputs, the picked compaction's
381 : // smallest and largest keys should not decrease the key span.
382 2 : func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest InternalKey) {
383 2 : emptyKey := InternalKey{}
384 2 : if base.InternalCompare(pc.cmp, smallest, emptyKey) == 0 {
385 2 : if base.InternalCompare(pc.cmp, largest, emptyKey) != 0 {
386 0 : panic("either both candidate keys are empty or neither are empty")
387 : }
388 2 : return
389 : }
390 2 : if base.InternalCompare(pc.cmp, pc.smallest, emptyKey) == 0 {
391 2 : if base.InternalCompare(pc.cmp, pc.largest, emptyKey) != 0 {
392 0 : panic("either both pc keys are empty or neither are empty")
393 : }
394 2 : pc.smallest = smallest
395 2 : pc.largest = largest
396 2 : return
397 : }
398 2 : if base.InternalCompare(pc.cmp, pc.smallest, smallest) >= 0 {
399 2 : pc.smallest = smallest
400 2 : }
401 2 : if base.InternalCompare(pc.cmp, pc.largest, largest) <= 0 {
402 2 : pc.largest = largest
403 2 : }
404 : }
405 :
406 : // setupInputs returns true if a compaction has been set up. It returns false if
407 : // a concurrent compaction is occurring on the start or output level files.
408 : func (pc *pickedCompaction) setupInputs(
409 : opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
410 2 : ) bool {
411 2 : // maxExpandedBytes is the maximum size of an expanded compaction. If
412 2 : // growing a compaction results in a larger size, the original compaction
413 2 : // is used instead.
414 2 : maxExpandedBytes := expandedCompactionByteSizeLimit(
415 2 : opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
416 2 : )
417 2 :
418 2 : if anyTablesCompacting(startLevel.files) {
419 2 : return false
420 2 : }
421 :
422 2 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.Iter()))
423 2 :
424 2 : // Determine the sstables in the output level which overlap with the input
425 2 : // sstables. No need to do this for intra-L0 compactions; outputLevel.files is
426 2 : // left empty for those.
427 2 : if startLevel.level != pc.outputLevel.level {
428 2 : pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
429 2 : if anyTablesCompacting(pc.outputLevel.files) {
430 2 : return false
431 2 : }
432 :
433 2 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
434 2 : startLevel.files.Iter(), pc.outputLevel.files.Iter()))
435 : }
436 :
437 : // Grow the sstables in startLevel.level as long as it doesn't affect the number
438 : // of sstables included from pc.outputLevel.level.
439 2 : if pc.lcf != nil && startLevel.level == 0 && pc.outputLevel.level != 0 {
440 2 : // Call the L0-specific compaction extension method. Similar logic as
441 2 : // pc.grow. Additional L0 files are optionally added to the compaction at
442 2 : // this step. Note that the bounds passed in are not the bounds of the
443 2 : // compaction, but rather the smallest and largest internal keys that
444 2 : // the compaction cannot include from L0 without pulling in more Lbase
445 2 : // files. Consider this example:
446 2 : //
447 2 : // L0: c-d e+f g-h
448 2 : // Lbase: a-b e+f i-j
449 2 : // a b c d e f g h i j
450 2 : //
451 2 : // The e-f files have already been chosen in the compaction. As pulling
452 2 : // in more LBase files is undesirable, the logic below will pass in
453 2 : // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
454 2 : // will expand the compaction to include c-d and g-h from L0. The
455 2 : // bounds passed in are exclusive; the compaction cannot be expanded
456 2 : // to include files that "touch" it.
457 2 : smallestBaseKey := base.InvalidInternalKey
458 2 : largestBaseKey := base.InvalidInternalKey
459 2 : if pc.outputLevel.files.Empty() {
460 2 : baseIter := pc.version.Levels[pc.outputLevel.level].Iter()
461 2 : if sm := baseIter.SeekLT(pc.cmp, pc.smallest.UserKey); sm != nil {
462 2 : smallestBaseKey = sm.Largest
463 2 : }
464 2 : if la := baseIter.SeekGE(pc.cmp, pc.largest.UserKey); la != nil {
465 2 : largestBaseKey = la.Smallest
466 2 : }
467 2 : } else {
468 2 : // NB: We use Reslice to access the underlying level's files, but
469 2 : // we discard the returned slice. The pc.outputLevel.files slice
470 2 : // is not modified.
471 2 : _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
472 2 : if sm := start.Prev(); sm != nil {
473 2 : smallestBaseKey = sm.Largest
474 2 : }
475 2 : if la := end.Next(); la != nil {
476 2 : largestBaseKey = la.Smallest
477 2 : }
478 : })
479 : }
480 2 : oldLcf := pc.lcf.Clone()
481 2 : if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
482 2 : var newStartLevelFiles []*fileMetadata
483 2 : iter := pc.version.Levels[0].Iter()
484 2 : var sizeSum uint64
485 2 : for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
486 2 : if pc.lcf.FilesIncluded[f.L0Index] {
487 2 : newStartLevelFiles = append(newStartLevelFiles, f)
488 2 : sizeSum += f.Size
489 2 : }
490 : }
491 2 : if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
492 2 : startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
493 2 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
494 2 : startLevel.files.Iter(), pc.outputLevel.files.Iter())
495 2 : } else {
496 1 : *pc.lcf = *oldLcf
497 1 : }
498 : }
499 2 : } else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
500 2 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
501 2 : startLevel.files.Iter(), pc.outputLevel.files.Iter()))
502 2 : }
503 :
504 2 : if pc.startLevel.level == 0 {
505 2 : // We don't change the input files for the compaction beyond this point.
506 2 : pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
507 2 : }
508 :
509 2 : return true
510 : }
511 :
512 : // grow grows the number of inputs at c.level without changing the number of
513 : // c.level+1 files in the compaction, and returns whether the inputs grew. sm
514 : // and la are the smallest and largest InternalKeys in all of the inputs.
515 : func (pc *pickedCompaction) grow(
516 : sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
517 2 : ) bool {
518 2 : if pc.outputLevel.files.Empty() {
519 2 : return false
520 2 : }
521 2 : grow0 := pc.version.Overlaps(startLevel.level, base.UserKeyBoundsFromInternal(sm, la))
522 2 : if anyTablesCompacting(grow0) {
523 1 : return false
524 1 : }
525 2 : if grow0.Len() <= startLevel.files.Len() {
526 2 : return false
527 2 : }
528 2 : if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
529 2 : return false
530 2 : }
531 : // We need to include the outputLevel iter because without it, in a multiLevel scenario,
532 : // sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
533 2 : sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter(), pc.outputLevel.files.Iter())
534 2 : grow1 := pc.version.Overlaps(pc.outputLevel.level, base.UserKeyBoundsFromInternal(sm1, la1))
535 2 : if anyTablesCompacting(grow1) {
536 1 : return false
537 1 : }
538 2 : if grow1.Len() != pc.outputLevel.files.Len() {
539 2 : return false
540 2 : }
541 2 : startLevel.files = grow0
542 2 : pc.outputLevel.files = grow1
543 2 : return true
544 : }
545 :
546 2 : func (pc *pickedCompaction) compactionSize() uint64 {
547 2 : var bytesToCompact uint64
548 2 : for i := range pc.inputs {
549 2 : bytesToCompact += pc.inputs[i].files.SizeSum()
550 2 : }
551 2 : return bytesToCompact
552 : }
553 :
554 : // setupMultiLevelCandidated returns true if it successfully added another level
555 : // to the compaction.
556 2 : func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailBytes uint64) bool {
557 2 : pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
558 2 :
559 2 : // Recalibrate startLevel and outputLevel:
560 2 : // - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
561 2 : // - push outputLevel to extraLevels and move the new level to outputLevel
562 2 : pc.startLevel = &pc.inputs[0]
563 2 : pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
564 2 : pc.outputLevel = &pc.inputs[2]
565 2 : return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
566 2 : }
567 :
568 : // anyTablesCompacting returns true if any tables in the level slice are
569 : // compacting.
570 2 : func anyTablesCompacting(inputs manifest.LevelSlice) bool {
571 2 : it := inputs.Iter()
572 2 : for f := it.First(); f != nil; f = it.Next() {
573 2 : if f.IsCompacting() {
574 2 : return true
575 2 : }
576 : }
577 2 : return false
578 : }
579 :
580 : // newCompactionPickerByScore creates a compactionPickerByScore associated with
581 : // the newest version. The picker is used under logLock (until a new version is
582 : // installed).
583 : func newCompactionPickerByScore(
584 : v *version,
585 : virtualBackings *manifest.VirtualBackings,
586 : opts *Options,
587 : inProgressCompactions []compactionInfo,
588 2 : ) *compactionPickerByScore {
589 2 : p := &compactionPickerByScore{
590 2 : opts: opts,
591 2 : vers: v,
592 2 : virtualBackings: virtualBackings,
593 2 : }
594 2 : p.initLevelMaxBytes(inProgressCompactions)
595 2 : return p
596 2 : }
597 :
598 : // Information about a candidate compaction level that has been identified by
599 : // the compaction picker.
600 : type candidateLevelInfo struct {
601 : // The compensatedScore of the level after adjusting according to the other
602 : // levels' sizes. For L0, the compensatedScoreRatio is equivalent to the
603 : // uncompensatedScoreRatio as we don't account for level size compensation in
604 : // L0.
605 : compensatedScoreRatio float64
606 : // The score of the level after accounting for level size compensation before
607 : // adjusting according to other levels' sizes. For L0, the compensatedScore
608 : // is equivalent to the uncompensatedScore as we don't account for level
609 : // size compensation in L0.
610 : compensatedScore float64
611 : // The score of the level to be compacted, calculated using uncompensated file
612 : // sizes and without any adjustments.
613 : uncompensatedScore float64
614 : // uncompensatedScoreRatio is the uncompensatedScore adjusted according to
615 : // the other levels' sizes.
616 : uncompensatedScoreRatio float64
617 : level int
618 : // The level to compact to.
619 : outputLevel int
620 : // The file in level that will be compacted. Additional files may be
621 : // picked by the compaction, and a pickedCompaction created for the
622 : // compaction.
623 : file manifest.LevelFile
624 : }
625 :
626 2 : func (c *candidateLevelInfo) shouldCompact() bool {
627 2 : return c.compensatedScoreRatio >= compactionScoreThreshold
628 2 : }
629 :
630 2 : func fileCompensation(f *fileMetadata) uint64 {
631 2 : return uint64(f.Stats.PointDeletionsBytesEstimate) + f.Stats.RangeDeletionsBytesEstimate
632 2 : }
633 :
634 : // compensatedSize returns f's file size, inflated according to compaction
635 : // priorities.
636 2 : func compensatedSize(f *fileMetadata) uint64 {
637 2 : // Add in the estimate of disk space that may be reclaimed by compacting the
638 2 : // file's tombstones.
639 2 : return f.Size + fileCompensation(f)
640 2 : }
641 :
642 : // compensatedSizeAnnotator implements manifest.Annotator, annotating B-Tree
643 : // nodes with the sum of the files' compensated sizes. Its annotation type is
644 : // a *uint64. Compensated sizes may change once a table's stats are loaded
645 : // asynchronously, so its values are marked as cacheable only if a file's
646 : // stats have been loaded.
647 : type compensatedSizeAnnotator struct {
648 : }
649 :
650 : var _ manifest.Annotator = compensatedSizeAnnotator{}
651 :
652 2 : func (a compensatedSizeAnnotator) Zero(dst interface{}) interface{} {
653 2 : if dst == nil {
654 2 : return new(uint64)
655 2 : }
656 2 : v := dst.(*uint64)
657 2 : *v = 0
658 2 : return v
659 : }
660 :
661 : func (a compensatedSizeAnnotator) Accumulate(
662 : f *fileMetadata, dst interface{},
663 2 : ) (v interface{}, cacheOK bool) {
664 2 : vptr := dst.(*uint64)
665 2 : *vptr = *vptr + compensatedSize(f)
666 2 : return vptr, f.StatsValid()
667 2 : }
668 :
669 2 : func (a compensatedSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
670 2 : srcV := src.(*uint64)
671 2 : dstV := dst.(*uint64)
672 2 : *dstV = *dstV + *srcV
673 2 : return dstV
674 2 : }
675 :
676 : // totalCompensatedSize computes the compensated size over a file metadata
677 : // iterator. Note that this function is linear in the files available to the
678 : // iterator. Use the compensatedSizeAnnotator if querying the total
679 : // compensated size of a level.
680 2 : func totalCompensatedSize(iter manifest.LevelIterator) uint64 {
681 2 : var sz uint64
682 2 : for f := iter.First(); f != nil; f = iter.Next() {
683 2 : sz += compensatedSize(f)
684 2 : }
685 2 : return sz
686 : }
687 :
688 : // compactionPickerByScore holds the state and logic for picking a compaction. A
689 : // compaction picker is associated with a single version. A new compaction
690 : // picker is created and initialized every time a new version is installed.
691 : type compactionPickerByScore struct {
692 : opts *Options
693 : vers *version
694 : virtualBackings *manifest.VirtualBackings
695 : // The level to target for L0 compactions. Levels L1 to baseLevel must be
696 : // empty.
697 : baseLevel int
698 : // levelMaxBytes holds the dynamically adjusted max bytes setting for each
699 : // level.
700 : levelMaxBytes [numLevels]int64
701 : }
702 :
703 : var _ compactionPicker = &compactionPickerByScore{}
704 :
705 2 : func (p *compactionPickerByScore) getScores(inProgress []compactionInfo) [numLevels]float64 {
706 2 : var scores [numLevels]float64
707 2 : for _, info := range p.calculateLevelScores(inProgress) {
708 2 : scores[info.level] = info.compensatedScoreRatio
709 2 : }
710 2 : return scores
711 : }
712 :
713 2 : func (p *compactionPickerByScore) getBaseLevel() int {
714 2 : if p == nil {
715 0 : return 1
716 0 : }
717 2 : return p.baseLevel
718 : }
719 :
720 : // estimatedCompactionDebt estimates the number of bytes which need to be
721 : // compacted before the LSM tree becomes stable.
722 2 : func (p *compactionPickerByScore) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
723 2 : if p == nil {
724 0 : return 0
725 0 : }
726 :
727 : // We assume that all the bytes in L0 need to be compacted to Lbase. This is
728 : // unlike the RocksDB logic that figures out whether L0 needs compaction.
729 2 : bytesAddedToNextLevel := l0ExtraSize + p.vers.Levels[0].Size()
730 2 : lbaseSize := p.vers.Levels[p.baseLevel].Size()
731 2 :
732 2 : var compactionDebt uint64
733 2 : if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
734 2 : // We only incur compaction debt if both L0 and Lbase contain data. If L0
735 2 : // is empty, no compaction is necessary. If Lbase is empty, a move-based
736 2 : // compaction from L0 would occur.
737 2 : compactionDebt += bytesAddedToNextLevel + lbaseSize
738 2 : }
739 :
740 : // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
741 : // bytes added to `level` in the loop.
742 2 : for level := p.baseLevel; level < numLevels-1; level++ {
743 2 : levelSize := p.vers.Levels[level].Size() + bytesAddedToNextLevel
744 2 : nextLevelSize := p.vers.Levels[level+1].Size()
745 2 : if levelSize > uint64(p.levelMaxBytes[level]) {
746 2 : bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
747 2 : if nextLevelSize > 0 {
748 2 : // We only incur compaction debt if the next level contains data. If the
749 2 : // next level is empty, a move-based compaction would be used.
750 2 : levelRatio := float64(nextLevelSize) / float64(levelSize)
751 2 : // The current level contributes bytesAddedToNextLevel to compactions.
752 2 : // The next level contributes levelRatio * bytesAddedToNextLevel.
753 2 : compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
754 2 : }
755 2 : } else {
756 2 : // We're not moving any bytes to the next level.
757 2 : bytesAddedToNextLevel = 0
758 2 : }
759 : }
760 2 : return compactionDebt
761 : }
762 :
763 2 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
764 2 : // The levelMaxBytes calculations here differ from RocksDB in two ways:
765 2 : //
766 2 : // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
767 2 : // level in L1-L6, rather than determining the size of the bottom level
768 2 : // based on the total amount of data in the dB. The RocksDB calculation is
769 2 : // problematic if L0 contains a significant fraction of data, or if the
770 2 : // level sizes are roughly equal and thus there is a significant fraction
771 2 : // of data outside of the largest level.
772 2 : //
773 2 : // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
774 2 : // baseBytesMax as the maximum of the configured LBaseMaxBytes and the
775 2 : // size of L0. This is problematic because baseBytesMax is used to compute
776 2 : // the max size of lower levels. A very large baseBytesMax will result in
777 2 : // an overly large value for the size of lower levels which will caused
778 2 : // those levels not to be compacted even when they should be
779 2 : // compacted. This often results in "inverted" LSM shapes where Ln is
780 2 : // larger than Ln+1.
781 2 :
782 2 : // Determine the first non-empty level and the total DB size.
783 2 : firstNonEmptyLevel := -1
784 2 : var dbSize uint64
785 2 : for level := 1; level < numLevels; level++ {
786 2 : if p.vers.Levels[level].Size() > 0 {
787 2 : if firstNonEmptyLevel == -1 {
788 2 : firstNonEmptyLevel = level
789 2 : }
790 2 : dbSize += p.vers.Levels[level].Size()
791 : }
792 : }
793 2 : for _, c := range inProgressCompactions {
794 2 : if c.outputLevel == 0 || c.outputLevel == -1 {
795 2 : continue
796 : }
797 2 : if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
798 2 : firstNonEmptyLevel = c.outputLevel
799 2 : }
800 : }
801 :
802 : // Initialize the max-bytes setting for each level to "infinity" which will
803 : // disallow compaction for that level. We'll fill in the actual value below
804 : // for levels we want to allow compactions from.
805 2 : for level := 0; level < numLevels; level++ {
806 2 : p.levelMaxBytes[level] = math.MaxInt64
807 2 : }
808 :
809 2 : if dbSize == 0 {
810 2 : // No levels for L1 and up contain any data. Target L0 compactions for the
811 2 : // last level or to the level to which there is an ongoing L0 compaction.
812 2 : p.baseLevel = numLevels - 1
813 2 : if firstNonEmptyLevel >= 0 {
814 2 : p.baseLevel = firstNonEmptyLevel
815 2 : }
816 2 : return
817 : }
818 :
819 2 : dbSize += p.vers.Levels[0].Size()
820 2 : bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
821 2 :
822 2 : curLevelSize := bottomLevelSize
823 2 : for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
824 2 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
825 2 : }
826 :
827 : // Compute base level (where L0 data is compacted to).
828 2 : baseBytesMax := uint64(p.opts.LBaseMaxBytes)
829 2 : p.baseLevel = firstNonEmptyLevel
830 2 : for p.baseLevel > 1 && curLevelSize > baseBytesMax {
831 2 : p.baseLevel--
832 2 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
833 2 : }
834 :
835 2 : smoothedLevelMultiplier := 1.0
836 2 : if p.baseLevel < numLevels-1 {
837 2 : smoothedLevelMultiplier = math.Pow(
838 2 : float64(bottomLevelSize)/float64(baseBytesMax),
839 2 : 1.0/float64(numLevels-p.baseLevel-1))
840 2 : }
841 :
842 2 : levelSize := float64(baseBytesMax)
843 2 : for level := p.baseLevel; level < numLevels; level++ {
844 2 : if level > p.baseLevel && levelSize > 0 {
845 2 : levelSize *= smoothedLevelMultiplier
846 2 : }
847 : // Round the result since test cases use small target level sizes, which
848 : // can be impacted by floating-point imprecision + integer truncation.
849 2 : roundedLevelSize := math.Round(levelSize)
850 2 : if roundedLevelSize > float64(math.MaxInt64) {
851 0 : p.levelMaxBytes[level] = math.MaxInt64
852 2 : } else {
853 2 : p.levelMaxBytes[level] = int64(roundedLevelSize)
854 2 : }
855 : }
856 : }
857 :
858 : type levelSizeAdjust struct {
859 : incomingActualBytes uint64
860 : outgoingActualBytes uint64
861 : outgoingCompensatedBytes uint64
862 : }
863 :
864 2 : func (a levelSizeAdjust) compensated() uint64 {
865 2 : return a.incomingActualBytes - a.outgoingCompensatedBytes
866 2 : }
867 :
868 2 : func (a levelSizeAdjust) actual() uint64 {
869 2 : return a.incomingActualBytes - a.outgoingActualBytes
870 2 : }
871 :
872 2 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
873 2 : // Compute size adjustments for each level based on the in-progress
874 2 : // compactions. We sum the file sizes of all files leaving and entering each
875 2 : // level in in-progress compactions. For outgoing files, we also sum a
876 2 : // separate sum of 'compensated file sizes', which are inflated according
877 2 : // to deletion estimates.
878 2 : //
879 2 : // When we adjust a level's size according to these values during score
880 2 : // calculation, we subtract the compensated size of start level inputs to
881 2 : // account for the fact that score calculation uses compensated sizes.
882 2 : //
883 2 : // Since compensated file sizes may be compensated because they reclaim
884 2 : // space from the output level's files, we only add the real file size to
885 2 : // the output level.
886 2 : //
887 2 : // This is slightly different from RocksDB's behavior, which simply elides
888 2 : // compacting files from the level size calculation.
889 2 : var sizeAdjust [numLevels]levelSizeAdjust
890 2 : for i := range inProgressCompactions {
891 2 : c := &inProgressCompactions[i]
892 2 : // If this compaction's version edit has already been applied, there's
893 2 : // no need to adjust: The LSM we'll examine will already reflect the
894 2 : // new LSM state.
895 2 : if c.versionEditApplied {
896 2 : continue
897 : }
898 :
899 2 : for _, input := range c.inputs {
900 2 : actualSize := input.files.SizeSum()
901 2 : compensatedSize := totalCompensatedSize(input.files.Iter())
902 2 :
903 2 : if input.level != c.outputLevel {
904 2 : sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
905 2 : sizeAdjust[input.level].outgoingActualBytes += actualSize
906 2 : if c.outputLevel != -1 {
907 2 : sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
908 2 : }
909 : }
910 : }
911 : }
912 2 : return sizeAdjust
913 : }
914 :
915 2 : func levelCompensatedSize(lm manifest.LevelMetadata) uint64 {
916 2 : return *lm.Annotation(compensatedSizeAnnotator{}).(*uint64)
917 2 : }
918 :
919 : func (p *compactionPickerByScore) calculateLevelScores(
920 : inProgressCompactions []compactionInfo,
921 2 : ) [numLevels]candidateLevelInfo {
922 2 : var scores [numLevels]candidateLevelInfo
923 2 : for i := range scores {
924 2 : scores[i].level = i
925 2 : scores[i].outputLevel = i + 1
926 2 : }
927 2 : l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.opts, inProgressCompactions)
928 2 : scores[0] = candidateLevelInfo{
929 2 : outputLevel: p.baseLevel,
930 2 : uncompensatedScore: l0UncompensatedScore,
931 2 : compensatedScore: l0UncompensatedScore, /* No level size compensation for L0 */
932 2 : }
933 2 : sizeAdjust := calculateSizeAdjust(inProgressCompactions)
934 2 : for level := 1; level < numLevels; level++ {
935 2 : compensatedLevelSize := levelCompensatedSize(p.vers.Levels[level]) + sizeAdjust[level].compensated()
936 2 : scores[level].compensatedScore = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
937 2 : scores[level].uncompensatedScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
938 2 : }
939 :
940 : // Adjust each level's {compensated, uncompensated}Score by the uncompensatedScore
941 : // of the next level to get a {compensated, uncompensated}ScoreRatio. If the
942 : // next level has a high uncompensatedScore, and is thus a priority for compaction,
943 : // this reduces the priority for compacting the current level. If the next level
944 : // has a low uncompensatedScore (i.e. it is below its target size), this increases
945 : // the priority for compacting the current level.
946 : //
947 : // The effect of this adjustment is to help prioritize compactions in lower
948 : // levels. The following example shows the compensatedScoreRatio and the
949 : // compensatedScore. In this scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase)
950 : // is significantly above its target size. The original score prioritizes
951 : // compactions from those two levels, but doing so ends up causing a future
952 : // problem: data piles up in the higher levels, starving L5->L6 compactions,
953 : // and to a lesser degree starving L4->L5 compactions.
954 : //
955 : // Note that in the example shown there is no level size compensation so the
956 : // compensatedScore and the uncompensatedScore is the same for each level.
957 : //
958 : // compensatedScoreRatio compensatedScore uncompensatedScore size max-size
959 : // L0 3.2 68.0 68.0 2.2 G -
960 : // L3 3.2 21.1 21.1 1.3 G 64 M
961 : // L4 3.4 6.7 6.7 3.1 G 467 M
962 : // L5 3.4 2.0 2.0 6.6 G 3.3 G
963 : // L6 0.6 0.6 0.6 14 G 24 G
964 2 : var prevLevel int
965 2 : for level := p.baseLevel; level < numLevels; level++ {
966 2 : // The compensated scores, and uncompensated scores will be turned into
967 2 : // ratios as they're adjusted according to other levels' sizes.
968 2 : scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
969 2 : scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
970 2 :
971 2 : // Avoid absurdly large scores by placing a floor on the score that we'll
972 2 : // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
973 2 : const minScore = 0.01
974 2 : if scores[prevLevel].compensatedScoreRatio >= compactionScoreThreshold {
975 2 : if scores[level].uncompensatedScore >= minScore {
976 2 : scores[prevLevel].compensatedScoreRatio /= scores[level].uncompensatedScore
977 2 : } else {
978 2 : scores[prevLevel].compensatedScoreRatio /= minScore
979 2 : }
980 : }
981 2 : if scores[prevLevel].uncompensatedScoreRatio >= compactionScoreThreshold {
982 2 : if scores[level].uncompensatedScore >= minScore {
983 2 : scores[prevLevel].uncompensatedScoreRatio /= scores[level].uncompensatedScore
984 2 : } else {
985 2 : scores[prevLevel].uncompensatedScoreRatio /= minScore
986 2 : }
987 : }
988 2 : prevLevel = level
989 : }
990 : // Set the score ratios for the lowest level.
991 : // INVARIANT: prevLevel == numLevels-1
992 2 : scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
993 2 : scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
994 2 :
995 2 : sort.Sort(sortCompactionLevelsByPriority(scores[:]))
996 2 : return scores
997 : }
998 :
999 : // calculateL0UncompensatedScore calculates a float score representing the
1000 : // relative priority of compacting L0. Level L0 is special in that files within
1001 : // L0 may overlap one another, so a different set of heuristics that take into
1002 : // account read amplification apply.
1003 : func calculateL0UncompensatedScore(
1004 : vers *version, opts *Options, inProgressCompactions []compactionInfo,
1005 2 : ) float64 {
1006 2 : // Use the sublevel count to calculate the score. The base vs intra-L0
1007 2 : // compaction determination happens in pickAuto, not here.
1008 2 : score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
1009 2 : float64(opts.L0CompactionThreshold)
1010 2 :
1011 2 : // Also calculate a score based on the file count but use it only if it
1012 2 : // produces a higher score than the sublevel-based one. This heuristic is
1013 2 : // designed to accommodate cases where L0 is accumulating non-overlapping
1014 2 : // files in L0. Letting too many non-overlapping files accumulate in few
1015 2 : // sublevels is undesirable, because:
1016 2 : // 1) we can produce a massive backlog to compact once files do overlap.
1017 2 : // 2) constructing L0 sublevels has a runtime that grows superlinearly with
1018 2 : // the number of files in L0 and must be done while holding D.mu.
1019 2 : noncompactingFiles := vers.Levels[0].Len()
1020 2 : for _, c := range inProgressCompactions {
1021 2 : for _, cl := range c.inputs {
1022 2 : if cl.level == 0 {
1023 2 : noncompactingFiles -= cl.files.Len()
1024 2 : }
1025 : }
1026 : }
1027 2 : fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
1028 2 : if score < fileScore {
1029 2 : score = fileScore
1030 2 : }
1031 2 : return score
1032 : }
1033 :
1034 : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
1035 : // compaction around. Currently, this function implements a heuristic similar to
1036 : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
1037 : // function is linear with respect to the number of files in `level` and
1038 : // `outputLevel`.
1039 : func pickCompactionSeedFile(
1040 : vers *version,
1041 : virtualBackings *manifest.VirtualBackings,
1042 : opts *Options,
1043 : level, outputLevel int,
1044 : earliestSnapshotSeqNum uint64,
1045 2 : ) (manifest.LevelFile, bool) {
1046 2 : // Select the file within the level to compact. We want to minimize write
1047 2 : // amplification, but also ensure that (a) deletes are propagated to the
1048 2 : // bottom level in a timely fashion, and (b) virtual sstables that are
1049 2 : // pinning backing sstables where most of the data is garbage are compacted
1050 2 : // away. Doing (a) and (b) reclaims disk space. A table's smallest sequence
1051 2 : // number provides a measure of its age. The ratio of overlapping-bytes /
1052 2 : // table-size gives an indication of write amplification (a smaller ratio is
1053 2 : // preferrable).
1054 2 : //
1055 2 : // The current heuristic is based off the RocksDB kMinOverlappingRatio
1056 2 : // heuristic. It chooses the file with the minimum overlapping ratio with
1057 2 : // the target level, which minimizes write amplification.
1058 2 : //
1059 2 : // The heuristic uses a "compensated size" for the denominator, which is the
1060 2 : // file size inflated by (a) an estimate of the space that may be reclaimed
1061 2 : // through compaction, and (b) a fraction of the amount of garbage in the
1062 2 : // backing sstable pinned by this (virtual) sstable.
1063 2 : //
1064 2 : // TODO(peter): For concurrent compactions, we may want to try harder to
1065 2 : // pick a seed file whose resulting compaction bounds do not overlap with
1066 2 : // an in-progress compaction.
1067 2 :
1068 2 : cmp := opts.Comparer.Compare
1069 2 : startIter := vers.Levels[level].Iter()
1070 2 : outputIter := vers.Levels[outputLevel].Iter()
1071 2 :
1072 2 : var file manifest.LevelFile
1073 2 : smallestRatio := uint64(math.MaxUint64)
1074 2 :
1075 2 : outputFile := outputIter.First()
1076 2 :
1077 2 : for f := startIter.First(); f != nil; f = startIter.Next() {
1078 2 : var overlappingBytes uint64
1079 2 : compacting := f.IsCompacting()
1080 2 : if compacting {
1081 2 : // Move on if this file is already being compacted. We'll likely
1082 2 : // still need to move past the overlapping output files regardless,
1083 2 : // but in cases where all start-level files are compacting we won't.
1084 2 : continue
1085 : }
1086 :
1087 : // Trim any output-level files smaller than f.
1088 2 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
1089 2 : outputFile = outputIter.Next()
1090 2 : }
1091 :
1092 2 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
1093 2 : overlappingBytes += outputFile.Size
1094 2 : compacting = compacting || outputFile.IsCompacting()
1095 2 :
1096 2 : // For files in the bottommost level of the LSM, the
1097 2 : // Stats.RangeDeletionsBytesEstimate field is set to the estimate
1098 2 : // of bytes /within/ the file itself that may be dropped by
1099 2 : // recompacting the file. These bytes from obsolete keys would not
1100 2 : // need to be rewritten if we compacted `f` into `outputFile`, so
1101 2 : // they don't contribute to write amplification. Subtracting them
1102 2 : // out of the overlapping bytes helps prioritize these compactions
1103 2 : // that are cheaper than their file sizes suggest.
1104 2 : if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
1105 2 : overlappingBytes -= outputFile.Stats.RangeDeletionsBytesEstimate
1106 2 : }
1107 :
1108 : // If the file in the next level extends beyond f's largest key,
1109 : // break out and don't advance outputIter because f's successor
1110 : // might also overlap.
1111 : //
1112 : // Note, we stop as soon as we encounter an output-level file with a
1113 : // largest key beyond the input-level file's largest bound. We
1114 : // perform a simple user key comparison here using sstableKeyCompare
1115 : // which handles the potential for exclusive largest key bounds.
1116 : // There's some subtlety when the bounds are equal (eg, equal and
1117 : // inclusive, or equal and exclusive). Current Pebble doesn't split
1118 : // user keys across sstables within a level (and in format versions
1119 : // FormatSplitUserKeysMarkedCompacted and later we guarantee no
1120 : // split user keys exist within the entire LSM). In that case, we're
1121 : // assured that neither the input level nor the output level's next
1122 : // file shares the same user key, so compaction expansion will not
1123 : // include them in any compaction compacting `f`.
1124 : //
1125 : // NB: If we /did/ allow split user keys, or we're running on an
1126 : // old database with an earlier format major version where there are
1127 : // existing split user keys, this logic would be incorrect. Consider
1128 : // L1: [a#120,a#100] [a#80,a#60]
1129 : // L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
1130 : // While considering the first file in L1, [a#120,a#100], we'd skip
1131 : // past all of the files in L2. When considering the second file in
1132 : // L1, we'd improperly conclude that the second file overlaps
1133 : // nothing in the second level and is cheap to compact, when in
1134 : // reality we'd need to expand the compaction to include all 5
1135 : // files.
1136 2 : if sstableKeyCompare(cmp, outputFile.Largest, f.Largest) > 0 {
1137 2 : break
1138 : }
1139 2 : outputFile = outputIter.Next()
1140 : }
1141 :
1142 : // If the input level file or one of the overlapping files is
1143 : // compacting, we're not going to be able to compact this file
1144 : // anyways, so skip it.
1145 2 : if compacting {
1146 2 : continue
1147 : }
1148 :
1149 2 : compSz := compensatedSize(f) + responsibleForGarbageBytes(virtualBackings, f)
1150 2 : scaledRatio := overlappingBytes * 1024 / compSz
1151 2 : if scaledRatio < smallestRatio {
1152 2 : smallestRatio = scaledRatio
1153 2 : file = startIter.Take()
1154 2 : }
1155 : }
1156 2 : return file, file.FileMetadata != nil
1157 : }
1158 :
1159 : // responsibleForGarbageBytes returns the amount of garbage in the backing
1160 : // sstable that we consider the responsibility of this virtual sstable. For
1161 : // non-virtual sstables, this is of course 0. For virtual sstables, we equally
1162 : // distribute the responsibility of the garbage across all the virtual
1163 : // sstables that are referencing the same backing sstable. One could
1164 : // alternatively distribute this in proportion to the virtual sst sizes, but
1165 : // it isn't clear that more sophisticated heuristics are worth it, given that
1166 : // the garbage cannot be reclaimed until all the referencing virtual sstables
1167 : // are compacted.
1168 2 : func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fileMetadata) uint64 {
1169 2 : if !m.Virtual {
1170 2 : return 0
1171 2 : }
1172 2 : useCount, virtualizedSize := virtualBackings.Usage(m.FileBacking.DiskFileNum)
1173 2 : // Since virtualizedSize is the sum of the estimated size of all virtual
1174 2 : // ssts, we allow for the possibility that virtualizedSize could exceed
1175 2 : // m.FileBacking.Size.
1176 2 : totalGarbage := int64(m.FileBacking.Size) - int64(virtualizedSize)
1177 2 : if totalGarbage <= 0 {
1178 1 : return 0
1179 1 : }
1180 2 : if useCount == 0 {
1181 0 : // This cannot happen if m exists in the latest version. The call to
1182 0 : // ResponsibleForGarbageBytes during compaction picking ensures that m
1183 0 : // exists in the latest version by holding versionSet.logLock.
1184 0 : panic(errors.AssertionFailedf("%s has zero useCount", m.String()))
1185 : }
1186 2 : return uint64(totalGarbage) / uint64(useCount)
1187 : }
1188 :
1189 : // pickAuto picks the best compaction, if any.
1190 : //
1191 : // On each call, pickAuto computes per-level size adjustments based on
1192 : // in-progress compactions, and computes a per-level score. The levels are
1193 : // iterated over in decreasing score order trying to find a valid compaction
1194 : // anchored at that level.
1195 : //
1196 : // If a score-based compaction cannot be found, pickAuto falls back to looking
1197 : // for an elision-only compaction to remove obsolete keys.
1198 2 : func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
1199 2 : // Compaction concurrency is controlled by L0 read-amp. We allow one
1200 2 : // additional compaction per L0CompactionConcurrency sublevels, as well as
1201 2 : // one additional compaction per CompactionDebtConcurrency bytes of
1202 2 : // compaction debt. Compaction concurrency is tied to L0 sublevels as that
1203 2 : // signal is independent of the database size. We tack on the compaction
1204 2 : // debt as a second signal to prevent compaction concurrency from dropping
1205 2 : // significantly right after a base compaction finishes, and before those
1206 2 : // bytes have been compacted further down the LSM.
1207 2 : if n := len(env.inProgressCompactions); n > 0 {
1208 2 : l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
1209 2 : compactionDebt := p.estimatedCompactionDebt(0)
1210 2 : ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
1211 2 : ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
1212 2 : if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
1213 2 : return nil
1214 2 : }
1215 : }
1216 :
1217 2 : scores := p.calculateLevelScores(env.inProgressCompactions)
1218 2 :
1219 2 : // TODO(bananabrick): Either remove, or change this into an event sent to the
1220 2 : // EventListener.
1221 2 : logCompaction := func(pc *pickedCompaction) {
1222 0 : var buf bytes.Buffer
1223 0 : for i := 0; i < numLevels; i++ {
1224 0 : if i != 0 && i < p.baseLevel {
1225 0 : continue
1226 : }
1227 :
1228 0 : var info *candidateLevelInfo
1229 0 : for j := range scores {
1230 0 : if scores[j].level == i {
1231 0 : info = &scores[j]
1232 0 : break
1233 : }
1234 : }
1235 :
1236 0 : marker := " "
1237 0 : if pc.startLevel.level == info.level {
1238 0 : marker = "*"
1239 0 : }
1240 0 : fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %5.1f %8s %8s",
1241 0 : marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
1242 0 : info.uncompensatedScoreRatio, info.uncompensatedScore,
1243 0 : humanize.Bytes.Int64(int64(totalCompensatedSize(
1244 0 : p.vers.Levels[info.level].Iter(),
1245 0 : ))),
1246 0 : humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
1247 0 : )
1248 0 :
1249 0 : count := 0
1250 0 : for i := range env.inProgressCompactions {
1251 0 : c := &env.inProgressCompactions[i]
1252 0 : if c.inputs[0].level != info.level {
1253 0 : continue
1254 : }
1255 0 : count++
1256 0 : if count == 1 {
1257 0 : fmt.Fprintf(&buf, " [")
1258 0 : } else {
1259 0 : fmt.Fprintf(&buf, " ")
1260 0 : }
1261 0 : fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
1262 : }
1263 0 : if count > 0 {
1264 0 : fmt.Fprintf(&buf, "]")
1265 0 : }
1266 0 : fmt.Fprintf(&buf, "\n")
1267 : }
1268 0 : p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
1269 0 : pc.startLevel.level, pc.outputLevel.level, buf.String())
1270 : }
1271 :
1272 : // Check for a score-based compaction. candidateLevelInfos are first sorted
1273 : // by whether they should be compacted, so if we find a level which shouldn't
1274 : // be compacted, we can break early.
1275 2 : for i := range scores {
1276 2 : info := &scores[i]
1277 2 : if !info.shouldCompact() {
1278 2 : break
1279 : }
1280 2 : if info.level == numLevels-1 {
1281 2 : continue
1282 : }
1283 :
1284 2 : if info.level == 0 {
1285 2 : pc = pickL0(env, p.opts, p.vers, p.baseLevel)
1286 2 : // Fail-safe to protect against compacting the same sstable
1287 2 : // concurrently.
1288 2 : if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
1289 2 : p.addScoresToPickedCompactionMetrics(pc, scores)
1290 2 : pc.score = info.compensatedScoreRatio
1291 2 : // TODO(bananabrick): Create an EventListener for logCompaction.
1292 2 : if false {
1293 0 : logCompaction(pc)
1294 0 : }
1295 2 : return pc
1296 : }
1297 2 : continue
1298 : }
1299 :
1300 : // info.level > 0
1301 2 : var ok bool
1302 2 : info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
1303 2 : if !ok {
1304 2 : continue
1305 : }
1306 :
1307 2 : pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel, p.levelMaxBytes)
1308 2 : // Fail-safe to protect against compacting the same sstable concurrently.
1309 2 : if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
1310 2 : p.addScoresToPickedCompactionMetrics(pc, scores)
1311 2 : pc.score = info.compensatedScoreRatio
1312 2 : // TODO(bananabrick): Create an EventListener for logCompaction.
1313 2 : if false {
1314 0 : logCompaction(pc)
1315 0 : }
1316 2 : return pc
1317 : }
1318 : }
1319 :
1320 : // Check for L6 files with tombstones that may be elided. These files may
1321 : // exist if a snapshot prevented the elision of a tombstone or because of
1322 : // a move compaction. These are low-priority compactions because they
1323 : // don't help us keep up with writes, just reclaim disk space.
1324 2 : if pc := p.pickElisionOnlyCompaction(env); pc != nil {
1325 2 : return pc
1326 2 : }
1327 :
1328 2 : if pc := p.pickReadTriggeredCompaction(env); pc != nil {
1329 1 : return pc
1330 1 : }
1331 :
1332 : // NB: This should only be run if a read compaction wasn't
1333 : // scheduled.
1334 : //
1335 : // We won't be scheduling a read compaction right now, and in
1336 : // read heavy workloads, compactions won't be scheduled frequently
1337 : // because flushes aren't frequent. So we need to signal to the
1338 : // iterator to schedule a compaction when it adds compactions to
1339 : // the read compaction queue.
1340 : //
1341 : // We need the nil check here because without it, we have some
1342 : // tests which don't set that variable fail. Since there's a
1343 : // chance that one of those tests wouldn't want extra compactions
1344 : // to be scheduled, I added this check here, instead of
1345 : // setting rescheduleReadCompaction in those tests.
1346 2 : if env.readCompactionEnv.rescheduleReadCompaction != nil {
1347 2 : *env.readCompactionEnv.rescheduleReadCompaction = true
1348 2 : }
1349 :
1350 : // At the lowest possible compaction-picking priority, look for files marked
1351 : // for compaction. Pebble will mark files for compaction if they have atomic
1352 : // compaction units that span multiple files. While current Pebble code does
1353 : // not construct such sstables, RocksDB and earlier versions of Pebble may
1354 : // have created them. These split user keys form sets of files that must be
1355 : // compacted together for correctness (referred to as "atomic compaction
1356 : // units" within the code). Rewrite them in-place.
1357 : //
1358 : // It's also possible that a file may have been marked for compaction by
1359 : // even earlier versions of Pebble code, since FileMetadata's
1360 : // MarkedForCompaction field is persisted in the manifest. That's okay. We
1361 : // previously would've ignored the designation, whereas now we'll re-compact
1362 : // the file in place.
1363 2 : if p.vers.Stats.MarkedForCompaction > 0 {
1364 1 : if pc := p.pickRewriteCompaction(env); pc != nil {
1365 1 : return pc
1366 1 : }
1367 : }
1368 :
1369 2 : return nil
1370 : }
1371 :
1372 : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
1373 : pc *pickedCompaction, candInfo [numLevels]candidateLevelInfo,
1374 2 : ) {
1375 2 :
1376 2 : // candInfo is sorted by score, not by compaction level.
1377 2 : infoByLevel := [numLevels]candidateLevelInfo{}
1378 2 : for i := range candInfo {
1379 2 : level := candInfo[i].level
1380 2 : infoByLevel[level] = candInfo[i]
1381 2 : }
1382 : // Gather the compaction scores for the levels participating in the compaction.
1383 2 : pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
1384 2 : inputIdx := 0
1385 2 : for i := range infoByLevel {
1386 2 : if pc.inputs[inputIdx].level == infoByLevel[i].level {
1387 2 : pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].compensatedScoreRatio
1388 2 : inputIdx++
1389 2 : }
1390 2 : if inputIdx == len(pc.inputs) {
1391 2 : break
1392 : }
1393 : }
1394 : }
1395 :
1396 : // elisionOnlyAnnotator implements the manifest.Annotator interface,
1397 : // annotating B-Tree nodes with the *fileMetadata of a file meeting the
1398 : // obsolete keys criteria for an elision-only compaction within the subtree.
1399 : // If multiple files meet the criteria, it chooses whichever file has the
1400 : // lowest LargestSeqNum. The lowest LargestSeqNum file will be the first
1401 : // eligible for an elision-only compaction once snapshots less than or equal
1402 : // to its LargestSeqNum are closed.
1403 : type elisionOnlyAnnotator struct{}
1404 :
1405 : var _ manifest.Annotator = elisionOnlyAnnotator{}
1406 :
1407 2 : func (a elisionOnlyAnnotator) Zero(interface{}) interface{} {
1408 2 : return nil
1409 2 : }
1410 :
1411 2 : func (a elisionOnlyAnnotator) Accumulate(f *fileMetadata, dst interface{}) (interface{}, bool) {
1412 2 : if f.IsCompacting() {
1413 2 : return dst, true
1414 2 : }
1415 2 : if !f.StatsValid() {
1416 2 : return dst, false
1417 2 : }
1418 : // Bottommost files are large and not worthwhile to compact just
1419 : // to remove a few tombstones. Consider a file ineligible if its
1420 : // own range deletions delete less than 10% of its data and its
1421 : // deletion tombstones make up less than 10% of its entries.
1422 : //
1423 : // TODO(jackson): This does not account for duplicate user keys
1424 : // which may be collapsed. Ideally, we would have 'obsolete keys'
1425 : // statistics that would include tombstones, the keys that are
1426 : // dropped by tombstones and duplicated user keys. See #847.
1427 : //
1428 : // Note that tables that contain exclusively range keys (i.e. no point keys,
1429 : // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
1430 : // from elision-only compactions.
1431 : // TODO(travers): Consider an alternative heuristic for elision of range-keys.
1432 2 : if f.Stats.RangeDeletionsBytesEstimate*10 < f.Size &&
1433 2 : f.Stats.NumDeletions*10 <= f.Stats.NumEntries {
1434 2 : return dst, true
1435 2 : }
1436 2 : if dst == nil {
1437 2 : return f, true
1438 2 : } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
1439 2 : return f, true
1440 2 : }
1441 1 : return dst, true
1442 : }
1443 :
1444 2 : func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{} {
1445 2 : if v == nil {
1446 2 : return accum
1447 2 : }
1448 : // If we haven't accumulated an eligible file yet, or f's LargestSeqNum is
1449 : // less than the accumulated file's, use f.
1450 1 : if accum == nil {
1451 1 : return v
1452 1 : }
1453 1 : f := v.(*fileMetadata)
1454 1 : accumV := accum.(*fileMetadata)
1455 1 : if accumV == nil || accumV.LargestSeqNum > f.LargestSeqNum {
1456 1 : return f
1457 1 : }
1458 1 : return accumV
1459 : }
1460 :
1461 : // markedForCompactionAnnotator implements the manifest.Annotator interface,
1462 : // annotating B-Tree nodes with the *fileMetadata of a file that is marked for
1463 : // compaction within the subtree. If multiple files meet the criteria, it
1464 : // chooses whichever file has the lowest LargestSeqNum.
1465 : type markedForCompactionAnnotator struct{}
1466 :
1467 : var _ manifest.Annotator = markedForCompactionAnnotator{}
1468 :
1469 1 : func (a markedForCompactionAnnotator) Zero(interface{}) interface{} {
1470 1 : return nil
1471 1 : }
1472 :
1473 : func (a markedForCompactionAnnotator) Accumulate(
1474 : f *fileMetadata, dst interface{},
1475 1 : ) (interface{}, bool) {
1476 1 : if !f.MarkedForCompaction {
1477 1 : // Not marked for compaction; return dst.
1478 1 : return dst, true
1479 1 : }
1480 1 : return markedMergeHelper(f, dst)
1481 : }
1482 :
1483 0 : func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} {
1484 0 : if v == nil {
1485 0 : return accum
1486 0 : }
1487 0 : accum, _ = markedMergeHelper(v.(*fileMetadata), accum)
1488 0 : return accum
1489 : }
1490 :
1491 : // REQUIRES: f is non-nil, and f.MarkedForCompaction=true.
1492 1 : func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) {
1493 1 : if dst == nil {
1494 1 : return f, true
1495 1 : } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
1496 0 : return f, true
1497 0 : }
1498 0 : return dst, true
1499 : }
1500 :
1501 : // pickElisionOnlyCompaction looks for compactions of sstables in the
1502 : // bottommost level containing obsolete records that may now be dropped.
1503 : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
1504 : env compactionEnv,
1505 2 : ) (pc *pickedCompaction) {
1506 2 : if p.opts.private.disableElisionOnlyCompactions {
1507 2 : return nil
1508 2 : }
1509 2 : v := p.vers.Levels[numLevels-1].Annotation(elisionOnlyAnnotator{})
1510 2 : if v == nil {
1511 2 : return nil
1512 2 : }
1513 2 : candidate := v.(*fileMetadata)
1514 2 : if candidate.IsCompacting() || candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
1515 2 : return nil
1516 2 : }
1517 2 : lf := p.vers.Levels[numLevels-1].Find(p.opts.Comparer.Compare, candidate)
1518 2 : if lf == nil {
1519 0 : panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
1520 : }
1521 :
1522 : // Construct a picked compaction of the elision candidate's atomic
1523 : // compaction unit.
1524 2 : pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
1525 2 : pc.kind = compactionKindElisionOnly
1526 2 : pc.startLevel.files = lf.Slice()
1527 2 : if anyTablesCompacting(lf.Slice()) {
1528 0 : return nil
1529 0 : }
1530 2 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1531 2 : // Fail-safe to protect against compacting the same sstable concurrently.
1532 2 : if !inputRangeAlreadyCompacting(env, pc) {
1533 2 : return pc
1534 2 : }
1535 1 : return nil
1536 : }
1537 :
1538 : // pickRewriteCompaction attempts to construct a compaction that
1539 : // rewrites a file marked for compaction. pickRewriteCompaction will
1540 : // pull in adjacent files in the file's atomic compaction unit if
1541 : // necessary. A rewrite compaction outputs files to the same level as
1542 : // the input level.
1543 1 : func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
1544 1 : for l := numLevels - 1; l >= 0; l-- {
1545 1 : v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{})
1546 1 : if v == nil {
1547 1 : // Try the next level.
1548 1 : continue
1549 : }
1550 1 : candidate := v.(*fileMetadata)
1551 1 : if candidate.IsCompacting() {
1552 0 : // Try the next level.
1553 0 : continue
1554 : }
1555 1 : lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate)
1556 1 : if lf == nil {
1557 0 : panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
1558 : }
1559 :
1560 1 : inputs := lf.Slice()
1561 1 : if anyTablesCompacting(inputs) {
1562 0 : // Try the next level.
1563 0 : continue
1564 : }
1565 :
1566 1 : pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
1567 1 : pc.outputLevel.level = l
1568 1 : pc.kind = compactionKindRewrite
1569 1 : pc.startLevel.files = inputs
1570 1 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1571 1 :
1572 1 : // Fail-safe to protect against compacting the same sstable concurrently.
1573 1 : if !inputRangeAlreadyCompacting(env, pc) {
1574 1 : if pc.startLevel.level == 0 {
1575 1 : pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
1576 1 : }
1577 1 : return pc
1578 : }
1579 : }
1580 0 : return nil
1581 : }
1582 :
1583 : // pickAutoLPositive picks an automatic compaction for the candidate
1584 : // file in a positive-numbered level. This function must not be used for
1585 : // L0.
1586 : func pickAutoLPositive(
1587 : env compactionEnv,
1588 : opts *Options,
1589 : vers *version,
1590 : cInfo candidateLevelInfo,
1591 : baseLevel int,
1592 : levelMaxBytes [numLevels]int64,
1593 2 : ) (pc *pickedCompaction) {
1594 2 : if cInfo.level == 0 {
1595 0 : panic("pebble: pickAutoLPositive called for L0")
1596 : }
1597 :
1598 2 : pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
1599 2 : if pc.outputLevel.level != cInfo.outputLevel {
1600 0 : panic("pebble: compaction picked unexpected output level")
1601 : }
1602 2 : pc.startLevel.files = cInfo.file.Slice()
1603 2 : // Files in level 0 may overlap each other, so pick up all overlapping ones.
1604 2 : if pc.startLevel.level == 0 {
1605 0 : cmp := opts.Comparer.Compare
1606 0 : smallest, largest := manifest.KeyRange(cmp, pc.startLevel.files.Iter())
1607 0 : pc.startLevel.files = vers.Overlaps(0, base.UserKeyBoundsFromInternal(smallest, largest))
1608 0 : if pc.startLevel.files.Empty() {
1609 0 : panic("pebble: empty compaction")
1610 : }
1611 : }
1612 :
1613 2 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1614 0 : return nil
1615 0 : }
1616 2 : return pc.maybeAddLevel(opts, env.diskAvailBytes)
1617 : }
1618 :
1619 : // maybeAddLevel maybe adds a level to the picked compaction.
1620 2 : func (pc *pickedCompaction) maybeAddLevel(opts *Options, diskAvailBytes uint64) *pickedCompaction {
1621 2 : pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
1622 2 : if pc.outputLevel.level == numLevels-1 {
1623 2 : // Don't add a level if the current output level is in L6
1624 2 : return pc
1625 2 : }
1626 2 : if !opts.Experimental.MultiLevelCompactionHeuristic.allowL0() && pc.startLevel.level == 0 {
1627 2 : return pc
1628 2 : }
1629 2 : if pc.compactionSize() > expandedCompactionByteSizeLimit(
1630 2 : opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes) {
1631 2 : // Don't add a level if the current compaction exceeds the compaction size limit
1632 2 : return pc
1633 2 : }
1634 2 : return opts.Experimental.MultiLevelCompactionHeuristic.pick(pc, opts, diskAvailBytes)
1635 : }
1636 :
1637 : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
1638 : type MultiLevelHeuristic interface {
1639 : // Evaluate returns the preferred compaction.
1640 : pick(pc *pickedCompaction, opts *Options, diskAvailBytes uint64) *pickedCompaction
1641 :
1642 : // Returns if the heuristic allows L0 to be involved in ML compaction
1643 : allowL0() bool
1644 :
1645 : // String implements fmt.Stringer.
1646 : String() string
1647 : }
1648 :
1649 : // NoMultiLevel will never add an additional level to the compaction.
1650 : type NoMultiLevel struct{}
1651 :
1652 : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
1653 :
1654 : func (nml NoMultiLevel) pick(
1655 : pc *pickedCompaction, opts *Options, diskAvailBytes uint64,
1656 2 : ) *pickedCompaction {
1657 2 : return pc
1658 2 : }
1659 :
1660 2 : func (nml NoMultiLevel) allowL0() bool { return false }
1661 2 : func (nml NoMultiLevel) String() string { return "none" }
1662 :
1663 2 : func (pc *pickedCompaction) predictedWriteAmp() float64 {
1664 2 : var bytesToCompact uint64
1665 2 : var higherLevelBytes uint64
1666 2 : for i := range pc.inputs {
1667 2 : levelSize := pc.inputs[i].files.SizeSum()
1668 2 : bytesToCompact += levelSize
1669 2 : if i != len(pc.inputs)-1 {
1670 2 : higherLevelBytes += levelSize
1671 2 : }
1672 : }
1673 2 : return float64(bytesToCompact) / float64(higherLevelBytes)
1674 : }
1675 :
1676 2 : func (pc *pickedCompaction) overlappingRatio() float64 {
1677 2 : var higherLevelBytes uint64
1678 2 : var lowestLevelBytes uint64
1679 2 : for i := range pc.inputs {
1680 2 : levelSize := pc.inputs[i].files.SizeSum()
1681 2 : if i == len(pc.inputs)-1 {
1682 2 : lowestLevelBytes += levelSize
1683 2 : continue
1684 : }
1685 2 : higherLevelBytes += levelSize
1686 : }
1687 2 : return float64(lowestLevelBytes) / float64(higherLevelBytes)
1688 : }
1689 :
1690 : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
1691 : // an additional level to the picked compaction if it reduces predicted write
1692 : // amp of the compaction + the addPropensity constant.
1693 : type WriteAmpHeuristic struct {
1694 : // addPropensity is a constant that affects the propensity to conduct multilevel
1695 : // compactions. If positive, a multilevel compaction may get picked even if
1696 : // the single level compaction has lower write amp, and vice versa.
1697 : AddPropensity float64
1698 :
1699 : // AllowL0 if true, allow l0 to be involved in a ML compaction.
1700 : AllowL0 bool
1701 : }
1702 :
1703 : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
1704 :
1705 : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
1706 : // picking slows down the compaction picking process. This should be as fast as
1707 : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
1708 : // in-progress flushes and compactions from completing, etc. Consider ways to
1709 : // deduplicate work, given that setupInputs has already been called.
1710 : func (wa WriteAmpHeuristic) pick(
1711 : pcOrig *pickedCompaction, opts *Options, diskAvailBytes uint64,
1712 2 : ) *pickedCompaction {
1713 2 : pcMulti := pcOrig.clone()
1714 2 : if !pcMulti.setupMultiLevelCandidate(opts, diskAvailBytes) {
1715 2 : return pcOrig
1716 2 : }
1717 2 : picked := pcOrig
1718 2 : if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
1719 2 : picked = pcMulti
1720 2 : }
1721 : // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
1722 2 : picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
1723 2 : return picked
1724 : }
1725 :
1726 2 : func (wa WriteAmpHeuristic) allowL0() bool {
1727 2 : return wa.AllowL0
1728 2 : }
1729 :
1730 : // String implements fmt.Stringer.
1731 2 : func (wa WriteAmpHeuristic) String() string {
1732 2 : return fmt.Sprintf("wamp(%.2f, %t)", wa.AddPropensity, wa.AllowL0)
1733 2 : }
1734 :
1735 : // Helper method to pick compactions originating from L0. Uses information about
1736 : // sublevels to generate a compaction.
1737 2 : func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc *pickedCompaction) {
1738 2 : // It is important to pass information about Lbase files to L0Sublevels
1739 2 : // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
1740 2 : // compaction. Without this, we observed reduced concurrency of L0=>Lbase
1741 2 : // compactions, and increasing read amplification in L0.
1742 2 : //
1743 2 : // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
1744 2 : // has been shown to not cause a performance regression.
1745 2 : lcf, err := vers.L0Sublevels.PickBaseCompaction(1, vers.Levels[baseLevel].Slice())
1746 2 : if err != nil {
1747 1 : opts.Logger.Errorf("error when picking base compaction: %s", err)
1748 1 : return
1749 1 : }
1750 2 : if lcf != nil {
1751 2 : pc = newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
1752 2 : pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel)
1753 2 : if pc.startLevel.files.Empty() {
1754 0 : opts.Logger.Fatalf("empty compaction chosen")
1755 0 : }
1756 2 : return pc.maybeAddLevel(opts, env.diskAvailBytes)
1757 : }
1758 :
1759 : // Couldn't choose a base compaction. Try choosing an intra-L0
1760 : // compaction. Note that we pass in L0CompactionThreshold here as opposed to
1761 : // 1, since choosing a single sublevel intra-L0 compaction is
1762 : // counterproductive.
1763 2 : lcf, err = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
1764 2 : if err != nil {
1765 0 : opts.Logger.Errorf("error when picking intra-L0 compaction: %s", err)
1766 0 : return
1767 0 : }
1768 2 : if lcf != nil {
1769 2 : pc = newPickedCompactionFromL0(lcf, opts, vers, 0, false)
1770 2 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1771 0 : return nil
1772 0 : }
1773 2 : if pc.startLevel.files.Empty() {
1774 0 : opts.Logger.Fatalf("empty compaction chosen")
1775 0 : }
1776 2 : {
1777 2 : iter := pc.startLevel.files.Iter()
1778 2 : if iter.First() == nil || iter.Next() == nil {
1779 0 : // A single-file intra-L0 compaction is unproductive.
1780 0 : return nil
1781 0 : }
1782 : }
1783 :
1784 2 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1785 : }
1786 2 : return pc
1787 : }
1788 :
1789 : func pickManualCompaction(
1790 : vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
1791 2 : ) (pc *pickedCompaction, retryLater bool) {
1792 2 : outputLevel := manual.level + 1
1793 2 : if manual.level == 0 {
1794 2 : outputLevel = baseLevel
1795 2 : } else if manual.level < baseLevel {
1796 2 : // The start level for a compaction must be >= Lbase. A manual
1797 2 : // compaction could have been created adhering to that condition, and
1798 2 : // then an automatic compaction came in and compacted all of the
1799 2 : // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
1800 2 : // ignore this manual compaction as there is nothing to do (manual.level
1801 2 : // points to an empty level).
1802 2 : return nil, false
1803 2 : }
1804 : // This conflictsWithInProgress call is necessary for the manual compaction to
1805 : // be retried when it conflicts with an ongoing automatic compaction. Without
1806 : // it, the compaction is dropped due to pc.setupInputs returning false since
1807 : // the input/output range is already being compacted, and the manual
1808 : // compaction ends with a non-compacted LSM.
1809 2 : if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
1810 2 : return nil, true
1811 2 : }
1812 2 : pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
1813 2 : manual.outputLevel = pc.outputLevel.level
1814 2 : pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
1815 2 : if pc.startLevel.files.Empty() {
1816 2 : // Nothing to do
1817 2 : return nil, false
1818 2 : }
1819 2 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1820 1 : // setupInputs returned false indicating there's a conflicting
1821 1 : // concurrent compaction.
1822 1 : return nil, true
1823 1 : }
1824 2 : if pc = pc.maybeAddLevel(opts, env.diskAvailBytes); pc == nil {
1825 0 : return nil, false
1826 0 : }
1827 2 : if pc.outputLevel.level != outputLevel {
1828 2 : if len(pc.extraLevels) > 0 {
1829 2 : // multilevel compactions relax this invariant
1830 2 : } else {
1831 0 : panic("pebble: compaction picked unexpected output level")
1832 : }
1833 : }
1834 : // Fail-safe to protect against compacting the same sstable concurrently.
1835 2 : if inputRangeAlreadyCompacting(env, pc) {
1836 1 : return nil, true
1837 1 : }
1838 2 : return pc, false
1839 : }
1840 :
1841 : // pickDownloadCompaction picks a download compaction for the downloadSpan,
1842 : // which could be specified as being performed either by a copy compaction of
1843 : // the backing file or a rewrite compaction.
1844 : func pickDownloadCompaction(
1845 : vers *version,
1846 : opts *Options,
1847 : env compactionEnv,
1848 : baseLevel int,
1849 : kind compactionKind,
1850 : level int,
1851 : file *fileMetadata,
1852 2 : ) (pc *pickedCompaction) {
1853 2 : // Check if the file is compacting already.
1854 2 : if file.CompactionState == manifest.CompactionStateCompacting {
1855 0 : return nil
1856 0 : }
1857 2 : if kind != compactionKindCopy && kind != compactionKindRewrite {
1858 0 : panic("invalid download/rewrite compaction kind")
1859 : }
1860 2 : pc = newPickedCompaction(opts, vers, level, level, baseLevel)
1861 2 : pc.kind = kind
1862 2 : pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*fileMetadata{file})
1863 2 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1864 0 : // setupInputs returned false indicating there's a conflicting
1865 0 : // concurrent compaction.
1866 0 : return nil
1867 0 : }
1868 2 : if pc.outputLevel.level != level {
1869 0 : panic("pebble: download compaction picked unexpected output level")
1870 : }
1871 : // Fail-safe to protect against compacting the same sstable concurrently.
1872 2 : if inputRangeAlreadyCompacting(env, pc) {
1873 0 : return nil
1874 0 : }
1875 2 : return pc
1876 : }
1877 :
1878 : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
1879 : env compactionEnv,
1880 2 : ) (pc *pickedCompaction) {
1881 2 : // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
1882 2 : // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
1883 2 : // lower priority.
1884 2 : if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
1885 2 : return nil
1886 2 : }
1887 2 : for env.readCompactionEnv.readCompactions.size > 0 {
1888 1 : rc := env.readCompactionEnv.readCompactions.remove()
1889 1 : if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
1890 1 : break
1891 : }
1892 : }
1893 2 : return pc
1894 : }
1895 :
1896 : func pickReadTriggeredCompactionHelper(
1897 : p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
1898 1 : ) (pc *pickedCompaction) {
1899 1 : overlapSlice := p.vers.Overlaps(rc.level, base.UserKeyBoundsInclusive(rc.start, rc.end))
1900 1 : if overlapSlice.Empty() {
1901 1 : // If there is no overlap, then the file with the key range
1902 1 : // must have been compacted away. So, we don't proceed to
1903 1 : // compact the same key range again.
1904 1 : return nil
1905 1 : }
1906 :
1907 1 : iter := overlapSlice.Iter()
1908 1 : var fileMatches bool
1909 1 : for f := iter.First(); f != nil; f = iter.Next() {
1910 1 : if f.FileNum == rc.fileNum {
1911 1 : fileMatches = true
1912 1 : break
1913 : }
1914 : }
1915 1 : if !fileMatches {
1916 1 : return nil
1917 1 : }
1918 :
1919 1 : pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
1920 1 :
1921 1 : pc.startLevel.files = overlapSlice
1922 1 : if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
1923 0 : return nil
1924 0 : }
1925 1 : if inputRangeAlreadyCompacting(env, pc) {
1926 0 : return nil
1927 0 : }
1928 1 : pc.kind = compactionKindRead
1929 1 :
1930 1 : // Prevent read compactions which are too wide.
1931 1 : outputOverlaps := pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
1932 1 : if outputOverlaps.SizeSum() > pc.maxReadCompactionBytes {
1933 1 : return nil
1934 1 : }
1935 :
1936 : // Prevent compactions which start with a small seed file X, but overlap
1937 : // with over allowedCompactionWidth * X file sizes in the output layer.
1938 1 : const allowedCompactionWidth = 35
1939 1 : if outputOverlaps.SizeSum() > overlapSlice.SizeSum()*allowedCompactionWidth {
1940 0 : return nil
1941 0 : }
1942 :
1943 1 : return pc
1944 : }
1945 :
1946 1 : func (p *compactionPickerByScore) forceBaseLevel1() {
1947 1 : p.baseLevel = 1
1948 1 : }
1949 :
1950 2 : func inputRangeAlreadyCompacting(env compactionEnv, pc *pickedCompaction) bool {
1951 2 : for _, cl := range pc.inputs {
1952 2 : iter := cl.files.Iter()
1953 2 : for f := iter.First(); f != nil; f = iter.Next() {
1954 2 : if f.IsCompacting() {
1955 1 : return true
1956 1 : }
1957 : }
1958 : }
1959 :
1960 : // Look for active compactions outputting to the same region of the key
1961 : // space in the same output level. Two potential compactions may conflict
1962 : // without sharing input files if there are no files in the output level
1963 : // that overlap with the intersection of the compactions' key spaces.
1964 : //
1965 : // Consider an active L0->Lbase compaction compacting two L0 files one
1966 : // [a-f] and the other [t-z] into Lbase.
1967 : //
1968 : // L0
1969 : // ↦ 000100 ↤ ↦ 000101 ↤
1970 : // L1
1971 : // ↦ 000004 ↤
1972 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
1973 : //
1974 : // If a new file 000102 [j-p] is flushed while the existing compaction is
1975 : // still ongoing, new file would not be in any compacting sublevel
1976 : // intervals and would not overlap with any Lbase files that are also
1977 : // compacting. However, this compaction cannot be picked because the
1978 : // compaction's output key space [j-p] would overlap the existing
1979 : // compaction's output key space [a-z].
1980 : //
1981 : // L0
1982 : // ↦ 000100* ↤ ↦ 000102 ↤ ↦ 000101* ↤
1983 : // L1
1984 : // ↦ 000004* ↤
1985 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
1986 : //
1987 : // * - currently compacting
1988 2 : if pc.outputLevel != nil && pc.outputLevel.level != 0 {
1989 2 : for _, c := range env.inProgressCompactions {
1990 2 : if pc.outputLevel.level != c.outputLevel {
1991 2 : continue
1992 : }
1993 2 : if base.InternalCompare(pc.cmp, c.largest, pc.smallest) < 0 ||
1994 2 : base.InternalCompare(pc.cmp, c.smallest, pc.largest) > 0 {
1995 2 : continue
1996 : }
1997 :
1998 : // The picked compaction and the in-progress compaction c are
1999 : // outputting to the same region of the key space of the same
2000 : // level.
2001 2 : return true
2002 : }
2003 : }
2004 2 : return false
2005 : }
2006 :
2007 : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
2008 : func conflictsWithInProgress(
2009 : manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
2010 2 : ) bool {
2011 2 : for _, c := range inProgressCompactions {
2012 2 : if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
2013 2 : isUserKeysOverlapping(manual.start, manual.end, c.smallest.UserKey, c.largest.UserKey, cmp) {
2014 2 : return true
2015 2 : }
2016 2 : for _, in := range c.inputs {
2017 2 : if in.files.Empty() {
2018 2 : continue
2019 : }
2020 2 : iter := in.files.Iter()
2021 2 : smallest := iter.First().Smallest.UserKey
2022 2 : largest := iter.Last().Largest.UserKey
2023 2 : if (in.level == manual.level || in.level == outputLevel) &&
2024 2 : isUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
2025 2 : return true
2026 2 : }
2027 : }
2028 : }
2029 2 : return false
2030 : }
2031 :
2032 2 : func isUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
2033 2 : return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
2034 2 : }
|