LCOV - code coverage report
Current view: top level - pebble - compaction_picker.go (source / functions) Coverage Total Hit
Test: 2025-10-09 08:18Z 056b059d - tests + meta.lcov Lines: 92.3 % 1279 1180
Test Date: 2025-10-09 08:22:32 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "cmp"
      10              :         "fmt"
      11              :         "iter"
      12              :         "math"
      13              :         "slices"
      14              :         "sort"
      15              :         "strings"
      16              : 
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/humanize"
      20              :         "github.com/cockroachdb/pebble/internal/invariants"
      21              :         "github.com/cockroachdb/pebble/internal/manifest"
      22              :         "github.com/cockroachdb/pebble/internal/problemspans"
      23              : )
      24              : 
      25              : // The minimum count for an intra-L0 compaction. This matches the RocksDB
      26              : // heuristic.
      27              : const minIntraL0Count = 4
      28              : 
      29              : type compactionEnv struct {
      30              :         // diskAvailBytes holds a statistic on the number of bytes available on
      31              :         // disk, as reported by the filesystem. It's used to be more restrictive in
      32              :         // expanding compactions if available disk space is limited.
      33              :         //
      34              :         // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
      35              :         // and whenever a compaction or flush completes. Since file removal is the
      36              :         // primary means of reclaiming space, there is a rough bound on the
      37              :         // statistic's staleness when available bytes is growing. Compactions and
      38              :         // flushes are longer, slower operations and provide a much looser bound
      39              :         // when available bytes is decreasing.
      40              :         diskAvailBytes          uint64
      41              :         earliestUnflushedSeqNum base.SeqNum
      42              :         earliestSnapshotSeqNum  base.SeqNum
      43              :         inProgressCompactions   []compactionInfo
      44              :         readCompactionEnv       readCompactionEnv
      45              :         // problemSpans is checked by the compaction picker to avoid compactions that
      46              :         // overlap an active "problem span". It can be nil when there are no problem
      47              :         // spans.
      48              :         problemSpans *problemspans.ByLevel
      49              : }
      50              : 
      51              : type compactionPickerMetrics struct {
      52              :         levels [numLevels]struct {
      53              :                 score                 float64
      54              :                 fillFactor            float64
      55              :                 compensatedFillFactor float64
      56              :         }
      57              : }
      58              : 
      59              : type compactionPicker interface {
      60              :         getMetrics([]compactionInfo) compactionPickerMetrics
      61              :         getBaseLevel() int
      62              :         estimatedCompactionDebt() uint64
      63              :         pickHighPrioritySpaceCompaction(env compactionEnv) pickedCompaction
      64              :         pickAutoScore(env compactionEnv) (pc pickedCompaction)
      65              :         pickAutoNonScore(env compactionEnv) (pc pickedCompaction)
      66              :         forceBaseLevel1()
      67              : }
      68              : 
      69              : // A pickedCompaction describes a potential compaction that the compaction
      70              : // picker has selected, based on its heuristics. When a compaction begins to
      71              : // execute, it is converted into a compaction struct by ConstructCompaction.
      72              : type pickedCompaction interface {
      73              :         // ManualID returns the ID of the manual compaction, or 0 if the picked
      74              :         // compaction is not a result of a manual compaction.
      75              :         ManualID() uint64
      76              :         // ConstructCompaction creates a compaction from the picked compaction.
      77              :         ConstructCompaction(*DB, CompactionGrantHandle) compaction
      78              :         // WaitingCompaction returns a WaitingCompaction description of this
      79              :         // compaction for consumption by the compaction scheduler.
      80              :         WaitingCompaction() WaitingCompaction
      81              : }
      82              : 
      83              : // readCompactionEnv is used to hold data required to perform read compactions
      84              : type readCompactionEnv struct {
      85              :         rescheduleReadCompaction *bool
      86              :         readCompactions          *readCompactionQueue
      87              :         flushing                 bool
      88              : }
      89              : 
      90              : // Information about in-progress compactions provided to the compaction picker.
      91              : // These are used to constrain the new compactions that will be picked.
      92              : type compactionInfo struct {
      93              :         // versionEditApplied is true if this compaction's version edit has already
      94              :         // been committed. The compaction may still be in-progress deleting newly
      95              :         // obsolete files.
      96              :         versionEditApplied bool
      97              :         // kind indicates the kind of compaction.
      98              :         kind        compactionKind
      99              :         inputs      []compactionLevel
     100              :         outputLevel int
     101              :         // bounds may be nil if the compaction does not involve sstables
     102              :         // (specifically, a blob file rewrite).
     103              :         bounds *base.UserKeyBounds
     104              : }
     105              : 
     106            1 : func (info compactionInfo) String() string {
     107            1 :         var buf bytes.Buffer
     108            1 :         var largest int
     109            1 :         for i, in := range info.inputs {
     110            1 :                 if i > 0 {
     111            1 :                         fmt.Fprintf(&buf, " -> ")
     112            1 :                 }
     113            1 :                 fmt.Fprintf(&buf, "L%d", in.level)
     114            1 :                 for f := range in.files.All() {
     115            1 :                         fmt.Fprintf(&buf, " %s", f.TableNum)
     116            1 :                 }
     117            1 :                 if largest < in.level {
     118            1 :                         largest = in.level
     119            1 :                 }
     120              :         }
     121            1 :         if largest != info.outputLevel || len(info.inputs) == 1 {
     122            1 :                 fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
     123            1 :         }
     124            1 :         return buf.String()
     125              : }
     126              : 
     127              : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
     128              : // sublevel.
     129              : type sublevelInfo struct {
     130              :         manifest.LevelSlice
     131              :         sublevel manifest.Layer
     132              : }
     133              : 
     134            2 : func (cl sublevelInfo) Clone() sublevelInfo {
     135            2 :         return sublevelInfo{
     136            2 :                 sublevel:   cl.sublevel,
     137            2 :                 LevelSlice: cl.LevelSlice,
     138            2 :         }
     139            2 : }
     140            1 : func (cl sublevelInfo) String() string {
     141            1 :         return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
     142            1 : }
     143              : 
     144              : // generateSublevelInfo will generate the level slices for each of the sublevels
     145              : // from the level slice for all of L0.
     146            2 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
     147            2 :         sublevelMap := make(map[uint64][]*manifest.TableMetadata)
     148            2 :         for f := range levelFiles.All() {
     149            2 :                 sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
     150            2 :         }
     151              : 
     152            2 :         var sublevels []int
     153            2 :         for level := range sublevelMap {
     154            2 :                 sublevels = append(sublevels, int(level))
     155            2 :         }
     156            2 :         sort.Ints(sublevels)
     157            2 : 
     158            2 :         var levelSlices []sublevelInfo
     159            2 :         for _, sublevel := range sublevels {
     160            2 :                 metas := sublevelMap[uint64(sublevel)]
     161            2 :                 levelSlices = append(
     162            2 :                         levelSlices,
     163            2 :                         sublevelInfo{
     164            2 :                                 manifest.NewLevelSliceKeySorted(cmp, metas),
     165            2 :                                 manifest.L0Sublevel(sublevel),
     166            2 :                         },
     167            2 :                 )
     168            2 :         }
     169            2 :         return levelSlices
     170              : }
     171              : 
     172              : // pickedCompactionMetrics holds metrics related to the compaction picking process
     173              : type pickedCompactionMetrics struct {
     174              :         // scores contains candidateLevelInfo.scores.
     175              :         scores                      []float64
     176              :         singleLevelOverlappingRatio float64
     177              :         multiLevelOverlappingRatio  float64
     178              : }
     179              : 
     180              : // pickedTableCompaction contains information about a compaction of sstables
     181              : // that has already been chosen, and is being constructed. Compaction
     182              : // construction info lives in this struct, and is copied over into the
     183              : // compaction struct in constructCompaction.
     184              : type pickedTableCompaction struct {
     185              :         // score of the chosen compaction (candidateLevelInfo.score).
     186              :         score float64
     187              :         // kind indicates the kind of compaction.
     188              :         kind compactionKind
     189              :         // manualID > 0 iff this is a manual compaction. It exists solely for
     190              :         // internal bookkeeping.
     191              :         manualID uint64
     192              :         // startLevel is the level that is being compacted. Inputs from startLevel
     193              :         // and outputLevel will be merged to produce a set of outputLevel files.
     194              :         startLevel *compactionLevel
     195              :         // outputLevel is the level that files are being produced in. outputLevel is
     196              :         // equal to startLevel+1 except when:
     197              :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     198              :         //    - in multilevel compaction, the output level is the lowest level involved in
     199              :         //      the compaction
     200              :         outputLevel *compactionLevel
     201              :         // inputs contain levels involved in the compaction in ascending order
     202              :         inputs []compactionLevel
     203              :         // LBase at the time of compaction picking. Might be uninitialized for
     204              :         // intra-L0 compactions.
     205              :         baseLevel int
     206              :         // L0-specific compaction info. Set to a non-nil value for all compactions
     207              :         // where startLevel == 0 that were generated by L0Sublevels.
     208              :         lcf *manifest.L0CompactionFiles
     209              :         // maxOutputFileSize is the maximum size of an individual table created
     210              :         // during compaction.
     211              :         maxOutputFileSize uint64
     212              :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     213              :         // single output table with the tables in the grandparent level.
     214              :         maxOverlapBytes uint64
     215              :         // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
     216              :         // overlap in its output level with. If the overlap is greater than
     217              :         // maxReadCompaction bytes, then we don't proceed with the compaction.
     218              :         maxReadCompactionBytes uint64
     219              : 
     220              :         // The boundaries of the input data.
     221              :         bounds        base.UserKeyBounds
     222              :         version       *manifest.Version
     223              :         l0Organizer   *manifest.L0Organizer
     224              :         pickerMetrics pickedCompactionMetrics
     225              : }
     226              : 
     227              : // Assert that *pickedTableCompaction implements pickedCompaction.
     228              : var _ pickedCompaction = (*pickedTableCompaction)(nil)
     229              : 
     230              : // ManualID returns the ID of the manual compaction, or 0 if the picked
     231              : // compaction is not a result of a manual compaction.
     232            2 : func (pc *pickedTableCompaction) ManualID() uint64 { return pc.manualID }
     233              : 
     234              : // Kind returns the kind of compaction.
     235            0 : func (pc *pickedTableCompaction) Kind() compactionKind { return pc.kind }
     236              : 
     237              : // Score returns the score of the level at the time the compaction was picked.
     238            0 : func (pc *pickedTableCompaction) Score() float64 { return pc.score }
     239              : 
     240              : // ConstructCompaction creates a compaction struct from the
     241              : // pickedTableCompaction.
     242              : func (pc *pickedTableCompaction) ConstructCompaction(
     243              :         d *DB, grantHandle CompactionGrantHandle,
     244            2 : ) compaction {
     245            2 :         return newCompaction(
     246            2 :                 pc,
     247            2 :                 d.opts,
     248            2 :                 d.timeNow(),
     249            2 :                 d.ObjProvider(),
     250            2 :                 grantHandle,
     251            2 :                 d.shouldCreateShared(pc.outputLevel.level),
     252            2 :                 d.determineCompactionValueSeparation)
     253            2 : }
     254              : 
     255              : // WaitingCompaction returns a WaitingCompaction description of this compaction
     256              : // for consumption by the compaction scheduler.
     257            2 : func (pc *pickedTableCompaction) WaitingCompaction() WaitingCompaction {
     258            2 :         if pc.manualID > 0 {
     259            2 :                 return WaitingCompaction{Priority: manualCompactionPriority, Score: pc.score}
     260            2 :         }
     261            2 :         entry, ok := scheduledCompactionMap[pc.kind]
     262            2 :         if !ok {
     263            0 :                 panic(errors.AssertionFailedf("unexpected compactionKind %s", pc.kind))
     264              :         }
     265            2 :         return WaitingCompaction{
     266            2 :                 Optional: entry.optional,
     267            2 :                 Priority: entry.priority,
     268            2 :                 Score:    pc.score,
     269            2 :         }
     270              : }
     271              : 
     272            2 : func defaultOutputLevel(startLevel, baseLevel int) int {
     273            2 :         outputLevel := startLevel + 1
     274            2 :         if startLevel == 0 {
     275            2 :                 outputLevel = baseLevel
     276            2 :         }
     277            2 :         if outputLevel >= numLevels-1 {
     278            2 :                 outputLevel = numLevels - 1
     279            2 :         }
     280            2 :         return outputLevel
     281              : }
     282              : 
     283              : func newPickedTableCompaction(
     284              :         opts *Options,
     285              :         cur *manifest.Version,
     286              :         l0Organizer *manifest.L0Organizer,
     287              :         startLevel, outputLevel, baseLevel int,
     288            2 : ) *pickedTableCompaction {
     289            2 :         if outputLevel > 0 && baseLevel == 0 {
     290            0 :                 panic("base level cannot be 0")
     291              :         }
     292            2 :         if startLevel > 0 && startLevel < baseLevel {
     293            1 :                 panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
     294            1 :                         startLevel, baseLevel))
     295              :         }
     296              : 
     297            2 :         targetFileSize := opts.TargetFileSize(outputLevel, baseLevel)
     298            2 :         pc := &pickedTableCompaction{
     299            2 :                 version:                cur,
     300            2 :                 l0Organizer:            l0Organizer,
     301            2 :                 baseLevel:              baseLevel,
     302            2 :                 inputs:                 []compactionLevel{{level: startLevel}, {level: outputLevel}},
     303            2 :                 maxOutputFileSize:      uint64(targetFileSize),
     304            2 :                 maxOverlapBytes:        maxGrandparentOverlapBytes(targetFileSize),
     305            2 :                 maxReadCompactionBytes: maxReadCompactionBytes(targetFileSize),
     306            2 :         }
     307            2 :         pc.startLevel = &pc.inputs[0]
     308            2 :         pc.outputLevel = &pc.inputs[1]
     309            2 :         return pc
     310              : }
     311              : 
     312              : // adjustedOutputLevel is the output level used for the purpose of
     313              : // determining the target output file size, overlap bytes, and expanded
     314              : // bytes, taking into account the base level.
     315            1 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
     316            1 :         if outputLevel == 0 {
     317            1 :                 return 0
     318            1 :         }
     319            1 :         if baseLevel == 0 {
     320            0 :                 panic("base level cannot be 0")
     321              :         }
     322              :         // Output level is in the range [baseLevel, numLevels). For the purpose of
     323              :         // determining the target output file size, overlap bytes, and expanded
     324              :         // bytes, we want to adjust the range to [1, numLevels).
     325            1 :         return 1 + outputLevel - baseLevel
     326              : }
     327              : 
     328              : func newPickedCompactionFromL0(
     329              :         lcf *manifest.L0CompactionFiles,
     330              :         opts *Options,
     331              :         vers *manifest.Version,
     332              :         l0Organizer *manifest.L0Organizer,
     333              :         baseLevel int,
     334              :         isBase bool,
     335            2 : ) *pickedTableCompaction {
     336            2 :         outputLevel := baseLevel
     337            2 :         if !isBase {
     338            2 :                 outputLevel = 0 // Intra L0
     339            2 :         }
     340              : 
     341            2 :         pc := newPickedTableCompaction(opts, vers, l0Organizer, 0, outputLevel, baseLevel)
     342            2 :         pc.lcf = lcf
     343            2 : 
     344            2 :         // Manually build the compaction as opposed to calling
     345            2 :         // pickAutoHelper. This is because L0Sublevels has already added
     346            2 :         // any overlapping L0 SSTables that need to be added, and
     347            2 :         // because compactions built by L0SSTables do not necessarily
     348            2 :         // pick contiguous sequences of files in pc.version.Levels[0].
     349            2 :         pc.startLevel.files = manifest.NewLevelSliceSeqSorted(lcf.Files)
     350            2 :         return pc
     351              : }
     352              : 
     353            1 : func (pc *pickedTableCompaction) String() string {
     354            1 :         var builder strings.Builder
     355            1 :         builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
     356            1 :         builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
     357            1 :         builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
     358            1 :         builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
     359            1 :         builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
     360            1 :         builder.WriteString(fmt.Sprintf(`bounds=%s, `, pc.bounds))
     361            1 :         builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
     362            1 :         builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
     363            1 :         builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
     364            1 :         builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
     365            1 :         builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
     366            1 :         builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
     367            1 :         return builder.String()
     368            1 : }
     369              : 
     370              : // Clone creates a deep copy of the pickedCompaction
     371            2 : func (pc *pickedTableCompaction) clone() *pickedTableCompaction {
     372            2 : 
     373            2 :         // Quickly copy over fields that do not require special deep copy care, and
     374            2 :         // set all fields that will require a deep copy to nil.
     375            2 :         newPC := &pickedTableCompaction{
     376            2 :                 score:                  pc.score,
     377            2 :                 kind:                   pc.kind,
     378            2 :                 baseLevel:              pc.baseLevel,
     379            2 :                 maxOutputFileSize:      pc.maxOutputFileSize,
     380            2 :                 maxOverlapBytes:        pc.maxOverlapBytes,
     381            2 :                 maxReadCompactionBytes: pc.maxReadCompactionBytes,
     382            2 :                 bounds:                 pc.bounds.Clone(),
     383            2 : 
     384            2 :                 // TODO(msbutler): properly clone picker metrics
     385            2 :                 pickerMetrics: pc.pickerMetrics,
     386            2 : 
     387            2 :                 // Both copies see the same manifest, therefore, it's ok for them to share
     388            2 :                 // the same pc.version and pc.l0Organizer.
     389            2 :                 version:     pc.version,
     390            2 :                 l0Organizer: pc.l0Organizer,
     391            2 :         }
     392            2 : 
     393            2 :         newPC.inputs = make([]compactionLevel, len(pc.inputs))
     394            2 :         for i := range pc.inputs {
     395            2 :                 newPC.inputs[i] = pc.inputs[i].Clone()
     396            2 :                 if i == 0 {
     397            2 :                         newPC.startLevel = &newPC.inputs[i]
     398            2 :                 } else if i == len(pc.inputs)-1 {
     399            2 :                         newPC.outputLevel = &newPC.inputs[i]
     400            2 :                 }
     401              :         }
     402              : 
     403            2 :         if len(pc.startLevel.l0SublevelInfo) > 0 {
     404            2 :                 newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
     405            2 :                 for i := range pc.startLevel.l0SublevelInfo {
     406            2 :                         newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
     407            2 :                 }
     408              :         }
     409            2 :         if pc.lcf != nil {
     410            2 :                 newPC.lcf = pc.lcf.Clone()
     411            2 :         }
     412            2 :         return newPC
     413              : }
     414              : 
     415              : // setupInputs returns true if a compaction has been set up using the provided inputLevel and
     416              : // pc.outputLevel. It returns false if a concurrent compaction is occurring on the start or
     417              : // output level files. Note that inputLevel is not necessarily pc.startLevel. In multiLevel
     418              : // compactions, inputs are set by calling setupInputs once for each adjacent pair of levels.
     419              : // This will preserve level invariants when expanding the compaction. pc.bounds will be updated
     420              : // to reflect the key range of the inputs.
     421              : func (pc *pickedTableCompaction) setupInputs(
     422              :         opts *Options,
     423              :         diskAvailBytes uint64,
     424              :         inProgressCompactions []compactionInfo,
     425              :         inputLevel *compactionLevel,
     426              :         problemSpans *problemspans.ByLevel,
     427            2 : ) bool {
     428            2 :         cmp := opts.Comparer.Compare
     429            2 :         if !canCompactTables(inputLevel.files, inputLevel.level, problemSpans) {
     430            2 :                 return false
     431            2 :         }
     432            2 :         pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, inputLevel.files.All())
     433            2 : 
     434            2 :         // Setup output files and attempt to grow the inputLevel files with
     435            2 :         // the expanded key range. No need to do this for intra-L0 compactions;
     436            2 :         // outputLevel.files is left empty for those.
     437            2 :         if inputLevel.level != pc.outputLevel.level {
     438            2 :                 // Determine the sstables in the output level which overlap with the compaction
     439            2 :                 // key range.
     440            2 :                 pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.bounds)
     441            2 :                 if !canCompactTables(pc.outputLevel.files, pc.outputLevel.level, problemSpans) {
     442            2 :                         return false
     443            2 :                 }
     444            2 :                 pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, pc.outputLevel.files.All())
     445            2 : 
     446            2 :                 // maxExpandedBytes is the maximum size of an expanded compaction. If
     447            2 :                 // growing a compaction results in a larger size, the original compaction
     448            2 :                 // is used instead.
     449            2 :                 targetFileSize := opts.TargetFileSize(pc.outputLevel.level, pc.baseLevel)
     450            2 :                 maxExpandedBytes := expandedCompactionByteSizeLimit(opts, targetFileSize, diskAvailBytes)
     451            2 : 
     452            2 :                 // Grow the sstables in inputLevel.level as long as it doesn't affect the number
     453            2 :                 // of sstables included from pc.outputLevel.level.
     454            2 :                 if pc.lcf != nil && inputLevel.level == 0 {
     455            2 :                         pc.growL0ForBase(cmp, maxExpandedBytes)
     456            2 :                 } else if pc.grow(cmp, pc.bounds, maxExpandedBytes, inputLevel, problemSpans) {
     457            2 :                         // inputLevel was expanded, adjust key range if necessary.
     458            2 :                         pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, inputLevel.files.All())
     459            2 :                 }
     460              :         }
     461              : 
     462            2 :         if inputLevel.level == 0 {
     463            2 :                 // If L0 is involved, it should always be the startLevel of the compaction.
     464            2 :                 pc.startLevel.l0SublevelInfo = generateSublevelInfo(cmp, pc.startLevel.files)
     465            2 :         }
     466              : 
     467            2 :         return !outputKeyRangeAlreadyCompacting(cmp, inProgressCompactions, pc)
     468              : }
     469              : 
     470              : // grow grows the number of inputs at startLevel without changing the number of
     471              : // pc.outputLevel files in the compaction, and returns whether the inputs grew. sm
     472              : // and la are the smallest and largest InternalKeys in all of the inputs.
     473              : func (pc *pickedTableCompaction) grow(
     474              :         cmp base.Compare,
     475              :         bounds base.UserKeyBounds,
     476              :         maxExpandedBytes uint64,
     477              :         inputLevel *compactionLevel,
     478              :         problemSpans *problemspans.ByLevel,
     479            2 : ) bool {
     480            2 :         if pc.outputLevel.files.Empty() {
     481            2 :                 return false
     482            2 :         }
     483            2 :         expandedInputLevel := pc.version.Overlaps(inputLevel.level, bounds)
     484            2 :         if !canCompactTables(expandedInputLevel, inputLevel.level, problemSpans) {
     485            1 :                 return false
     486            1 :         }
     487            2 :         if expandedInputLevel.Len() <= inputLevel.files.Len() {
     488            2 :                 return false
     489            2 :         }
     490            2 :         if expandedInputLevel.AggregateSizeSum()+pc.outputLevel.files.AggregateSizeSum() >= maxExpandedBytes {
     491            2 :                 return false
     492            2 :         }
     493              :         // Check that expanding the input level does not change the number of overlapping files in output level.
     494              :         // We need to include the outputLevel iter because without it, in a multiLevel scenario,
     495              :         // expandedInputLevel's key range not fully cover all files currently in pc.outputLevel,
     496              :         // since pc.outputLevel was created using the entire key range which includes higher levels.
     497            2 :         expandedOutputLevel := pc.version.Overlaps(pc.outputLevel.level,
     498            2 :                 manifest.KeyRange(cmp, expandedInputLevel.All(), pc.outputLevel.files.All()))
     499            2 :         if expandedOutputLevel.Len() != pc.outputLevel.files.Len() {
     500            2 :                 return false
     501            2 :         }
     502            2 :         if !canCompactTables(expandedOutputLevel, pc.outputLevel.level, problemSpans) {
     503            0 :                 return false
     504            0 :         }
     505            2 :         inputLevel.files = expandedInputLevel
     506            2 :         return true
     507              : }
     508              : 
     509              : // Similar logic as pc.grow. Additional L0 files are optionally added to the
     510              : // compaction at this step. Note that the bounds passed in are not the bounds
     511              : // of the compaction, but rather the smallest and largest internal keys that
     512              : // the compaction cannot include from L0 without pulling in more Lbase
     513              : // files. Consider this example:
     514              : //
     515              : // L0:        c-d e+f g-h
     516              : // Lbase: a-b     e+f     i-j
     517              : //
     518              : //      a b c d e f g h i j
     519              : //
     520              : // The e-f files have already been chosen in the compaction. As pulling
     521              : // in more LBase files is undesirable, the logic below will pass in
     522              : // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
     523              : // will expand the compaction to include c-d and g-h from L0. The
     524              : // bounds passed in are exclusive; the compaction cannot be expanded
     525              : // to include files that "touch" it.
     526            2 : func (pc *pickedTableCompaction) growL0ForBase(cmp base.Compare, maxExpandedBytes uint64) bool {
     527            2 :         if invariants.Enabled {
     528            2 :                 if pc.startLevel.level != 0 {
     529            0 :                         panic(fmt.Sprintf("pc.startLevel.level is %d, expected 0", pc.startLevel.level))
     530              :                 }
     531              :         }
     532              : 
     533            2 :         if pc.outputLevel.files.Empty() {
     534            2 :                 // If there are no overlapping fields in the output level, we do not
     535            2 :                 // attempt to expand the compaction to encourage move compactions.
     536            2 :                 return false
     537            2 :         }
     538              : 
     539            2 :         smallestBaseKey := base.InvalidInternalKey
     540            2 :         largestBaseKey := base.InvalidInternalKey
     541            2 :         // NB: We use Reslice to access the underlying level's files, but
     542            2 :         // we discard the returned slice. The pc.outputLevel.files slice
     543            2 :         // is not modified.
     544            2 :         _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
     545            2 :                 if sm := start.Prev(); sm != nil {
     546            2 :                         smallestBaseKey = sm.Largest()
     547            2 :                 }
     548            2 :                 if la := end.Next(); la != nil {
     549            2 :                         largestBaseKey = la.Smallest()
     550            2 :                 }
     551              :         })
     552            2 :         oldLcf := pc.lcf.Clone()
     553            2 :         if !pc.l0Organizer.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
     554            2 :                 return false
     555            2 :         }
     556              : 
     557            2 :         var newStartLevelFiles []*manifest.TableMetadata
     558            2 :         iter := pc.version.Levels[0].Iter()
     559            2 :         var sizeSum uint64
     560            2 :         for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
     561            2 :                 if pc.lcf.FilesIncluded[f.L0Index] {
     562            2 :                         newStartLevelFiles = append(newStartLevelFiles, f)
     563            2 :                         sizeSum += f.Size
     564            2 :                 }
     565              :         }
     566              : 
     567            2 :         if sizeSum+pc.outputLevel.files.AggregateSizeSum() >= maxExpandedBytes {
     568            1 :                 *pc.lcf = *oldLcf
     569            1 :                 return false
     570            1 :         }
     571              : 
     572            2 :         pc.startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
     573            2 :         pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds,
     574            2 :                 pc.startLevel.files.All(), pc.outputLevel.files.All())
     575            2 :         return true
     576              : }
     577              : 
     578              : // estimatedInputSize returns an estimate of the size of the compaction's
     579              : // inputs, including the estimated physical size of input tables' blob
     580              : // references.
     581            2 : func (pc *pickedTableCompaction) estimatedInputSize() uint64 {
     582            2 :         var bytesToCompact uint64
     583            2 :         for i := range pc.inputs {
     584            2 :                 bytesToCompact += pc.inputs[i].files.AggregateSizeSum()
     585            2 :         }
     586            2 :         return bytesToCompact
     587              : }
     588              : 
     589              : // setupMultiLevelCandidate returns true if it successfully added another level
     590              : // to the compaction.
     591              : // Note that adding a new level will never change the startLevel inputs, but we
     592              : // will attempt to expand the inputs of the intermediate level to the output key range,
     593              : // if size constraints allow it.
     594              : // For example, consider the following LSM structure, with the initial compaction
     595              : // from L1->L2:
     596              : // startLevel: L1 [a-b]
     597              : // outputLevel: L2 [a-c]
     598              : // L1:  |a-b  | d--e
     599              : // L2:  |a---c| d----f
     600              : // L3:   a---------e
     601              : //
     602              : // When adding L3, we'll expand L2 to include d-f via a call to setupInputs with
     603              : // startLevel=L2. L1 will not be expanded.
     604              : // startLevel:        L1 [a-b]
     605              : // intermediateLevel: L2 [a-c, d-f]
     606              : // outputLevel:       L3 [a-e]
     607              : // L1:  |a-b  |   d--e
     608              : // L2:  |a---c  d----f|
     609              : // L3:  |a---------e  |
     610            2 : func (pc *pickedTableCompaction) setupMultiLevelCandidate(opts *Options, env compactionEnv) bool {
     611            2 :         pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
     612            2 : 
     613            2 :         // Recalibrate startLevel and outputLevel:
     614            2 :         //  - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
     615            2 :         //  - push outputLevel to extraLevels and move the new level to outputLevel
     616            2 :         pc.startLevel = &pc.inputs[0]
     617            2 :         pc.outputLevel = &pc.inputs[2]
     618            2 :         return pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, &pc.inputs[1], nil /* TODO(radu) */)
     619            2 : }
     620              : 
     621              : // canCompactTables returns true if the tables in the level slice are not
     622              : // compacting already and don't intersect any problem spans.
     623              : func canCompactTables(
     624              :         inputs manifest.LevelSlice, level int, problemSpans *problemspans.ByLevel,
     625            2 : ) bool {
     626            2 :         for f := range inputs.All() {
     627            2 :                 if f.IsCompacting() {
     628            2 :                         return false
     629            2 :                 }
     630            2 :                 if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
     631            1 :                         return false
     632            1 :                 }
     633              :         }
     634            2 :         return true
     635              : }
     636              : 
     637              : // newCompactionPickerByScore creates a compactionPickerByScore associated with
     638              : // the newest version. The picker is used under logLock (until a new version is
     639              : // installed).
     640              : func newCompactionPickerByScore(
     641              :         v *manifest.Version,
     642              :         lvs *latestVersionState,
     643              :         opts *Options,
     644              :         inProgressCompactions []compactionInfo,
     645            2 : ) *compactionPickerByScore {
     646            2 :         p := &compactionPickerByScore{
     647            2 :                 opts:               opts,
     648            2 :                 vers:               v,
     649            2 :                 latestVersionState: lvs,
     650            2 :         }
     651            2 :         p.initLevelMaxBytes(inProgressCompactions)
     652            2 :         return p
     653            2 : }
     654              : 
     655              : // Information about a candidate compaction level that has been identified by
     656              : // the compaction picker.
     657              : type candidateLevelInfo struct {
     658              :         // The fill factor of the level, calculated using uncompensated file sizes and
     659              :         // without any adjustments. A factor > 1 means that the level has more data
     660              :         // than the ideal size for that level.
     661              :         //
     662              :         // For L0, the fill factor is calculated based on the number of sublevels
     663              :         // (see calculateL0FillFactor).
     664              :         //
     665              :         // For L1+, the fill factor is the ratio between the total uncompensated file
     666              :         // size and the ideal size of the level (based on the total size of the DB).
     667              :         fillFactor float64
     668              : 
     669              :         // The score of the level, used to rank levels.
     670              :         //
     671              :         // If the level doesn't require compaction, the score is 0. Otherwise:
     672              :         //  - for L6 the score is equal to the fillFactor;
     673              :         //  - for L0-L5:
     674              :         //    - if the fillFactor is < 1: the score is equal to the fillFactor;
     675              :         //    - if the fillFactor is >= 1: the score is the ratio between the
     676              :         //                                 fillFactor and the next level's fillFactor.
     677              :         score float64
     678              : 
     679              :         // The fill factor of the level after accounting for level size compensation.
     680              :         //
     681              :         // For L0, the compensatedFillFactor is equal to the fillFactor as we don't
     682              :         // account for level size compensation in L0.
     683              :         //
     684              :         // For l1+, the compensatedFillFactor takes into account the estimated
     685              :         // savings in the lower levels because of deletions.
     686              :         //
     687              :         // The compensated fill factor is used to determine if the level should be
     688              :         // compacted (see calculateLevelScores).
     689              :         compensatedFillFactor float64
     690              : 
     691              :         level int
     692              :         // The level to compact to.
     693              :         outputLevel int
     694              :         // The file in level that will be compacted. Additional files may be
     695              :         // picked by the compaction, and a pickedCompaction created for the
     696              :         // compaction.
     697              :         file manifest.LevelFile
     698              : }
     699              : 
     700            2 : func (c *candidateLevelInfo) shouldCompact() bool {
     701            2 :         return c.score > 0
     702            2 : }
     703              : 
     704            2 : func tableTombstoneCompensation(t *manifest.TableMetadata) uint64 {
     705            2 :         if stats, ok := t.Stats(); ok {
     706            2 :                 return stats.PointDeletionsBytesEstimate + stats.RangeDeletionsBytesEstimate
     707            2 :         }
     708            2 :         return 0
     709              : }
     710              : 
     711              : // tableCompensatedSize returns t's size, including an estimate of the physical
     712              : // size of its external references, and inflated according to compaction
     713              : // priorities.
     714            2 : func tableCompensatedSize(t *manifest.TableMetadata) uint64 {
     715            2 :         // Add in the estimate of disk space that may be reclaimed by compacting the
     716            2 :         // table's tombstones.
     717            2 :         return t.Size + t.EstimatedReferenceSize() + tableTombstoneCompensation(t)
     718            2 : }
     719              : 
     720              : // totalCompensatedSize computes the compensated size over a table metadata
     721              : // iterator. Note that this function is linear in the files available to the
     722              : // iterator. Use the compensatedSizeAnnotator if querying the total
     723              : // compensated size of a level.
     724            2 : func totalCompensatedSize(iter iter.Seq[*manifest.TableMetadata]) uint64 {
     725            2 :         var sz uint64
     726            2 :         for f := range iter {
     727            2 :                 sz += tableCompensatedSize(f)
     728            2 :         }
     729            2 :         return sz
     730              : }
     731              : 
     732              : // compactionPickerByScore holds the state and logic for picking a compaction. A
     733              : // compaction picker is associated with a single version. A new compaction
     734              : // picker is created and initialized every time a new version is installed.
     735              : type compactionPickerByScore struct {
     736              :         opts *Options
     737              :         vers *manifest.Version
     738              :         // Unlike vers, which is immutable and the latest version when this picker
     739              :         // is created, latestVersionState represents the mutable state of the latest
     740              :         // version. This means that at some point in the future a
     741              :         // compactionPickerByScore created in the past will have mutually
     742              :         // inconsistent state in vers and latestVersionState. This is not a problem
     743              :         // since (a) a new picker is created in UpdateVersionLocked when a new
     744              :         // version is installed, and (b) only the latest picker is used for picking
     745              :         // compactions. This is ensured by holding versionSet.logLock for both (a)
     746              :         // and (b).
     747              :         latestVersionState *latestVersionState
     748              :         // The level to target for L0 compactions. Levels L1 to baseLevel must be
     749              :         // empty.
     750              :         baseLevel int
     751              :         // levelMaxBytes holds the dynamically adjusted max bytes setting for each
     752              :         // level.
     753              :         levelMaxBytes [numLevels]int64
     754              :         dbSizeBytes   uint64
     755              : }
     756              : 
     757              : var _ compactionPicker = &compactionPickerByScore{}
     758              : 
     759            1 : func (p *compactionPickerByScore) getMetrics(inProgress []compactionInfo) compactionPickerMetrics {
     760            1 :         var m compactionPickerMetrics
     761            1 :         for _, info := range p.calculateLevelScores(inProgress) {
     762            1 :                 m.levels[info.level].score = info.score
     763            1 :                 m.levels[info.level].fillFactor = info.fillFactor
     764            1 :                 m.levels[info.level].compensatedFillFactor = info.compensatedFillFactor
     765            1 :         }
     766            1 :         return m
     767              : }
     768              : 
     769            2 : func (p *compactionPickerByScore) getBaseLevel() int {
     770            2 :         if p == nil {
     771            0 :                 return 1
     772            0 :         }
     773            2 :         return p.baseLevel
     774              : }
     775              : 
     776              : // estimatedCompactionDebt estimates the number of bytes which need to be
     777              : // compacted before the LSM tree becomes stable.
     778            2 : func (p *compactionPickerByScore) estimatedCompactionDebt() uint64 {
     779            2 :         if p == nil {
     780            0 :                 return 0
     781            0 :         }
     782              : 
     783              :         // We assume that all the bytes in L0 need to be compacted to Lbase. This is
     784              :         // unlike the RocksDB logic that figures out whether L0 needs compaction.
     785            2 :         bytesAddedToNextLevel := p.vers.Levels[0].AggregateSize()
     786            2 :         lbaseSize := p.vers.Levels[p.baseLevel].AggregateSize()
     787            2 : 
     788            2 :         var compactionDebt uint64
     789            2 :         if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
     790            2 :                 // We only incur compaction debt if both L0 and Lbase contain data. If L0
     791            2 :                 // is empty, no compaction is necessary. If Lbase is empty, a move-based
     792            2 :                 // compaction from L0 would occur.
     793            2 :                 compactionDebt += bytesAddedToNextLevel + lbaseSize
     794            2 :         }
     795              : 
     796              :         // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
     797              :         // bytes added to `level` in the loop.
     798            2 :         for level := p.baseLevel; level < numLevels-1; level++ {
     799            2 :                 levelSize := p.vers.Levels[level].AggregateSize() + bytesAddedToNextLevel
     800            2 :                 nextLevelSize := p.vers.Levels[level+1].AggregateSize()
     801            2 :                 if levelSize > uint64(p.levelMaxBytes[level]) {
     802            2 :                         bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
     803            2 :                         if nextLevelSize > 0 {
     804            2 :                                 // We only incur compaction debt if the next level contains data. If the
     805            2 :                                 // next level is empty, a move-based compaction would be used.
     806            2 :                                 levelRatio := float64(nextLevelSize) / float64(levelSize)
     807            2 :                                 // The current level contributes bytesAddedToNextLevel to compactions.
     808            2 :                                 // The next level contributes levelRatio * bytesAddedToNextLevel.
     809            2 :                                 compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
     810            2 :                         }
     811            2 :                 } else {
     812            2 :                         // We're not moving any bytes to the next level.
     813            2 :                         bytesAddedToNextLevel = 0
     814            2 :                 }
     815              :         }
     816            2 :         return compactionDebt
     817              : }
     818              : 
     819            2 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
     820            2 :         // The levelMaxBytes calculations here differ from RocksDB in two ways:
     821            2 :         //
     822            2 :         // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
     823            2 :         //    level in L1-L6, rather than determining the size of the bottom level
     824            2 :         //    based on the total amount of data in the dB. The RocksDB calculation is
     825            2 :         //    problematic if L0 contains a significant fraction of data, or if the
     826            2 :         //    level sizes are roughly equal and thus there is a significant fraction
     827            2 :         //    of data outside of the largest level.
     828            2 :         //
     829            2 :         // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
     830            2 :         //    baseBytesMax as the maximum of the configured LBaseMaxBytes and the
     831            2 :         //    size of L0. This is problematic because baseBytesMax is used to compute
     832            2 :         //    the max size of lower levels. A very large baseBytesMax will result in
     833            2 :         //    an overly large value for the size of lower levels which will caused
     834            2 :         //    those levels not to be compacted even when they should be
     835            2 :         //    compacted. This often results in "inverted" LSM shapes where Ln is
     836            2 :         //    larger than Ln+1.
     837            2 : 
     838            2 :         // Determine the first non-empty level and the total DB size.
     839            2 :         firstNonEmptyLevel := -1
     840            2 :         var dbSize uint64
     841            2 :         for level := 1; level < numLevels; level++ {
     842            2 :                 if p.vers.Levels[level].AggregateSize() > 0 {
     843            2 :                         if firstNonEmptyLevel == -1 {
     844            2 :                                 firstNonEmptyLevel = level
     845            2 :                         }
     846            2 :                         dbSize += p.vers.Levels[level].AggregateSize()
     847              :                 }
     848              :         }
     849            2 :         for _, c := range inProgressCompactions {
     850            2 :                 if c.outputLevel == 0 || c.outputLevel == -1 {
     851            2 :                         continue
     852              :                 }
     853            2 :                 if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
     854            2 :                         firstNonEmptyLevel = c.outputLevel
     855            2 :                 }
     856              :         }
     857              : 
     858              :         // Initialize the max-bytes setting for each level to "infinity" which will
     859              :         // disallow compaction for that level. We'll fill in the actual value below
     860              :         // for levels we want to allow compactions from.
     861            2 :         for level := 0; level < numLevels; level++ {
     862            2 :                 p.levelMaxBytes[level] = math.MaxInt64
     863            2 :         }
     864              : 
     865            2 :         dbSizeBelowL0 := dbSize
     866            2 :         dbSize += p.vers.Levels[0].AggregateSize()
     867            2 :         p.dbSizeBytes = dbSize
     868            2 :         if dbSizeBelowL0 == 0 {
     869            2 :                 // No levels for L1 and up contain any data. Target L0 compactions for the
     870            2 :                 // last level or to the level to which there is an ongoing L0 compaction.
     871            2 :                 p.baseLevel = numLevels - 1
     872            2 :                 if firstNonEmptyLevel >= 0 {
     873            2 :                         p.baseLevel = firstNonEmptyLevel
     874            2 :                 }
     875            2 :                 return
     876              :         }
     877              : 
     878            2 :         bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
     879            2 : 
     880            2 :         curLevelSize := bottomLevelSize
     881            2 :         for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
     882            2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     883            2 :         }
     884              : 
     885              :         // Compute base level (where L0 data is compacted to).
     886            2 :         baseBytesMax := uint64(p.opts.LBaseMaxBytes)
     887            2 :         p.baseLevel = firstNonEmptyLevel
     888            2 :         for p.baseLevel > 1 && curLevelSize > baseBytesMax {
     889            2 :                 p.baseLevel--
     890            2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     891            2 :         }
     892              : 
     893            2 :         smoothedLevelMultiplier := 1.0
     894            2 :         if p.baseLevel < numLevels-1 {
     895            2 :                 smoothedLevelMultiplier = math.Pow(
     896            2 :                         float64(bottomLevelSize)/float64(baseBytesMax),
     897            2 :                         1.0/float64(numLevels-p.baseLevel-1))
     898            2 :         }
     899              : 
     900            2 :         levelSize := float64(baseBytesMax)
     901            2 :         for level := p.baseLevel; level < numLevels; level++ {
     902            2 :                 if level > p.baseLevel && levelSize > 0 {
     903            2 :                         levelSize *= smoothedLevelMultiplier
     904            2 :                 }
     905              :                 // Round the result since test cases use small target level sizes, which
     906              :                 // can be impacted by floating-point imprecision + integer truncation.
     907            2 :                 roundedLevelSize := math.Round(levelSize)
     908            2 :                 if roundedLevelSize > float64(math.MaxInt64) {
     909            0 :                         p.levelMaxBytes[level] = math.MaxInt64
     910            2 :                 } else {
     911            2 :                         p.levelMaxBytes[level] = int64(roundedLevelSize)
     912            2 :                 }
     913              :         }
     914              : }
     915              : 
     916              : type levelSizeAdjust struct {
     917              :         incomingActualBytes      uint64
     918              :         outgoingActualBytes      uint64
     919              :         outgoingCompensatedBytes uint64
     920              : }
     921              : 
     922            2 : func (a levelSizeAdjust) compensated() uint64 {
     923            2 :         return a.incomingActualBytes - a.outgoingCompensatedBytes
     924            2 : }
     925              : 
     926            2 : func (a levelSizeAdjust) actual() uint64 {
     927            2 :         return a.incomingActualBytes - a.outgoingActualBytes
     928            2 : }
     929              : 
     930            2 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
     931            2 :         // Compute size adjustments for each level based on the in-progress
     932            2 :         // compactions. We sum the file sizes of all files leaving and entering each
     933            2 :         // level in in-progress compactions. For outgoing files, we also sum a
     934            2 :         // separate sum of 'compensated file sizes', which are inflated according
     935            2 :         // to deletion estimates.
     936            2 :         //
     937            2 :         // When we adjust a level's size according to these values during score
     938            2 :         // calculation, we subtract the compensated size of start level inputs to
     939            2 :         // account for the fact that score calculation uses compensated sizes.
     940            2 :         //
     941            2 :         // Since compensated file sizes may be compensated because they reclaim
     942            2 :         // space from the output level's files, we only add the real file size to
     943            2 :         // the output level.
     944            2 :         //
     945            2 :         // This is slightly different from RocksDB's behavior, which simply elides
     946            2 :         // compacting files from the level size calculation.
     947            2 :         var sizeAdjust [numLevels]levelSizeAdjust
     948            2 :         for i := range inProgressCompactions {
     949            2 :                 c := &inProgressCompactions[i]
     950            2 :                 // If this compaction's version edit has already been applied, there's
     951            2 :                 // no need to adjust: The LSM we'll examine will already reflect the
     952            2 :                 // new LSM state.
     953            2 :                 if c.versionEditApplied {
     954            2 :                         continue
     955              :                 }
     956              : 
     957            2 :                 for _, input := range c.inputs {
     958            2 :                         actualSize := input.files.AggregateSizeSum()
     959            2 :                         compensatedSize := totalCompensatedSize(input.files.All())
     960            2 : 
     961            2 :                         if input.level != c.outputLevel {
     962            2 :                                 sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
     963            2 :                                 sizeAdjust[input.level].outgoingActualBytes += actualSize
     964            2 :                                 if c.outputLevel != -1 {
     965            2 :                                         sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
     966            2 :                                 }
     967              :                         }
     968              :                 }
     969              :         }
     970            2 :         return sizeAdjust
     971              : }
     972              : 
     973              : // calculateLevelScores calculates the candidateLevelInfo for all levels and
     974              : // returns them in decreasing score order.
     975              : func (p *compactionPickerByScore) calculateLevelScores(
     976              :         inProgressCompactions []compactionInfo,
     977            2 : ) [numLevels]candidateLevelInfo {
     978            2 :         var scores [numLevels]candidateLevelInfo
     979            2 :         for i := range scores {
     980            2 :                 scores[i].level = i
     981            2 :                 scores[i].outputLevel = i + 1
     982            2 :         }
     983            2 :         l0FillFactor := calculateL0FillFactor(p.vers, p.latestVersionState.l0Organizer, p.opts, inProgressCompactions)
     984            2 :         scores[0] = candidateLevelInfo{
     985            2 :                 outputLevel:           p.baseLevel,
     986            2 :                 fillFactor:            l0FillFactor,
     987            2 :                 compensatedFillFactor: l0FillFactor, // No compensation for L0.
     988            2 :         }
     989            2 :         sizeAdjust := calculateSizeAdjust(inProgressCompactions)
     990            2 :         for level := 1; level < numLevels; level++ {
     991            2 :                 // Actual file size.
     992            2 :                 compensatedLevelSize := p.vers.Levels[level].AggregateSize()
     993            2 :                 // Deletions.
     994            2 :                 delBytes := deletionBytesAnnotator.LevelAnnotation(p.vers.Levels[level])
     995            2 :                 compensatedLevelSize += delBytes.PointDels + delBytes.RangeDels
     996            2 :                 // Adjustments for in-progress compactions.
     997            2 :                 compensatedLevelSize += sizeAdjust[level].compensated()
     998            2 :                 scores[level].compensatedFillFactor = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
     999            2 :                 scores[level].fillFactor = float64(p.vers.Levels[level].AggregateSize()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
    1000            2 :         }
    1001              : 
    1002              :         // Adjust each level's fill factor by the fill factor of the next level to get
    1003              :         // an (uncompensated) score; and each level's compensated fill factor by the
    1004              :         // fill factor of the next level to get a compensated score.
    1005              :         //
    1006              :         // The compensated score is used to determine if the level should be compacted
    1007              :         // at all. The (uncompensated) score is used as the value used to rank levels.
    1008              :         //
    1009              :         // If the next level has a high fill factor, and is thus a priority for
    1010              :         // compaction, this reduces the priority for compacting the current level. If
    1011              :         // the next level has a low fill factor (i.e. it is below its target size),
    1012              :         // this increases the priority for compacting the current level.
    1013              :         //
    1014              :         // The effect of this adjustment is to help prioritize compactions in lower
    1015              :         // levels. The following example shows the scores and the fill factors. In this
    1016              :         // scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase) is significantly above its
    1017              :         // target size. The original score prioritizes compactions from those two
    1018              :         // levels, but doing so ends up causing a future problem: data piles up in the
    1019              :         // higher levels, starving L5->L6 compactions, and to a lesser degree starving
    1020              :         // L4->L5 compactions.
    1021              :         //
    1022              :         // Note that in the example shown there is no level size compensation so the
    1023              :         // compensatedFillFactor and fillFactor are the same for each level.
    1024              :         //
    1025              :         //        score   fillFactor   compensatedFillFactor   size   max-size
    1026              :         //   L0     3.2         68.0                    68.0  2.2 G          -
    1027              :         //   L3     3.2         21.1                    21.1  1.3 G       64 M
    1028              :         //   L4     3.4          6.7                     6.7  3.1 G      467 M
    1029              :         //   L5     3.4          2.0                     2.0  6.6 G      3.3 G
    1030              :         //   L6       0          0.6                     0.6   14 G       24 G
    1031              :         //
    1032              :         // TODO(radu): the way compensation works needs some rethinking. For example,
    1033              :         // if compacting L5 can free up a lot of space in L6, the score of L5 should
    1034              :         // go *up* with the fill factor of L6, not the other way around.
    1035            2 :         for level := 0; level < numLevels; level++ {
    1036            2 :                 if level > 0 && level < p.baseLevel {
    1037            2 :                         continue
    1038              :                 }
    1039            2 :                 const compensatedFillFactorThreshold = 1.0
    1040            2 :                 if scores[level].compensatedFillFactor < compensatedFillFactorThreshold {
    1041            2 :                         // No need to compact this level; score stays 0.
    1042            2 :                         continue
    1043              :                 }
    1044            2 :                 score := scores[level].fillFactor
    1045            2 :                 compensatedScore := scores[level].compensatedFillFactor
    1046            2 :                 if level < numLevels-1 {
    1047            2 :                         nextLevel := scores[level].outputLevel
    1048            2 :                         // Avoid absurdly large scores by placing a floor on the factor that we'll
    1049            2 :                         // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
    1050            2 :                         denominator := max(0.01, scores[nextLevel].fillFactor)
    1051            2 :                         score /= denominator
    1052            2 :                         compensatedScore /= denominator
    1053            2 :                 }
    1054              : 
    1055              :                 // We calculated a compensated score above by dividing the
    1056              :                 // compensatedFillFactor by the next level's fill factor. Previous
    1057              :                 // versions of Pebble had a default behavior of only considering levels
    1058              :                 // with a compensatedScore >= 1.0 eligible for compaction. This wasn't a
    1059              :                 // principled decision and has been experimentally observed to limit
    1060              :                 // productive compactions that would reclaim disk space, but were
    1061              :                 // prohibited because the output level's fill factor was > 1.0.
    1062              :                 //
    1063              :                 // We allow the use of old behavior through the
    1064              :                 // UseDeprecatedCompensatedScore option, which if true, only considers
    1065              :                 // the level eligible for compaction iff both compensatedFillFactor and
    1066              :                 // compensatedScore are >= 1.0.
    1067              :                 //
    1068              :                 // Otherwise, only L0 requires the compensatedScore to be >= 1.0; all
    1069              :                 // other levels only require the compensatedFillFactor to be >= 1.0. L0
    1070              :                 // is treated exceptionally out of concern that a large, LBase that's
    1071              :                 // being compacted into the next level may prevent L0->Lbase compactions
    1072              :                 // and attempting to pick L0 compactions may result in intra-L0
    1073              :                 // compactions.
    1074            2 :                 const compensatedScoreThreshold = 1.0
    1075            2 :                 if (level == 0 || p.opts.Experimental.UseDeprecatedCompensatedScore()) &&
    1076            2 :                         compensatedScore < compensatedScoreThreshold {
    1077            2 :                         // No need to compact this level; score stays 0.
    1078            2 :                         continue
    1079              :                 }
    1080            2 :                 scores[level].score = score
    1081              :         }
    1082              :         // Sort by score (decreasing) and break ties by level (increasing).
    1083            2 :         slices.SortFunc(scores[:], func(a, b candidateLevelInfo) int {
    1084            2 :                 if a.score != b.score {
    1085            2 :                         return cmp.Compare(b.score, a.score)
    1086            2 :                 }
    1087            2 :                 return cmp.Compare(a.level, b.level)
    1088              :         })
    1089            2 :         return scores
    1090              : }
    1091              : 
    1092              : // calculateL0FillFactor calculates a float value representing the relative
    1093              : // priority of compacting L0. A value less than 1 indicates that L0 does not
    1094              : // need any compactions.
    1095              : //
    1096              : // L0 is special in that files within L0 may overlap one another, so a different
    1097              : // set of heuristics that take into account read amplification apply.
    1098              : func calculateL0FillFactor(
    1099              :         vers *manifest.Version,
    1100              :         l0Organizer *manifest.L0Organizer,
    1101              :         opts *Options,
    1102              :         inProgressCompactions []compactionInfo,
    1103            2 : ) float64 {
    1104            2 :         // Use the sublevel count to calculate the score. The base vs intra-L0
    1105            2 :         // compaction determination happens in pickAuto, not here.
    1106            2 :         score := float64(2*l0Organizer.MaxDepthAfterOngoingCompactions()) /
    1107            2 :                 float64(opts.L0CompactionThreshold)
    1108            2 : 
    1109            2 :         // Also calculate a score based on the file count but use it only if it
    1110            2 :         // produces a higher score than the sublevel-based one. This heuristic is
    1111            2 :         // designed to accommodate cases where L0 is accumulating non-overlapping
    1112            2 :         // files in L0. Letting too many non-overlapping files accumulate in few
    1113            2 :         // sublevels is undesirable, because:
    1114            2 :         // 1) we can produce a massive backlog to compact once files do overlap.
    1115            2 :         // 2) constructing L0 sublevels has a runtime that grows superlinearly with
    1116            2 :         //    the number of files in L0 and must be done while holding D.mu.
    1117            2 :         noncompactingFiles := vers.Levels[0].Len()
    1118            2 :         for _, c := range inProgressCompactions {
    1119            2 :                 for _, cl := range c.inputs {
    1120            2 :                         if cl.level == 0 {
    1121            2 :                                 noncompactingFiles -= cl.files.Len()
    1122            2 :                         }
    1123              :                 }
    1124              :         }
    1125            2 :         fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
    1126            2 :         if score < fileScore {
    1127            2 :                 score = fileScore
    1128            2 :         }
    1129            2 :         return score
    1130              : }
    1131              : 
    1132              : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
    1133              : // compaction around. Currently, this function implements a heuristic similar to
    1134              : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
    1135              : // function is linear with respect to the number of files in `level` and
    1136              : // `outputLevel`.
    1137              : func pickCompactionSeedFile(
    1138              :         vers *manifest.Version,
    1139              :         virtualBackings *manifest.VirtualBackings,
    1140              :         opts *Options,
    1141              :         level, outputLevel int,
    1142              :         earliestSnapshotSeqNum base.SeqNum,
    1143              :         problemSpans *problemspans.ByLevel,
    1144            2 : ) (manifest.LevelFile, bool) {
    1145            2 :         // Select the file within the level to compact. We want to minimize write
    1146            2 :         // amplification, but also ensure that (a) deletes are propagated to the
    1147            2 :         // bottom level in a timely fashion, and (b) virtual sstables that are
    1148            2 :         // pinning backing sstables where most of the data is garbage are compacted
    1149            2 :         // away. Doing (a) and (b) reclaims disk space. A table's smallest sequence
    1150            2 :         // number provides a measure of its age. The ratio of overlapping-bytes /
    1151            2 :         // table-size gives an indication of write amplification (a smaller ratio is
    1152            2 :         // preferrable).
    1153            2 :         //
    1154            2 :         // The current heuristic is based off the RocksDB kMinOverlappingRatio
    1155            2 :         // heuristic. It chooses the file with the minimum overlapping ratio with
    1156            2 :         // the target level, which minimizes write amplification.
    1157            2 :         //
    1158            2 :         // The heuristic uses a "compensated size" for the denominator, which is the
    1159            2 :         // file size inflated by (a) an estimate of the space that may be reclaimed
    1160            2 :         // through compaction, and (b) a fraction of the amount of garbage in the
    1161            2 :         // backing sstable pinned by this (virtual) sstable.
    1162            2 :         //
    1163            2 :         // TODO(peter): For concurrent compactions, we may want to try harder to
    1164            2 :         // pick a seed file whose resulting compaction bounds do not overlap with
    1165            2 :         // an in-progress compaction.
    1166            2 : 
    1167            2 :         cmp := opts.Comparer.Compare
    1168            2 :         startIter := vers.Levels[level].Iter()
    1169            2 :         outputIter := vers.Levels[outputLevel].Iter()
    1170            2 : 
    1171            2 :         var file manifest.LevelFile
    1172            2 :         smallestRatio := uint64(math.MaxUint64)
    1173            2 : 
    1174            2 :         outputFile := outputIter.First()
    1175            2 : 
    1176            2 :         for f := startIter.First(); f != nil; f = startIter.Next() {
    1177            2 :                 var overlappingBytes uint64
    1178            2 :                 if f.IsCompacting() {
    1179            2 :                         // Move on if this file is already being compacted. We'll likely
    1180            2 :                         // still need to move past the overlapping output files regardless,
    1181            2 :                         // but in cases where all start-level files are compacting we won't.
    1182            2 :                         continue
    1183              :                 }
    1184            2 :                 if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
    1185            1 :                         // File touches problem span which temporarily disallows auto compactions.
    1186            1 :                         continue
    1187              :                 }
    1188              : 
    1189              :                 // Trim any output-level files smaller than f.
    1190            2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest(), f.Smallest()) < 0 {
    1191            2 :                         outputFile = outputIter.Next()
    1192            2 :                 }
    1193              : 
    1194            2 :                 skip := false
    1195            2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest(), f.Largest()) <= 0 {
    1196            2 :                         overlappingBytes += outputFile.Size
    1197            2 :                         if outputFile.IsCompacting() {
    1198            2 :                                 // If one of the overlapping files is compacting, we're not going to be
    1199            2 :                                 // able to compact f anyway, so skip it.
    1200            2 :                                 skip = true
    1201            2 :                                 break
    1202              :                         }
    1203            2 :                         if problemSpans != nil && problemSpans.Overlaps(outputLevel, outputFile.UserKeyBounds()) {
    1204            0 :                                 // Overlapping file touches problem span which temporarily disallows auto compactions.
    1205            0 :                                 skip = true
    1206            0 :                                 break
    1207              :                         }
    1208              : 
    1209              :                         // For files in the bottommost level of the LSM, the
    1210              :                         // Stats.RangeDeletionsBytesEstimate field is set to the estimate
    1211              :                         // of bytes /within/ the file itself that may be dropped by
    1212              :                         // recompacting the file. These bytes from obsolete keys would not
    1213              :                         // need to be rewritten if we compacted `f` into `outputFile`, so
    1214              :                         // they don't contribute to write amplification. Subtracting them
    1215              :                         // out of the overlapping bytes helps prioritize these compactions
    1216              :                         // that are cheaper than their file sizes suggest.
    1217            2 :                         if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
    1218            2 :                                 if stats, ok := outputFile.Stats(); ok {
    1219            2 :                                         overlappingBytes -= stats.RangeDeletionsBytesEstimate
    1220            2 :                                 }
    1221              :                         }
    1222              : 
    1223              :                         // If the file in the next level extends beyond f's largest key,
    1224              :                         // break out and don't advance outputIter because f's successor
    1225              :                         // might also overlap.
    1226              :                         //
    1227              :                         // Note, we stop as soon as we encounter an output-level file with a
    1228              :                         // largest key beyond the input-level file's largest bound. We
    1229              :                         // perform a simple user key comparison here using sstableKeyCompare
    1230              :                         // which handles the potential for exclusive largest key bounds.
    1231              :                         // There's some subtlety when the bounds are equal (eg, equal and
    1232              :                         // inclusive, or equal and exclusive). Current Pebble doesn't split
    1233              :                         // user keys across sstables within a level (and in format versions
    1234              :                         // FormatSplitUserKeysMarkedCompacted and later we guarantee no
    1235              :                         // split user keys exist within the entire LSM). In that case, we're
    1236              :                         // assured that neither the input level nor the output level's next
    1237              :                         // file shares the same user key, so compaction expansion will not
    1238              :                         // include them in any compaction compacting `f`.
    1239              :                         //
    1240              :                         // NB: If we /did/ allow split user keys, or we're running on an
    1241              :                         // old database with an earlier format major version where there are
    1242              :                         // existing split user keys, this logic would be incorrect. Consider
    1243              :                         //    L1: [a#120,a#100] [a#80,a#60]
    1244              :                         //    L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
    1245              :                         // While considering the first file in L1, [a#120,a#100], we'd skip
    1246              :                         // past all of the files in L2. When considering the second file in
    1247              :                         // L1, we'd improperly conclude that the second file overlaps
    1248              :                         // nothing in the second level and is cheap to compact, when in
    1249              :                         // reality we'd need to expand the compaction to include all 5
    1250              :                         // files.
    1251            2 :                         if sstableKeyCompare(cmp, outputFile.Largest(), f.Largest()) > 0 {
    1252            2 :                                 break
    1253              :                         }
    1254            2 :                         outputFile = outputIter.Next()
    1255              :                 }
    1256            2 :                 if skip {
    1257            2 :                         continue
    1258              :                 }
    1259              : 
    1260            2 :                 compSz := tableCompensatedSize(f) + responsibleForGarbageBytes(virtualBackings, f)
    1261            2 :                 scaledRatio := overlappingBytes * 1024 / compSz
    1262            2 :                 if scaledRatio < smallestRatio {
    1263            2 :                         smallestRatio = scaledRatio
    1264            2 :                         file = startIter.Take()
    1265            2 :                 }
    1266              :         }
    1267            2 :         return file, file.TableMetadata != nil
    1268              : }
    1269              : 
    1270              : // responsibleForGarbageBytes returns the amount of garbage in the backing
    1271              : // sstable that we consider the responsibility of this virtual sstable. For
    1272              : // non-virtual sstables, this is of course 0. For virtual sstables, we equally
    1273              : // distribute the responsibility of the garbage across all the virtual
    1274              : // sstables that are referencing the same backing sstable. One could
    1275              : // alternatively distribute this in proportion to the virtual sst sizes, but
    1276              : // it isn't clear that more sophisticated heuristics are worth it, given that
    1277              : // the garbage cannot be reclaimed until all the referencing virtual sstables
    1278              : // are compacted.
    1279              : func responsibleForGarbageBytes(
    1280              :         virtualBackings *manifest.VirtualBackings, m *manifest.TableMetadata,
    1281            2 : ) uint64 {
    1282            2 :         if !m.Virtual {
    1283            2 :                 return 0
    1284            2 :         }
    1285            2 :         useCount, virtualizedSize := virtualBackings.Usage(m.TableBacking.DiskFileNum)
    1286            2 :         // Since virtualizedSize is the sum of the estimated size of all virtual
    1287            2 :         // ssts, we allow for the possibility that virtualizedSize could exceed
    1288            2 :         // m.TableBacking.Size.
    1289            2 :         totalGarbage := int64(m.TableBacking.Size) - int64(virtualizedSize)
    1290            2 :         if totalGarbage <= 0 {
    1291            1 :                 return 0
    1292            1 :         }
    1293            2 :         if useCount == 0 {
    1294            0 :                 // This cannot happen if m exists in the latest version. The call to
    1295            0 :                 // ResponsibleForGarbageBytes during compaction picking ensures that m
    1296            0 :                 // exists in the latest version by holding versionSet.logLock.
    1297            0 :                 panic(errors.AssertionFailedf("%s has zero useCount", m.String()))
    1298              :         }
    1299            2 :         return uint64(totalGarbage) / uint64(useCount)
    1300              : }
    1301              : 
    1302            2 : func (p *compactionPickerByScore) getCompactionConcurrency() int {
    1303            2 :         lower, upper := p.opts.CompactionConcurrencyRange()
    1304            2 :         if lower >= upper {
    1305            2 :                 return upper
    1306            2 :         }
    1307              :         // Compaction concurrency is controlled by L0 read-amp. We allow one
    1308              :         // additional compaction per L0CompactionConcurrency sublevels, as well as
    1309              :         // one additional compaction per CompactionDebtConcurrency bytes of
    1310              :         // compaction debt. Compaction concurrency is tied to L0 sublevels as that
    1311              :         // signal is independent of the database size. We tack on the compaction
    1312              :         // debt as a second signal to prevent compaction concurrency from dropping
    1313              :         // significantly right after a base compaction finishes, and before those
    1314              :         // bytes have been compacted further down the LSM.
    1315              :         //
    1316              :         // Let n be the number of in-progress compactions.
    1317              :         //
    1318              :         // l0ReadAmp >= ccSignal1 then can run another compaction, where
    1319              :         // ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency
    1320              :         // Rearranging,
    1321              :         // n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
    1322              :         // So we can run up to
    1323              :         // l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency extra compactions.
    1324            2 :         l0ReadAmpCompactions := 0
    1325            2 :         if p.opts.Experimental.L0CompactionConcurrency > 0 {
    1326            2 :                 l0ReadAmp := p.latestVersionState.l0Organizer.MaxDepthAfterOngoingCompactions()
    1327            2 :                 l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency)
    1328            2 :         }
    1329              :         // compactionDebt >= ccSignal2 then can run another compaction, where
    1330              :         // ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
    1331              :         // Rearranging,
    1332              :         // n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
    1333              :         // So we can run up to
    1334              :         // compactionDebt / p.opts.Experimental.CompactionDebtConcurrency extra
    1335              :         // compactions.
    1336            2 :         compactionDebtCompactions := 0
    1337            2 :         if p.opts.Experimental.CompactionDebtConcurrency > 0 {
    1338            2 :                 compactionDebt := p.estimatedCompactionDebt()
    1339            2 :                 compactionDebtCompactions = int(compactionDebt / p.opts.Experimental.CompactionDebtConcurrency)
    1340            2 :         }
    1341              : 
    1342            2 :         compactableGarbageCompactions := 0
    1343            2 :         garbageFractionLimit := p.opts.Experimental.CompactionGarbageFractionForMaxConcurrency()
    1344            2 :         if garbageFractionLimit > 0 && p.dbSizeBytes > 0 {
    1345            2 :                 delBytes := deletionBytesAnnotator.MultiLevelAnnotation(p.vers.Levels[:])
    1346            2 :                 compactableGarbageBytes := delBytes.PointDels + delBytes.RangeDels
    1347            2 :                 garbageFraction := float64(compactableGarbageBytes) / float64(p.dbSizeBytes)
    1348            2 :                 compactableGarbageCompactions =
    1349            2 :                         int((garbageFraction / garbageFractionLimit) * float64(upper-lower))
    1350            2 :         }
    1351              : 
    1352            2 :         extraCompactions := max(l0ReadAmpCompactions, compactionDebtCompactions, compactableGarbageCompactions, 0)
    1353            2 : 
    1354            2 :         return min(lower+extraCompactions, upper)
    1355              : }
    1356              : 
    1357              : // TODO(sumeer): remove unless someone actually finds this useful.
    1358              : func (p *compactionPickerByScore) logCompactionForTesting(
    1359              :         env compactionEnv, scores [numLevels]candidateLevelInfo, pc *pickedTableCompaction,
    1360            0 : ) {
    1361            0 :         var buf bytes.Buffer
    1362            0 :         for i := 0; i < numLevels; i++ {
    1363            0 :                 if i != 0 && i < p.baseLevel {
    1364            0 :                         continue
    1365              :                 }
    1366              : 
    1367            0 :                 var info *candidateLevelInfo
    1368            0 :                 for j := range scores {
    1369            0 :                         if scores[j].level == i {
    1370            0 :                                 info = &scores[j]
    1371            0 :                                 break
    1372              :                         }
    1373              :                 }
    1374              : 
    1375            0 :                 marker := " "
    1376            0 :                 if pc.startLevel.level == info.level {
    1377            0 :                         marker = "*"
    1378            0 :                 }
    1379            0 :                 fmt.Fprintf(&buf, "  %sL%d: score:%5.1f  fillFactor:%5.1f  compensatedFillFactor:%5.1f %8s  %8s",
    1380            0 :                         marker, info.level, info.score, info.fillFactor, info.compensatedFillFactor,
    1381            0 :                         humanize.Bytes.Int64(int64(totalCompensatedSize(
    1382            0 :                                 p.vers.Levels[info.level].All(),
    1383            0 :                         ))),
    1384            0 :                         humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
    1385            0 :                 )
    1386            0 : 
    1387            0 :                 count := 0
    1388            0 :                 for i := range env.inProgressCompactions {
    1389            0 :                         c := &env.inProgressCompactions[i]
    1390            0 :                         if c.inputs[0].level != info.level {
    1391            0 :                                 continue
    1392              :                         }
    1393            0 :                         count++
    1394            0 :                         if count == 1 {
    1395            0 :                                 fmt.Fprintf(&buf, "  [")
    1396            0 :                         } else {
    1397            0 :                                 fmt.Fprintf(&buf, " ")
    1398            0 :                         }
    1399            0 :                         fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
    1400              :                 }
    1401            0 :                 if count > 0 {
    1402            0 :                         fmt.Fprintf(&buf, "]")
    1403            0 :                 }
    1404            0 :                 fmt.Fprintf(&buf, "\n")
    1405              :         }
    1406            0 :         p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
    1407            0 :                 pc.startLevel.level, pc.outputLevel.level, buf.String())
    1408              : }
    1409              : 
    1410              : // pickHighPrioritySpaceCompaction checks for a high-priority space reclamation
    1411              : // compaction. Under some circumstances, we want to persue a compaction for the
    1412              : // purpose of reclaiming disk space even when there are eligible default
    1413              : // compactions.
    1414              : func (p *compactionPickerByScore) pickHighPrioritySpaceCompaction(
    1415              :         env compactionEnv,
    1416            2 : ) pickedCompaction {
    1417            2 :         if pc := p.pickBlobFileRewriteCompactionHighPriority(env); pc != nil {
    1418            2 :                 return pc
    1419            2 :         }
    1420              :         // NB: We can't just return the above result because the above func returns
    1421              :         // a *pickedBlobFileCompaction, not a pickedCompaction. We need to return an
    1422              :         // untyped nil.
    1423            2 :         return nil
    1424              : }
    1425              : 
    1426              : // pickAutoScore picks the best score-based compaction, if any.
    1427              : //
    1428              : // On each call, pickAutoScore computes per-level size adjustments based on
    1429              : // in-progress compactions, and computes a per-level score. The levels are
    1430              : // iterated over in decreasing score order trying to find a valid compaction
    1431              : // anchored at that level.
    1432              : //
    1433              : // If a score-based compaction cannot be found, pickAuto falls back to looking
    1434              : // for an elision-only compaction to remove obsolete keys.
    1435            2 : func (p *compactionPickerByScore) pickAutoScore(env compactionEnv) pickedCompaction {
    1436            2 :         scores := p.calculateLevelScores(env.inProgressCompactions)
    1437            2 : 
    1438            2 :         // Check for a score-based compaction. candidateLevelInfos are first sorted
    1439            2 :         // by whether they should be compacted, so if we find a level which shouldn't
    1440            2 :         // be compacted, we can break early.
    1441            2 :         for i := range scores {
    1442            2 :                 info := &scores[i]
    1443            2 :                 if !info.shouldCompact() {
    1444            2 :                         break
    1445              :                 }
    1446            2 :                 if info.level == numLevels-1 {
    1447            2 :                         continue
    1448              :                 }
    1449              : 
    1450            2 :                 if info.level == 0 {
    1451            2 :                         ptc := pickL0(env, p.opts, p.vers, p.latestVersionState.l0Organizer, p.baseLevel)
    1452            2 :                         if ptc != nil {
    1453            2 :                                 p.addScoresToPickedCompactionMetrics(ptc, scores)
    1454            2 :                                 ptc.score = info.score
    1455            2 :                                 if false {
    1456            0 :                                         p.logCompactionForTesting(env, scores, ptc)
    1457            0 :                                 }
    1458            2 :                                 return ptc
    1459              :                         }
    1460            2 :                         continue
    1461              :                 }
    1462              : 
    1463              :                 // info.level > 0
    1464            2 :                 var ok bool
    1465            2 :                 info.file, ok = pickCompactionSeedFile(p.vers, &p.latestVersionState.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum, env.problemSpans)
    1466            2 :                 if !ok {
    1467            2 :                         continue
    1468              :                 }
    1469              : 
    1470            2 :                 pc := pickAutoLPositive(env, p.opts, p.vers, p.latestVersionState.l0Organizer, *info, p.baseLevel)
    1471            2 :                 if pc != nil {
    1472            2 :                         p.addScoresToPickedCompactionMetrics(pc, scores)
    1473            2 :                         pc.score = info.score
    1474            2 :                         if false {
    1475            0 :                                 p.logCompactionForTesting(env, scores, pc)
    1476            0 :                         }
    1477            2 :                         return pc
    1478              :                 }
    1479              :         }
    1480            2 :         return nil
    1481              : }
    1482              : 
    1483              : // pickAutoNonScore picks the best non-score-based compaction, if any.
    1484            2 : func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc pickedCompaction) {
    1485            2 :         // Check for files which contain excessive point tombstones that could slow
    1486            2 :         // down reads. Unlike elision-only compactions, these compactions may select
    1487            2 :         // a file at any level rather than only the lowest level.
    1488            2 :         if pc := p.pickTombstoneDensityCompaction(env); pc != nil {
    1489            2 :                 return pc
    1490            2 :         }
    1491              : 
    1492              :         // Check for L6 files with tombstones that may be elided. These files may
    1493              :         // exist if a snapshot prevented the elision of a tombstone or because of
    1494              :         // a move compaction. These are low-priority compactions because they
    1495              :         // don't help us keep up with writes, just reclaim disk space.
    1496            2 :         if pc := p.pickElisionOnlyCompaction(env); pc != nil {
    1497            2 :                 return pc
    1498            2 :         }
    1499              : 
    1500              :         // Check for virtual SST rewrites. These compactions materialize virtual tables
    1501              :         // to reclaim space in backing files with low utilization.
    1502            2 :         if pc := p.pickVirtualRewriteCompaction(env); pc != nil {
    1503            2 :                 return pc
    1504            2 :         }
    1505              : 
    1506              :         // Check for blob file rewrites. These are low-priority compactions because
    1507              :         // they don't help us keep up with writes, just reclaim disk space.
    1508            2 :         if pc := p.pickBlobFileRewriteCompactionLowPriority(env); pc != nil {
    1509            2 :                 return pc
    1510            2 :         }
    1511              : 
    1512            2 :         if pc := p.pickReadTriggeredCompaction(env); pc != nil {
    1513            1 :                 return pc
    1514            1 :         }
    1515              : 
    1516              :         // NB: This should only be run if a read compaction wasn't
    1517              :         // scheduled.
    1518              :         //
    1519              :         // We won't be scheduling a read compaction right now, and in
    1520              :         // read heavy workloads, compactions won't be scheduled frequently
    1521              :         // because flushes aren't frequent. So we need to signal to the
    1522              :         // iterator to schedule a compaction when it adds compactions to
    1523              :         // the read compaction queue.
    1524              :         //
    1525              :         // We need the nil check here because without it, we have some
    1526              :         // tests which don't set that variable fail. Since there's a
    1527              :         // chance that one of those tests wouldn't want extra compactions
    1528              :         // to be scheduled, I added this check here, instead of
    1529              :         // setting rescheduleReadCompaction in those tests.
    1530            2 :         if env.readCompactionEnv.rescheduleReadCompaction != nil {
    1531            2 :                 *env.readCompactionEnv.rescheduleReadCompaction = true
    1532            2 :         }
    1533              : 
    1534              :         // At the lowest possible compaction-picking priority, look for files marked
    1535              :         // for compaction. Pebble will mark files for compaction if they have atomic
    1536              :         // compaction units that span multiple files. While current Pebble code does
    1537              :         // not construct such sstables, RocksDB and earlier versions of Pebble may
    1538              :         // have created them. These split user keys form sets of files that must be
    1539              :         // compacted together for correctness (referred to as "atomic compaction
    1540              :         // units" within the code). Rewrite them in-place.
    1541              :         //
    1542              :         // It's also possible that a file may have been marked for compaction by
    1543              :         // even earlier versions of Pebble code, since TableMetadata's
    1544              :         // MarkedForCompaction field is persisted in the manifest. That's okay. We
    1545              :         // previously would've ignored the designation, whereas now we'll re-compact
    1546              :         // the file in place.
    1547            2 :         if p.vers.MarkedForCompaction.Count() > 0 {
    1548            1 :                 if pc := p.pickRewriteCompaction(env); pc != nil {
    1549            1 :                         return pc
    1550            1 :                 }
    1551              :         }
    1552              : 
    1553            2 :         return nil
    1554              : }
    1555              : 
    1556              : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
    1557              :         pc *pickedTableCompaction, candInfo [numLevels]candidateLevelInfo,
    1558            2 : ) {
    1559            2 : 
    1560            2 :         // candInfo is sorted by score, not by compaction level.
    1561            2 :         infoByLevel := [numLevels]candidateLevelInfo{}
    1562            2 :         for i := range candInfo {
    1563            2 :                 level := candInfo[i].level
    1564            2 :                 infoByLevel[level] = candInfo[i]
    1565            2 :         }
    1566              :         // Gather the compaction scores for the levels participating in the compaction.
    1567            2 :         pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
    1568            2 :         inputIdx := 0
    1569            2 :         for i := range infoByLevel {
    1570            2 :                 if pc.inputs[inputIdx].level == infoByLevel[i].level {
    1571            2 :                         pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].score
    1572            2 :                         inputIdx++
    1573            2 :                 }
    1574            2 :                 if inputIdx == len(pc.inputs) {
    1575            2 :                         break
    1576              :                 }
    1577              :         }
    1578              : }
    1579              : 
    1580              : // elisionOnlyAnnotator is a manifest.TableAnnotator that annotates B-Tree
    1581              : // nodes with the *fileMetadata of a file meeting the obsolete keys criteria
    1582              : // for an elision-only compaction within the subtree. If multiple files meet
    1583              : // the criteria, it chooses whichever file has the lowest LargestSeqNum. The
    1584              : // lowest LargestSeqNum file will be the first eligible for an elision-only
    1585              : // compaction once snapshots less than or equal to its LargestSeqNum are closed.
    1586              : var elisionOnlyAnnotator = manifest.MakePickFileAnnotator(
    1587              :         manifest.NewTableAnnotationIdx(),
    1588              :         manifest.PickFileAnnotatorFuncs{
    1589            2 :                 Filter: func(f *manifest.TableMetadata) (eligible bool, cacheOK bool) {
    1590            2 :                         if f.IsCompacting() {
    1591            2 :                                 return false, true
    1592            2 :                         }
    1593              : 
    1594            2 :                         backingProps, backingPropsValid := f.TableBacking.Properties()
    1595            2 :                         stats, statsValid := f.Stats()
    1596            2 :                         if !backingPropsValid || !statsValid {
    1597            2 :                                 return false, false
    1598            2 :                         }
    1599              : 
    1600              :                         // Bottommost files are large and not worthwhile to compact just
    1601              :                         // to remove a few tombstones. Consider a file eligible only if
    1602              :                         // either its own range deletions delete at least 10% of its data or
    1603              :                         // its deletion tombstones make at least 10% of its entries.
    1604              :                         //
    1605              :                         // TODO(jackson): This does not account for duplicate user keys
    1606              :                         // which may be collapsed. Ideally, we would have 'obsolete keys'
    1607              :                         // statistics that would include tombstones, the keys that are
    1608              :                         // dropped by tombstones and duplicated user keys. See #847.
    1609              :                         //
    1610              :                         // Note that tables that contain exclusively range keys (i.e. no point keys,
    1611              :                         // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
    1612              :                         // from elision-only compactions.
    1613              :                         // TODO(travers): Consider an alternative heuristic for elision of range-keys.
    1614            2 :                         eligible = stats.RangeDeletionsBytesEstimate*10 >= f.Size || backingProps.NumDeletions*10 > backingProps.NumEntries
    1615            2 :                         return eligible, true
    1616              :                 },
    1617            2 :                 Compare: func(f1 *manifest.TableMetadata, f2 *manifest.TableMetadata) bool {
    1618            2 :                         return f1.LargestSeqNum < f2.LargestSeqNum
    1619            2 :                 },
    1620              :         },
    1621              : )
    1622              : 
    1623              : // pickedCompactionFromCandidateFile creates a pickedCompaction from a *fileMetadata
    1624              : // with various checks to ensure that the file still exists in the expected level
    1625              : // and isn't already being compacted.
    1626              : func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
    1627              :         candidate *manifest.TableMetadata,
    1628              :         env compactionEnv,
    1629              :         startLevel int,
    1630              :         outputLevel int,
    1631              :         kind compactionKind,
    1632            2 : ) *pickedTableCompaction {
    1633            2 :         if candidate == nil || candidate.IsCompacting() {
    1634            2 :                 return nil
    1635            2 :         }
    1636              : 
    1637            2 :         var inputs manifest.LevelSlice
    1638            2 :         if startLevel == 0 && outputLevel > 0 {
    1639            1 :                 // Overlapping L0 files must also be compacted alongside the candidate.
    1640            1 :                 // Some compactions attempt to rewrite a file in place (e.g. virtual rewrite)
    1641            1 :                 // so we only do this for L0->Lbase compactions.
    1642            1 :                 inputs = p.vers.Overlaps(0, candidate.UserKeyBounds())
    1643            2 :         } else {
    1644            2 :                 inputs = p.vers.Levels[startLevel].Find(p.opts.Comparer.Compare, candidate)
    1645            2 :         }
    1646            2 :         if invariants.Enabled {
    1647            2 :                 found := false
    1648            2 :                 for f := range inputs.All() {
    1649            2 :                         if f.TableNum == candidate.TableNum {
    1650            2 :                                 found = true
    1651            2 :                         }
    1652              :                 }
    1653            2 :                 if !found {
    1654            0 :                         panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.TableNum, startLevel))
    1655              :                 }
    1656              :         }
    1657              : 
    1658            2 :         pc := newPickedTableCompaction(p.opts, p.vers, p.latestVersionState.l0Organizer,
    1659            2 :                 startLevel, outputLevel, p.baseLevel)
    1660            2 :         pc.kind = kind
    1661            2 :         pc.startLevel.files = inputs
    1662            2 : 
    1663            2 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
    1664            1 :                 return nil
    1665            1 :         }
    1666            2 :         return pc
    1667              : }
    1668              : 
    1669              : // pickElisionOnlyCompaction looks for compactions of sstables in the
    1670              : // bottommost level containing obsolete records that may now be dropped.
    1671              : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
    1672              :         env compactionEnv,
    1673            2 : ) (pc *pickedTableCompaction) {
    1674            2 :         if p.opts.private.disableElisionOnlyCompactions {
    1675            2 :                 return nil
    1676            2 :         }
    1677            2 :         candidate := elisionOnlyAnnotator.LevelAnnotation(p.vers.Levels[numLevels-1])
    1678            2 :         if candidate == nil {
    1679            2 :                 return nil
    1680            2 :         }
    1681            2 :         if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
    1682            2 :                 return nil
    1683            2 :         }
    1684            2 :         return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
    1685              : }
    1686              : 
    1687              : // pickRewriteCompaction attempts to construct a compaction that
    1688              : // rewrites a file marked for compaction. pickRewriteCompaction will
    1689              : // pull in adjacent files in the file's atomic compaction unit if
    1690              : // necessary. A rewrite compaction outputs files to the same level as
    1691              : // the input level.
    1692              : func (p *compactionPickerByScore) pickRewriteCompaction(
    1693              :         env compactionEnv,
    1694            1 : ) (pc *pickedTableCompaction) {
    1695            1 :         for candidate, level := range p.vers.MarkedForCompaction.Ascending() {
    1696            1 :                 if pc := p.pickedCompactionFromCandidateFile(candidate, env, level, level, compactionKindRewrite); pc != nil {
    1697            1 :                         return pc
    1698            1 :                 }
    1699              :         }
    1700            0 :         return nil
    1701              : }
    1702              : 
    1703              : // pickVirtualRewriteCompaction looks for backing tables that have a low percentage
    1704              : // of referenced data and materializes their virtual sstables.
    1705              : func (p *compactionPickerByScore) pickVirtualRewriteCompaction(
    1706              :         env compactionEnv,
    1707            2 : ) *pickedTableCompaction {
    1708            2 : 
    1709            2 :         for _, c := range env.inProgressCompactions {
    1710            2 :                 // Allow only one virtual rewrite compaction at a time.
    1711            2 :                 if c.kind == compactionKindVirtualRewrite {
    1712            2 :                         return nil
    1713            2 :                 }
    1714              :         }
    1715              : 
    1716              :         // We'll pick one virtual table at a time to materialize. This works with our
    1717              :         // compaction system, which currently doesn't support outputting to multiple levels
    1718              :         // or selecting files that aren't contiguous in a level. Successfully materializing
    1719              :         // one of the backing's virtual table will also make the backing more likely to be
    1720              :         // picked again, since the space amp will increase.
    1721            2 :         referencedDataPct, _, vtablesByLevel := p.latestVersionState.virtualBackings.ReplacementCandidate()
    1722            2 : 
    1723            2 :         if 1-referencedDataPct < p.opts.Experimental.VirtualTableRewriteUnreferencedFraction() {
    1724            1 :                 return nil
    1725            1 :         }
    1726              : 
    1727            2 :         for level, tables := range vtablesByLevel {
    1728            2 :                 for _, vt := range tables {
    1729            2 :                         if vt.IsCompacting() {
    1730            2 :                                 continue
    1731              :                         }
    1732            2 :                         if pc := p.pickedCompactionFromCandidateFile(vt, env, level, level, compactionKindVirtualRewrite); pc != nil {
    1733            2 :                                 return pc
    1734            2 :                         }
    1735              :                 }
    1736              :         }
    1737              : 
    1738            2 :         return nil
    1739              : }
    1740              : 
    1741              : // pickBlobFileRewriteCompactionHighPriority picks a compaction that rewrites a
    1742              : // blob file to reclaim disk space if the heuristics for high-priority blob file
    1743              : // rewrites are met.
    1744              : func (p *compactionPickerByScore) pickBlobFileRewriteCompactionHighPriority(
    1745              :         env compactionEnv,
    1746            2 : ) (pc *pickedBlobFileCompaction) {
    1747            2 :         policy := p.opts.Experimental.ValueSeparationPolicy()
    1748            2 :         if policy.GarbageRatioHighPriority >= 1.0 {
    1749            1 :                 // High-priority blob file rewrite compactions are disabled.
    1750            1 :                 return nil
    1751            1 :         }
    1752            2 :         aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
    1753            2 :         if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
    1754            2 :                 // No blob files with any garbage to rewrite.
    1755            2 :                 return nil
    1756            2 :         }
    1757              : 
    1758            2 :         garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
    1759            2 :                 float64(aggregateStats.ValueSize)
    1760            2 :         if garbagePct <= policy.GarbageRatioHighPriority {
    1761            2 :                 // Not enough garbage to warrant a rewrite compaction.
    1762            2 :                 return nil
    1763            2 :         }
    1764            2 :         pc = p.pickBlobFileRewriteCandidate(env)
    1765            2 :         if pc != nil {
    1766            2 :                 pc.highPriority = true
    1767            2 :         }
    1768            2 :         return pc
    1769              : }
    1770              : 
    1771              : // pickBlobFileRewriteCompactionLowPriority picks a compaction that rewrites a
    1772              : // blob file to reclaim disk space if the heuristics for low-priority blob file
    1773              : // rewrites are met.
    1774              : func (p *compactionPickerByScore) pickBlobFileRewriteCompactionLowPriority(
    1775              :         env compactionEnv,
    1776            2 : ) (pc *pickedBlobFileCompaction) {
    1777            2 :         aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
    1778            2 :         if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
    1779            2 :                 // No blob files with any garbage to rewrite.
    1780            2 :                 return nil
    1781            2 :         }
    1782            2 :         policy := p.opts.Experimental.ValueSeparationPolicy()
    1783            2 :         if policy.GarbageRatioLowPriority >= 1.0 {
    1784            0 :                 // Blob file rewrite compactions are disabled.
    1785            0 :                 return nil
    1786            0 :         }
    1787              : 
    1788              :         // We want to use ReferencedValueSize here instead of ReferencedBackingValueSize
    1789              :         // to get the estimate of live data in blob files. We'll check below to see if there
    1790              :         // are actually any candidates with garbage to reclaim.
    1791            2 :         garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
    1792            2 :                 float64(aggregateStats.ValueSize)
    1793            2 :         if garbagePct <= policy.GarbageRatioLowPriority {
    1794            2 :                 // Not enough garbage to warrant a rewrite compaction.
    1795            2 :                 return nil
    1796            2 :         }
    1797            2 :         return p.pickBlobFileRewriteCandidate(env)
    1798              : }
    1799              : 
    1800              : func (p *compactionPickerByScore) pickBlobFileRewriteCandidate(
    1801              :         env compactionEnv,
    1802            2 : ) (pc *pickedBlobFileCompaction) {
    1803            2 :         // Check if there is an ongoing blob file rewrite compaction. If there is,
    1804            2 :         // don't schedule a new one.
    1805            2 :         for _, c := range env.inProgressCompactions {
    1806            2 :                 if c.kind == compactionKindBlobFileRewrite {
    1807            2 :                         return nil
    1808            2 :                 }
    1809              :         }
    1810            2 :         candidate, ok := p.latestVersionState.blobFiles.ReplacementCandidate()
    1811            2 :         if !ok {
    1812            2 :                 // None meet the heuristic.
    1813            2 :                 return nil
    1814            2 :         }
    1815            2 :         return &pickedBlobFileCompaction{
    1816            2 :                 vers:              p.vers,
    1817            2 :                 file:              candidate,
    1818            2 :                 referencingTables: p.latestVersionState.blobFiles.ReferencingTables(candidate.FileID),
    1819            2 :         }
    1820              : }
    1821              : 
    1822              : // pickTombstoneDensityCompaction looks for a compaction that eliminates
    1823              : // regions of extremely high point tombstone density. For each level, it picks
    1824              : // a file where the ratio of tombstone-dense blocks is at least
    1825              : // options.Experimental.MinTombstoneDenseRatio, prioritizing compaction of
    1826              : // files with higher ratios of tombstone-dense blocks.
    1827              : func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
    1828              :         env compactionEnv,
    1829            2 : ) (pc *pickedTableCompaction) {
    1830            2 :         if p.opts.Experimental.TombstoneDenseCompactionThreshold <= 0 {
    1831            0 :                 // Tombstone density compactions are disabled.
    1832            0 :                 return nil
    1833            0 :         }
    1834              : 
    1835            2 :         var candidate *manifest.TableMetadata
    1836            2 :         var candidateTombstoneDenseBlocksRatio float64
    1837            2 :         var level int
    1838            2 :         // If a candidate file has a very high overlapping ratio, point tombstones
    1839            2 :         // in it are likely sparse in keyspace even if the sstable itself is tombstone
    1840            2 :         // dense. These tombstones likely wouldn't be slow to iterate over, so we exclude
    1841            2 :         // these files from tombstone density compactions. The threshold of 40.0 is
    1842            2 :         // chosen somewhat arbitrarily, after some observations around excessively large
    1843            2 :         // tombstone density compactions.
    1844            2 :         const maxOverlappingRatio = 40.0
    1845            2 :         // NB: we don't consider the lowest level because elision-only compactions
    1846            2 :         // handle that case.
    1847            2 :         lastNonEmptyLevel := numLevels - 1
    1848            2 :         for l := numLevels - 2; l >= 0; l-- {
    1849            2 :                 iter := p.vers.Levels[l].Iter()
    1850            2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1851            2 :                         if f.IsCompacting() || f.Size == 0 {
    1852            2 :                                 continue
    1853              :                         }
    1854            2 :                         props, propsValid := f.TableBacking.Properties()
    1855            2 :                         if !propsValid {
    1856            2 :                                 continue
    1857              :                         }
    1858            2 :                         if props.TombstoneDenseBlocksRatio < p.opts.Experimental.TombstoneDenseCompactionThreshold {
    1859            2 :                                 continue
    1860              :                         }
    1861            2 :                         overlaps := p.vers.Overlaps(lastNonEmptyLevel, f.UserKeyBounds())
    1862            2 :                         if float64(overlaps.AggregateSizeSum())/float64(f.Size) > maxOverlappingRatio {
    1863            2 :                                 continue
    1864              :                         }
    1865            2 :                         if candidate == nil || candidateTombstoneDenseBlocksRatio < props.TombstoneDenseBlocksRatio {
    1866            2 :                                 candidate = f
    1867            2 :                                 candidateTombstoneDenseBlocksRatio = props.TombstoneDenseBlocksRatio
    1868            2 :                                 level = l
    1869            2 :                         }
    1870              :                 }
    1871              :                 // We prefer lower level (ie. L5) candidates over higher level (ie. L4) ones.
    1872            2 :                 if candidate != nil {
    1873            2 :                         break
    1874              :                 }
    1875            2 :                 if !p.vers.Levels[l].Empty() {
    1876            2 :                         lastNonEmptyLevel = l
    1877            2 :                 }
    1878              :         }
    1879              : 
    1880            2 :         return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
    1881              : }
    1882              : 
    1883              : // pickAutoLPositive picks an automatic compaction for the candidate
    1884              : // file in a positive-numbered level. This function must not be used for
    1885              : // L0.
    1886              : func pickAutoLPositive(
    1887              :         env compactionEnv,
    1888              :         opts *Options,
    1889              :         vers *manifest.Version,
    1890              :         l0Organizer *manifest.L0Organizer,
    1891              :         cInfo candidateLevelInfo,
    1892              :         baseLevel int,
    1893            2 : ) (pc *pickedTableCompaction) {
    1894            2 :         if cInfo.level == 0 {
    1895            0 :                 panic("pebble: pickAutoLPositive called for L0")
    1896              :         }
    1897              : 
    1898            2 :         pc = newPickedTableCompaction(opts, vers, l0Organizer, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
    1899            2 :         if pc.outputLevel.level != cInfo.outputLevel {
    1900            0 :                 panic("pebble: compaction picked unexpected output level")
    1901              :         }
    1902            2 :         pc.startLevel.files = cInfo.file.Slice()
    1903            2 : 
    1904            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
    1905            2 :                 return nil
    1906            2 :         }
    1907            2 :         return pc.maybeAddLevel(opts, env)
    1908              : }
    1909              : 
    1910              : // maybeAddLevel maybe adds a level to the picked compaction.
    1911              : // Multilevel compactions are only allowed if the max compaction concurrency
    1912              : // is greater than 1, and there are no in-progress multi-level compactions.
    1913              : func (pc *pickedTableCompaction) maybeAddLevel(
    1914              :         opts *Options, env compactionEnv,
    1915            2 : ) *pickedTableCompaction {
    1916            2 :         pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
    1917            2 :         if pc.outputLevel.level == numLevels-1 {
    1918            2 :                 // Don't add a level if the current output level is in L6.
    1919            2 :                 return pc
    1920            2 :         }
    1921              :         // We allow at most one in-progress multiLevel compaction at any time.
    1922            2 :         for _, c := range env.inProgressCompactions {
    1923            2 :                 if len(c.inputs) > 2 {
    1924            2 :                         return pc
    1925            2 :                 }
    1926              :         }
    1927            2 :         _, upper := opts.CompactionConcurrencyRange()
    1928            2 :         if upper == 1 {
    1929            2 :                 // If the maximum compaction concurrency is 1, avoid picking a multi-level compactions
    1930            2 :                 // as they could block compactions from L0.
    1931            2 :                 return pc
    1932            2 :         }
    1933            2 :         if !opts.Experimental.MultiLevelCompactionHeuristic().allowL0() && pc.startLevel.level == 0 {
    1934            2 :                 return pc
    1935            2 :         }
    1936            2 :         targetFileSize := opts.TargetFileSize(pc.outputLevel.level, pc.baseLevel)
    1937            2 :         if pc.estimatedInputSize() > expandedCompactionByteSizeLimit(opts, targetFileSize, env.diskAvailBytes) {
    1938            1 :                 // Don't add a level if the current compaction exceeds the compaction size limit
    1939            1 :                 return pc
    1940            1 :         }
    1941            2 :         return opts.Experimental.MultiLevelCompactionHeuristic().pick(pc, opts, env)
    1942              : }
    1943              : 
    1944              : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
    1945              : type MultiLevelHeuristic interface {
    1946              :         // Evaluate returns the preferred compaction.
    1947              :         pick(pc *pickedTableCompaction, opts *Options, env compactionEnv) *pickedTableCompaction
    1948              : 
    1949              :         // Returns if the heuristic allows L0 to be involved in ML compaction
    1950              :         allowL0() bool
    1951              : 
    1952              :         // String implements fmt.Stringer.
    1953              :         String() string
    1954              : }
    1955              : 
    1956              : // NoMultiLevel will never add an additional level to the compaction.
    1957              : type NoMultiLevel struct{}
    1958              : 
    1959              : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
    1960              : 
    1961            2 : func OptionNoMultiLevel() MultiLevelHeuristic {
    1962            2 :         return NoMultiLevel{}
    1963            2 : }
    1964              : 
    1965              : func (nml NoMultiLevel) pick(
    1966              :         pc *pickedTableCompaction, opts *Options, env compactionEnv,
    1967            2 : ) *pickedTableCompaction {
    1968            2 :         return pc
    1969            2 : }
    1970              : 
    1971            2 : func (nml NoMultiLevel) allowL0() bool  { return false }
    1972            2 : func (nml NoMultiLevel) String() string { return "none" }
    1973              : 
    1974            2 : func (pc *pickedTableCompaction) predictedWriteAmp() float64 {
    1975            2 :         var bytesToCompact uint64
    1976            2 :         var higherLevelBytes uint64
    1977            2 :         for i := range pc.inputs {
    1978            2 :                 levelSize := pc.inputs[i].files.AggregateSizeSum()
    1979            2 :                 bytesToCompact += levelSize
    1980            2 :                 if i != len(pc.inputs)-1 {
    1981            2 :                         higherLevelBytes += levelSize
    1982            2 :                 }
    1983              :         }
    1984            2 :         return float64(bytesToCompact) / float64(higherLevelBytes)
    1985              : }
    1986              : 
    1987            2 : func (pc *pickedTableCompaction) overlappingRatio() float64 {
    1988            2 :         var higherLevelBytes uint64
    1989            2 :         var lowestLevelBytes uint64
    1990            2 :         for i := range pc.inputs {
    1991            2 :                 levelSize := pc.inputs[i].files.AggregateSizeSum()
    1992            2 :                 if i == len(pc.inputs)-1 {
    1993            2 :                         lowestLevelBytes += levelSize
    1994            2 :                         continue
    1995              :                 }
    1996            2 :                 higherLevelBytes += levelSize
    1997              :         }
    1998            2 :         return float64(lowestLevelBytes) / float64(higherLevelBytes)
    1999              : }
    2000              : 
    2001              : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
    2002              : // an additional level to the picked compaction if it reduces predicted write
    2003              : // amp of the compaction + the addPropensity constant.
    2004              : type WriteAmpHeuristic struct {
    2005              :         // addPropensity is a constant that affects the propensity to conduct multilevel
    2006              :         // compactions. If positive, a multilevel compaction may get picked even if
    2007              :         // the single level compaction has lower write amp, and vice versa.
    2008              :         AddPropensity float64
    2009              : 
    2010              :         // AllowL0 if true, allow l0 to be involved in a ML compaction.
    2011              :         AllowL0 bool
    2012              : }
    2013              : 
    2014              : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
    2015              : 
    2016              : // Default write amp heuristic with no propensity towards multi-level
    2017              : // and no multilevel compactions involving L0.
    2018              : var defaultWriteAmpHeuristic = &WriteAmpHeuristic{}
    2019              : 
    2020            2 : func OptionWriteAmpHeuristic() MultiLevelHeuristic {
    2021            2 :         return defaultWriteAmpHeuristic
    2022            2 : }
    2023              : 
    2024              : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
    2025              : // picking slows down the compaction picking process.  This should be as fast as
    2026              : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
    2027              : // in-progress flushes and compactions from completing, etc. Consider ways to
    2028              : // deduplicate work, given that setupInputs has already been called.
    2029              : func (wa WriteAmpHeuristic) pick(
    2030              :         pcOrig *pickedTableCompaction, opts *Options, env compactionEnv,
    2031            2 : ) *pickedTableCompaction {
    2032            2 :         pcMulti := pcOrig.clone()
    2033            2 :         if !pcMulti.setupMultiLevelCandidate(opts, env) {
    2034            2 :                 return pcOrig
    2035            2 :         }
    2036              :         // We consider the addition of a level as an "expansion" of the compaction.
    2037              :         // If pcMulti is past the expanded compaction byte size limit already,
    2038              :         // we don't consider it.
    2039            2 :         targetFileSize := opts.TargetFileSize(pcMulti.outputLevel.level, pcMulti.baseLevel)
    2040            2 :         if pcMulti.estimatedInputSize() >= expandedCompactionByteSizeLimit(opts, targetFileSize, env.diskAvailBytes) {
    2041            0 :                 return pcOrig
    2042            0 :         }
    2043            2 :         picked := pcOrig
    2044            2 :         if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
    2045            2 :                 picked = pcMulti
    2046            2 :         }
    2047              :         // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
    2048            2 :         picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
    2049            2 :         return picked
    2050              : }
    2051              : 
    2052            2 : func (wa WriteAmpHeuristic) allowL0() bool {
    2053            2 :         return wa.AllowL0
    2054            2 : }
    2055              : 
    2056              : // String implements fmt.Stringer.
    2057            2 : func (wa WriteAmpHeuristic) String() string {
    2058            2 :         return fmt.Sprintf("wamp(%.2f, %t)", wa.AddPropensity, wa.AllowL0)
    2059            2 : }
    2060              : 
    2061              : // Helper method to pick compactions originating from L0. Uses information about
    2062              : // sublevels to generate a compaction.
    2063              : func pickL0(
    2064              :         env compactionEnv,
    2065              :         opts *Options,
    2066              :         vers *manifest.Version,
    2067              :         l0Organizer *manifest.L0Organizer,
    2068              :         baseLevel int,
    2069            2 : ) *pickedTableCompaction {
    2070            2 :         // It is important to pass information about Lbase files to L0Sublevels
    2071            2 :         // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
    2072            2 :         // compaction. Without this, we observed reduced concurrency of L0=>Lbase
    2073            2 :         // compactions, and increasing read amplification in L0.
    2074            2 :         //
    2075            2 :         // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
    2076            2 :         // has been shown to not cause a performance regression.
    2077            2 :         lcf := l0Organizer.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice(), baseLevel, env.problemSpans)
    2078            2 :         if lcf != nil {
    2079            2 :                 pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, true)
    2080            2 :                 if pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
    2081            2 :                         if pc.startLevel.files.Empty() {
    2082            0 :                                 opts.Logger.Errorf("%v", base.AssertionFailedf("empty compaction chosen"))
    2083            0 :                         }
    2084            2 :                         return pc.maybeAddLevel(opts, env)
    2085              :                 }
    2086              :                 // TODO(radu): investigate why this happens.
    2087              :                 // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
    2088              :         }
    2089              : 
    2090              :         // Couldn't choose a base compaction. Try choosing an intra-L0
    2091              :         // compaction. Note that we pass in L0CompactionThreshold here as opposed to
    2092              :         // 1, since choosing a single sublevel intra-L0 compaction is
    2093              :         // counterproductive.
    2094            2 :         lcf = l0Organizer.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count, env.problemSpans)
    2095            2 :         if lcf != nil {
    2096            2 :                 pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, false)
    2097            2 :                 if pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
    2098            2 :                         if pc.startLevel.files.Empty() {
    2099            0 :                                 opts.Logger.Fatalf("empty compaction chosen")
    2100            0 :                         }
    2101              :                         // A single-file intra-L0 compaction is unproductive.
    2102            2 :                         if iter := pc.startLevel.files.Iter(); iter.First() != nil && iter.Next() != nil {
    2103            2 :                                 pc.bounds = manifest.KeyRange(opts.Comparer.Compare, pc.startLevel.files.All())
    2104            2 :                                 return pc
    2105            2 :                         }
    2106            0 :                 } else {
    2107            0 :                         // TODO(radu): investigate why this happens.
    2108            0 :                         // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
    2109            0 :                 }
    2110              :         }
    2111            2 :         return nil
    2112              : }
    2113              : 
    2114              : func newPickedManualCompaction(
    2115              :         vers *manifest.Version,
    2116              :         l0Organizer *manifest.L0Organizer,
    2117              :         opts *Options,
    2118              :         env compactionEnv,
    2119              :         baseLevel int,
    2120              :         manual *manualCompaction,
    2121            2 : ) (pc *pickedTableCompaction, retryLater bool) {
    2122            2 :         outputLevel := manual.level + 1
    2123            2 :         if manual.level == 0 {
    2124            2 :                 outputLevel = baseLevel
    2125            2 :         } else if manual.level < baseLevel {
    2126            2 :                 // The start level for a compaction must be >= Lbase. A manual
    2127            2 :                 // compaction could have been created adhering to that condition, and
    2128            2 :                 // then an automatic compaction came in and compacted all of the
    2129            2 :                 // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
    2130            2 :                 // ignore this manual compaction as there is nothing to do (manual.level
    2131            2 :                 // points to an empty level).
    2132            2 :                 return nil, false
    2133            2 :         }
    2134              :         // This conflictsWithInProgress call is necessary for the manual compaction to
    2135              :         // be retried when it conflicts with an ongoing automatic compaction. Without
    2136              :         // it, the compaction is dropped due to pc.setupInputs returning false since
    2137              :         // the input/output range is already being compacted, and the manual
    2138              :         // compaction ends with a non-compacted LSM.
    2139            2 :         if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
    2140            2 :                 return nil, true
    2141            2 :         }
    2142            2 :         pc = newPickedTableCompaction(opts, vers, l0Organizer, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
    2143            2 :         pc.manualID = manual.id
    2144            2 :         manual.outputLevel = pc.outputLevel.level
    2145            2 :         pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
    2146            2 :         if pc.startLevel.files.Empty() {
    2147            2 :                 // Nothing to do
    2148            2 :                 return nil, false
    2149            2 :         }
    2150              :         // We use nil problemSpans because we don't want problem spans to prevent
    2151              :         // manual compactions.
    2152            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, nil /* problemSpans */) {
    2153            2 :                 // setupInputs returned false indicating there's a conflicting
    2154            2 :                 // concurrent compaction.
    2155            2 :                 return nil, true
    2156            2 :         }
    2157            2 :         if pc = pc.maybeAddLevel(opts, env); pc == nil {
    2158            0 :                 return nil, false
    2159            0 :         }
    2160            2 :         if pc.outputLevel.level != outputLevel {
    2161            2 :                 if len(pc.inputs) > 2 {
    2162            2 :                         // Multilevel compactions relax this invariant.
    2163            2 :                 } else {
    2164            0 :                         panic("pebble: compaction picked unexpected output level")
    2165              :                 }
    2166              :         }
    2167            2 :         return pc, false
    2168              : }
    2169              : 
    2170              : // pickDownloadCompaction picks a download compaction for the downloadSpan,
    2171              : // which could be specified as being performed either by a copy compaction of
    2172              : // the backing file or a rewrite compaction.
    2173              : func pickDownloadCompaction(
    2174              :         vers *manifest.Version,
    2175              :         l0Organizer *manifest.L0Organizer,
    2176              :         opts *Options,
    2177              :         env compactionEnv,
    2178              :         baseLevel int,
    2179              :         kind compactionKind,
    2180              :         level int,
    2181              :         file *manifest.TableMetadata,
    2182            2 : ) (pc *pickedTableCompaction) {
    2183            2 :         // Check if the file is compacting already.
    2184            2 :         if file.CompactionState == manifest.CompactionStateCompacting {
    2185            0 :                 return nil
    2186            0 :         }
    2187            2 :         if kind != compactionKindCopy && kind != compactionKindRewrite {
    2188            0 :                 panic("invalid download/rewrite compaction kind")
    2189              :         }
    2190            2 :         pc = newPickedTableCompaction(opts, vers, l0Organizer, level, level, baseLevel)
    2191            2 :         pc.kind = kind
    2192            2 :         pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*manifest.TableMetadata{file})
    2193            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, nil /* problemSpans */) {
    2194            0 :                 // setupInputs returned false indicating there's a conflicting
    2195            0 :                 // concurrent compaction.
    2196            0 :                 return nil
    2197            0 :         }
    2198            2 :         if pc.outputLevel.level != level {
    2199            0 :                 panic("pebble: download compaction picked unexpected output level")
    2200              :         }
    2201            2 :         return pc
    2202              : }
    2203              : 
    2204              : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
    2205              :         env compactionEnv,
    2206            2 : ) (pc *pickedTableCompaction) {
    2207            2 :         // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
    2208            2 :         // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
    2209            2 :         // lower priority.
    2210            2 :         if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
    2211            2 :                 return nil
    2212            2 :         }
    2213            2 :         for env.readCompactionEnv.readCompactions.size > 0 {
    2214            1 :                 rc := env.readCompactionEnv.readCompactions.remove()
    2215            1 :                 if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
    2216            1 :                         break
    2217              :                 }
    2218              :         }
    2219            2 :         return pc
    2220              : }
    2221              : 
    2222              : func pickReadTriggeredCompactionHelper(
    2223              :         p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
    2224            1 : ) (pc *pickedTableCompaction) {
    2225            1 :         overlapSlice := p.vers.Overlaps(rc.level, base.UserKeyBoundsInclusive(rc.start, rc.end))
    2226            1 :         var fileMatches bool
    2227            1 :         for f := range overlapSlice.All() {
    2228            1 :                 if f.TableNum == rc.tableNum {
    2229            1 :                         fileMatches = true
    2230            1 :                         break
    2231              :                 }
    2232              :         }
    2233            1 :         if !fileMatches {
    2234            1 :                 return nil
    2235            1 :         }
    2236              : 
    2237            1 :         pc = newPickedTableCompaction(p.opts, p.vers, p.latestVersionState.l0Organizer,
    2238            1 :                 rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
    2239            1 : 
    2240            1 :         pc.startLevel.files = overlapSlice
    2241            1 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
    2242            0 :                 return nil
    2243            0 :         }
    2244            1 :         pc.kind = compactionKindRead
    2245            1 : 
    2246            1 :         // Prevent read compactions which are too wide.
    2247            1 :         outputOverlaps := pc.version.Overlaps(pc.outputLevel.level, pc.bounds)
    2248            1 :         if outputOverlaps.AggregateSizeSum() > pc.maxReadCompactionBytes {
    2249            1 :                 return nil
    2250            1 :         }
    2251              : 
    2252              :         // Prevent compactions which start with a small seed file X, but overlap
    2253              :         // with over allowedCompactionWidth * X file sizes in the output layer.
    2254            1 :         const allowedCompactionWidth = 35
    2255            1 :         if outputOverlaps.AggregateSizeSum() > overlapSlice.AggregateSizeSum()*allowedCompactionWidth {
    2256            0 :                 return nil
    2257            0 :         }
    2258              : 
    2259            1 :         return pc
    2260              : }
    2261              : 
    2262            1 : func (p *compactionPickerByScore) forceBaseLevel1() {
    2263            1 :         p.baseLevel = 1
    2264            1 : }
    2265              : 
    2266              : // outputKeyRangeAlreadyCompacting checks if the input range of the picked
    2267              : // compaction is already being written to by an in-progress compaction.
    2268              : func outputKeyRangeAlreadyCompacting(
    2269              :         cmp base.Compare, inProgressCompactions []compactionInfo, pc *pickedTableCompaction,
    2270            2 : ) bool {
    2271            2 :         // Look for active compactions outputting to the same region of the key
    2272            2 :         // space in the same output level. Two potential compactions may conflict
    2273            2 :         // without sharing input files if there are no files in the output level
    2274            2 :         // that overlap with the intersection of the compactions' key spaces.
    2275            2 :         //
    2276            2 :         // Consider an active L0->Lbase compaction compacting two L0 files one
    2277            2 :         // [a-f] and the other [t-z] into Lbase.
    2278            2 :         //
    2279            2 :         // L0
    2280            2 :         //     ↦ 000100  ↤                           ↦  000101   ↤
    2281            2 :         // L1
    2282            2 :         //     ↦ 000004  ↤
    2283            2 :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2284            2 :         //
    2285            2 :         // If a new file 000102 [j-p] is flushed while the existing compaction is
    2286            2 :         // still ongoing, new file would not be in any compacting sublevel
    2287            2 :         // intervals and would not overlap with any Lbase files that are also
    2288            2 :         // compacting. However, this compaction cannot be picked because the
    2289            2 :         // compaction's output key space [j-p] would overlap the existing
    2290            2 :         // compaction's output key space [a-z].
    2291            2 :         //
    2292            2 :         // L0
    2293            2 :         //     ↦ 000100* ↤       ↦   000102  ↤       ↦  000101*  ↤
    2294            2 :         // L1
    2295            2 :         //     ↦ 000004* ↤
    2296            2 :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2297            2 :         //
    2298            2 :         // * - currently compacting
    2299            2 :         if pc.outputLevel != nil && pc.outputLevel.level != 0 {
    2300            2 :                 for _, c := range inProgressCompactions {
    2301            2 :                         if pc.outputLevel.level != c.outputLevel {
    2302            2 :                                 continue
    2303              :                         }
    2304            2 :                         if !c.bounds.Overlaps(cmp, &pc.bounds) {
    2305            2 :                                 continue
    2306              :                         }
    2307              :                         // The picked compaction and the in-progress compaction c are
    2308              :                         // outputting to the same region of the key space of the same
    2309              :                         // level.
    2310            2 :                         return true
    2311              :                 }
    2312              :         }
    2313            2 :         return false
    2314              : }
    2315              : 
    2316              : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
    2317              : func conflictsWithInProgress(
    2318              :         manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
    2319            2 : ) bool {
    2320            2 :         for _, c := range inProgressCompactions {
    2321            2 :                 if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
    2322            2 :                         areUserKeysOverlapping(manual.start, manual.end, c.bounds.Start, c.bounds.End.Key, cmp) {
    2323            2 :                         return true
    2324            2 :                 }
    2325            2 :                 for _, in := range c.inputs {
    2326            2 :                         if in.files.Empty() {
    2327            2 :                                 continue
    2328              :                         }
    2329            2 :                         iter := in.files.Iter()
    2330            2 :                         smallest := iter.First().Smallest().UserKey
    2331            2 :                         largest := iter.Last().Largest().UserKey
    2332            2 :                         if (in.level == manual.level || in.level == outputLevel) &&
    2333            2 :                                 areUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
    2334            2 :                                 return true
    2335            2 :                         }
    2336              :                 }
    2337              :         }
    2338            2 :         return false
    2339              : }
    2340              : 
    2341            2 : func areUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
    2342            2 :         return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
    2343            2 : }
        

Generated by: LCOV version 2.0-1