LCOV - code coverage report
Current view: top level - pebble - compaction_picker.go (source / functions) Hit Total Coverage
Test: 2023-11-20 08:16Z b9be64b8 - tests + meta.lcov Lines: 1143 1233 92.7 %
Date: 2023-11-20 08:17:03 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "fmt"
      10             :         "math"
      11             :         "sort"
      12             :         "strings"
      13             : 
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/humanize"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             : )
      18             : 
      19             : // The minimum count for an intra-L0 compaction. This matches the RocksDB
      20             : // heuristic.
      21             : const minIntraL0Count = 4
      22             : 
      23             : type compactionEnv struct {
      24             :         // diskAvailBytes holds a statistic on the number of bytes available on
      25             :         // disk, as reported by the filesystem. It's used to be more restrictive in
      26             :         // expanding compactions if available disk space is limited.
      27             :         //
      28             :         // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
      29             :         // and whenever a compaction or flush completes. Since file removal is the
      30             :         // primary means of reclaiming space, there is a rough bound on the
      31             :         // statistic's staleness when available bytes is growing. Compactions and
      32             :         // flushes are longer, slower operations and provide a much looser bound
      33             :         // when available bytes is decreasing.
      34             :         diskAvailBytes          uint64
      35             :         earliestUnflushedSeqNum uint64
      36             :         earliestSnapshotSeqNum  uint64
      37             :         inProgressCompactions   []compactionInfo
      38             :         readCompactionEnv       readCompactionEnv
      39             : }
      40             : 
      41             : type compactionPicker interface {
      42             :         getScores([]compactionInfo) [numLevels]float64
      43             :         getBaseLevel() int
      44             :         estimatedCompactionDebt(l0ExtraSize uint64) uint64
      45             :         pickAuto(env compactionEnv) (pc *pickedCompaction)
      46             :         pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
      47             :         pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
      48             :         pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
      49             :         forceBaseLevel1()
      50             : }
      51             : 
      52             : // readCompactionEnv is used to hold data required to perform read compactions
      53             : type readCompactionEnv struct {
      54             :         rescheduleReadCompaction *bool
      55             :         readCompactions          *readCompactionQueue
      56             :         flushing                 bool
      57             : }
      58             : 
      59             : // Information about in-progress compactions provided to the compaction picker.
      60             : // These are used to constrain the new compactions that will be picked.
      61             : type compactionInfo struct {
      62             :         // versionEditApplied is true if this compaction's version edit has already
      63             :         // been committed. The compaction may still be in-progress deleting newly
      64             :         // obsolete files.
      65             :         versionEditApplied bool
      66             :         inputs             []compactionLevel
      67             :         outputLevel        int
      68             :         smallest           InternalKey
      69             :         largest            InternalKey
      70             : }
      71             : 
      72           1 : func (info compactionInfo) String() string {
      73           1 :         var buf bytes.Buffer
      74           1 :         var largest int
      75           1 :         for i, in := range info.inputs {
      76           1 :                 if i > 0 {
      77           1 :                         fmt.Fprintf(&buf, " -> ")
      78           1 :                 }
      79           1 :                 fmt.Fprintf(&buf, "L%d", in.level)
      80           1 :                 in.files.Each(func(m *fileMetadata) {
      81           1 :                         fmt.Fprintf(&buf, " %s", m.FileNum)
      82           1 :                 })
      83           1 :                 if largest < in.level {
      84           1 :                         largest = in.level
      85           1 :                 }
      86             :         }
      87           1 :         if largest != info.outputLevel || len(info.inputs) == 1 {
      88           1 :                 fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
      89           1 :         }
      90           1 :         return buf.String()
      91             : }
      92             : 
      93             : type sortCompactionLevelsByPriority []candidateLevelInfo
      94             : 
      95           2 : func (s sortCompactionLevelsByPriority) Len() int {
      96           2 :         return len(s)
      97           2 : }
      98             : 
      99             : // A level should be picked for compaction if the compensatedScoreRatio is >= the
     100             : // compactionScoreThreshold.
     101             : const compactionScoreThreshold = 1
     102             : 
     103             : // Less should return true if s[i] must be placed earlier than s[j] in the final
     104             : // sorted list. The candidateLevelInfo for the level placed earlier is more likely
     105             : // to be picked for a compaction.
     106           2 : func (s sortCompactionLevelsByPriority) Less(i, j int) bool {
     107           2 :         iShouldCompact := s[i].compensatedScoreRatio >= compactionScoreThreshold
     108           2 :         jShouldCompact := s[j].compensatedScoreRatio >= compactionScoreThreshold
     109           2 :         // Ordering is defined as decreasing on (shouldCompact, uncompensatedScoreRatio)
     110           2 :         // where shouldCompact is 1 for true and 0 for false.
     111           2 :         if iShouldCompact && !jShouldCompact {
     112           2 :                 return true
     113           2 :         }
     114           2 :         if !iShouldCompact && jShouldCompact {
     115           2 :                 return false
     116           2 :         }
     117             : 
     118           2 :         if s[i].uncompensatedScoreRatio != s[j].uncompensatedScoreRatio {
     119           2 :                 return s[i].uncompensatedScoreRatio > s[j].uncompensatedScoreRatio
     120           2 :         }
     121           2 :         return s[i].level < s[j].level
     122             : }
     123             : 
     124           2 : func (s sortCompactionLevelsByPriority) Swap(i, j int) {
     125           2 :         s[i], s[j] = s[j], s[i]
     126           2 : }
     127             : 
     128             : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
     129             : // sublevel.
     130             : type sublevelInfo struct {
     131             :         manifest.LevelSlice
     132             :         sublevel manifest.Level
     133             : }
     134             : 
     135           1 : func (cl sublevelInfo) Clone() sublevelInfo {
     136           1 :         return sublevelInfo{
     137           1 :                 sublevel:   cl.sublevel,
     138           1 :                 LevelSlice: cl.LevelSlice.Reslice(func(start, end *manifest.LevelIterator) {}),
     139             :         }
     140             : }
     141           1 : func (cl sublevelInfo) String() string {
     142           1 :         return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
     143           1 : }
     144             : 
     145             : // generateSublevelInfo will generate the level slices for each of the sublevels
     146             : // from the level slice for all of L0.
     147           2 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
     148           2 :         sublevelMap := make(map[uint64][]*fileMetadata)
     149           2 :         it := levelFiles.Iter()
     150           2 :         for f := it.First(); f != nil; f = it.Next() {
     151           2 :                 sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
     152           2 :         }
     153             : 
     154           2 :         var sublevels []int
     155           2 :         for level := range sublevelMap {
     156           2 :                 sublevels = append(sublevels, int(level))
     157           2 :         }
     158           2 :         sort.Ints(sublevels)
     159           2 : 
     160           2 :         var levelSlices []sublevelInfo
     161           2 :         for _, sublevel := range sublevels {
     162           2 :                 metas := sublevelMap[uint64(sublevel)]
     163           2 :                 levelSlices = append(
     164           2 :                         levelSlices,
     165           2 :                         sublevelInfo{
     166           2 :                                 manifest.NewLevelSliceKeySorted(cmp, metas),
     167           2 :                                 manifest.L0Sublevel(sublevel),
     168           2 :                         },
     169           2 :                 )
     170           2 :         }
     171           2 :         return levelSlices
     172             : }
     173             : 
     174             : // compactionPickerMetrics holds metrics related to the compaction picking process
     175             : type compactionPickerMetrics struct {
     176             :         // scores contains the compensatedScoreRatio from the candidateLevelInfo.
     177             :         scores                      []float64
     178             :         singleLevelOverlappingRatio float64
     179             :         multiLevelOverlappingRatio  float64
     180             : }
     181             : 
     182             : // pickedCompaction contains information about a compaction that has already
     183             : // been chosen, and is being constructed. Compaction construction info lives in
     184             : // this struct, and is copied over into the compaction struct when that's
     185             : // created.
     186             : type pickedCompaction struct {
     187             :         cmp Compare
     188             :         // score of the chosen compaction. This is the same as the
     189             :         // compensatedScoreRatio in the candidateLevelInfo.
     190             :         score float64
     191             :         // kind indicates the kind of compaction.
     192             :         kind compactionKind
     193             :         // startLevel is the level that is being compacted. Inputs from startLevel
     194             :         // and outputLevel will be merged to produce a set of outputLevel files.
     195             :         startLevel *compactionLevel
     196             :         // outputLevel is the level that files are being produced in. outputLevel is
     197             :         // equal to startLevel+1 except when:
     198             :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     199             :         //    - in multilevel compaction, the output level is the lowest level involved in
     200             :         //      the compaction
     201             :         outputLevel *compactionLevel
     202             :         // extraLevels contain additional levels in between the input and output
     203             :         // levels that get compacted in multi level compactions
     204             :         extraLevels []*compactionLevel
     205             :         inputs      []compactionLevel
     206             :         // LBase at the time of compaction picking.
     207             :         baseLevel int
     208             :         // L0-specific compaction info. Set to a non-nil value for all compactions
     209             :         // where startLevel == 0 that were generated by L0Sublevels.
     210             :         lcf *manifest.L0CompactionFiles
     211             :         // maxOutputFileSize is the maximum size of an individual table created
     212             :         // during compaction.
     213             :         maxOutputFileSize uint64
     214             :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     215             :         // single output table with the tables in the grandparent level.
     216             :         maxOverlapBytes uint64
     217             :         // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
     218             :         // overlap in its output level with. If the overlap is greater than
     219             :         // maxReadCompaction bytes, then we don't proceed with the compaction.
     220             :         maxReadCompactionBytes uint64
     221             :         // The boundaries of the input data.
     222             :         smallest      InternalKey
     223             :         largest       InternalKey
     224             :         version       *version
     225             :         pickerMetrics compactionPickerMetrics
     226             : }
     227             : 
     228           2 : func defaultOutputLevel(startLevel, baseLevel int) int {
     229           2 :         outputLevel := startLevel + 1
     230           2 :         if startLevel == 0 {
     231           2 :                 outputLevel = baseLevel
     232           2 :         }
     233           2 :         if outputLevel >= numLevels-1 {
     234           2 :                 outputLevel = numLevels - 1
     235           2 :         }
     236           2 :         return outputLevel
     237             : }
     238             : 
     239             : func newPickedCompaction(
     240             :         opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
     241           2 : ) *pickedCompaction {
     242           2 :         if startLevel > 0 && startLevel < baseLevel {
     243           1 :                 panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
     244           1 :                         startLevel, baseLevel))
     245             :         }
     246             : 
     247           2 :         adjustedLevel := adjustedOutputLevel(outputLevel, baseLevel)
     248           2 :         pc := &pickedCompaction{
     249           2 :                 cmp:                    opts.Comparer.Compare,
     250           2 :                 version:                cur,
     251           2 :                 baseLevel:              baseLevel,
     252           2 :                 inputs:                 []compactionLevel{{level: startLevel}, {level: outputLevel}},
     253           2 :                 maxOutputFileSize:      uint64(opts.Level(adjustedLevel).TargetFileSize),
     254           2 :                 maxOverlapBytes:        maxGrandparentOverlapBytes(opts, adjustedLevel),
     255           2 :                 maxReadCompactionBytes: maxReadCompactionBytes(opts, adjustedLevel),
     256           2 :         }
     257           2 :         pc.startLevel = &pc.inputs[0]
     258           2 :         pc.outputLevel = &pc.inputs[1]
     259           2 :         return pc
     260             : }
     261             : 
     262             : // adjustedOutputLevel is the output level used for the purpose of
     263             : // determining the target output file size, overlap bytes, and expanded
     264             : // bytes, taking into account the base level.
     265           2 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
     266           2 :         adjustedOutputLevel := outputLevel
     267           2 :         if adjustedOutputLevel > 0 {
     268           2 :                 // Output level is in the range [baseLevel, numLevels]. For the purpose of
     269           2 :                 // determining the target output file size, overlap bytes, and expanded
     270           2 :                 // bytes, we want to adjust the range to [1,numLevels].
     271           2 :                 adjustedOutputLevel = 1 + outputLevel - baseLevel
     272           2 :         }
     273           2 :         return adjustedOutputLevel
     274             : }
     275             : 
     276             : func newPickedCompactionFromL0(
     277             :         lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
     278           2 : ) *pickedCompaction {
     279           2 :         outputLevel := baseLevel
     280           2 :         if !isBase {
     281           2 :                 outputLevel = 0 // Intra L0
     282           2 :         }
     283             : 
     284           2 :         pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
     285           2 :         pc.lcf = lcf
     286           2 :         pc.outputLevel.level = outputLevel
     287           2 : 
     288           2 :         // Manually build the compaction as opposed to calling
     289           2 :         // pickAutoHelper. This is because L0Sublevels has already added
     290           2 :         // any overlapping L0 SSTables that need to be added, and
     291           2 :         // because compactions built by L0SSTables do not necessarily
     292           2 :         // pick contiguous sequences of files in pc.version.Levels[0].
     293           2 :         files := make([]*manifest.FileMetadata, 0, len(lcf.Files))
     294           2 :         iter := vers.Levels[0].Iter()
     295           2 :         for f := iter.First(); f != nil; f = iter.Next() {
     296           2 :                 if lcf.FilesIncluded[f.L0Index] {
     297           2 :                         files = append(files, f)
     298           2 :                 }
     299             :         }
     300           2 :         pc.startLevel.files = manifest.NewLevelSliceSeqSorted(files)
     301           2 :         return pc
     302             : }
     303             : 
     304           1 : func (pc *pickedCompaction) String() string {
     305           1 :         var builder strings.Builder
     306           1 :         builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
     307           1 :         builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
     308           1 :         builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
     309           1 :         builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
     310           1 :         builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
     311           1 :         builder.WriteString(fmt.Sprintf(`smallest=%s, `, pc.smallest))
     312           1 :         builder.WriteString(fmt.Sprintf(`largest=%s, `, pc.largest))
     313           1 :         builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
     314           1 :         builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
     315           1 :         builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
     316           1 :         builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
     317           1 :         builder.WriteString(fmt.Sprintf(`extraLevels=%s, `, pc.extraLevels))
     318           1 :         builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
     319           1 :         builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
     320           1 :         return builder.String()
     321           1 : }
     322             : 
     323             : // Clone creates a deep copy of the pickedCompaction
     324           2 : func (pc *pickedCompaction) clone() *pickedCompaction {
     325           2 : 
     326           2 :         // Quickly copy over fields that do not require special deep copy care, and
     327           2 :         // set all fields that will require a deep copy to nil.
     328           2 :         newPC := &pickedCompaction{
     329           2 :                 cmp:                    pc.cmp,
     330           2 :                 score:                  pc.score,
     331           2 :                 kind:                   pc.kind,
     332           2 :                 baseLevel:              pc.baseLevel,
     333           2 :                 maxOutputFileSize:      pc.maxOutputFileSize,
     334           2 :                 maxOverlapBytes:        pc.maxOverlapBytes,
     335           2 :                 maxReadCompactionBytes: pc.maxReadCompactionBytes,
     336           2 :                 smallest:               pc.smallest.Clone(),
     337           2 :                 largest:                pc.largest.Clone(),
     338           2 : 
     339           2 :                 // TODO(msbutler): properly clone picker metrics
     340           2 :                 pickerMetrics: pc.pickerMetrics,
     341           2 : 
     342           2 :                 // Both copies see the same manifest, therefore, it's ok for them to se
     343           2 :                 // share the same pc. version.
     344           2 :                 version: pc.version,
     345           2 :         }
     346           2 : 
     347           2 :         newPC.inputs = make([]compactionLevel, len(pc.inputs))
     348           2 :         newPC.extraLevels = make([]*compactionLevel, 0, len(pc.extraLevels))
     349           2 :         for i := range pc.inputs {
     350           2 :                 newPC.inputs[i] = pc.inputs[i].Clone()
     351           2 :                 if i == 0 {
     352           2 :                         newPC.startLevel = &newPC.inputs[i]
     353           2 :                 } else if i == len(pc.inputs)-1 {
     354           2 :                         newPC.outputLevel = &newPC.inputs[i]
     355           2 :                 } else {
     356           1 :                         newPC.extraLevels = append(newPC.extraLevels, &newPC.inputs[i])
     357           1 :                 }
     358             :         }
     359             : 
     360           2 :         if len(pc.startLevel.l0SublevelInfo) > 0 {
     361           1 :                 newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
     362           1 :                 for i := range pc.startLevel.l0SublevelInfo {
     363           1 :                         newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
     364           1 :                 }
     365             :         }
     366           2 :         if pc.lcf != nil {
     367           1 :                 newPC.lcf = pc.lcf.Clone()
     368           1 :         }
     369           2 :         return newPC
     370             : }
     371             : 
     372             : // maybeExpandedBounds is a helper function for setupInputs which ensures the
     373             : // pickedCompaction's smallest and largest internal keys are updated iff
     374             : // the candidate keys expand the key span. This avoids a bug for multi-level
     375             : // compactions: during the second call to setupInputs, the picked compaction's
     376             : // smallest and largest keys should not decrease the key span.
     377           2 : func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest InternalKey) {
     378           2 :         emptyKey := InternalKey{}
     379           2 :         if base.InternalCompare(pc.cmp, smallest, emptyKey) == 0 {
     380           2 :                 if base.InternalCompare(pc.cmp, largest, emptyKey) != 0 {
     381           0 :                         panic("either both candidate keys are empty or neither are empty")
     382             :                 }
     383           2 :                 return
     384             :         }
     385           2 :         if base.InternalCompare(pc.cmp, pc.smallest, emptyKey) == 0 {
     386           2 :                 if base.InternalCompare(pc.cmp, pc.largest, emptyKey) != 0 {
     387           0 :                         panic("either both pc keys are empty or neither are empty")
     388             :                 }
     389           2 :                 pc.smallest = smallest
     390           2 :                 pc.largest = largest
     391           2 :                 return
     392             :         }
     393           2 :         if base.InternalCompare(pc.cmp, pc.smallest, smallest) >= 0 {
     394           2 :                 pc.smallest = smallest
     395           2 :         }
     396           2 :         if base.InternalCompare(pc.cmp, pc.largest, largest) <= 0 {
     397           2 :                 pc.largest = largest
     398           2 :         }
     399             : }
     400             : 
     401             : // setupInputs returns true if a compaction has been set up. It returns false if
     402             : // a concurrent compaction is occurring on the start or output level files.
     403             : func (pc *pickedCompaction) setupInputs(
     404             :         opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
     405           2 : ) bool {
     406           2 :         // maxExpandedBytes is the maximum size of an expanded compaction. If
     407           2 :         // growing a compaction results in a larger size, the original compaction
     408           2 :         // is used instead.
     409           2 :         maxExpandedBytes := expandedCompactionByteSizeLimit(
     410           2 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
     411           2 :         )
     412           2 : 
     413           2 :         // Expand the initial inputs to a clean cut.
     414           2 :         var isCompacting bool
     415           2 :         startLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, startLevel.files, false /* disableIsCompacting */)
     416           2 :         if isCompacting {
     417           2 :                 return false
     418           2 :         }
     419           2 :         pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.Iter()))
     420           2 : 
     421           2 :         // Determine the sstables in the output level which overlap with the input
     422           2 :         // sstables, and then expand those tables to a clean cut. No need to do
     423           2 :         // this for intra-L0 compactions; outputLevel.files is left empty for those.
     424           2 :         if startLevel.level != pc.outputLevel.level {
     425           2 :                 pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.cmp, pc.smallest.UserKey,
     426           2 :                         pc.largest.UserKey, pc.largest.IsExclusiveSentinel())
     427           2 :                 pc.outputLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, pc.outputLevel.files,
     428           2 :                         false /* disableIsCompacting */)
     429           2 :                 if isCompacting {
     430           2 :                         return false
     431           2 :                 }
     432           2 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     433           2 :                         startLevel.files.Iter(), pc.outputLevel.files.Iter()))
     434             :         }
     435             : 
     436             :         // Grow the sstables in startLevel.level as long as it doesn't affect the number
     437             :         // of sstables included from pc.outputLevel.level.
     438           2 :         if pc.lcf != nil && startLevel.level == 0 && pc.outputLevel.level != 0 {
     439           2 :                 // Call the L0-specific compaction extension method. Similar logic as
     440           2 :                 // pc.grow. Additional L0 files are optionally added to the compaction at
     441           2 :                 // this step. Note that the bounds passed in are not the bounds of the
     442           2 :                 // compaction, but rather the smallest and largest internal keys that
     443           2 :                 // the compaction cannot include from L0 without pulling in more Lbase
     444           2 :                 // files. Consider this example:
     445           2 :                 //
     446           2 :                 // L0:        c-d e+f g-h
     447           2 :                 // Lbase: a-b     e+f     i-j
     448           2 :                 //        a b c d e f g h i j
     449           2 :                 //
     450           2 :                 // The e-f files have already been chosen in the compaction. As pulling
     451           2 :                 // in more LBase files is undesirable, the logic below will pass in
     452           2 :                 // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
     453           2 :                 // will expand the compaction to include c-d and g-h from L0. The
     454           2 :                 // bounds passed in are exclusive; the compaction cannot be expanded
     455           2 :                 // to include files that "touch" it.
     456           2 :                 smallestBaseKey := base.InvalidInternalKey
     457           2 :                 largestBaseKey := base.InvalidInternalKey
     458           2 :                 if pc.outputLevel.files.Empty() {
     459           2 :                         baseIter := pc.version.Levels[pc.outputLevel.level].Iter()
     460           2 :                         if sm := baseIter.SeekLT(pc.cmp, pc.smallest.UserKey); sm != nil {
     461           2 :                                 smallestBaseKey = sm.Largest
     462           2 :                         }
     463           2 :                         if la := baseIter.SeekGE(pc.cmp, pc.largest.UserKey); la != nil {
     464           2 :                                 largestBaseKey = la.Smallest
     465           2 :                         }
     466           2 :                 } else {
     467           2 :                         // NB: We use Reslice to access the underlying level's files, but
     468           2 :                         // we discard the returned slice. The pc.outputLevel.files slice
     469           2 :                         // is not modified.
     470           2 :                         _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
     471           2 :                                 if sm := start.Prev(); sm != nil {
     472           2 :                                         smallestBaseKey = sm.Largest
     473           2 :                                 }
     474           2 :                                 if la := end.Next(); la != nil {
     475           2 :                                         largestBaseKey = la.Smallest
     476           2 :                                 }
     477             :                         })
     478             :                 }
     479           2 :                 oldLcf := pc.lcf.Clone()
     480           2 :                 if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
     481           2 :                         var newStartLevelFiles []*fileMetadata
     482           2 :                         iter := pc.version.Levels[0].Iter()
     483           2 :                         var sizeSum uint64
     484           2 :                         for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
     485           2 :                                 if pc.lcf.FilesIncluded[f.L0Index] {
     486           2 :                                         newStartLevelFiles = append(newStartLevelFiles, f)
     487           2 :                                         sizeSum += f.Size
     488           2 :                                 }
     489             :                         }
     490           2 :                         if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
     491           2 :                                 startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
     492           2 :                                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
     493           2 :                                         startLevel.files.Iter(), pc.outputLevel.files.Iter())
     494           2 :                         } else {
     495           2 :                                 *pc.lcf = *oldLcf
     496           2 :                         }
     497             :                 }
     498           2 :         } else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
     499           2 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     500           2 :                         startLevel.files.Iter(), pc.outputLevel.files.Iter()))
     501           2 :         }
     502             : 
     503           2 :         if pc.startLevel.level == 0 {
     504           2 :                 // We don't change the input files for the compaction beyond this point.
     505           2 :                 pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
     506           2 :         }
     507             : 
     508           2 :         return true
     509             : }
     510             : 
     511             : // grow grows the number of inputs at c.level without changing the number of
     512             : // c.level+1 files in the compaction, and returns whether the inputs grew. sm
     513             : // and la are the smallest and largest InternalKeys in all of the inputs.
     514             : func (pc *pickedCompaction) grow(
     515             :         sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
     516           2 : ) bool {
     517           2 :         if pc.outputLevel.files.Empty() {
     518           2 :                 return false
     519           2 :         }
     520           2 :         grow0 := pc.version.Overlaps(startLevel.level, pc.cmp, sm.UserKey,
     521           2 :                 la.UserKey, la.IsExclusiveSentinel())
     522           2 :         grow0, isCompacting := expandToAtomicUnit(pc.cmp, grow0, false /* disableIsCompacting */)
     523           2 :         if isCompacting {
     524           1 :                 return false
     525           1 :         }
     526           2 :         if grow0.Len() <= startLevel.files.Len() {
     527           2 :                 return false
     528           2 :         }
     529           2 :         if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
     530           2 :                 return false
     531           2 :         }
     532             :         // We need to include the outputLevel iter because without it, in a multiLevel scenario,
     533             :         // sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
     534           2 :         sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter(), pc.outputLevel.files.Iter())
     535           2 :         grow1 := pc.version.Overlaps(pc.outputLevel.level, pc.cmp, sm1.UserKey,
     536           2 :                 la1.UserKey, la1.IsExclusiveSentinel())
     537           2 :         grow1, isCompacting = expandToAtomicUnit(pc.cmp, grow1, false /* disableIsCompacting */)
     538           2 :         if isCompacting {
     539           1 :                 return false
     540           1 :         }
     541           2 :         if grow1.Len() != pc.outputLevel.files.Len() {
     542           2 :                 return false
     543           2 :         }
     544           2 :         startLevel.files = grow0
     545           2 :         pc.outputLevel.files = grow1
     546           2 :         return true
     547             : }
     548             : 
     549           2 : func (pc *pickedCompaction) compactionSize() uint64 {
     550           2 :         var bytesToCompact uint64
     551           2 :         for i := range pc.inputs {
     552           2 :                 bytesToCompact += pc.inputs[i].files.SizeSum()
     553           2 :         }
     554           2 :         return bytesToCompact
     555             : }
     556             : 
     557             : // setupMultiLevelCandidated returns true if it successfully added another level
     558             : // to the compaction.
     559           2 : func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailBytes uint64) bool {
     560           2 :         pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
     561           2 : 
     562           2 :         // Recalibrate startLevel and outputLevel:
     563           2 :         //  - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
     564           2 :         //  - push outputLevel to extraLevels and move the new level to outputLevel
     565           2 :         pc.startLevel = &pc.inputs[0]
     566           2 :         pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
     567           2 :         pc.outputLevel = &pc.inputs[2]
     568           2 :         return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
     569           2 : }
     570             : 
     571             : // expandToAtomicUnit expands the provided level slice within its level both
     572             : // forwards and backwards to its "atomic compaction unit" boundaries, if
     573             : // necessary.
     574             : //
     575             : // While picking compaction inputs, this is required to maintain the invariant
     576             : // that the versions of keys at level+1 are older than the versions of keys at
     577             : // level. Tables are added to the right of the current slice tables such that
     578             : // the rightmost table has a "clean cut". A clean cut is either a change in
     579             : // user keys, or when the largest key in the left sstable is a range tombstone
     580             : // sentinel key (InternalKeyRangeDeleteSentinel).
     581             : //
     582             : // In addition to maintaining the seqnum invariant, expandToAtomicUnit is used
     583             : // to provide clean boundaries for range tombstone truncation during
     584             : // compaction. In order to achieve these clean boundaries, expandToAtomicUnit
     585             : // needs to find a "clean cut" on the left edge of the compaction as well.
     586             : // This is necessary in order for "atomic compaction units" to always be
     587             : // compacted as a unit. Failure to do this leads to a subtle bug with
     588             : // truncation of range tombstones to atomic compaction unit boundaries.
     589             : // Consider the scenario:
     590             : //
     591             : //      L3:
     592             : //        12:[a#2,15-b#1,1]
     593             : //        13:[b#0,15-d#72057594037927935,15]
     594             : //
     595             : // These sstables contain a range tombstone [a-d)#2 which spans the two
     596             : // sstables. The two sstables need to always be kept together. Compacting
     597             : // sstable 13 independently of sstable 12 would result in:
     598             : //
     599             : //      L3:
     600             : //        12:[a#2,15-b#1,1]
     601             : //      L4:
     602             : //        14:[b#0,15-d#72057594037927935,15]
     603             : //
     604             : // This state is still ok, but when sstable 12 is next compacted, its range
     605             : // tombstones will be truncated at "b" (the largest key in its atomic
     606             : // compaction unit). In the scenario here, that could result in b#1 becoming
     607             : // visible when it should be deleted.
     608             : //
     609             : // isCompacting is returned true for any atomic units that contain files that
     610             : // have in-progress compactions, i.e. FileMetadata.Compacting == true. If
     611             : // disableIsCompacting is true, isCompacting always returns false. This helps
     612             : // avoid spurious races from being detected when this method is used outside
     613             : // of compaction picking code.
     614             : //
     615             : // TODO(jackson): Compactions and flushes no longer split a user key between two
     616             : // sstables. We could perform a migration, re-compacting any sstables with split
     617             : // user keys, which would allow us to remove atomic compaction unit expansion
     618             : // code.
     619             : func expandToAtomicUnit(
     620             :         cmp Compare, inputs manifest.LevelSlice, disableIsCompacting bool,
     621           2 : ) (slice manifest.LevelSlice, isCompacting bool) {
     622           2 :         // NB: Inputs for L0 can't be expanded and *version.Overlaps guarantees
     623           2 :         // that we get a 'clean cut.' For L0, Overlaps will return a slice without
     624           2 :         // access to the rest of the L0 files, so it's OK to try to reslice.
     625           2 :         if inputs.Empty() {
     626           2 :                 // Nothing to expand.
     627           2 :                 return inputs, false
     628           2 :         }
     629             : 
     630             :         // TODO(jackson): Update to avoid use of LevelIterator.Current(). The
     631             :         // Reslice interface will require some tweaking, because we currently rely
     632             :         // on Reslice having already positioned the LevelIterator appropriately.
     633             : 
     634           2 :         inputs = inputs.Reslice(func(start, end *manifest.LevelIterator) {
     635           2 :                 iter := start.Clone()
     636           2 :                 iter.Prev()
     637           2 :                 for cur, prev := start.Current(), iter.Current(); prev != nil; cur, prev = start.Prev(), iter.Prev() {
     638           2 :                         if cur.IsCompacting() {
     639           2 :                                 isCompacting = true
     640           2 :                         }
     641           2 :                         if cmp(prev.Largest.UserKey, cur.Smallest.UserKey) < 0 {
     642           2 :                                 break
     643             :                         }
     644           2 :                         if prev.Largest.IsExclusiveSentinel() {
     645           2 :                                 // The table prev has a largest key indicating that the user key
     646           2 :                                 // prev.largest.UserKey doesn't actually exist in the table.
     647           2 :                                 break
     648             :                         }
     649             :                         // prev.Largest.UserKey == cur.Smallest.UserKey, so we need to
     650             :                         // include prev in the compaction.
     651             :                 }
     652             : 
     653           2 :                 iter = end.Clone()
     654           2 :                 iter.Next()
     655           2 :                 for cur, next := end.Current(), iter.Current(); next != nil; cur, next = end.Next(), iter.Next() {
     656           2 :                         if cur.IsCompacting() {
     657           2 :                                 isCompacting = true
     658           2 :                         }
     659           2 :                         if cmp(cur.Largest.UserKey, next.Smallest.UserKey) < 0 {
     660           2 :                                 break
     661             :                         }
     662           2 :                         if cur.Largest.IsExclusiveSentinel() {
     663           2 :                                 // The table cur has a largest key indicating that the user key
     664           2 :                                 // cur.largest.UserKey doesn't actually exist in the table.
     665           2 :                                 break
     666             :                         }
     667             :                         // cur.Largest.UserKey == next.Smallest.UserKey, so we need to
     668             :                         // include next in the compaction.
     669             :                 }
     670             :         })
     671           2 :         inputIter := inputs.Iter()
     672           2 :         isCompacting = !disableIsCompacting &&
     673           2 :                 (isCompacting || inputIter.First().IsCompacting() || inputIter.Last().IsCompacting())
     674           2 :         return inputs, isCompacting
     675             : }
     676             : 
     677             : func newCompactionPicker(
     678             :         v *version, opts *Options, inProgressCompactions []compactionInfo,
     679           2 : ) compactionPicker {
     680           2 :         p := &compactionPickerByScore{
     681           2 :                 opts: opts,
     682           2 :                 vers: v,
     683           2 :         }
     684           2 :         p.initLevelMaxBytes(inProgressCompactions)
     685           2 :         return p
     686           2 : }
     687             : 
     688             : // Information about a candidate compaction level that has been identified by
     689             : // the compaction picker.
     690             : type candidateLevelInfo struct {
     691             :         // The compensatedScore of the level after adjusting according to the other
     692             :         // levels' sizes. For L0, the compensatedScoreRatio is equivalent to the
     693             :         // uncompensatedScoreRatio as we don't account for level size compensation in
     694             :         // L0.
     695             :         compensatedScoreRatio float64
     696             :         // The score of the level after accounting for level size compensation before
     697             :         // adjusting according to other levels' sizes. For L0, the compensatedScore
     698             :         // is equivalent to the uncompensatedScore as we don't account for level
     699             :         // size compensation in L0.
     700             :         compensatedScore float64
     701             :         // The score of the level to be compacted, calculated using uncompensated file
     702             :         // sizes and without any adjustments.
     703             :         uncompensatedScore float64
     704             :         // uncompensatedScoreRatio is the uncompensatedScore adjusted according to
     705             :         // the other levels' sizes.
     706             :         uncompensatedScoreRatio float64
     707             :         level                   int
     708             :         // The level to compact to.
     709             :         outputLevel int
     710             :         // The file in level that will be compacted. Additional files may be
     711             :         // picked by the compaction, and a pickedCompaction created for the
     712             :         // compaction.
     713             :         file manifest.LevelFile
     714             : }
     715             : 
     716           2 : func (c *candidateLevelInfo) shouldCompact() bool {
     717           2 :         return c.compensatedScoreRatio >= compactionScoreThreshold
     718           2 : }
     719             : 
     720           2 : func fileCompensation(f *fileMetadata) uint64 {
     721           2 :         return uint64(f.Stats.PointDeletionsBytesEstimate) + f.Stats.RangeDeletionsBytesEstimate
     722           2 : }
     723             : 
     724             : // compensatedSize returns f's file size, inflated according to compaction
     725             : // priorities.
     726           2 : func compensatedSize(f *fileMetadata) uint64 {
     727           2 :         // Add in the estimate of disk space that may be reclaimed by compacting the
     728           2 :         // file's tombstones.
     729           2 :         return f.Size + fileCompensation(f)
     730           2 : }
     731             : 
     732             : // compensatedSizeAnnotator implements manifest.Annotator, annotating B-Tree
     733             : // nodes with the sum of the files' compensated sizes. Its annotation type is
     734             : // a *uint64. Compensated sizes may change once a table's stats are loaded
     735             : // asynchronously, so its values are marked as cacheable only if a file's
     736             : // stats have been loaded.
     737             : type compensatedSizeAnnotator struct {
     738             : }
     739             : 
     740             : var _ manifest.Annotator = compensatedSizeAnnotator{}
     741             : 
     742           2 : func (a compensatedSizeAnnotator) Zero(dst interface{}) interface{} {
     743           2 :         if dst == nil {
     744           2 :                 return new(uint64)
     745           2 :         }
     746           2 :         v := dst.(*uint64)
     747           2 :         *v = 0
     748           2 :         return v
     749             : }
     750             : 
     751             : func (a compensatedSizeAnnotator) Accumulate(
     752             :         f *fileMetadata, dst interface{},
     753           2 : ) (v interface{}, cacheOK bool) {
     754           2 :         vptr := dst.(*uint64)
     755           2 :         *vptr = *vptr + compensatedSize(f)
     756           2 :         return vptr, f.StatsValid()
     757           2 : }
     758             : 
     759           2 : func (a compensatedSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
     760           2 :         srcV := src.(*uint64)
     761           2 :         dstV := dst.(*uint64)
     762           2 :         *dstV = *dstV + *srcV
     763           2 :         return dstV
     764           2 : }
     765             : 
     766             : // totalCompensatedSize computes the compensated size over a file metadata
     767             : // iterator. Note that this function is linear in the files available to the
     768             : // iterator. Use the compensatedSizeAnnotator if querying the total
     769             : // compensated size of a level.
     770           2 : func totalCompensatedSize(iter manifest.LevelIterator) uint64 {
     771           2 :         var sz uint64
     772           2 :         for f := iter.First(); f != nil; f = iter.Next() {
     773           2 :                 sz += compensatedSize(f)
     774           2 :         }
     775           2 :         return sz
     776             : }
     777             : 
     778             : // compactionPickerByScore holds the state and logic for picking a compaction. A
     779             : // compaction picker is associated with a single version. A new compaction
     780             : // picker is created and initialized every time a new version is installed.
     781             : type compactionPickerByScore struct {
     782             :         opts *Options
     783             :         vers *version
     784             :         // The level to target for L0 compactions. Levels L1 to baseLevel must be
     785             :         // empty.
     786             :         baseLevel int
     787             :         // levelMaxBytes holds the dynamically adjusted max bytes setting for each
     788             :         // level.
     789             :         levelMaxBytes [numLevels]int64
     790             : }
     791             : 
     792             : var _ compactionPicker = &compactionPickerByScore{}
     793             : 
     794           2 : func (p *compactionPickerByScore) getScores(inProgress []compactionInfo) [numLevels]float64 {
     795           2 :         var scores [numLevels]float64
     796           2 :         for _, info := range p.calculateLevelScores(inProgress) {
     797           2 :                 scores[info.level] = info.compensatedScoreRatio
     798           2 :         }
     799           2 :         return scores
     800             : }
     801             : 
     802           2 : func (p *compactionPickerByScore) getBaseLevel() int {
     803           2 :         if p == nil {
     804           0 :                 return 1
     805           0 :         }
     806           2 :         return p.baseLevel
     807             : }
     808             : 
     809             : // estimatedCompactionDebt estimates the number of bytes which need to be
     810             : // compacted before the LSM tree becomes stable.
     811           2 : func (p *compactionPickerByScore) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
     812           2 :         if p == nil {
     813           0 :                 return 0
     814           0 :         }
     815             : 
     816             :         // We assume that all the bytes in L0 need to be compacted to Lbase. This is
     817             :         // unlike the RocksDB logic that figures out whether L0 needs compaction.
     818           2 :         bytesAddedToNextLevel := l0ExtraSize + p.vers.Levels[0].Size()
     819           2 :         lbaseSize := p.vers.Levels[p.baseLevel].Size()
     820           2 : 
     821           2 :         var compactionDebt uint64
     822           2 :         if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
     823           2 :                 // We only incur compaction debt if both L0 and Lbase contain data. If L0
     824           2 :                 // is empty, no compaction is necessary. If Lbase is empty, a move-based
     825           2 :                 // compaction from L0 would occur.
     826           2 :                 compactionDebt += bytesAddedToNextLevel + lbaseSize
     827           2 :         }
     828             : 
     829             :         // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
     830             :         // bytes added to `level` in the loop.
     831           2 :         for level := p.baseLevel; level < numLevels-1; level++ {
     832           2 :                 levelSize := p.vers.Levels[level].Size() + bytesAddedToNextLevel
     833           2 :                 nextLevelSize := p.vers.Levels[level+1].Size()
     834           2 :                 if levelSize > uint64(p.levelMaxBytes[level]) {
     835           2 :                         bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
     836           2 :                         if nextLevelSize > 0 {
     837           2 :                                 // We only incur compaction debt if the next level contains data. If the
     838           2 :                                 // next level is empty, a move-based compaction would be used.
     839           2 :                                 levelRatio := float64(nextLevelSize) / float64(levelSize)
     840           2 :                                 // The current level contributes bytesAddedToNextLevel to compactions.
     841           2 :                                 // The next level contributes levelRatio * bytesAddedToNextLevel.
     842           2 :                                 compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
     843           2 :                         }
     844           2 :                 } else {
     845           2 :                         // We're not moving any bytes to the next level.
     846           2 :                         bytesAddedToNextLevel = 0
     847           2 :                 }
     848             :         }
     849           2 :         return compactionDebt
     850             : }
     851             : 
     852           2 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
     853           2 :         // The levelMaxBytes calculations here differ from RocksDB in two ways:
     854           2 :         //
     855           2 :         // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
     856           2 :         //    level in L1-L6, rather than determining the size of the bottom level
     857           2 :         //    based on the total amount of data in the dB. The RocksDB calculation is
     858           2 :         //    problematic if L0 contains a significant fraction of data, or if the
     859           2 :         //    level sizes are roughly equal and thus there is a significant fraction
     860           2 :         //    of data outside of the largest level.
     861           2 :         //
     862           2 :         // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
     863           2 :         //    baseBytesMax as the maximum of the configured LBaseMaxBytes and the
     864           2 :         //    size of L0. This is problematic because baseBytesMax is used to compute
     865           2 :         //    the max size of lower levels. A very large baseBytesMax will result in
     866           2 :         //    an overly large value for the size of lower levels which will caused
     867           2 :         //    those levels not to be compacted even when they should be
     868           2 :         //    compacted. This often results in "inverted" LSM shapes where Ln is
     869           2 :         //    larger than Ln+1.
     870           2 : 
     871           2 :         // Determine the first non-empty level and the total DB size.
     872           2 :         firstNonEmptyLevel := -1
     873           2 :         var dbSize uint64
     874           2 :         for level := 1; level < numLevels; level++ {
     875           2 :                 if p.vers.Levels[level].Size() > 0 {
     876           2 :                         if firstNonEmptyLevel == -1 {
     877           2 :                                 firstNonEmptyLevel = level
     878           2 :                         }
     879           2 :                         dbSize += p.vers.Levels[level].Size()
     880             :                 }
     881             :         }
     882           2 :         for _, c := range inProgressCompactions {
     883           2 :                 if c.outputLevel == 0 || c.outputLevel == -1 {
     884           2 :                         continue
     885             :                 }
     886           2 :                 if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
     887           2 :                         firstNonEmptyLevel = c.outputLevel
     888           2 :                 }
     889             :         }
     890             : 
     891             :         // Initialize the max-bytes setting for each level to "infinity" which will
     892             :         // disallow compaction for that level. We'll fill in the actual value below
     893             :         // for levels we want to allow compactions from.
     894           2 :         for level := 0; level < numLevels; level++ {
     895           2 :                 p.levelMaxBytes[level] = math.MaxInt64
     896           2 :         }
     897             : 
     898           2 :         if dbSize == 0 {
     899           2 :                 // No levels for L1 and up contain any data. Target L0 compactions for the
     900           2 :                 // last level or to the level to which there is an ongoing L0 compaction.
     901           2 :                 p.baseLevel = numLevels - 1
     902           2 :                 if firstNonEmptyLevel >= 0 {
     903           2 :                         p.baseLevel = firstNonEmptyLevel
     904           2 :                 }
     905           2 :                 return
     906             :         }
     907             : 
     908           2 :         dbSize += p.vers.Levels[0].Size()
     909           2 :         bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
     910           2 : 
     911           2 :         curLevelSize := bottomLevelSize
     912           2 :         for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
     913           2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     914           2 :         }
     915             : 
     916             :         // Compute base level (where L0 data is compacted to).
     917           2 :         baseBytesMax := uint64(p.opts.LBaseMaxBytes)
     918           2 :         p.baseLevel = firstNonEmptyLevel
     919           2 :         for p.baseLevel > 1 && curLevelSize > baseBytesMax {
     920           2 :                 p.baseLevel--
     921           2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     922           2 :         }
     923             : 
     924           2 :         smoothedLevelMultiplier := 1.0
     925           2 :         if p.baseLevel < numLevels-1 {
     926           2 :                 smoothedLevelMultiplier = math.Pow(
     927           2 :                         float64(bottomLevelSize)/float64(baseBytesMax),
     928           2 :                         1.0/float64(numLevels-p.baseLevel-1))
     929           2 :         }
     930             : 
     931           2 :         levelSize := float64(baseBytesMax)
     932           2 :         for level := p.baseLevel; level < numLevels; level++ {
     933           2 :                 if level > p.baseLevel && levelSize > 0 {
     934           2 :                         levelSize *= smoothedLevelMultiplier
     935           2 :                 }
     936             :                 // Round the result since test cases use small target level sizes, which
     937             :                 // can be impacted by floating-point imprecision + integer truncation.
     938           2 :                 roundedLevelSize := math.Round(levelSize)
     939           2 :                 if roundedLevelSize > float64(math.MaxInt64) {
     940           0 :                         p.levelMaxBytes[level] = math.MaxInt64
     941           2 :                 } else {
     942           2 :                         p.levelMaxBytes[level] = int64(roundedLevelSize)
     943           2 :                 }
     944             :         }
     945             : }
     946             : 
     947             : type levelSizeAdjust struct {
     948             :         incomingActualBytes      uint64
     949             :         outgoingActualBytes      uint64
     950             :         outgoingCompensatedBytes uint64
     951             : }
     952             : 
     953           2 : func (a levelSizeAdjust) compensated() uint64 {
     954           2 :         return a.incomingActualBytes - a.outgoingCompensatedBytes
     955           2 : }
     956             : 
     957           2 : func (a levelSizeAdjust) actual() uint64 {
     958           2 :         return a.incomingActualBytes - a.outgoingActualBytes
     959           2 : }
     960             : 
     961           2 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
     962           2 :         // Compute size adjustments for each level based on the in-progress
     963           2 :         // compactions. We sum the file sizes of all files leaving and entering each
     964           2 :         // level in in-progress compactions. For outgoing files, we also sum a
     965           2 :         // separate sum of 'compensated file sizes', which are inflated according
     966           2 :         // to deletion estimates.
     967           2 :         //
     968           2 :         // When we adjust a level's size according to these values during score
     969           2 :         // calculation, we subtract the compensated size of start level inputs to
     970           2 :         // account for the fact that score calculation uses compensated sizes.
     971           2 :         //
     972           2 :         // Since compensated file sizes may be compensated because they reclaim
     973           2 :         // space from the output level's files, we only add the real file size to
     974           2 :         // the output level.
     975           2 :         //
     976           2 :         // This is slightly different from RocksDB's behavior, which simply elides
     977           2 :         // compacting files from the level size calculation.
     978           2 :         var sizeAdjust [numLevels]levelSizeAdjust
     979           2 :         for i := range inProgressCompactions {
     980           2 :                 c := &inProgressCompactions[i]
     981           2 :                 // If this compaction's version edit has already been applied, there's
     982           2 :                 // no need to adjust: The LSM we'll examine will already reflect the
     983           2 :                 // new LSM state.
     984           2 :                 if c.versionEditApplied {
     985           2 :                         continue
     986             :                 }
     987             : 
     988           2 :                 for _, input := range c.inputs {
     989           2 :                         actualSize := input.files.SizeSum()
     990           2 :                         compensatedSize := totalCompensatedSize(input.files.Iter())
     991           2 : 
     992           2 :                         if input.level != c.outputLevel {
     993           2 :                                 sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
     994           2 :                                 sizeAdjust[input.level].outgoingActualBytes += actualSize
     995           2 :                                 if c.outputLevel != -1 {
     996           2 :                                         sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
     997           2 :                                 }
     998             :                         }
     999             :                 }
    1000             :         }
    1001           2 :         return sizeAdjust
    1002             : }
    1003             : 
    1004           2 : func levelCompensatedSize(lm manifest.LevelMetadata) uint64 {
    1005           2 :         return *lm.Annotation(compensatedSizeAnnotator{}).(*uint64)
    1006           2 : }
    1007             : 
    1008             : func (p *compactionPickerByScore) calculateLevelScores(
    1009             :         inProgressCompactions []compactionInfo,
    1010           2 : ) [numLevels]candidateLevelInfo {
    1011           2 :         var scores [numLevels]candidateLevelInfo
    1012           2 :         for i := range scores {
    1013           2 :                 scores[i].level = i
    1014           2 :                 scores[i].outputLevel = i + 1
    1015           2 :         }
    1016           2 :         l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.opts, inProgressCompactions)
    1017           2 :         scores[0] = candidateLevelInfo{
    1018           2 :                 outputLevel:        p.baseLevel,
    1019           2 :                 uncompensatedScore: l0UncompensatedScore,
    1020           2 :                 compensatedScore:   l0UncompensatedScore, /* No level size compensation for L0 */
    1021           2 :         }
    1022           2 :         sizeAdjust := calculateSizeAdjust(inProgressCompactions)
    1023           2 :         for level := 1; level < numLevels; level++ {
    1024           2 :                 compensatedLevelSize := levelCompensatedSize(p.vers.Levels[level]) + sizeAdjust[level].compensated()
    1025           2 :                 scores[level].compensatedScore = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
    1026           2 :                 scores[level].uncompensatedScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
    1027           2 :         }
    1028             : 
    1029             :         // Adjust each level's {compensated, uncompensated}Score by the uncompensatedScore
    1030             :         // of the next level to get a {compensated, uncompensated}ScoreRatio. If the
    1031             :         // next level has a high uncompensatedScore, and is thus a priority for compaction,
    1032             :         // this reduces the priority for compacting the current level. If the next level
    1033             :         // has a low uncompensatedScore (i.e. it is below its target size), this increases
    1034             :         // the priority for compacting the current level.
    1035             :         //
    1036             :         // The effect of this adjustment is to help prioritize compactions in lower
    1037             :         // levels. The following example shows the compensatedScoreRatio and the
    1038             :         // compensatedScore. In this scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase)
    1039             :         // is significantly above its target size. The original score prioritizes
    1040             :         // compactions from those two levels, but doing so ends up causing a future
    1041             :         // problem: data piles up in the higher levels, starving L5->L6 compactions,
    1042             :         // and to a lesser degree starving L4->L5 compactions.
    1043             :         //
    1044             :         // Note that in the example shown there is no level size compensation so the
    1045             :         // compensatedScore and the uncompensatedScore is the same for each level.
    1046             :         //
    1047             :         //        compensatedScoreRatio   compensatedScore   uncompensatedScore   size   max-size
    1048             :         //   L0                     3.2               68.0                 68.0  2.2 G          -
    1049             :         //   L3                     3.2               21.1                 21.1  1.3 G       64 M
    1050             :         //   L4                     3.4                6.7                  6.7  3.1 G      467 M
    1051             :         //   L5                     3.4                2.0                  2.0  6.6 G      3.3 G
    1052             :         //   L6                     0.6                0.6                  0.6   14 G       24 G
    1053           2 :         var prevLevel int
    1054           2 :         for level := p.baseLevel; level < numLevels; level++ {
    1055           2 :                 // The compensated scores, and uncompensated scores will be turned into
    1056           2 :                 // ratios as they're adjusted according to other levels' sizes.
    1057           2 :                 scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
    1058           2 :                 scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
    1059           2 : 
    1060           2 :                 // Avoid absurdly large scores by placing a floor on the score that we'll
    1061           2 :                 // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
    1062           2 :                 const minScore = 0.01
    1063           2 :                 if scores[prevLevel].compensatedScoreRatio >= compactionScoreThreshold {
    1064           2 :                         if scores[level].uncompensatedScore >= minScore {
    1065           2 :                                 scores[prevLevel].compensatedScoreRatio /= scores[level].uncompensatedScore
    1066           2 :                         } else {
    1067           2 :                                 scores[prevLevel].compensatedScoreRatio /= minScore
    1068           2 :                         }
    1069             :                 }
    1070           2 :                 if scores[prevLevel].uncompensatedScoreRatio >= compactionScoreThreshold {
    1071           2 :                         if scores[level].uncompensatedScore >= minScore {
    1072           2 :                                 scores[prevLevel].uncompensatedScoreRatio /= scores[level].uncompensatedScore
    1073           2 :                         } else {
    1074           2 :                                 scores[prevLevel].uncompensatedScoreRatio /= minScore
    1075           2 :                         }
    1076             :                 }
    1077           2 :                 prevLevel = level
    1078             :         }
    1079             :         // Set the score ratios for the lowest level.
    1080             :         // INVARIANT: prevLevel == numLevels-1
    1081           2 :         scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
    1082           2 :         scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
    1083           2 : 
    1084           2 :         sort.Sort(sortCompactionLevelsByPriority(scores[:]))
    1085           2 :         return scores
    1086             : }
    1087             : 
    1088             : // calculateL0UncompensatedScore calculates a float score representing the
    1089             : // relative priority of compacting L0. Level L0 is special in that files within
    1090             : // L0 may overlap one another, so a different set of heuristics that take into
    1091             : // account read amplification apply.
    1092             : func calculateL0UncompensatedScore(
    1093             :         vers *version, opts *Options, inProgressCompactions []compactionInfo,
    1094           2 : ) float64 {
    1095           2 :         // Use the sublevel count to calculate the score. The base vs intra-L0
    1096           2 :         // compaction determination happens in pickAuto, not here.
    1097           2 :         score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
    1098           2 :                 float64(opts.L0CompactionThreshold)
    1099           2 : 
    1100           2 :         // Also calculate a score based on the file count but use it only if it
    1101           2 :         // produces a higher score than the sublevel-based one. This heuristic is
    1102           2 :         // designed to accommodate cases where L0 is accumulating non-overlapping
    1103           2 :         // files in L0. Letting too many non-overlapping files accumulate in few
    1104           2 :         // sublevels is undesirable, because:
    1105           2 :         // 1) we can produce a massive backlog to compact once files do overlap.
    1106           2 :         // 2) constructing L0 sublevels has a runtime that grows superlinearly with
    1107           2 :         //    the number of files in L0 and must be done while holding D.mu.
    1108           2 :         noncompactingFiles := vers.Levels[0].Len()
    1109           2 :         for _, c := range inProgressCompactions {
    1110           2 :                 for _, cl := range c.inputs {
    1111           2 :                         if cl.level == 0 {
    1112           2 :                                 noncompactingFiles -= cl.files.Len()
    1113           2 :                         }
    1114             :                 }
    1115             :         }
    1116           2 :         fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
    1117           2 :         if score < fileScore {
    1118           2 :                 score = fileScore
    1119           2 :         }
    1120           2 :         return score
    1121             : }
    1122             : 
    1123             : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
    1124             : // compaction around. Currently, this function implements a heuristic similar to
    1125             : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
    1126             : // function is linear with respect to the number of files in `level` and
    1127             : // `outputLevel`.
    1128             : func pickCompactionSeedFile(
    1129             :         vers *version, opts *Options, level, outputLevel int, earliestSnapshotSeqNum uint64,
    1130           2 : ) (manifest.LevelFile, bool) {
    1131           2 :         // Select the file within the level to compact. We want to minimize write
    1132           2 :         // amplification, but also ensure that deletes are propagated to the
    1133           2 :         // bottom level in a timely fashion so as to reclaim disk space. A table's
    1134           2 :         // smallest sequence number provides a measure of its age. The ratio of
    1135           2 :         // overlapping-bytes / table-size gives an indication of write
    1136           2 :         // amplification (a smaller ratio is preferrable).
    1137           2 :         //
    1138           2 :         // The current heuristic is based off the the RocksDB kMinOverlappingRatio
    1139           2 :         // heuristic. It chooses the file with the minimum overlapping ratio with
    1140           2 :         // the target level, which minimizes write amplification.
    1141           2 :         //
    1142           2 :         // It uses a "compensated size" for the denominator, which is the file
    1143           2 :         // size but artificially inflated by an estimate of the space that may be
    1144           2 :         // reclaimed through compaction. Currently, we only compensate for range
    1145           2 :         // deletions and only with a rough estimate of the reclaimable bytes. This
    1146           2 :         // differs from RocksDB which only compensates for point tombstones and
    1147           2 :         // only if they exceed the number of non-deletion entries in table.
    1148           2 :         //
    1149           2 :         // TODO(peter): For concurrent compactions, we may want to try harder to
    1150           2 :         // pick a seed file whose resulting compaction bounds do not overlap with
    1151           2 :         // an in-progress compaction.
    1152           2 : 
    1153           2 :         cmp := opts.Comparer.Compare
    1154           2 :         startIter := vers.Levels[level].Iter()
    1155           2 :         outputIter := vers.Levels[outputLevel].Iter()
    1156           2 : 
    1157           2 :         var file manifest.LevelFile
    1158           2 :         smallestRatio := uint64(math.MaxUint64)
    1159           2 : 
    1160           2 :         outputFile := outputIter.First()
    1161           2 : 
    1162           2 :         for f := startIter.First(); f != nil; f = startIter.Next() {
    1163           2 :                 var overlappingBytes uint64
    1164           2 :                 compacting := f.IsCompacting()
    1165           2 :                 if compacting {
    1166           2 :                         // Move on if this file is already being compacted. We'll likely
    1167           2 :                         // still need to move past the overlapping output files regardless,
    1168           2 :                         // but in cases where all start-level files are compacting we won't.
    1169           2 :                         continue
    1170             :                 }
    1171             : 
    1172             :                 // Trim any output-level files smaller than f.
    1173           2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
    1174           2 :                         outputFile = outputIter.Next()
    1175           2 :                 }
    1176             : 
    1177           2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
    1178           2 :                         overlappingBytes += outputFile.Size
    1179           2 :                         compacting = compacting || outputFile.IsCompacting()
    1180           2 : 
    1181           2 :                         // For files in the bottommost level of the LSM, the
    1182           2 :                         // Stats.RangeDeletionsBytesEstimate field is set to the estimate
    1183           2 :                         // of bytes /within/ the file itself that may be dropped by
    1184           2 :                         // recompacting the file. These bytes from obsolete keys would not
    1185           2 :                         // need to be rewritten if we compacted `f` into `outputFile`, so
    1186           2 :                         // they don't contribute to write amplification. Subtracting them
    1187           2 :                         // out of the overlapping bytes helps prioritize these compactions
    1188           2 :                         // that are cheaper than their file sizes suggest.
    1189           2 :                         if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
    1190           2 :                                 overlappingBytes -= outputFile.Stats.RangeDeletionsBytesEstimate
    1191           2 :                         }
    1192             : 
    1193             :                         // If the file in the next level extends beyond f's largest key,
    1194             :                         // break out and don't advance outputIter because f's successor
    1195             :                         // might also overlap.
    1196             :                         //
    1197             :                         // Note, we stop as soon as we encounter an output-level file with a
    1198             :                         // largest key beyond the input-level file's largest bound. We
    1199             :                         // perform a simple user key comparison here using sstableKeyCompare
    1200             :                         // which handles the potential for exclusive largest key bounds.
    1201             :                         // There's some subtlety when the bounds are equal (eg, equal and
    1202             :                         // inclusive, or equal and exclusive). Current Pebble doesn't split
    1203             :                         // user keys across sstables within a level (and in format versions
    1204             :                         // FormatSplitUserKeysMarkedCompacted and later we guarantee no
    1205             :                         // split user keys exist within the entire LSM). In that case, we're
    1206             :                         // assured that neither the input level nor the output level's next
    1207             :                         // file shares the same user key, so compaction expansion will not
    1208             :                         // include them in any compaction compacting `f`.
    1209             :                         //
    1210             :                         // NB: If we /did/ allow split user keys, or we're running on an
    1211             :                         // old database with an earlier format major version where there are
    1212             :                         // existing split user keys, this logic would be incorrect. Consider
    1213             :                         //    L1: [a#120,a#100] [a#80,a#60]
    1214             :                         //    L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
    1215             :                         // While considering the first file in L1, [a#120,a#100], we'd skip
    1216             :                         // past all of the files in L2. When considering the second file in
    1217             :                         // L1, we'd improperly conclude that the second file overlaps
    1218             :                         // nothing in the second level and is cheap to compact, when in
    1219             :                         // reality we'd need to expand the compaction to include all 5
    1220             :                         // files.
    1221           2 :                         if sstableKeyCompare(cmp, outputFile.Largest, f.Largest) > 0 {
    1222           2 :                                 break
    1223             :                         }
    1224           2 :                         outputFile = outputIter.Next()
    1225             :                 }
    1226             : 
    1227             :                 // If the input level file or one of the overlapping files is
    1228             :                 // compacting, we're not going to be able to compact this file
    1229             :                 // anyways, so skip it.
    1230           2 :                 if compacting {
    1231           2 :                         continue
    1232             :                 }
    1233             : 
    1234           2 :                 compSz := compensatedSize(f)
    1235           2 :                 scaledRatio := overlappingBytes * 1024 / compSz
    1236           2 :                 if scaledRatio < smallestRatio {
    1237           2 :                         smallestRatio = scaledRatio
    1238           2 :                         file = startIter.Take()
    1239           2 :                 }
    1240             :         }
    1241           2 :         return file, file.FileMetadata != nil
    1242             : }
    1243             : 
    1244             : // pickAuto picks the best compaction, if any.
    1245             : //
    1246             : // On each call, pickAuto computes per-level size adjustments based on
    1247             : // in-progress compactions, and computes a per-level score. The levels are
    1248             : // iterated over in decreasing score order trying to find a valid compaction
    1249             : // anchored at that level.
    1250             : //
    1251             : // If a score-based compaction cannot be found, pickAuto falls back to looking
    1252             : // for an elision-only compaction to remove obsolete keys.
    1253           2 : func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
    1254           2 :         // Compaction concurrency is controlled by L0 read-amp. We allow one
    1255           2 :         // additional compaction per L0CompactionConcurrency sublevels, as well as
    1256           2 :         // one additional compaction per CompactionDebtConcurrency bytes of
    1257           2 :         // compaction debt. Compaction concurrency is tied to L0 sublevels as that
    1258           2 :         // signal is independent of the database size. We tack on the compaction
    1259           2 :         // debt as a second signal to prevent compaction concurrency from dropping
    1260           2 :         // significantly right after a base compaction finishes, and before those
    1261           2 :         // bytes have been compacted further down the LSM.
    1262           2 :         if n := len(env.inProgressCompactions); n > 0 {
    1263           2 :                 l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
    1264           2 :                 compactionDebt := p.estimatedCompactionDebt(0)
    1265           2 :                 ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
    1266           2 :                 ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
    1267           2 :                 if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
    1268           2 :                         return nil
    1269           2 :                 }
    1270             :         }
    1271             : 
    1272           2 :         scores := p.calculateLevelScores(env.inProgressCompactions)
    1273           2 : 
    1274           2 :         // TODO(bananabrick): Either remove, or change this into an event sent to the
    1275           2 :         // EventListener.
    1276           2 :         logCompaction := func(pc *pickedCompaction) {
    1277           0 :                 var buf bytes.Buffer
    1278           0 :                 for i := 0; i < numLevels; i++ {
    1279           0 :                         if i != 0 && i < p.baseLevel {
    1280           0 :                                 continue
    1281             :                         }
    1282             : 
    1283           0 :                         var info *candidateLevelInfo
    1284           0 :                         for j := range scores {
    1285           0 :                                 if scores[j].level == i {
    1286           0 :                                         info = &scores[j]
    1287           0 :                                         break
    1288             :                                 }
    1289             :                         }
    1290             : 
    1291           0 :                         marker := " "
    1292           0 :                         if pc.startLevel.level == info.level {
    1293           0 :                                 marker = "*"
    1294           0 :                         }
    1295           0 :                         fmt.Fprintf(&buf, "  %sL%d: %5.1f  %5.1f  %5.1f  %5.1f %8s  %8s",
    1296           0 :                                 marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
    1297           0 :                                 info.uncompensatedScoreRatio, info.uncompensatedScore,
    1298           0 :                                 humanize.Bytes.Int64(int64(totalCompensatedSize(
    1299           0 :                                         p.vers.Levels[info.level].Iter(),
    1300           0 :                                 ))),
    1301           0 :                                 humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
    1302           0 :                         )
    1303           0 : 
    1304           0 :                         count := 0
    1305           0 :                         for i := range env.inProgressCompactions {
    1306           0 :                                 c := &env.inProgressCompactions[i]
    1307           0 :                                 if c.inputs[0].level != info.level {
    1308           0 :                                         continue
    1309             :                                 }
    1310           0 :                                 count++
    1311           0 :                                 if count == 1 {
    1312           0 :                                         fmt.Fprintf(&buf, "  [")
    1313           0 :                                 } else {
    1314           0 :                                         fmt.Fprintf(&buf, " ")
    1315           0 :                                 }
    1316           0 :                                 fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
    1317             :                         }
    1318           0 :                         if count > 0 {
    1319           0 :                                 fmt.Fprintf(&buf, "]")
    1320           0 :                         }
    1321           0 :                         fmt.Fprintf(&buf, "\n")
    1322             :                 }
    1323           0 :                 p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
    1324           0 :                         pc.startLevel.level, pc.outputLevel.level, buf.String())
    1325             :         }
    1326             : 
    1327             :         // Check for a score-based compaction. candidateLevelInfos are first sorted
    1328             :         // by whether they should be compacted, so if we find a level which shouldn't
    1329             :         // be compacted, we can break early.
    1330           2 :         for i := range scores {
    1331           2 :                 info := &scores[i]
    1332           2 :                 if !info.shouldCompact() {
    1333           2 :                         break
    1334             :                 }
    1335           2 :                 if info.level == numLevels-1 {
    1336           2 :                         continue
    1337             :                 }
    1338             : 
    1339           2 :                 if info.level == 0 {
    1340           2 :                         pc = pickL0(env, p.opts, p.vers, p.baseLevel)
    1341           2 :                         // Fail-safe to protect against compacting the same sstable
    1342           2 :                         // concurrently.
    1343           2 :                         if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1344           2 :                                 p.addScoresToPickedCompactionMetrics(pc, scores)
    1345           2 :                                 pc.score = info.compensatedScoreRatio
    1346           2 :                                 // TODO(bananabrick): Create an EventListener for logCompaction.
    1347           2 :                                 if false {
    1348           0 :                                         logCompaction(pc)
    1349           0 :                                 }
    1350           2 :                                 return pc
    1351             :                         }
    1352           2 :                         continue
    1353             :                 }
    1354             : 
    1355             :                 // info.level > 0
    1356           2 :                 var ok bool
    1357           2 :                 info.file, ok = pickCompactionSeedFile(p.vers, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
    1358           2 :                 if !ok {
    1359           2 :                         continue
    1360             :                 }
    1361             : 
    1362           2 :                 pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel, p.levelMaxBytes)
    1363           2 :                 // Fail-safe to protect against compacting the same sstable concurrently.
    1364           2 :                 if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1365           2 :                         p.addScoresToPickedCompactionMetrics(pc, scores)
    1366           2 :                         pc.score = info.compensatedScoreRatio
    1367           2 :                         // TODO(bananabrick): Create an EventListener for logCompaction.
    1368           2 :                         if false {
    1369           0 :                                 logCompaction(pc)
    1370           0 :                         }
    1371           2 :                         return pc
    1372             :                 }
    1373             :         }
    1374             : 
    1375             :         // Check for L6 files with tombstones that may be elided. These files may
    1376             :         // exist if a snapshot prevented the elision of a tombstone or because of
    1377             :         // a move compaction. These are low-priority compactions because they
    1378             :         // don't help us keep up with writes, just reclaim disk space.
    1379           2 :         if pc := p.pickElisionOnlyCompaction(env); pc != nil {
    1380           2 :                 return pc
    1381           2 :         }
    1382             : 
    1383           2 :         if pc := p.pickReadTriggeredCompaction(env); pc != nil {
    1384           1 :                 return pc
    1385           1 :         }
    1386             : 
    1387             :         // NB: This should only be run if a read compaction wasn't
    1388             :         // scheduled.
    1389             :         //
    1390             :         // We won't be scheduling a read compaction right now, and in
    1391             :         // read heavy workloads, compactions won't be scheduled frequently
    1392             :         // because flushes aren't frequent. So we need to signal to the
    1393             :         // iterator to schedule a compaction when it adds compactions to
    1394             :         // the read compaction queue.
    1395             :         //
    1396             :         // We need the nil check here because without it, we have some
    1397             :         // tests which don't set that variable fail. Since there's a
    1398             :         // chance that one of those tests wouldn't want extra compactions
    1399             :         // to be scheduled, I added this check here, instead of
    1400             :         // setting rescheduleReadCompaction in those tests.
    1401           2 :         if env.readCompactionEnv.rescheduleReadCompaction != nil {
    1402           2 :                 *env.readCompactionEnv.rescheduleReadCompaction = true
    1403           2 :         }
    1404             : 
    1405             :         // At the lowest possible compaction-picking priority, look for files marked
    1406             :         // for compaction. Pebble will mark files for compaction if they have atomic
    1407             :         // compaction units that span multiple files. While current Pebble code does
    1408             :         // not construct such sstables, RocksDB and earlier versions of Pebble may
    1409             :         // have created them. These split user keys form sets of files that must be
    1410             :         // compacted together for correctness (referred to as "atomic compaction
    1411             :         // units" within the code). Rewrite them in-place.
    1412             :         //
    1413             :         // It's also possible that a file may have been marked for compaction by
    1414             :         // even earlier versions of Pebble code, since FileMetadata's
    1415             :         // MarkedForCompaction field is persisted in the manifest. That's okay. We
    1416             :         // previously would've ignored the designation, whereas now we'll re-compact
    1417             :         // the file in place.
    1418           2 :         if p.vers.Stats.MarkedForCompaction > 0 {
    1419           1 :                 if pc := p.pickRewriteCompaction(env); pc != nil {
    1420           1 :                         return pc
    1421           1 :                 }
    1422             :         }
    1423             : 
    1424           2 :         return nil
    1425             : }
    1426             : 
    1427             : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
    1428             :         pc *pickedCompaction, candInfo [numLevels]candidateLevelInfo,
    1429           2 : ) {
    1430           2 : 
    1431           2 :         // candInfo is sorted by score, not by compaction level.
    1432           2 :         infoByLevel := [numLevels]candidateLevelInfo{}
    1433           2 :         for i := range candInfo {
    1434           2 :                 level := candInfo[i].level
    1435           2 :                 infoByLevel[level] = candInfo[i]
    1436           2 :         }
    1437             :         // Gather the compaction scores for the levels participating in the compaction.
    1438           2 :         pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
    1439           2 :         inputIdx := 0
    1440           2 :         for i := range infoByLevel {
    1441           2 :                 if pc.inputs[inputIdx].level == infoByLevel[i].level {
    1442           2 :                         pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].compensatedScoreRatio
    1443           2 :                         inputIdx++
    1444           2 :                 }
    1445           2 :                 if inputIdx == len(pc.inputs) {
    1446           2 :                         break
    1447             :                 }
    1448             :         }
    1449             : }
    1450             : 
    1451             : // elisionOnlyAnnotator implements the manifest.Annotator interface,
    1452             : // annotating B-Tree nodes with the *fileMetadata of a file meeting the
    1453             : // obsolete keys criteria for an elision-only compaction within the subtree.
    1454             : // If multiple files meet the criteria, it chooses whichever file has the
    1455             : // lowest LargestSeqNum. The lowest LargestSeqNum file will be the first
    1456             : // eligible for an elision-only compaction once snapshots less than or equal
    1457             : // to its LargestSeqNum are closed.
    1458             : type elisionOnlyAnnotator struct{}
    1459             : 
    1460             : var _ manifest.Annotator = elisionOnlyAnnotator{}
    1461             : 
    1462           2 : func (a elisionOnlyAnnotator) Zero(interface{}) interface{} {
    1463           2 :         return nil
    1464           2 : }
    1465             : 
    1466           2 : func (a elisionOnlyAnnotator) Accumulate(f *fileMetadata, dst interface{}) (interface{}, bool) {
    1467           2 :         if f.IsCompacting() {
    1468           2 :                 return dst, true
    1469           2 :         }
    1470           2 :         if !f.StatsValid() {
    1471           2 :                 return dst, false
    1472           2 :         }
    1473             :         // Bottommost files are large and not worthwhile to compact just
    1474             :         // to remove a few tombstones. Consider a file ineligible if its
    1475             :         // own range deletions delete less than 10% of its data and its
    1476             :         // deletion tombstones make up less than 10% of its entries.
    1477             :         //
    1478             :         // TODO(jackson): This does not account for duplicate user keys
    1479             :         // which may be collapsed. Ideally, we would have 'obsolete keys'
    1480             :         // statistics that would include tombstones, the keys that are
    1481             :         // dropped by tombstones and duplicated user keys. See #847.
    1482             :         //
    1483             :         // Note that tables that contain exclusively range keys (i.e. no point keys,
    1484             :         // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
    1485             :         // from elision-only compactions.
    1486             :         // TODO(travers): Consider an alternative heuristic for elision of range-keys.
    1487           2 :         if f.Stats.RangeDeletionsBytesEstimate*10 < f.Size &&
    1488           2 :                 f.Stats.NumDeletions*10 <= f.Stats.NumEntries {
    1489           2 :                 return dst, true
    1490           2 :         }
    1491           2 :         if dst == nil {
    1492           2 :                 return f, true
    1493           2 :         } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
    1494           1 :                 return f, true
    1495           1 :         }
    1496           1 :         return dst, true
    1497             : }
    1498             : 
    1499           2 : func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{} {
    1500           2 :         if v == nil {
    1501           2 :                 return accum
    1502           2 :         }
    1503             :         // If we haven't accumulated an eligible file yet, or f's LargestSeqNum is
    1504             :         // less than the accumulated file's, use f.
    1505           1 :         if accum == nil {
    1506           1 :                 return v
    1507           1 :         }
    1508           1 :         f := v.(*fileMetadata)
    1509           1 :         accumV := accum.(*fileMetadata)
    1510           1 :         if accumV == nil || accumV.LargestSeqNum > f.LargestSeqNum {
    1511           1 :                 return f
    1512           1 :         }
    1513           1 :         return accumV
    1514             : }
    1515             : 
    1516             : // markedForCompactionAnnotator implements the manifest.Annotator interface,
    1517             : // annotating B-Tree nodes with the *fileMetadata of a file that is marked for
    1518             : // compaction within the subtree. If multiple files meet the criteria, it
    1519             : // chooses whichever file has the lowest LargestSeqNum.
    1520             : type markedForCompactionAnnotator struct{}
    1521             : 
    1522             : var _ manifest.Annotator = markedForCompactionAnnotator{}
    1523             : 
    1524           1 : func (a markedForCompactionAnnotator) Zero(interface{}) interface{} {
    1525           1 :         return nil
    1526           1 : }
    1527             : 
    1528             : func (a markedForCompactionAnnotator) Accumulate(
    1529             :         f *fileMetadata, dst interface{},
    1530           1 : ) (interface{}, bool) {
    1531           1 :         if !f.MarkedForCompaction {
    1532           1 :                 // Not marked for compaction; return dst.
    1533           1 :                 return dst, true
    1534           1 :         }
    1535           1 :         return markedMergeHelper(f, dst)
    1536             : }
    1537             : 
    1538           1 : func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} {
    1539           1 :         if v == nil {
    1540           1 :                 return accum
    1541           1 :         }
    1542           0 :         accum, _ = markedMergeHelper(v.(*fileMetadata), accum)
    1543           0 :         return accum
    1544             : }
    1545             : 
    1546             : // REQUIRES: f is non-nil, and f.MarkedForCompaction=true.
    1547           1 : func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) {
    1548           1 :         if dst == nil {
    1549           1 :                 return f, true
    1550           1 :         } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
    1551           1 :                 return f, true
    1552           1 :         }
    1553           1 :         return dst, true
    1554             : }
    1555             : 
    1556             : // pickElisionOnlyCompaction looks for compactions of sstables in the
    1557             : // bottommost level containing obsolete records that may now be dropped.
    1558             : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
    1559             :         env compactionEnv,
    1560           2 : ) (pc *pickedCompaction) {
    1561           2 :         if p.opts.private.disableElisionOnlyCompactions {
    1562           1 :                 return nil
    1563           1 :         }
    1564           2 :         v := p.vers.Levels[numLevels-1].Annotation(elisionOnlyAnnotator{})
    1565           2 :         if v == nil {
    1566           2 :                 return nil
    1567           2 :         }
    1568           2 :         candidate := v.(*fileMetadata)
    1569           2 :         if candidate.IsCompacting() || candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
    1570           2 :                 return nil
    1571           2 :         }
    1572           2 :         lf := p.vers.Levels[numLevels-1].Find(p.opts.Comparer.Compare, candidate)
    1573           2 :         if lf == nil {
    1574           0 :                 panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
    1575             :         }
    1576             : 
    1577             :         // Construct a picked compaction of the elision candidate's atomic
    1578             :         // compaction unit.
    1579           2 :         pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
    1580           2 :         pc.kind = compactionKindElisionOnly
    1581           2 :         var isCompacting bool
    1582           2 :         pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */)
    1583           2 :         if isCompacting {
    1584           0 :                 return nil
    1585           0 :         }
    1586           2 :         pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
    1587           2 :         // Fail-safe to protect against compacting the same sstable concurrently.
    1588           2 :         if !inputRangeAlreadyCompacting(env, pc) {
    1589           2 :                 return pc
    1590           2 :         }
    1591           1 :         return nil
    1592             : }
    1593             : 
    1594             : // pickRewriteCompaction attempts to construct a compaction that
    1595             : // rewrites a file marked for compaction. pickRewriteCompaction will
    1596             : // pull in adjacent files in the file's atomic compaction unit if
    1597             : // necessary. A rewrite compaction outputs files to the same level as
    1598             : // the input level.
    1599           1 : func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
    1600           1 :         for l := numLevels - 1; l >= 0; l-- {
    1601           1 :                 v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{})
    1602           1 :                 if v == nil {
    1603           1 :                         // Try the next level.
    1604           1 :                         continue
    1605             :                 }
    1606           1 :                 candidate := v.(*fileMetadata)
    1607           1 :                 if candidate.IsCompacting() {
    1608           1 :                         // Try the next level.
    1609           1 :                         continue
    1610             :                 }
    1611           1 :                 lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate)
    1612           1 :                 if lf == nil {
    1613           0 :                         panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
    1614             :                 }
    1615             : 
    1616           1 :                 inputs := lf.Slice()
    1617           1 :                 // L0 files generated by a flush have never been split such that
    1618           1 :                 // adjacent files can contain the same user key. So we do not need to
    1619           1 :                 // rewrite an atomic compaction unit for L0. Note that there is nothing
    1620           1 :                 // preventing two different flushes from producing files that are
    1621           1 :                 // non-overlapping from an InternalKey perspective, but span the same
    1622           1 :                 // user key. However, such files cannot be in the same L0 sublevel,
    1623           1 :                 // since each sublevel requires non-overlapping user keys (unlike other
    1624           1 :                 // levels).
    1625           1 :                 if l > 0 {
    1626           1 :                         // Find this file's atomic compaction unit. This is only relevant
    1627           1 :                         // for levels L1+.
    1628           1 :                         var isCompacting bool
    1629           1 :                         inputs, isCompacting = expandToAtomicUnit(
    1630           1 :                                 p.opts.Comparer.Compare,
    1631           1 :                                 inputs,
    1632           1 :                                 false, /* disableIsCompacting */
    1633           1 :                         )
    1634           1 :                         if isCompacting {
    1635           0 :                                 // Try the next level.
    1636           0 :                                 continue
    1637             :                         }
    1638             :                 }
    1639             : 
    1640           1 :                 pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
    1641           1 :                 pc.outputLevel.level = l
    1642           1 :                 pc.kind = compactionKindRewrite
    1643           1 :                 pc.startLevel.files = inputs
    1644           1 :                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
    1645           1 : 
    1646           1 :                 // Fail-safe to protect against compacting the same sstable concurrently.
    1647           1 :                 if !inputRangeAlreadyCompacting(env, pc) {
    1648           1 :                         if pc.startLevel.level == 0 {
    1649           1 :                                 pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
    1650           1 :                         }
    1651           1 :                         return pc
    1652             :                 }
    1653             :         }
    1654           1 :         return nil
    1655             : }
    1656             : 
    1657             : // pickAutoLPositive picks an automatic compaction for the candidate
    1658             : // file in a positive-numbered level. This function must not be used for
    1659             : // L0.
    1660             : func pickAutoLPositive(
    1661             :         env compactionEnv,
    1662             :         opts *Options,
    1663             :         vers *version,
    1664             :         cInfo candidateLevelInfo,
    1665             :         baseLevel int,
    1666             :         levelMaxBytes [numLevels]int64,
    1667           2 : ) (pc *pickedCompaction) {
    1668           2 :         if cInfo.level == 0 {
    1669           0 :                 panic("pebble: pickAutoLPositive called for L0")
    1670             :         }
    1671             : 
    1672           2 :         pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
    1673           2 :         if pc.outputLevel.level != cInfo.outputLevel {
    1674           0 :                 panic("pebble: compaction picked unexpected output level")
    1675             :         }
    1676           2 :         pc.startLevel.files = cInfo.file.Slice()
    1677           2 :         // Files in level 0 may overlap each other, so pick up all overlapping ones.
    1678           2 :         if pc.startLevel.level == 0 {
    1679           0 :                 cmp := opts.Comparer.Compare
    1680           0 :                 smallest, largest := manifest.KeyRange(cmp, pc.startLevel.files.Iter())
    1681           0 :                 pc.startLevel.files = vers.Overlaps(0, cmp, smallest.UserKey,
    1682           0 :                         largest.UserKey, largest.IsExclusiveSentinel())
    1683           0 :                 if pc.startLevel.files.Empty() {
    1684           0 :                         panic("pebble: empty compaction")
    1685             :                 }
    1686             :         }
    1687             : 
    1688           2 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1689           0 :                 return nil
    1690           0 :         }
    1691           2 :         return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1692             : }
    1693             : 
    1694             : // maybeAddLevel maybe adds a level to the picked compaction.
    1695           2 : func (pc *pickedCompaction) maybeAddLevel(opts *Options, diskAvailBytes uint64) *pickedCompaction {
    1696           2 :         pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
    1697           2 :         if pc.outputLevel.level == numLevels-1 {
    1698           2 :                 // Don't add a level if the current output level is in L6
    1699           2 :                 return pc
    1700           2 :         }
    1701           2 :         if !opts.Experimental.MultiLevelCompactionHeuristic.allowL0() && pc.startLevel.level == 0 {
    1702           2 :                 return pc
    1703           2 :         }
    1704           2 :         if pc.compactionSize() > expandedCompactionByteSizeLimit(
    1705           2 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes) {
    1706           2 :                 // Don't add a level if the current compaction exceeds the compaction size limit
    1707           2 :                 return pc
    1708           2 :         }
    1709           2 :         return opts.Experimental.MultiLevelCompactionHeuristic.pick(pc, opts, diskAvailBytes)
    1710             : }
    1711             : 
    1712             : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
    1713             : type MultiLevelHeuristic interface {
    1714             :         // Evaluate returns the preferred compaction.
    1715             :         pick(pc *pickedCompaction, opts *Options, diskAvailBytes uint64) *pickedCompaction
    1716             : 
    1717             :         // Returns if the heuristic allows L0 to be involved in ML compaction
    1718             :         allowL0() bool
    1719             : }
    1720             : 
    1721             : // NoMultiLevel will never add an additional level to the compaction.
    1722             : type NoMultiLevel struct{}
    1723             : 
    1724             : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
    1725             : 
    1726             : func (nml NoMultiLevel) pick(
    1727             :         pc *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1728           1 : ) *pickedCompaction {
    1729           1 :         return pc
    1730           1 : }
    1731             : 
    1732           1 : func (nml NoMultiLevel) allowL0() bool {
    1733           1 :         return false
    1734           1 : }
    1735             : 
    1736           2 : func (pc *pickedCompaction) predictedWriteAmp() float64 {
    1737           2 :         var bytesToCompact uint64
    1738           2 :         var higherLevelBytes uint64
    1739           2 :         for i := range pc.inputs {
    1740           2 :                 levelSize := pc.inputs[i].files.SizeSum()
    1741           2 :                 bytesToCompact += levelSize
    1742           2 :                 if i != len(pc.inputs)-1 {
    1743           2 :                         higherLevelBytes += levelSize
    1744           2 :                 }
    1745             :         }
    1746           2 :         return float64(bytesToCompact) / float64(higherLevelBytes)
    1747             : }
    1748             : 
    1749           2 : func (pc *pickedCompaction) overlappingRatio() float64 {
    1750           2 :         var higherLevelBytes uint64
    1751           2 :         var lowestLevelBytes uint64
    1752           2 :         for i := range pc.inputs {
    1753           2 :                 levelSize := pc.inputs[i].files.SizeSum()
    1754           2 :                 if i == len(pc.inputs)-1 {
    1755           2 :                         lowestLevelBytes += levelSize
    1756           2 :                         continue
    1757             :                 }
    1758           2 :                 higherLevelBytes += levelSize
    1759             :         }
    1760           2 :         return float64(lowestLevelBytes) / float64(higherLevelBytes)
    1761             : }
    1762             : 
    1763             : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
    1764             : // an additional level to the picked compaction if it reduces predicted write
    1765             : // amp of the compaction + the addPropensity constant.
    1766             : type WriteAmpHeuristic struct {
    1767             :         // addPropensity is a constant that affects the propensity to conduct multilevel
    1768             :         // compactions. If positive, a multilevel compaction may get picked even if
    1769             :         // the single level compaction has lower write amp, and vice versa.
    1770             :         AddPropensity float64
    1771             : 
    1772             :         // AllowL0 if true, allow l0 to be involved in a ML compaction.
    1773             :         AllowL0 bool
    1774             : }
    1775             : 
    1776             : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
    1777             : 
    1778             : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
    1779             : // picking slows down the compaction picking process.  This should be as fast as
    1780             : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
    1781             : // in-progress flushes and compactions from completing, etc. Consider ways to
    1782             : // deduplicate work, given that setupInputs has already been called.
    1783             : func (wa WriteAmpHeuristic) pick(
    1784             :         pcOrig *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1785           2 : ) *pickedCompaction {
    1786           2 :         pcMulti := pcOrig.clone()
    1787           2 :         if !pcMulti.setupMultiLevelCandidate(opts, diskAvailBytes) {
    1788           2 :                 return pcOrig
    1789           2 :         }
    1790           2 :         picked := pcOrig
    1791           2 :         if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
    1792           2 :                 picked = pcMulti
    1793           2 :         }
    1794             :         // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
    1795           2 :         picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
    1796           2 :         return picked
    1797             : }
    1798             : 
    1799           2 : func (wa WriteAmpHeuristic) allowL0() bool {
    1800           2 :         return wa.AllowL0
    1801           2 : }
    1802             : 
    1803             : // Helper method to pick compactions originating from L0. Uses information about
    1804             : // sublevels to generate a compaction.
    1805           2 : func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc *pickedCompaction) {
    1806           2 :         // It is important to pass information about Lbase files to L0Sublevels
    1807           2 :         // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
    1808           2 :         // compaction. Without this, we observed reduced concurrency of L0=>Lbase
    1809           2 :         // compactions, and increasing read amplification in L0.
    1810           2 :         //
    1811           2 :         // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
    1812           2 :         // has been shown to not cause a performance regression.
    1813           2 :         lcf, err := vers.L0Sublevels.PickBaseCompaction(1, vers.Levels[baseLevel].Slice())
    1814           2 :         if err != nil {
    1815           1 :                 opts.Logger.Errorf("error when picking base compaction: %s", err)
    1816           1 :                 return
    1817           1 :         }
    1818           2 :         if lcf != nil {
    1819           2 :                 pc = newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
    1820           2 :                 pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel)
    1821           2 :                 if pc.startLevel.files.Empty() {
    1822           0 :                         opts.Logger.Fatalf("empty compaction chosen")
    1823           0 :                 }
    1824           2 :                 return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1825             :         }
    1826             : 
    1827             :         // Couldn't choose a base compaction. Try choosing an intra-L0
    1828             :         // compaction. Note that we pass in L0CompactionThreshold here as opposed to
    1829             :         // 1, since choosing a single sublevel intra-L0 compaction is
    1830             :         // counterproductive.
    1831           2 :         lcf, err = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
    1832           2 :         if err != nil {
    1833           0 :                 opts.Logger.Errorf("error when picking intra-L0 compaction: %s", err)
    1834           0 :                 return
    1835           0 :         }
    1836           2 :         if lcf != nil {
    1837           2 :                 pc = newPickedCompactionFromL0(lcf, opts, vers, 0, false)
    1838           2 :                 if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1839           0 :                         return nil
    1840           0 :                 }
    1841           2 :                 if pc.startLevel.files.Empty() {
    1842           0 :                         opts.Logger.Fatalf("empty compaction chosen")
    1843           0 :                 }
    1844           2 :                 {
    1845           2 :                         iter := pc.startLevel.files.Iter()
    1846           2 :                         if iter.First() == nil || iter.Next() == nil {
    1847           0 :                                 // A single-file intra-L0 compaction is unproductive.
    1848           0 :                                 return nil
    1849           0 :                         }
    1850             :                 }
    1851             : 
    1852           2 :                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
    1853             :         }
    1854           2 :         return pc
    1855             : }
    1856             : 
    1857             : func pickManualCompaction(
    1858             :         vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
    1859           2 : ) (pc *pickedCompaction, retryLater bool) {
    1860           2 :         outputLevel := manual.level + 1
    1861           2 :         if manual.level == 0 {
    1862           2 :                 outputLevel = baseLevel
    1863           2 :         } else if manual.level < baseLevel {
    1864           1 :                 // The start level for a compaction must be >= Lbase. A manual
    1865           1 :                 // compaction could have been created adhering to that condition, and
    1866           1 :                 // then an automatic compaction came in and compacted all of the
    1867           1 :                 // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
    1868           1 :                 // ignore this manual compaction as there is nothing to do (manual.level
    1869           1 :                 // points to an empty level).
    1870           1 :                 return nil, false
    1871           1 :         }
    1872             :         // This conflictsWithInProgress call is necessary for the manual compaction to
    1873             :         // be retried when it conflicts with an ongoing automatic compaction. Without
    1874             :         // it, the compaction is dropped due to pc.setupInputs returning false since
    1875             :         // the input/output range is already being compacted, and the manual
    1876             :         // compaction ends with a non-compacted LSM.
    1877           2 :         if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
    1878           2 :                 return nil, true
    1879           2 :         }
    1880           2 :         pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
    1881           2 :         manual.outputLevel = pc.outputLevel.level
    1882           2 :         pc.startLevel.files = vers.Overlaps(manual.level, opts.Comparer.Compare, manual.start, manual.end, false)
    1883           2 :         if pc.startLevel.files.Empty() {
    1884           2 :                 // Nothing to do
    1885           2 :                 return nil, false
    1886           2 :         }
    1887           2 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1888           1 :                 // setupInputs returned false indicating there's a conflicting
    1889           1 :                 // concurrent compaction.
    1890           1 :                 return nil, true
    1891           1 :         }
    1892           2 :         if pc = pc.maybeAddLevel(opts, env.diskAvailBytes); pc == nil {
    1893           0 :                 return nil, false
    1894           0 :         }
    1895           2 :         if pc.outputLevel.level != outputLevel {
    1896           2 :                 if len(pc.extraLevels) > 0 {
    1897           2 :                         // multilevel compactions relax this invariant
    1898           2 :                 } else {
    1899           0 :                         panic("pebble: compaction picked unexpected output level")
    1900             :                 }
    1901             :         }
    1902             :         // Fail-safe to protect against compacting the same sstable concurrently.
    1903           2 :         if inputRangeAlreadyCompacting(env, pc) {
    1904           1 :                 return nil, true
    1905           1 :         }
    1906           2 :         return pc, false
    1907             : }
    1908             : 
    1909             : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
    1910             :         env compactionEnv,
    1911           2 : ) (pc *pickedCompaction) {
    1912           2 :         // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
    1913           2 :         // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
    1914           2 :         // lower priority.
    1915           2 :         if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
    1916           2 :                 return nil
    1917           2 :         }
    1918           2 :         for env.readCompactionEnv.readCompactions.size > 0 {
    1919           1 :                 rc := env.readCompactionEnv.readCompactions.remove()
    1920           1 :                 if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
    1921           1 :                         break
    1922             :                 }
    1923             :         }
    1924           2 :         return pc
    1925             : }
    1926             : 
    1927             : func pickReadTriggeredCompactionHelper(
    1928             :         p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
    1929           1 : ) (pc *pickedCompaction) {
    1930           1 :         cmp := p.opts.Comparer.Compare
    1931           1 :         overlapSlice := p.vers.Overlaps(rc.level, cmp, rc.start, rc.end, false /* exclusiveEnd */)
    1932           1 :         if overlapSlice.Empty() {
    1933           1 :                 // If there is no overlap, then the file with the key range
    1934           1 :                 // must have been compacted away. So, we don't proceed to
    1935           1 :                 // compact the same key range again.
    1936           1 :                 return nil
    1937           1 :         }
    1938             : 
    1939           1 :         iter := overlapSlice.Iter()
    1940           1 :         var fileMatches bool
    1941           1 :         for f := iter.First(); f != nil; f = iter.Next() {
    1942           1 :                 if f.FileNum == rc.fileNum {
    1943           1 :                         fileMatches = true
    1944           1 :                         break
    1945             :                 }
    1946             :         }
    1947           1 :         if !fileMatches {
    1948           1 :                 return nil
    1949           1 :         }
    1950             : 
    1951           1 :         pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
    1952           1 : 
    1953           1 :         pc.startLevel.files = overlapSlice
    1954           1 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
    1955           0 :                 return nil
    1956           0 :         }
    1957           1 :         if inputRangeAlreadyCompacting(env, pc) {
    1958           0 :                 return nil
    1959           0 :         }
    1960           1 :         pc.kind = compactionKindRead
    1961           1 : 
    1962           1 :         // Prevent read compactions which are too wide.
    1963           1 :         outputOverlaps := pc.version.Overlaps(
    1964           1 :                 pc.outputLevel.level, pc.cmp, pc.smallest.UserKey,
    1965           1 :                 pc.largest.UserKey, pc.largest.IsExclusiveSentinel())
    1966           1 :         if outputOverlaps.SizeSum() > pc.maxReadCompactionBytes {
    1967           1 :                 return nil
    1968           1 :         }
    1969             : 
    1970             :         // Prevent compactions which start with a small seed file X, but overlap
    1971             :         // with over allowedCompactionWidth * X file sizes in the output layer.
    1972           1 :         const allowedCompactionWidth = 35
    1973           1 :         if outputOverlaps.SizeSum() > overlapSlice.SizeSum()*allowedCompactionWidth {
    1974           0 :                 return nil
    1975           0 :         }
    1976             : 
    1977           1 :         return pc
    1978             : }
    1979             : 
    1980           1 : func (p *compactionPickerByScore) forceBaseLevel1() {
    1981           1 :         p.baseLevel = 1
    1982           1 : }
    1983             : 
    1984           2 : func inputRangeAlreadyCompacting(env compactionEnv, pc *pickedCompaction) bool {
    1985           2 :         for _, cl := range pc.inputs {
    1986           2 :                 iter := cl.files.Iter()
    1987           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1988           2 :                         if f.IsCompacting() {
    1989           1 :                                 return true
    1990           1 :                         }
    1991             :                 }
    1992             :         }
    1993             : 
    1994             :         // Look for active compactions outputting to the same region of the key
    1995             :         // space in the same output level. Two potential compactions may conflict
    1996             :         // without sharing input files if there are no files in the output level
    1997             :         // that overlap with the intersection of the compactions' key spaces.
    1998             :         //
    1999             :         // Consider an active L0->Lbase compaction compacting two L0 files one
    2000             :         // [a-f] and the other [t-z] into Lbase.
    2001             :         //
    2002             :         // L0
    2003             :         //     ↦ 000100  ↤                           ↦  000101   ↤
    2004             :         // L1
    2005             :         //     ↦ 000004  ↤
    2006             :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2007             :         //
    2008             :         // If a new file 000102 [j-p] is flushed while the existing compaction is
    2009             :         // still ongoing, new file would not be in any compacting sublevel
    2010             :         // intervals and would not overlap with any Lbase files that are also
    2011             :         // compacting. However, this compaction cannot be picked because the
    2012             :         // compaction's output key space [j-p] would overlap the existing
    2013             :         // compaction's output key space [a-z].
    2014             :         //
    2015             :         // L0
    2016             :         //     ↦ 000100* ↤       ↦   000102  ↤       ↦  000101*  ↤
    2017             :         // L1
    2018             :         //     ↦ 000004* ↤
    2019             :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2020             :         //
    2021             :         // * - currently compacting
    2022           2 :         if pc.outputLevel != nil && pc.outputLevel.level != 0 {
    2023           2 :                 for _, c := range env.inProgressCompactions {
    2024           2 :                         if pc.outputLevel.level != c.outputLevel {
    2025           2 :                                 continue
    2026             :                         }
    2027           2 :                         if base.InternalCompare(pc.cmp, c.largest, pc.smallest) < 0 ||
    2028           2 :                                 base.InternalCompare(pc.cmp, c.smallest, pc.largest) > 0 {
    2029           2 :                                 continue
    2030             :                         }
    2031             : 
    2032             :                         // The picked compaction and the in-progress compaction c are
    2033             :                         // outputting to the same region of the key space of the same
    2034             :                         // level.
    2035           2 :                         return true
    2036             :                 }
    2037             :         }
    2038           2 :         return false
    2039             : }
    2040             : 
    2041             : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
    2042             : func conflictsWithInProgress(
    2043             :         manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
    2044           2 : ) bool {
    2045           2 :         for _, c := range inProgressCompactions {
    2046           2 :                 if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
    2047           2 :                         isUserKeysOverlapping(manual.start, manual.end, c.smallest.UserKey, c.largest.UserKey, cmp) {
    2048           2 :                         return true
    2049           2 :                 }
    2050           2 :                 for _, in := range c.inputs {
    2051           2 :                         if in.files.Empty() {
    2052           2 :                                 continue
    2053             :                         }
    2054           2 :                         iter := in.files.Iter()
    2055           2 :                         smallest := iter.First().Smallest.UserKey
    2056           2 :                         largest := iter.Last().Largest.UserKey
    2057           2 :                         if (in.level == manual.level || in.level == outputLevel) &&
    2058           2 :                                 isUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
    2059           1 :                                 return true
    2060           1 :                         }
    2061             :                 }
    2062             :         }
    2063           2 :         return false
    2064             : }
    2065             : 
    2066           2 : func isUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
    2067           2 :         return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
    2068           2 : }

Generated by: LCOV version 1.14