LCOV - code coverage report
Current view: top level - pebble - compaction_picker.go (source / functions) Hit Total Coverage
Test: 2024-12-11 08:18Z ab9741a3 - tests only.lcov Lines: 1076 1226 87.8 %
Date: 2024-12-11 08:18:35 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "math"
      12             :         "sort"
      13             :         "strings"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/humanize"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/internal/manifest"
      20             : )
      21             : 
      22             : // The minimum count for an intra-L0 compaction. This matches the RocksDB
      23             : // heuristic.
      24             : const minIntraL0Count = 4
      25             : 
      26             : type compactionEnv struct {
      27             :         // diskAvailBytes holds a statistic on the number of bytes available on
      28             :         // disk, as reported by the filesystem. It's used to be more restrictive in
      29             :         // expanding compactions if available disk space is limited.
      30             :         //
      31             :         // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
      32             :         // and whenever a compaction or flush completes. Since file removal is the
      33             :         // primary means of reclaiming space, there is a rough bound on the
      34             :         // statistic's staleness when available bytes is growing. Compactions and
      35             :         // flushes are longer, slower operations and provide a much looser bound
      36             :         // when available bytes is decreasing.
      37             :         diskAvailBytes          uint64
      38             :         earliestUnflushedSeqNum base.SeqNum
      39             :         earliestSnapshotSeqNum  base.SeqNum
      40             :         inProgressCompactions   []compactionInfo
      41             :         readCompactionEnv       readCompactionEnv
      42             : }
      43             : 
      44             : type compactionPicker interface {
      45             :         getScores([]compactionInfo) [numLevels]float64
      46             :         getBaseLevel() int
      47             :         estimatedCompactionDebt(l0ExtraSize uint64) uint64
      48             :         pickAuto(env compactionEnv) (pc *pickedCompaction)
      49             :         pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
      50             :         pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
      51             :         pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
      52             :         forceBaseLevel1()
      53             : }
      54             : 
      55             : // readCompactionEnv is used to hold data required to perform read compactions
      56             : type readCompactionEnv struct {
      57             :         rescheduleReadCompaction *bool
      58             :         readCompactions          *readCompactionQueue
      59             :         flushing                 bool
      60             : }
      61             : 
      62             : // Information about in-progress compactions provided to the compaction picker.
      63             : // These are used to constrain the new compactions that will be picked.
      64             : type compactionInfo struct {
      65             :         // versionEditApplied is true if this compaction's version edit has already
      66             :         // been committed. The compaction may still be in-progress deleting newly
      67             :         // obsolete files.
      68             :         versionEditApplied bool
      69             :         inputs             []compactionLevel
      70             :         outputLevel        int
      71             :         smallest           InternalKey
      72             :         largest            InternalKey
      73             : }
      74             : 
      75           1 : func (info compactionInfo) String() string {
      76           1 :         var buf bytes.Buffer
      77           1 :         var largest int
      78           1 :         for i, in := range info.inputs {
      79           1 :                 if i > 0 {
      80           1 :                         fmt.Fprintf(&buf, " -> ")
      81           1 :                 }
      82           1 :                 fmt.Fprintf(&buf, "L%d", in.level)
      83           1 :                 in.files.Each(func(m *fileMetadata) {
      84           1 :                         fmt.Fprintf(&buf, " %s", m.FileNum)
      85           1 :                 })
      86           1 :                 if largest < in.level {
      87           1 :                         largest = in.level
      88           1 :                 }
      89             :         }
      90           1 :         if largest != info.outputLevel || len(info.inputs) == 1 {
      91           1 :                 fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
      92           1 :         }
      93           1 :         return buf.String()
      94             : }
      95             : 
      96             : type sortCompactionLevelsByPriority []candidateLevelInfo
      97             : 
      98           1 : func (s sortCompactionLevelsByPriority) Len() int {
      99           1 :         return len(s)
     100           1 : }
     101             : 
     102             : // A level should be picked for compaction if the compensatedScoreRatio is >= the
     103             : // compactionScoreThreshold.
     104             : const compactionScoreThreshold = 1
     105             : 
     106             : // Less should return true if s[i] must be placed earlier than s[j] in the final
     107             : // sorted list. The candidateLevelInfo for the level placed earlier is more likely
     108             : // to be picked for a compaction.
     109           1 : func (s sortCompactionLevelsByPriority) Less(i, j int) bool {
     110           1 :         iShouldCompact := s[i].compensatedScoreRatio >= compactionScoreThreshold
     111           1 :         jShouldCompact := s[j].compensatedScoreRatio >= compactionScoreThreshold
     112           1 :         // Ordering is defined as decreasing on (shouldCompact, uncompensatedScoreRatio)
     113           1 :         // where shouldCompact is 1 for true and 0 for false.
     114           1 :         if iShouldCompact && !jShouldCompact {
     115           1 :                 return true
     116           1 :         }
     117           1 :         if !iShouldCompact && jShouldCompact {
     118           1 :                 return false
     119           1 :         }
     120             : 
     121           1 :         if s[i].uncompensatedScoreRatio != s[j].uncompensatedScoreRatio {
     122           1 :                 return s[i].uncompensatedScoreRatio > s[j].uncompensatedScoreRatio
     123           1 :         }
     124           1 :         return s[i].level < s[j].level
     125             : }
     126             : 
     127           1 : func (s sortCompactionLevelsByPriority) Swap(i, j int) {
     128           1 :         s[i], s[j] = s[j], s[i]
     129           1 : }
     130             : 
     131             : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
     132             : // sublevel.
     133             : type sublevelInfo struct {
     134             :         manifest.LevelSlice
     135             :         sublevel manifest.Layer
     136             : }
     137             : 
     138           1 : func (cl sublevelInfo) Clone() sublevelInfo {
     139           1 :         return sublevelInfo{
     140           1 :                 sublevel:   cl.sublevel,
     141           1 :                 LevelSlice: cl.LevelSlice,
     142           1 :         }
     143           1 : }
     144           1 : func (cl sublevelInfo) String() string {
     145           1 :         return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
     146           1 : }
     147             : 
     148             : // generateSublevelInfo will generate the level slices for each of the sublevels
     149             : // from the level slice for all of L0.
     150           1 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
     151           1 :         sublevelMap := make(map[uint64][]*fileMetadata)
     152           1 :         it := levelFiles.Iter()
     153           1 :         for f := it.First(); f != nil; f = it.Next() {
     154           1 :                 sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
     155           1 :         }
     156             : 
     157           1 :         var sublevels []int
     158           1 :         for level := range sublevelMap {
     159           1 :                 sublevels = append(sublevels, int(level))
     160           1 :         }
     161           1 :         sort.Ints(sublevels)
     162           1 : 
     163           1 :         var levelSlices []sublevelInfo
     164           1 :         for _, sublevel := range sublevels {
     165           1 :                 metas := sublevelMap[uint64(sublevel)]
     166           1 :                 levelSlices = append(
     167           1 :                         levelSlices,
     168           1 :                         sublevelInfo{
     169           1 :                                 manifest.NewLevelSliceKeySorted(cmp, metas),
     170           1 :                                 manifest.L0Sublevel(sublevel),
     171           1 :                         },
     172           1 :                 )
     173           1 :         }
     174           1 :         return levelSlices
     175             : }
     176             : 
     177             : // compactionPickerMetrics holds metrics related to the compaction picking process
     178             : type compactionPickerMetrics struct {
     179             :         // scores contains the compensatedScoreRatio from the candidateLevelInfo.
     180             :         scores                      []float64
     181             :         singleLevelOverlappingRatio float64
     182             :         multiLevelOverlappingRatio  float64
     183             : }
     184             : 
     185             : // pickedCompaction contains information about a compaction that has already
     186             : // been chosen, and is being constructed. Compaction construction info lives in
     187             : // this struct, and is copied over into the compaction struct when that's
     188             : // created.
     189             : type pickedCompaction struct {
     190             :         cmp Compare
     191             :         // score of the chosen compaction. This is the same as the
     192             :         // compensatedScoreRatio in the candidateLevelInfo.
     193             :         score float64
     194             :         // kind indicates the kind of compaction.
     195             :         kind compactionKind
     196             :         // startLevel is the level that is being compacted. Inputs from startLevel
     197             :         // and outputLevel will be merged to produce a set of outputLevel files.
     198             :         startLevel *compactionLevel
     199             :         // outputLevel is the level that files are being produced in. outputLevel is
     200             :         // equal to startLevel+1 except when:
     201             :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     202             :         //    - in multilevel compaction, the output level is the lowest level involved in
     203             :         //      the compaction
     204             :         outputLevel *compactionLevel
     205             :         // extraLevels contain additional levels in between the input and output
     206             :         // levels that get compacted in multi level compactions
     207             :         extraLevels []*compactionLevel
     208             :         inputs      []compactionLevel
     209             :         // LBase at the time of compaction picking.
     210             :         baseLevel int
     211             :         // L0-specific compaction info. Set to a non-nil value for all compactions
     212             :         // where startLevel == 0 that were generated by L0Sublevels.
     213             :         lcf *manifest.L0CompactionFiles
     214             :         // maxOutputFileSize is the maximum size of an individual table created
     215             :         // during compaction.
     216             :         maxOutputFileSize uint64
     217             :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     218             :         // single output table with the tables in the grandparent level.
     219             :         maxOverlapBytes uint64
     220             :         // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
     221             :         // overlap in its output level with. If the overlap is greater than
     222             :         // maxReadCompaction bytes, then we don't proceed with the compaction.
     223             :         maxReadCompactionBytes uint64
     224             : 
     225             :         // slot is the compaction slot used up by this compaction. Encapsulates any
     226             :         // limiting/pacing logic.
     227             :         slot base.CompactionSlot
     228             :         // The boundaries of the input data.
     229             :         smallest      InternalKey
     230             :         largest       InternalKey
     231             :         version       *version
     232             :         pickerMetrics compactionPickerMetrics
     233             : }
     234             : 
     235           1 : func (pc *pickedCompaction) userKeyBounds() base.UserKeyBounds {
     236           1 :         return base.UserKeyBoundsFromInternal(pc.smallest, pc.largest)
     237           1 : }
     238             : 
     239           1 : func defaultOutputLevel(startLevel, baseLevel int) int {
     240           1 :         outputLevel := startLevel + 1
     241           1 :         if startLevel == 0 {
     242           1 :                 outputLevel = baseLevel
     243           1 :         }
     244           1 :         if outputLevel >= numLevels-1 {
     245           1 :                 outputLevel = numLevels - 1
     246           1 :         }
     247           1 :         return outputLevel
     248             : }
     249             : 
     250             : func newPickedCompaction(
     251             :         opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
     252           1 : ) *pickedCompaction {
     253           1 :         if startLevel > 0 && startLevel < baseLevel {
     254           1 :                 panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
     255           1 :                         startLevel, baseLevel))
     256             :         }
     257             : 
     258           1 :         adjustedLevel := adjustedOutputLevel(outputLevel, baseLevel)
     259           1 :         pc := &pickedCompaction{
     260           1 :                 cmp:                    opts.Comparer.Compare,
     261           1 :                 version:                cur,
     262           1 :                 baseLevel:              baseLevel,
     263           1 :                 inputs:                 []compactionLevel{{level: startLevel}, {level: outputLevel}},
     264           1 :                 maxOutputFileSize:      uint64(opts.Level(adjustedLevel).TargetFileSize),
     265           1 :                 maxOverlapBytes:        maxGrandparentOverlapBytes(opts, adjustedLevel),
     266           1 :                 maxReadCompactionBytes: maxReadCompactionBytes(opts, adjustedLevel),
     267           1 :         }
     268           1 :         pc.startLevel = &pc.inputs[0]
     269           1 :         pc.outputLevel = &pc.inputs[1]
     270           1 :         return pc
     271             : }
     272             : 
     273             : // adjustedOutputLevel is the output level used for the purpose of
     274             : // determining the target output file size, overlap bytes, and expanded
     275             : // bytes, taking into account the base level.
     276           1 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
     277           1 :         adjustedOutputLevel := outputLevel
     278           1 :         if adjustedOutputLevel > 0 {
     279           1 :                 // Output level is in the range [baseLevel, numLevels]. For the purpose of
     280           1 :                 // determining the target output file size, overlap bytes, and expanded
     281           1 :                 // bytes, we want to adjust the range to [1,numLevels].
     282           1 :                 adjustedOutputLevel = 1 + outputLevel - baseLevel
     283           1 :         }
     284           1 :         return adjustedOutputLevel
     285             : }
     286             : 
     287             : func newPickedCompactionFromL0(
     288             :         lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
     289           1 : ) *pickedCompaction {
     290           1 :         outputLevel := baseLevel
     291           1 :         if !isBase {
     292           1 :                 outputLevel = 0 // Intra L0
     293           1 :         }
     294             : 
     295           1 :         pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
     296           1 :         pc.lcf = lcf
     297           1 :         pc.outputLevel.level = outputLevel
     298           1 : 
     299           1 :         // Manually build the compaction as opposed to calling
     300           1 :         // pickAutoHelper. This is because L0Sublevels has already added
     301           1 :         // any overlapping L0 SSTables that need to be added, and
     302           1 :         // because compactions built by L0SSTables do not necessarily
     303           1 :         // pick contiguous sequences of files in pc.version.Levels[0].
     304           1 :         files := make([]*manifest.FileMetadata, 0, len(lcf.Files))
     305           1 :         iter := vers.Levels[0].Iter()
     306           1 :         for f := iter.First(); f != nil; f = iter.Next() {
     307           1 :                 if lcf.FilesIncluded[f.L0Index] {
     308           1 :                         files = append(files, f)
     309           1 :                 }
     310             :         }
     311           1 :         pc.startLevel.files = manifest.NewLevelSliceSeqSorted(files)
     312           1 :         return pc
     313             : }
     314             : 
     315           1 : func (pc *pickedCompaction) String() string {
     316           1 :         var builder strings.Builder
     317           1 :         builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
     318           1 :         builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
     319           1 :         builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
     320           1 :         builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
     321           1 :         builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
     322           1 :         builder.WriteString(fmt.Sprintf(`smallest=%s, `, pc.smallest))
     323           1 :         builder.WriteString(fmt.Sprintf(`largest=%s, `, pc.largest))
     324           1 :         builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
     325           1 :         builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
     326           1 :         builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
     327           1 :         builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
     328           1 :         builder.WriteString(fmt.Sprintf(`extraLevels=%s, `, pc.extraLevels))
     329           1 :         builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
     330           1 :         builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
     331           1 :         return builder.String()
     332           1 : }
     333             : 
     334             : // Clone creates a deep copy of the pickedCompaction
     335           1 : func (pc *pickedCompaction) clone() *pickedCompaction {
     336           1 : 
     337           1 :         // Quickly copy over fields that do not require special deep copy care, and
     338           1 :         // set all fields that will require a deep copy to nil.
     339           1 :         newPC := &pickedCompaction{
     340           1 :                 cmp:                    pc.cmp,
     341           1 :                 score:                  pc.score,
     342           1 :                 kind:                   pc.kind,
     343           1 :                 baseLevel:              pc.baseLevel,
     344           1 :                 maxOutputFileSize:      pc.maxOutputFileSize,
     345           1 :                 maxOverlapBytes:        pc.maxOverlapBytes,
     346           1 :                 maxReadCompactionBytes: pc.maxReadCompactionBytes,
     347           1 :                 smallest:               pc.smallest.Clone(),
     348           1 :                 largest:                pc.largest.Clone(),
     349           1 : 
     350           1 :                 // TODO(msbutler): properly clone picker metrics
     351           1 :                 pickerMetrics: pc.pickerMetrics,
     352           1 : 
     353           1 :                 // Both copies see the same manifest, therefore, it's ok for them to se
     354           1 :                 // share the same pc. version.
     355           1 :                 version: pc.version,
     356           1 :         }
     357           1 : 
     358           1 :         newPC.inputs = make([]compactionLevel, len(pc.inputs))
     359           1 :         newPC.extraLevels = make([]*compactionLevel, 0, len(pc.extraLevels))
     360           1 :         for i := range pc.inputs {
     361           1 :                 newPC.inputs[i] = pc.inputs[i].Clone()
     362           1 :                 if i == 0 {
     363           1 :                         newPC.startLevel = &newPC.inputs[i]
     364           1 :                 } else if i == len(pc.inputs)-1 {
     365           1 :                         newPC.outputLevel = &newPC.inputs[i]
     366           1 :                 } else {
     367           1 :                         newPC.extraLevels = append(newPC.extraLevels, &newPC.inputs[i])
     368           1 :                 }
     369             :         }
     370             : 
     371           1 :         if len(pc.startLevel.l0SublevelInfo) > 0 {
     372           1 :                 newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
     373           1 :                 for i := range pc.startLevel.l0SublevelInfo {
     374           1 :                         newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
     375           1 :                 }
     376             :         }
     377           1 :         if pc.lcf != nil {
     378           1 :                 newPC.lcf = pc.lcf.Clone()
     379           1 :         }
     380           1 :         return newPC
     381             : }
     382             : 
     383             : // maybeExpandBounds is a helper function for setupInputs which ensures the
     384             : // pickedCompaction's smallest and largest internal keys are updated iff
     385             : // the candidate keys expand the key span. This avoids a bug for multi-level
     386             : // compactions: during the second call to setupInputs, the picked compaction's
     387             : // smallest and largest keys should not decrease the key span.
     388           1 : func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest InternalKey) {
     389           1 :         if len(smallest.UserKey) == 0 && len(largest.UserKey) == 0 {
     390           1 :                 return
     391           1 :         }
     392           1 :         if len(pc.smallest.UserKey) == 0 && len(pc.largest.UserKey) == 0 {
     393           1 :                 pc.smallest = smallest
     394           1 :                 pc.largest = largest
     395           1 :                 return
     396           1 :         }
     397           1 :         if base.InternalCompare(pc.cmp, pc.smallest, smallest) >= 0 {
     398           1 :                 pc.smallest = smallest
     399           1 :         }
     400           1 :         if base.InternalCompare(pc.cmp, pc.largest, largest) <= 0 {
     401           1 :                 pc.largest = largest
     402           1 :         }
     403             : }
     404             : 
     405             : // setupInputs returns true if a compaction has been set up. It returns false if
     406             : // a concurrent compaction is occurring on the start or output level files.
     407             : func (pc *pickedCompaction) setupInputs(
     408             :         opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
     409           1 : ) bool {
     410           1 :         // maxExpandedBytes is the maximum size of an expanded compaction. If
     411           1 :         // growing a compaction results in a larger size, the original compaction
     412           1 :         // is used instead.
     413           1 :         maxExpandedBytes := expandedCompactionByteSizeLimit(
     414           1 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
     415           1 :         )
     416           1 : 
     417           1 :         if anyTablesCompacting(startLevel.files) {
     418           1 :                 return false
     419           1 :         }
     420             : 
     421           1 :         pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.Iter()))
     422           1 : 
     423           1 :         // Determine the sstables in the output level which overlap with the input
     424           1 :         // sstables. No need to do this for intra-L0 compactions; outputLevel.files is
     425           1 :         // left empty for those.
     426           1 :         if startLevel.level != pc.outputLevel.level {
     427           1 :                 pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
     428           1 :                 if anyTablesCompacting(pc.outputLevel.files) {
     429           1 :                         return false
     430           1 :                 }
     431             : 
     432           1 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     433           1 :                         startLevel.files.Iter(), pc.outputLevel.files.Iter()))
     434             :         }
     435             : 
     436             :         // Grow the sstables in startLevel.level as long as it doesn't affect the number
     437             :         // of sstables included from pc.outputLevel.level.
     438           1 :         if pc.lcf != nil && startLevel.level == 0 && pc.outputLevel.level != 0 {
     439           1 :                 // Call the L0-specific compaction extension method. Similar logic as
     440           1 :                 // pc.grow. Additional L0 files are optionally added to the compaction at
     441           1 :                 // this step. Note that the bounds passed in are not the bounds of the
     442           1 :                 // compaction, but rather the smallest and largest internal keys that
     443           1 :                 // the compaction cannot include from L0 without pulling in more Lbase
     444           1 :                 // files. Consider this example:
     445           1 :                 //
     446           1 :                 // L0:        c-d e+f g-h
     447           1 :                 // Lbase: a-b     e+f     i-j
     448           1 :                 //        a b c d e f g h i j
     449           1 :                 //
     450           1 :                 // The e-f files have already been chosen in the compaction. As pulling
     451           1 :                 // in more LBase files is undesirable, the logic below will pass in
     452           1 :                 // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
     453           1 :                 // will expand the compaction to include c-d and g-h from L0. The
     454           1 :                 // bounds passed in are exclusive; the compaction cannot be expanded
     455           1 :                 // to include files that "touch" it.
     456           1 :                 smallestBaseKey := base.InvalidInternalKey
     457           1 :                 largestBaseKey := base.InvalidInternalKey
     458           1 :                 if pc.outputLevel.files.Empty() {
     459           1 :                         baseIter := pc.version.Levels[pc.outputLevel.level].Iter()
     460           1 :                         if sm := baseIter.SeekLT(pc.cmp, pc.smallest.UserKey); sm != nil {
     461           1 :                                 smallestBaseKey = sm.Largest
     462           1 :                         }
     463           1 :                         if la := baseIter.SeekGE(pc.cmp, pc.largest.UserKey); la != nil {
     464           1 :                                 largestBaseKey = la.Smallest
     465           1 :                         }
     466           1 :                 } else {
     467           1 :                         // NB: We use Reslice to access the underlying level's files, but
     468           1 :                         // we discard the returned slice. The pc.outputLevel.files slice
     469           1 :                         // is not modified.
     470           1 :                         _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
     471           1 :                                 if sm := start.Prev(); sm != nil {
     472           1 :                                         smallestBaseKey = sm.Largest
     473           1 :                                 }
     474           1 :                                 if la := end.Next(); la != nil {
     475           1 :                                         largestBaseKey = la.Smallest
     476           1 :                                 }
     477             :                         })
     478             :                 }
     479           1 :                 oldLcf := pc.lcf.Clone()
     480           1 :                 if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
     481           1 :                         var newStartLevelFiles []*fileMetadata
     482           1 :                         iter := pc.version.Levels[0].Iter()
     483           1 :                         var sizeSum uint64
     484           1 :                         for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
     485           1 :                                 if pc.lcf.FilesIncluded[f.L0Index] {
     486           1 :                                         newStartLevelFiles = append(newStartLevelFiles, f)
     487           1 :                                         sizeSum += f.Size
     488           1 :                                 }
     489             :                         }
     490           1 :                         if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
     491           1 :                                 startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
     492           1 :                                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
     493           1 :                                         startLevel.files.Iter(), pc.outputLevel.files.Iter())
     494           1 :                         } else {
     495           1 :                                 *pc.lcf = *oldLcf
     496           1 :                         }
     497             :                 }
     498           1 :         } else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
     499           1 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     500           1 :                         startLevel.files.Iter(), pc.outputLevel.files.Iter()))
     501           1 :         }
     502             : 
     503           1 :         if pc.startLevel.level == 0 {
     504           1 :                 // We don't change the input files for the compaction beyond this point.
     505           1 :                 pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
     506           1 :         }
     507             : 
     508           1 :         return true
     509             : }
     510             : 
     511             : // grow grows the number of inputs at c.level without changing the number of
     512             : // c.level+1 files in the compaction, and returns whether the inputs grew. sm
     513             : // and la are the smallest and largest InternalKeys in all of the inputs.
     514             : func (pc *pickedCompaction) grow(
     515             :         sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
     516           1 : ) bool {
     517           1 :         if pc.outputLevel.files.Empty() {
     518           1 :                 return false
     519           1 :         }
     520           1 :         grow0 := pc.version.Overlaps(startLevel.level, base.UserKeyBoundsFromInternal(sm, la))
     521           1 :         if anyTablesCompacting(grow0) {
     522           0 :                 return false
     523           0 :         }
     524           1 :         if grow0.Len() <= startLevel.files.Len() {
     525           1 :                 return false
     526           1 :         }
     527           1 :         if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
     528           1 :                 return false
     529           1 :         }
     530             :         // We need to include the outputLevel iter because without it, in a multiLevel scenario,
     531             :         // sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
     532           1 :         sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter(), pc.outputLevel.files.Iter())
     533           1 :         grow1 := pc.version.Overlaps(pc.outputLevel.level, base.UserKeyBoundsFromInternal(sm1, la1))
     534           1 :         if anyTablesCompacting(grow1) {
     535           0 :                 return false
     536           0 :         }
     537           1 :         if grow1.Len() != pc.outputLevel.files.Len() {
     538           1 :                 return false
     539           1 :         }
     540           1 :         startLevel.files = grow0
     541           1 :         pc.outputLevel.files = grow1
     542           1 :         return true
     543             : }
     544             : 
     545           1 : func (pc *pickedCompaction) compactionSize() uint64 {
     546           1 :         var bytesToCompact uint64
     547           1 :         for i := range pc.inputs {
     548           1 :                 bytesToCompact += pc.inputs[i].files.SizeSum()
     549           1 :         }
     550           1 :         return bytesToCompact
     551             : }
     552             : 
     553             : // setupMultiLevelCandidated returns true if it successfully added another level
     554             : // to the compaction.
     555           1 : func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailBytes uint64) bool {
     556           1 :         pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
     557           1 : 
     558           1 :         // Recalibrate startLevel and outputLevel:
     559           1 :         //  - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
     560           1 :         //  - push outputLevel to extraLevels and move the new level to outputLevel
     561           1 :         pc.startLevel = &pc.inputs[0]
     562           1 :         pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
     563           1 :         pc.outputLevel = &pc.inputs[2]
     564           1 :         return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
     565           1 : }
     566             : 
     567             : // anyTablesCompacting returns true if any tables in the level slice are
     568             : // compacting.
     569           1 : func anyTablesCompacting(inputs manifest.LevelSlice) bool {
     570           1 :         it := inputs.Iter()
     571           1 :         for f := it.First(); f != nil; f = it.Next() {
     572           1 :                 if f.IsCompacting() {
     573           1 :                         return true
     574           1 :                 }
     575             :         }
     576           1 :         return false
     577             : }
     578             : 
     579             : // newCompactionPickerByScore creates a compactionPickerByScore associated with
     580             : // the newest version. The picker is used under logLock (until a new version is
     581             : // installed).
     582             : func newCompactionPickerByScore(
     583             :         v *version,
     584             :         virtualBackings *manifest.VirtualBackings,
     585             :         opts *Options,
     586             :         inProgressCompactions []compactionInfo,
     587           1 : ) *compactionPickerByScore {
     588           1 :         p := &compactionPickerByScore{
     589           1 :                 opts:            opts,
     590           1 :                 vers:            v,
     591           1 :                 virtualBackings: virtualBackings,
     592           1 :         }
     593           1 :         p.initLevelMaxBytes(inProgressCompactions)
     594           1 :         return p
     595           1 : }
     596             : 
     597             : // Information about a candidate compaction level that has been identified by
     598             : // the compaction picker.
     599             : type candidateLevelInfo struct {
     600             :         // The compensatedScore of the level after adjusting according to the other
     601             :         // levels' sizes. For L0, the compensatedScoreRatio is equivalent to the
     602             :         // uncompensatedScoreRatio as we don't account for level size compensation in
     603             :         // L0.
     604             :         compensatedScoreRatio float64
     605             :         // The score of the level after accounting for level size compensation before
     606             :         // adjusting according to other levels' sizes. For L0, the compensatedScore
     607             :         // is equivalent to the uncompensatedScore as we don't account for level
     608             :         // size compensation in L0.
     609             :         compensatedScore float64
     610             :         // The score of the level to be compacted, calculated using uncompensated file
     611             :         // sizes and without any adjustments.
     612             :         uncompensatedScore float64
     613             :         // uncompensatedScoreRatio is the uncompensatedScore adjusted according to
     614             :         // the other levels' sizes.
     615             :         uncompensatedScoreRatio float64
     616             :         level                   int
     617             :         // The level to compact to.
     618             :         outputLevel int
     619             :         // The file in level that will be compacted. Additional files may be
     620             :         // picked by the compaction, and a pickedCompaction created for the
     621             :         // compaction.
     622             :         file manifest.LevelFile
     623             : }
     624             : 
     625           1 : func (c *candidateLevelInfo) shouldCompact() bool {
     626           1 :         return c.compensatedScoreRatio >= compactionScoreThreshold
     627           1 : }
     628             : 
     629           1 : func fileCompensation(f *fileMetadata) uint64 {
     630           1 :         return uint64(f.Stats.PointDeletionsBytesEstimate) + f.Stats.RangeDeletionsBytesEstimate
     631           1 : }
     632             : 
     633             : // compensatedSize returns f's file size, inflated according to compaction
     634             : // priorities.
     635           1 : func compensatedSize(f *fileMetadata) uint64 {
     636           1 :         // Add in the estimate of disk space that may be reclaimed by compacting the
     637           1 :         // file's tombstones.
     638           1 :         return f.Size + fileCompensation(f)
     639           1 : }
     640             : 
     641             : // compensatedSizeAnnotator is a manifest.Annotator that annotates B-Tree
     642             : // nodes with the sum of the files' compensated sizes. Compensated sizes may
     643             : // change once a table's stats are loaded asynchronously, so its values are
     644             : // marked as cacheable only if a file's stats have been loaded.
     645           1 : var compensatedSizeAnnotator = manifest.SumAnnotator(func(f *fileMetadata) (uint64, bool) {
     646           1 :         return compensatedSize(f), f.StatsValid()
     647           1 : })
     648             : 
     649             : // totalCompensatedSize computes the compensated size over a file metadata
     650             : // iterator. Note that this function is linear in the files available to the
     651             : // iterator. Use the compensatedSizeAnnotator if querying the total
     652             : // compensated size of a level.
     653           1 : func totalCompensatedSize(iter manifest.LevelIterator) uint64 {
     654           1 :         var sz uint64
     655           1 :         for f := iter.First(); f != nil; f = iter.Next() {
     656           1 :                 sz += compensatedSize(f)
     657           1 :         }
     658           1 :         return sz
     659             : }
     660             : 
     661             : // compactionPickerByScore holds the state and logic for picking a compaction. A
     662             : // compaction picker is associated with a single version. A new compaction
     663             : // picker is created and initialized every time a new version is installed.
     664             : type compactionPickerByScore struct {
     665             :         opts            *Options
     666             :         vers            *version
     667             :         virtualBackings *manifest.VirtualBackings
     668             :         // The level to target for L0 compactions. Levels L1 to baseLevel must be
     669             :         // empty.
     670             :         baseLevel int
     671             :         // levelMaxBytes holds the dynamically adjusted max bytes setting for each
     672             :         // level.
     673             :         levelMaxBytes [numLevels]int64
     674             : }
     675             : 
     676             : var _ compactionPicker = &compactionPickerByScore{}
     677             : 
     678           1 : func (p *compactionPickerByScore) getScores(inProgress []compactionInfo) [numLevels]float64 {
     679           1 :         var scores [numLevels]float64
     680           1 :         for _, info := range p.calculateLevelScores(inProgress) {
     681           1 :                 scores[info.level] = info.compensatedScoreRatio
     682           1 :         }
     683           1 :         return scores
     684             : }
     685             : 
     686           1 : func (p *compactionPickerByScore) getBaseLevel() int {
     687           1 :         if p == nil {
     688           0 :                 return 1
     689           0 :         }
     690           1 :         return p.baseLevel
     691             : }
     692             : 
     693             : // estimatedCompactionDebt estimates the number of bytes which need to be
     694             : // compacted before the LSM tree becomes stable.
     695           1 : func (p *compactionPickerByScore) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
     696           1 :         if p == nil {
     697           0 :                 return 0
     698           0 :         }
     699             : 
     700             :         // We assume that all the bytes in L0 need to be compacted to Lbase. This is
     701             :         // unlike the RocksDB logic that figures out whether L0 needs compaction.
     702           1 :         bytesAddedToNextLevel := l0ExtraSize + p.vers.Levels[0].Size()
     703           1 :         lbaseSize := p.vers.Levels[p.baseLevel].Size()
     704           1 : 
     705           1 :         var compactionDebt uint64
     706           1 :         if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
     707           1 :                 // We only incur compaction debt if both L0 and Lbase contain data. If L0
     708           1 :                 // is empty, no compaction is necessary. If Lbase is empty, a move-based
     709           1 :                 // compaction from L0 would occur.
     710           1 :                 compactionDebt += bytesAddedToNextLevel + lbaseSize
     711           1 :         }
     712             : 
     713             :         // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
     714             :         // bytes added to `level` in the loop.
     715           1 :         for level := p.baseLevel; level < numLevels-1; level++ {
     716           1 :                 levelSize := p.vers.Levels[level].Size() + bytesAddedToNextLevel
     717           1 :                 nextLevelSize := p.vers.Levels[level+1].Size()
     718           1 :                 if levelSize > uint64(p.levelMaxBytes[level]) {
     719           1 :                         bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
     720           1 :                         if nextLevelSize > 0 {
     721           1 :                                 // We only incur compaction debt if the next level contains data. If the
     722           1 :                                 // next level is empty, a move-based compaction would be used.
     723           1 :                                 levelRatio := float64(nextLevelSize) / float64(levelSize)
     724           1 :                                 // The current level contributes bytesAddedToNextLevel to compactions.
     725           1 :                                 // The next level contributes levelRatio * bytesAddedToNextLevel.
     726           1 :                                 compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
     727           1 :                         }
     728           1 :                 } else {
     729           1 :                         // We're not moving any bytes to the next level.
     730           1 :                         bytesAddedToNextLevel = 0
     731           1 :                 }
     732             :         }
     733           1 :         return compactionDebt
     734             : }
     735             : 
     736           1 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
     737           1 :         // The levelMaxBytes calculations here differ from RocksDB in two ways:
     738           1 :         //
     739           1 :         // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
     740           1 :         //    level in L1-L6, rather than determining the size of the bottom level
     741           1 :         //    based on the total amount of data in the dB. The RocksDB calculation is
     742           1 :         //    problematic if L0 contains a significant fraction of data, or if the
     743           1 :         //    level sizes are roughly equal and thus there is a significant fraction
     744           1 :         //    of data outside of the largest level.
     745           1 :         //
     746           1 :         // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
     747           1 :         //    baseBytesMax as the maximum of the configured LBaseMaxBytes and the
     748           1 :         //    size of L0. This is problematic because baseBytesMax is used to compute
     749           1 :         //    the max size of lower levels. A very large baseBytesMax will result in
     750           1 :         //    an overly large value for the size of lower levels which will caused
     751           1 :         //    those levels not to be compacted even when they should be
     752           1 :         //    compacted. This often results in "inverted" LSM shapes where Ln is
     753           1 :         //    larger than Ln+1.
     754           1 : 
     755           1 :         // Determine the first non-empty level and the total DB size.
     756           1 :         firstNonEmptyLevel := -1
     757           1 :         var dbSize uint64
     758           1 :         for level := 1; level < numLevels; level++ {
     759           1 :                 if p.vers.Levels[level].Size() > 0 {
     760           1 :                         if firstNonEmptyLevel == -1 {
     761           1 :                                 firstNonEmptyLevel = level
     762           1 :                         }
     763           1 :                         dbSize += p.vers.Levels[level].Size()
     764             :                 }
     765             :         }
     766           1 :         for _, c := range inProgressCompactions {
     767           1 :                 if c.outputLevel == 0 || c.outputLevel == -1 {
     768           1 :                         continue
     769             :                 }
     770           1 :                 if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
     771           1 :                         firstNonEmptyLevel = c.outputLevel
     772           1 :                 }
     773             :         }
     774             : 
     775             :         // Initialize the max-bytes setting for each level to "infinity" which will
     776             :         // disallow compaction for that level. We'll fill in the actual value below
     777             :         // for levels we want to allow compactions from.
     778           1 :         for level := 0; level < numLevels; level++ {
     779           1 :                 p.levelMaxBytes[level] = math.MaxInt64
     780           1 :         }
     781             : 
     782           1 :         if dbSize == 0 {
     783           1 :                 // No levels for L1 and up contain any data. Target L0 compactions for the
     784           1 :                 // last level or to the level to which there is an ongoing L0 compaction.
     785           1 :                 p.baseLevel = numLevels - 1
     786           1 :                 if firstNonEmptyLevel >= 0 {
     787           1 :                         p.baseLevel = firstNonEmptyLevel
     788           1 :                 }
     789           1 :                 return
     790             :         }
     791             : 
     792           1 :         dbSize += p.vers.Levels[0].Size()
     793           1 :         bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
     794           1 : 
     795           1 :         curLevelSize := bottomLevelSize
     796           1 :         for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
     797           1 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     798           1 :         }
     799             : 
     800             :         // Compute base level (where L0 data is compacted to).
     801           1 :         baseBytesMax := uint64(p.opts.LBaseMaxBytes)
     802           1 :         p.baseLevel = firstNonEmptyLevel
     803           1 :         for p.baseLevel > 1 && curLevelSize > baseBytesMax {
     804           1 :                 p.baseLevel--
     805           1 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     806           1 :         }
     807             : 
     808           1 :         smoothedLevelMultiplier := 1.0
     809           1 :         if p.baseLevel < numLevels-1 {
     810           1 :                 smoothedLevelMultiplier = math.Pow(
     811           1 :                         float64(bottomLevelSize)/float64(baseBytesMax),
     812           1 :                         1.0/float64(numLevels-p.baseLevel-1))
     813           1 :         }
     814             : 
     815           1 :         levelSize := float64(baseBytesMax)
     816           1 :         for level := p.baseLevel; level < numLevels; level++ {
     817           1 :                 if level > p.baseLevel && levelSize > 0 {
     818           1 :                         levelSize *= smoothedLevelMultiplier
     819           1 :                 }
     820             :                 // Round the result since test cases use small target level sizes, which
     821             :                 // can be impacted by floating-point imprecision + integer truncation.
     822           1 :                 roundedLevelSize := math.Round(levelSize)
     823           1 :                 if roundedLevelSize > float64(math.MaxInt64) {
     824           0 :                         p.levelMaxBytes[level] = math.MaxInt64
     825           1 :                 } else {
     826           1 :                         p.levelMaxBytes[level] = int64(roundedLevelSize)
     827           1 :                 }
     828             :         }
     829             : }
     830             : 
     831             : type levelSizeAdjust struct {
     832             :         incomingActualBytes      uint64
     833             :         outgoingActualBytes      uint64
     834             :         outgoingCompensatedBytes uint64
     835             : }
     836             : 
     837           1 : func (a levelSizeAdjust) compensated() uint64 {
     838           1 :         return a.incomingActualBytes - a.outgoingCompensatedBytes
     839           1 : }
     840             : 
     841           1 : func (a levelSizeAdjust) actual() uint64 {
     842           1 :         return a.incomingActualBytes - a.outgoingActualBytes
     843           1 : }
     844             : 
     845           1 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
     846           1 :         // Compute size adjustments for each level based on the in-progress
     847           1 :         // compactions. We sum the file sizes of all files leaving and entering each
     848           1 :         // level in in-progress compactions. For outgoing files, we also sum a
     849           1 :         // separate sum of 'compensated file sizes', which are inflated according
     850           1 :         // to deletion estimates.
     851           1 :         //
     852           1 :         // When we adjust a level's size according to these values during score
     853           1 :         // calculation, we subtract the compensated size of start level inputs to
     854           1 :         // account for the fact that score calculation uses compensated sizes.
     855           1 :         //
     856           1 :         // Since compensated file sizes may be compensated because they reclaim
     857           1 :         // space from the output level's files, we only add the real file size to
     858           1 :         // the output level.
     859           1 :         //
     860           1 :         // This is slightly different from RocksDB's behavior, which simply elides
     861           1 :         // compacting files from the level size calculation.
     862           1 :         var sizeAdjust [numLevels]levelSizeAdjust
     863           1 :         for i := range inProgressCompactions {
     864           1 :                 c := &inProgressCompactions[i]
     865           1 :                 // If this compaction's version edit has already been applied, there's
     866           1 :                 // no need to adjust: The LSM we'll examine will already reflect the
     867           1 :                 // new LSM state.
     868           1 :                 if c.versionEditApplied {
     869           1 :                         continue
     870             :                 }
     871             : 
     872           1 :                 for _, input := range c.inputs {
     873           1 :                         actualSize := input.files.SizeSum()
     874           1 :                         compensatedSize := totalCompensatedSize(input.files.Iter())
     875           1 : 
     876           1 :                         if input.level != c.outputLevel {
     877           1 :                                 sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
     878           1 :                                 sizeAdjust[input.level].outgoingActualBytes += actualSize
     879           1 :                                 if c.outputLevel != -1 {
     880           1 :                                         sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
     881           1 :                                 }
     882             :                         }
     883             :                 }
     884             :         }
     885           1 :         return sizeAdjust
     886             : }
     887             : 
     888             : func (p *compactionPickerByScore) calculateLevelScores(
     889             :         inProgressCompactions []compactionInfo,
     890           1 : ) [numLevels]candidateLevelInfo {
     891           1 :         var scores [numLevels]candidateLevelInfo
     892           1 :         for i := range scores {
     893           1 :                 scores[i].level = i
     894           1 :                 scores[i].outputLevel = i + 1
     895           1 :         }
     896           1 :         l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.opts, inProgressCompactions)
     897           1 :         scores[0] = candidateLevelInfo{
     898           1 :                 outputLevel:        p.baseLevel,
     899           1 :                 uncompensatedScore: l0UncompensatedScore,
     900           1 :                 compensatedScore:   l0UncompensatedScore, /* No level size compensation for L0 */
     901           1 :         }
     902           1 :         sizeAdjust := calculateSizeAdjust(inProgressCompactions)
     903           1 :         for level := 1; level < numLevels; level++ {
     904           1 :                 compensatedLevelSize := *compensatedSizeAnnotator.LevelAnnotation(p.vers.Levels[level]) + sizeAdjust[level].compensated()
     905           1 :                 scores[level].compensatedScore = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
     906           1 :                 scores[level].uncompensatedScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
     907           1 :         }
     908             : 
     909             :         // Adjust each level's {compensated, uncompensated}Score by the uncompensatedScore
     910             :         // of the next level to get a {compensated, uncompensated}ScoreRatio. If the
     911             :         // next level has a high uncompensatedScore, and is thus a priority for compaction,
     912             :         // this reduces the priority for compacting the current level. If the next level
     913             :         // has a low uncompensatedScore (i.e. it is below its target size), this increases
     914             :         // the priority for compacting the current level.
     915             :         //
     916             :         // The effect of this adjustment is to help prioritize compactions in lower
     917             :         // levels. The following example shows the compensatedScoreRatio and the
     918             :         // compensatedScore. In this scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase)
     919             :         // is significantly above its target size. The original score prioritizes
     920             :         // compactions from those two levels, but doing so ends up causing a future
     921             :         // problem: data piles up in the higher levels, starving L5->L6 compactions,
     922             :         // and to a lesser degree starving L4->L5 compactions.
     923             :         //
     924             :         // Note that in the example shown there is no level size compensation so the
     925             :         // compensatedScore and the uncompensatedScore is the same for each level.
     926             :         //
     927             :         //        compensatedScoreRatio   compensatedScore   uncompensatedScore   size   max-size
     928             :         //   L0                     3.2               68.0                 68.0  2.2 G          -
     929             :         //   L3                     3.2               21.1                 21.1  1.3 G       64 M
     930             :         //   L4                     3.4                6.7                  6.7  3.1 G      467 M
     931             :         //   L5                     3.4                2.0                  2.0  6.6 G      3.3 G
     932             :         //   L6                     0.6                0.6                  0.6   14 G       24 G
     933           1 :         var prevLevel int
     934           1 :         for level := p.baseLevel; level < numLevels; level++ {
     935           1 :                 // The compensated scores, and uncompensated scores will be turned into
     936           1 :                 // ratios as they're adjusted according to other levels' sizes.
     937           1 :                 scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
     938           1 :                 scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
     939           1 : 
     940           1 :                 // Avoid absurdly large scores by placing a floor on the score that we'll
     941           1 :                 // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
     942           1 :                 const minScore = 0.01
     943           1 :                 if scores[prevLevel].compensatedScoreRatio >= compactionScoreThreshold {
     944           1 :                         if scores[level].uncompensatedScore >= minScore {
     945           1 :                                 scores[prevLevel].compensatedScoreRatio /= scores[level].uncompensatedScore
     946           1 :                         } else {
     947           1 :                                 scores[prevLevel].compensatedScoreRatio /= minScore
     948           1 :                         }
     949             :                 }
     950           1 :                 if scores[prevLevel].uncompensatedScoreRatio >= compactionScoreThreshold {
     951           1 :                         if scores[level].uncompensatedScore >= minScore {
     952           1 :                                 scores[prevLevel].uncompensatedScoreRatio /= scores[level].uncompensatedScore
     953           1 :                         } else {
     954           1 :                                 scores[prevLevel].uncompensatedScoreRatio /= minScore
     955           1 :                         }
     956             :                 }
     957           1 :                 prevLevel = level
     958             :         }
     959             :         // Set the score ratios for the lowest level.
     960             :         // INVARIANT: prevLevel == numLevels-1
     961           1 :         scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
     962           1 :         scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
     963           1 : 
     964           1 :         sort.Sort(sortCompactionLevelsByPriority(scores[:]))
     965           1 :         return scores
     966             : }
     967             : 
     968             : // calculateL0UncompensatedScore calculates a float score representing the
     969             : // relative priority of compacting L0. Level L0 is special in that files within
     970             : // L0 may overlap one another, so a different set of heuristics that take into
     971             : // account read amplification apply.
     972             : func calculateL0UncompensatedScore(
     973             :         vers *version, opts *Options, inProgressCompactions []compactionInfo,
     974           1 : ) float64 {
     975           1 :         // Use the sublevel count to calculate the score. The base vs intra-L0
     976           1 :         // compaction determination happens in pickAuto, not here.
     977           1 :         score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
     978           1 :                 float64(opts.L0CompactionThreshold)
     979           1 : 
     980           1 :         // Also calculate a score based on the file count but use it only if it
     981           1 :         // produces a higher score than the sublevel-based one. This heuristic is
     982           1 :         // designed to accommodate cases where L0 is accumulating non-overlapping
     983           1 :         // files in L0. Letting too many non-overlapping files accumulate in few
     984           1 :         // sublevels is undesirable, because:
     985           1 :         // 1) we can produce a massive backlog to compact once files do overlap.
     986           1 :         // 2) constructing L0 sublevels has a runtime that grows superlinearly with
     987           1 :         //    the number of files in L0 and must be done while holding D.mu.
     988           1 :         noncompactingFiles := vers.Levels[0].Len()
     989           1 :         for _, c := range inProgressCompactions {
     990           1 :                 for _, cl := range c.inputs {
     991           1 :                         if cl.level == 0 {
     992           1 :                                 noncompactingFiles -= cl.files.Len()
     993           1 :                         }
     994             :                 }
     995             :         }
     996           1 :         fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
     997           1 :         if score < fileScore {
     998           1 :                 score = fileScore
     999           1 :         }
    1000           1 :         return score
    1001             : }
    1002             : 
    1003             : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
    1004             : // compaction around. Currently, this function implements a heuristic similar to
    1005             : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
    1006             : // function is linear with respect to the number of files in `level` and
    1007             : // `outputLevel`.
    1008             : func pickCompactionSeedFile(
    1009             :         vers *version,
    1010             :         virtualBackings *manifest.VirtualBackings,
    1011             :         opts *Options,
    1012             :         level, outputLevel int,
    1013             :         earliestSnapshotSeqNum base.SeqNum,
    1014           1 : ) (manifest.LevelFile, bool) {
    1015           1 :         // Select the file within the level to compact. We want to minimize write
    1016           1 :         // amplification, but also ensure that (a) deletes are propagated to the
    1017           1 :         // bottom level in a timely fashion, and (b) virtual sstables that are
    1018           1 :         // pinning backing sstables where most of the data is garbage are compacted
    1019           1 :         // away. Doing (a) and (b) reclaims disk space. A table's smallest sequence
    1020           1 :         // number provides a measure of its age. The ratio of overlapping-bytes /
    1021           1 :         // table-size gives an indication of write amplification (a smaller ratio is
    1022           1 :         // preferrable).
    1023           1 :         //
    1024           1 :         // The current heuristic is based off the RocksDB kMinOverlappingRatio
    1025           1 :         // heuristic. It chooses the file with the minimum overlapping ratio with
    1026           1 :         // the target level, which minimizes write amplification.
    1027           1 :         //
    1028           1 :         // The heuristic uses a "compensated size" for the denominator, which is the
    1029           1 :         // file size inflated by (a) an estimate of the space that may be reclaimed
    1030           1 :         // through compaction, and (b) a fraction of the amount of garbage in the
    1031           1 :         // backing sstable pinned by this (virtual) sstable.
    1032           1 :         //
    1033           1 :         // TODO(peter): For concurrent compactions, we may want to try harder to
    1034           1 :         // pick a seed file whose resulting compaction bounds do not overlap with
    1035           1 :         // an in-progress compaction.
    1036           1 : 
    1037           1 :         cmp := opts.Comparer.Compare
    1038           1 :         startIter := vers.Levels[level].Iter()
    1039           1 :         outputIter := vers.Levels[outputLevel].Iter()
    1040           1 : 
    1041           1 :         var file manifest.LevelFile
    1042           1 :         smallestRatio := uint64(math.MaxUint64)
    1043           1 : 
    1044           1 :         outputFile := outputIter.First()
    1045           1 : 
    1046           1 :         for f := startIter.First(); f != nil; f = startIter.Next() {
    1047           1 :                 var overlappingBytes uint64
    1048           1 :                 compacting := f.IsCompacting()
    1049           1 :                 if compacting {
    1050           1 :                         // Move on if this file is already being compacted. We'll likely
    1051           1 :                         // still need to move past the overlapping output files regardless,
    1052           1 :                         // but in cases where all start-level files are compacting we won't.
    1053           1 :                         continue
    1054             :                 }
    1055             : 
    1056             :                 // Trim any output-level files smaller than f.
    1057           1 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
    1058           1 :                         outputFile = outputIter.Next()
    1059           1 :                 }
    1060             : 
    1061           1 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
    1062           1 :                         overlappingBytes += outputFile.Size
    1063           1 :                         compacting = compacting || outputFile.IsCompacting()
    1064           1 : 
    1065           1 :                         // For files in the bottommost level of the LSM, the
    1066           1 :                         // Stats.RangeDeletionsBytesEstimate field is set to the estimate
    1067           1 :                         // of bytes /within/ the file itself that may be dropped by
    1068           1 :                         // recompacting the file. These bytes from obsolete keys would not
    1069           1 :                         // need to be rewritten if we compacted `f` into `outputFile`, so
    1070           1 :                         // they don't contribute to write amplification. Subtracting them
    1071           1 :                         // out of the overlapping bytes helps prioritize these compactions
    1072           1 :                         // that are cheaper than their file sizes suggest.
    1073           1 :                         if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
    1074           1 :                                 overlappingBytes -= outputFile.Stats.RangeDeletionsBytesEstimate
    1075           1 :                         }
    1076             : 
    1077             :                         // If the file in the next level extends beyond f's largest key,
    1078             :                         // break out and don't advance outputIter because f's successor
    1079             :                         // might also overlap.
    1080             :                         //
    1081             :                         // Note, we stop as soon as we encounter an output-level file with a
    1082             :                         // largest key beyond the input-level file's largest bound. We
    1083             :                         // perform a simple user key comparison here using sstableKeyCompare
    1084             :                         // which handles the potential for exclusive largest key bounds.
    1085             :                         // There's some subtlety when the bounds are equal (eg, equal and
    1086             :                         // inclusive, or equal and exclusive). Current Pebble doesn't split
    1087             :                         // user keys across sstables within a level (and in format versions
    1088             :                         // FormatSplitUserKeysMarkedCompacted and later we guarantee no
    1089             :                         // split user keys exist within the entire LSM). In that case, we're
    1090             :                         // assured that neither the input level nor the output level's next
    1091             :                         // file shares the same user key, so compaction expansion will not
    1092             :                         // include them in any compaction compacting `f`.
    1093             :                         //
    1094             :                         // NB: If we /did/ allow split user keys, or we're running on an
    1095             :                         // old database with an earlier format major version where there are
    1096             :                         // existing split user keys, this logic would be incorrect. Consider
    1097             :                         //    L1: [a#120,a#100] [a#80,a#60]
    1098             :                         //    L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
    1099             :                         // While considering the first file in L1, [a#120,a#100], we'd skip
    1100             :                         // past all of the files in L2. When considering the second file in
    1101             :                         // L1, we'd improperly conclude that the second file overlaps
    1102             :                         // nothing in the second level and is cheap to compact, when in
    1103             :                         // reality we'd need to expand the compaction to include all 5
    1104             :                         // files.
    1105           1 :                         if sstableKeyCompare(cmp, outputFile.Largest, f.Largest) > 0 {
    1106           1 :                                 break
    1107             :                         }
    1108           1 :                         outputFile = outputIter.Next()
    1109             :                 }
    1110             : 
    1111             :                 // If the input level file or one of the overlapping files is
    1112             :                 // compacting, we're not going to be able to compact this file
    1113             :                 // anyways, so skip it.
    1114           1 :                 if compacting {
    1115           1 :                         continue
    1116             :                 }
    1117             : 
    1118           1 :                 compSz := compensatedSize(f) + responsibleForGarbageBytes(virtualBackings, f)
    1119           1 :                 scaledRatio := overlappingBytes * 1024 / compSz
    1120           1 :                 if scaledRatio < smallestRatio {
    1121           1 :                         smallestRatio = scaledRatio
    1122           1 :                         file = startIter.Take()
    1123           1 :                 }
    1124             :         }
    1125           1 :         return file, file.FileMetadata != nil
    1126             : }
    1127             : 
    1128             : // responsibleForGarbageBytes returns the amount of garbage in the backing
    1129             : // sstable that we consider the responsibility of this virtual sstable. For
    1130             : // non-virtual sstables, this is of course 0. For virtual sstables, we equally
    1131             : // distribute the responsibility of the garbage across all the virtual
    1132             : // sstables that are referencing the same backing sstable. One could
    1133             : // alternatively distribute this in proportion to the virtual sst sizes, but
    1134             : // it isn't clear that more sophisticated heuristics are worth it, given that
    1135             : // the garbage cannot be reclaimed until all the referencing virtual sstables
    1136             : // are compacted.
    1137           1 : func responsibleForGarbageBytes(virtualBackings *manifest.VirtualBackings, m *fileMetadata) uint64 {
    1138           1 :         if !m.Virtual {
    1139           1 :                 return 0
    1140           1 :         }
    1141           1 :         useCount, virtualizedSize := virtualBackings.Usage(m.FileBacking.DiskFileNum)
    1142           1 :         // Since virtualizedSize is the sum of the estimated size of all virtual
    1143           1 :         // ssts, we allow for the possibility that virtualizedSize could exceed
    1144           1 :         // m.FileBacking.Size.
    1145           1 :         totalGarbage := int64(m.FileBacking.Size) - int64(virtualizedSize)
    1146           1 :         if totalGarbage <= 0 {
    1147           0 :                 return 0
    1148           0 :         }
    1149           1 :         if useCount == 0 {
    1150           0 :                 // This cannot happen if m exists in the latest version. The call to
    1151           0 :                 // ResponsibleForGarbageBytes during compaction picking ensures that m
    1152           0 :                 // exists in the latest version by holding versionSet.logLock.
    1153           0 :                 panic(errors.AssertionFailedf("%s has zero useCount", m.String()))
    1154             :         }
    1155           1 :         return uint64(totalGarbage) / uint64(useCount)
    1156             : }
    1157             : 
    1158             : // pickAuto picks the best compaction, if any.
    1159             : //
    1160             : // On each call, pickAuto computes per-level size adjustments based on
    1161             : // in-progress compactions, and computes a per-level score. The levels are
    1162             : // iterated over in decreasing score order trying to find a valid compaction
    1163             : // anchored at that level.
    1164             : //
    1165             : // If a score-based compaction cannot be found, pickAuto falls back to looking
    1166             : // for an elision-only compaction to remove obsolete keys.
    1167           1 : func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
    1168           1 :         // Compaction concurrency is controlled by L0 read-amp. We allow one
    1169           1 :         // additional compaction per L0CompactionConcurrency sublevels, as well as
    1170           1 :         // one additional compaction per CompactionDebtConcurrency bytes of
    1171           1 :         // compaction debt. Compaction concurrency is tied to L0 sublevels as that
    1172           1 :         // signal is independent of the database size. We tack on the compaction
    1173           1 :         // debt as a second signal to prevent compaction concurrency from dropping
    1174           1 :         // significantly right after a base compaction finishes, and before those
    1175           1 :         // bytes have been compacted further down the LSM.
    1176           1 :         if n := len(env.inProgressCompactions); n > 0 {
    1177           1 :                 l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
    1178           1 :                 compactionDebt := p.estimatedCompactionDebt(0)
    1179           1 :                 ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
    1180           1 :                 ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
    1181           1 :                 if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
    1182           1 :                         return nil
    1183           1 :                 }
    1184             :         }
    1185             : 
    1186             :         // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
    1187             :         // and passed-in as an option during Open.
    1188           1 :         limiter := p.opts.Experimental.CompactionLimiter
    1189           1 :         var slot base.CompactionSlot
    1190           1 :         if n := len(env.inProgressCompactions); n == 0 {
    1191           1 :                 // We are not running a compaction at the moment. We should take a compaction slot
    1192           1 :                 // without permission.
    1193           1 :                 slot = limiter.TookWithoutPermission(context.TODO())
    1194           1 :         } else {
    1195           1 :                 var err error
    1196           1 :                 slot, err = limiter.RequestSlot(context.TODO())
    1197           1 :                 if err != nil {
    1198           0 :                         p.opts.EventListener.BackgroundError(err)
    1199           0 :                         return nil
    1200           0 :                 }
    1201           1 :                 if slot == nil {
    1202           0 :                         // The limiter is denying us a compaction slot. Yield to other work.
    1203           0 :                         return nil
    1204           0 :                 }
    1205             :         }
    1206           1 :         defer func() {
    1207           1 :                 if pc != nil {
    1208           1 :                         var inputSize uint64
    1209           1 :                         for i := range pc.inputs {
    1210           1 :                                 inputSize += pc.inputs[i].files.SizeSum()
    1211           1 :                         }
    1212           1 :                         slot.CompactionSelected(pc.startLevel.level, pc.outputLevel.level, inputSize)
    1213           1 :                         pc.slot = slot
    1214             :                 }
    1215             :         }()
    1216             : 
    1217           1 :         scores := p.calculateLevelScores(env.inProgressCompactions)
    1218           1 : 
    1219           1 :         // TODO(bananabrick): Either remove, or change this into an event sent to the
    1220           1 :         // EventListener.
    1221           1 :         logCompaction := func(pc *pickedCompaction) {
    1222           0 :                 var buf bytes.Buffer
    1223           0 :                 for i := 0; i < numLevels; i++ {
    1224           0 :                         if i != 0 && i < p.baseLevel {
    1225           0 :                                 continue
    1226             :                         }
    1227             : 
    1228           0 :                         var info *candidateLevelInfo
    1229           0 :                         for j := range scores {
    1230           0 :                                 if scores[j].level == i {
    1231           0 :                                         info = &scores[j]
    1232           0 :                                         break
    1233             :                                 }
    1234             :                         }
    1235             : 
    1236           0 :                         marker := " "
    1237           0 :                         if pc.startLevel.level == info.level {
    1238           0 :                                 marker = "*"
    1239           0 :                         }
    1240           0 :                         fmt.Fprintf(&buf, "  %sL%d: %5.1f  %5.1f  %5.1f  %5.1f %8s  %8s",
    1241           0 :                                 marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
    1242           0 :                                 info.uncompensatedScoreRatio, info.uncompensatedScore,
    1243           0 :                                 humanize.Bytes.Int64(int64(totalCompensatedSize(
    1244           0 :                                         p.vers.Levels[info.level].Iter(),
    1245           0 :                                 ))),
    1246           0 :                                 humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
    1247           0 :                         )
    1248           0 : 
    1249           0 :                         count := 0
    1250           0 :                         for i := range env.inProgressCompactions {
    1251           0 :                                 c := &env.inProgressCompactions[i]
    1252           0 :                                 if c.inputs[0].level != info.level {
    1253           0 :                                         continue
    1254             :                                 }
    1255           0 :                                 count++
    1256           0 :                                 if count == 1 {
    1257           0 :                                         fmt.Fprintf(&buf, "  [")
    1258           0 :                                 } else {
    1259           0 :                                         fmt.Fprintf(&buf, " ")
    1260           0 :                                 }
    1261           0 :                                 fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
    1262             :                         }
    1263           0 :                         if count > 0 {
    1264           0 :                                 fmt.Fprintf(&buf, "]")
    1265           0 :                         }
    1266           0 :                         fmt.Fprintf(&buf, "\n")
    1267             :                 }
    1268           0 :                 p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
    1269           0 :                         pc.startLevel.level, pc.outputLevel.level, buf.String())
    1270             :         }
    1271             : 
    1272             :         // Check for a score-based compaction. candidateLevelInfos are first sorted
    1273             :         // by whether they should be compacted, so if we find a level which shouldn't
    1274             :         // be compacted, we can break early.
    1275           1 :         for i := range scores {
    1276           1 :                 info := &scores[i]
    1277           1 :                 if !info.shouldCompact() {
    1278           1 :                         break
    1279             :                 }
    1280           1 :                 if info.level == numLevels-1 {
    1281           1 :                         continue
    1282             :                 }
    1283             : 
    1284           1 :                 if info.level == 0 {
    1285           1 :                         pc = pickL0(env, p.opts, p.vers, p.baseLevel)
    1286           1 :                         // Fail-safe to protect against compacting the same sstable
    1287           1 :                         // concurrently.
    1288           1 :                         if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1289           1 :                                 p.addScoresToPickedCompactionMetrics(pc, scores)
    1290           1 :                                 pc.score = info.compensatedScoreRatio
    1291           1 :                                 // TODO(bananabrick): Create an EventListener for logCompaction.
    1292           1 :                                 if false {
    1293           0 :                                         logCompaction(pc)
    1294           0 :                                 }
    1295           1 :                                 return pc
    1296             :                         }
    1297           1 :                         continue
    1298             :                 }
    1299             : 
    1300             :                 // info.level > 0
    1301           1 :                 var ok bool
    1302           1 :                 info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
    1303           1 :                 if !ok {
    1304           1 :                         continue
    1305             :                 }
    1306             : 
    1307           1 :                 pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)
    1308           1 :                 // Fail-safe to protect against compacting the same sstable concurrently.
    1309           1 :                 if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1310           1 :                         p.addScoresToPickedCompactionMetrics(pc, scores)
    1311           1 :                         pc.score = info.compensatedScoreRatio
    1312           1 :                         // TODO(bananabrick): Create an EventListener for logCompaction.
    1313           1 :                         if false {
    1314           0 :                                 logCompaction(pc)
    1315           0 :                         }
    1316           1 :                         return pc
    1317             :                 }
    1318             :         }
    1319             : 
    1320             :         // Check for files which contain excessive point tombstones that could slow
    1321             :         // down reads. Unlike elision-only compactions, these compactions may select
    1322             :         // a file at any level rather than only the lowest level.
    1323           1 :         if pc := p.pickTombstoneDensityCompaction(env); pc != nil {
    1324           0 :                 return pc
    1325           0 :         }
    1326             : 
    1327             :         // Check for L6 files with tombstones that may be elided. These files may
    1328             :         // exist if a snapshot prevented the elision of a tombstone or because of
    1329             :         // a move compaction. These are low-priority compactions because they
    1330             :         // don't help us keep up with writes, just reclaim disk space.
    1331           1 :         if pc := p.pickElisionOnlyCompaction(env); pc != nil {
    1332           1 :                 return pc
    1333           1 :         }
    1334             : 
    1335           1 :         if pc := p.pickReadTriggeredCompaction(env); pc != nil {
    1336           1 :                 return pc
    1337           1 :         }
    1338             : 
    1339             :         // NB: This should only be run if a read compaction wasn't
    1340             :         // scheduled.
    1341             :         //
    1342             :         // We won't be scheduling a read compaction right now, and in
    1343             :         // read heavy workloads, compactions won't be scheduled frequently
    1344             :         // because flushes aren't frequent. So we need to signal to the
    1345             :         // iterator to schedule a compaction when it adds compactions to
    1346             :         // the read compaction queue.
    1347             :         //
    1348             :         // We need the nil check here because without it, we have some
    1349             :         // tests which don't set that variable fail. Since there's a
    1350             :         // chance that one of those tests wouldn't want extra compactions
    1351             :         // to be scheduled, I added this check here, instead of
    1352             :         // setting rescheduleReadCompaction in those tests.
    1353           1 :         if env.readCompactionEnv.rescheduleReadCompaction != nil {
    1354           1 :                 *env.readCompactionEnv.rescheduleReadCompaction = true
    1355           1 :         }
    1356             : 
    1357             :         // At the lowest possible compaction-picking priority, look for files marked
    1358             :         // for compaction. Pebble will mark files for compaction if they have atomic
    1359             :         // compaction units that span multiple files. While current Pebble code does
    1360             :         // not construct such sstables, RocksDB and earlier versions of Pebble may
    1361             :         // have created them. These split user keys form sets of files that must be
    1362             :         // compacted together for correctness (referred to as "atomic compaction
    1363             :         // units" within the code). Rewrite them in-place.
    1364             :         //
    1365             :         // It's also possible that a file may have been marked for compaction by
    1366             :         // even earlier versions of Pebble code, since FileMetadata's
    1367             :         // MarkedForCompaction field is persisted in the manifest. That's okay. We
    1368             :         // previously would've ignored the designation, whereas now we'll re-compact
    1369             :         // the file in place.
    1370           1 :         if p.vers.Stats.MarkedForCompaction > 0 {
    1371           1 :                 if pc := p.pickRewriteCompaction(env); pc != nil {
    1372           1 :                         return pc
    1373           1 :                 }
    1374             :         }
    1375             : 
    1376           1 :         return nil
    1377             : }
    1378             : 
    1379             : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
    1380             :         pc *pickedCompaction, candInfo [numLevels]candidateLevelInfo,
    1381           1 : ) {
    1382           1 : 
    1383           1 :         // candInfo is sorted by score, not by compaction level.
    1384           1 :         infoByLevel := [numLevels]candidateLevelInfo{}
    1385           1 :         for i := range candInfo {
    1386           1 :                 level := candInfo[i].level
    1387           1 :                 infoByLevel[level] = candInfo[i]
    1388           1 :         }
    1389             :         // Gather the compaction scores for the levels participating in the compaction.
    1390           1 :         pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
    1391           1 :         inputIdx := 0
    1392           1 :         for i := range infoByLevel {
    1393           1 :                 if pc.inputs[inputIdx].level == infoByLevel[i].level {
    1394           1 :                         pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].compensatedScoreRatio
    1395           1 :                         inputIdx++
    1396           1 :                 }
    1397           1 :                 if inputIdx == len(pc.inputs) {
    1398           1 :                         break
    1399             :                 }
    1400             :         }
    1401             : }
    1402             : 
    1403             : // elisionOnlyAnnotator is a manifest.Annotator that annotates B-Tree
    1404             : // nodes with the *fileMetadata of a file meeting the obsolete keys criteria
    1405             : // for an elision-only compaction within the subtree. If multiple files meet
    1406             : // the criteria, it chooses whichever file has the lowest LargestSeqNum. The
    1407             : // lowest LargestSeqNum file will be the first eligible for an elision-only
    1408             : // compaction once snapshots less than or equal to its LargestSeqNum are closed.
    1409             : var elisionOnlyAnnotator = &manifest.Annotator[fileMetadata]{
    1410             :         Aggregator: manifest.PickFileAggregator{
    1411           1 :                 Filter: func(f *fileMetadata) (eligible bool, cacheOK bool) {
    1412           1 :                         if f.IsCompacting() {
    1413           1 :                                 return false, true
    1414           1 :                         }
    1415           1 :                         if !f.StatsValid() {
    1416           1 :                                 return false, false
    1417           1 :                         }
    1418             :                         // Bottommost files are large and not worthwhile to compact just
    1419             :                         // to remove a few tombstones. Consider a file eligible only if
    1420             :                         // either its own range deletions delete at least 10% of its data or
    1421             :                         // its deletion tombstones make at least 10% of its entries.
    1422             :                         //
    1423             :                         // TODO(jackson): This does not account for duplicate user keys
    1424             :                         // which may be collapsed. Ideally, we would have 'obsolete keys'
    1425             :                         // statistics that would include tombstones, the keys that are
    1426             :                         // dropped by tombstones and duplicated user keys. See #847.
    1427             :                         //
    1428             :                         // Note that tables that contain exclusively range keys (i.e. no point keys,
    1429             :                         // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
    1430             :                         // from elision-only compactions.
    1431             :                         // TODO(travers): Consider an alternative heuristic for elision of range-keys.
    1432           1 :                         return f.Stats.RangeDeletionsBytesEstimate*10 >= f.Size || f.Stats.NumDeletions*10 > f.Stats.NumEntries, true
    1433             :                 },
    1434           1 :                 Compare: func(f1 *fileMetadata, f2 *fileMetadata) bool {
    1435           1 :                         return f1.LargestSeqNum < f2.LargestSeqNum
    1436           1 :                 },
    1437             :         },
    1438             : }
    1439             : 
    1440             : // markedForCompactionAnnotator is a manifest.Annotator that annotates B-Tree
    1441             : // nodes with the *fileMetadata of a file that is marked for compaction
    1442             : // within the subtree. If multiple files meet the criteria, it chooses
    1443             : // whichever file has the lowest LargestSeqNum.
    1444             : var markedForCompactionAnnotator = &manifest.Annotator[fileMetadata]{
    1445             :         Aggregator: manifest.PickFileAggregator{
    1446           1 :                 Filter: func(f *fileMetadata) (eligible bool, cacheOK bool) {
    1447           1 :                         return f.MarkedForCompaction, true
    1448           1 :                 },
    1449           0 :                 Compare: func(f1 *fileMetadata, f2 *fileMetadata) bool {
    1450           0 :                         return f1.LargestSeqNum < f2.LargestSeqNum
    1451           0 :                 },
    1452             :         },
    1453             : }
    1454             : 
    1455             : // pickedCompactionFromCandidateFile creates a pickedCompaction from a *fileMetadata
    1456             : // with various checks to ensure that the file still exists in the expected level
    1457             : // and isn't already being compacted.
    1458             : func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
    1459             :         candidate *fileMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind,
    1460           1 : ) *pickedCompaction {
    1461           1 :         if candidate == nil || candidate.IsCompacting() {
    1462           1 :                 return nil
    1463           1 :         }
    1464             : 
    1465           1 :         var inputs manifest.LevelSlice
    1466           1 :         if startLevel == 0 {
    1467           1 :                 // Overlapping L0 files must also be compacted alongside the candidate.
    1468           1 :                 inputs = p.vers.Overlaps(0, candidate.UserKeyBounds())
    1469           1 :         } else {
    1470           1 :                 inputs = p.vers.Levels[startLevel].Find(p.opts.Comparer.Compare, candidate)
    1471           1 :         }
    1472           1 :         if invariants.Enabled {
    1473           1 :                 found := false
    1474           1 :                 inputs.Each(func(f *fileMetadata) {
    1475           1 :                         if f.FileNum == candidate.FileNum {
    1476           1 :                                 found = true
    1477           1 :                         }
    1478             :                 })
    1479           1 :                 if !found {
    1480           0 :                         panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, startLevel))
    1481             :                 }
    1482             :         }
    1483             : 
    1484           1 :         pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel)
    1485           1 :         pc.kind = kind
    1486           1 :         pc.startLevel.files = inputs
    1487           1 :         pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
    1488           1 : 
    1489           1 :         // Fail-safe to protect against compacting the same sstable concurrently.
    1490           1 :         if inputRangeAlreadyCompacting(env, pc) {
    1491           0 :                 return nil
    1492           0 :         }
    1493             : 
    1494           1 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
    1495           0 :                 return nil
    1496           0 :         }
    1497             : 
    1498           1 :         return pc
    1499             : }
    1500             : 
    1501             : // pickElisionOnlyCompaction looks for compactions of sstables in the
    1502             : // bottommost level containing obsolete records that may now be dropped.
    1503             : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
    1504             :         env compactionEnv,
    1505           1 : ) (pc *pickedCompaction) {
    1506           1 :         if p.opts.private.disableElisionOnlyCompactions {
    1507           1 :                 return nil
    1508           1 :         }
    1509           1 :         candidate := elisionOnlyAnnotator.LevelAnnotation(p.vers.Levels[numLevels-1])
    1510           1 :         if candidate == nil {
    1511           1 :                 return nil
    1512           1 :         }
    1513           1 :         if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
    1514           1 :                 return nil
    1515           1 :         }
    1516           1 :         return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
    1517             : }
    1518             : 
    1519             : // pickRewriteCompaction attempts to construct a compaction that
    1520             : // rewrites a file marked for compaction. pickRewriteCompaction will
    1521             : // pull in adjacent files in the file's atomic compaction unit if
    1522             : // necessary. A rewrite compaction outputs files to the same level as
    1523             : // the input level.
    1524           1 : func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
    1525           1 :         for l := numLevels - 1; l >= 0; l-- {
    1526           1 :                 candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
    1527           1 :                 if candidate == nil {
    1528           1 :                         // Try the next level.
    1529           1 :                         continue
    1530             :                 }
    1531           1 :                 pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite)
    1532           1 :                 if pc != nil {
    1533           1 :                         return pc
    1534           1 :                 }
    1535             :         }
    1536           0 :         return nil
    1537             : }
    1538             : 
    1539             : // pickTombstoneDensityCompaction looks for a compaction that eliminates
    1540             : // regions of extremely high point tombstone density. For each level, it picks
    1541             : // a file where the ratio of tombstone-dense blocks is at least
    1542             : // options.Experimental.MinTombstoneDenseRatio, prioritizing compaction of
    1543             : // files with higher ratios of tombstone-dense blocks.
    1544             : func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
    1545             :         env compactionEnv,
    1546           1 : ) (pc *pickedCompaction) {
    1547           1 :         if p.opts.Experimental.TombstoneDenseCompactionThreshold <= 0 {
    1548           0 :                 // Tombstone density compactions are disabled.
    1549           0 :                 return nil
    1550           0 :         }
    1551             : 
    1552           1 :         var candidate *fileMetadata
    1553           1 :         var level int
    1554           1 :         // If a candidate file has a very high overlapping ratio, point tombstones
    1555           1 :         // in it are likely sparse in keyspace even if the sstable itself is tombstone
    1556           1 :         // dense. These tombstones likely wouldn't be slow to iterate over, so we exclude
    1557           1 :         // these files from tombstone density compactions. The threshold of 40.0 is
    1558           1 :         // chosen somewhat arbitrarily, after some observations around excessively large
    1559           1 :         // tombstone density compactions.
    1560           1 :         const maxOverlappingRatio = 40.0
    1561           1 :         // NB: we don't consider the lowest level because elision-only compactions
    1562           1 :         // handle that case.
    1563           1 :         lastNonEmptyLevel := numLevels - 1
    1564           1 :         for l := numLevels - 2; l >= 0; l-- {
    1565           1 :                 iter := p.vers.Levels[l].Iter()
    1566           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1567           1 :                         if f.IsCompacting() || !f.StatsValid() || f.Size == 0 {
    1568           1 :                                 continue
    1569             :                         }
    1570           1 :                         if f.Stats.TombstoneDenseBlocksRatio < p.opts.Experimental.TombstoneDenseCompactionThreshold {
    1571           1 :                                 continue
    1572             :                         }
    1573           0 :                         overlaps := p.vers.Overlaps(lastNonEmptyLevel, f.UserKeyBounds())
    1574           0 :                         if float64(overlaps.SizeSum())/float64(f.Size) > maxOverlappingRatio {
    1575           0 :                                 continue
    1576             :                         }
    1577           0 :                         if candidate == nil || candidate.Stats.TombstoneDenseBlocksRatio < f.Stats.TombstoneDenseBlocksRatio {
    1578           0 :                                 candidate = f
    1579           0 :                                 level = l
    1580           0 :                         }
    1581             :                 }
    1582             :                 // We prefer lower level (ie. L5) candidates over higher level (ie. L4) ones.
    1583           1 :                 if candidate != nil {
    1584           0 :                         break
    1585             :                 }
    1586           1 :                 if !p.vers.Levels[l].Empty() {
    1587           1 :                         lastNonEmptyLevel = l
    1588           1 :                 }
    1589             :         }
    1590             : 
    1591           1 :         return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
    1592             : }
    1593             : 
    1594             : // pickAutoLPositive picks an automatic compaction for the candidate
    1595             : // file in a positive-numbered level. This function must not be used for
    1596             : // L0.
    1597             : func pickAutoLPositive(
    1598             :         env compactionEnv, opts *Options, vers *version, cInfo candidateLevelInfo, baseLevel int,
    1599           1 : ) (pc *pickedCompaction) {
    1600           1 :         if cInfo.level == 0 {
    1601           0 :                 panic("pebble: pickAutoLPositive called for L0")
    1602             :         }
    1603             : 
    1604           1 :         pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
    1605           1 :         if pc.outputLevel.level != cInfo.outputLevel {
    1606           0 :                 panic("pebble: compaction picked unexpected output level")
    1607             :         }
    1608           1 :         pc.startLevel.files = cInfo.file.Slice()
    1609           1 :         // Files in level 0 may overlap each other, so pick up all overlapping ones.
    1610           1 :         if pc.startLevel.level == 0 {
    1611           0 :                 cmp := opts.Comparer.Compare
    1612           0 :                 smallest, largest := manifest.KeyRange(cmp, pc.startLevel.files.Iter())
    1613           0 :                 pc.startLevel.files = vers.Overlaps(0, base.UserKeyBoundsFromInternal(smallest, largest))
    1614           0 :                 if pc.startLevel.files.Empty() {
    1615           0 :                         panic("pebble: empty compaction")
    1616             :                 }
    1617             :         }
    1618             : 
    1619           1 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1620           0 :                 return nil
    1621           0 :         }
    1622           1 :         return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1623             : }
    1624             : 
    1625             : // maybeAddLevel maybe adds a level to the picked compaction.
    1626           1 : func (pc *pickedCompaction) maybeAddLevel(opts *Options, diskAvailBytes uint64) *pickedCompaction {
    1627           1 :         pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
    1628           1 :         if pc.outputLevel.level == numLevels-1 {
    1629           1 :                 // Don't add a level if the current output level is in L6
    1630           1 :                 return pc
    1631           1 :         }
    1632           1 :         if !opts.Experimental.MultiLevelCompactionHeuristic.allowL0() && pc.startLevel.level == 0 {
    1633           1 :                 return pc
    1634           1 :         }
    1635           1 :         if pc.compactionSize() > expandedCompactionByteSizeLimit(
    1636           1 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes) {
    1637           1 :                 // Don't add a level if the current compaction exceeds the compaction size limit
    1638           1 :                 return pc
    1639           1 :         }
    1640           1 :         return opts.Experimental.MultiLevelCompactionHeuristic.pick(pc, opts, diskAvailBytes)
    1641             : }
    1642             : 
    1643             : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
    1644             : type MultiLevelHeuristic interface {
    1645             :         // Evaluate returns the preferred compaction.
    1646             :         pick(pc *pickedCompaction, opts *Options, diskAvailBytes uint64) *pickedCompaction
    1647             : 
    1648             :         // Returns if the heuristic allows L0 to be involved in ML compaction
    1649             :         allowL0() bool
    1650             : 
    1651             :         // String implements fmt.Stringer.
    1652             :         String() string
    1653             : }
    1654             : 
    1655             : // NoMultiLevel will never add an additional level to the compaction.
    1656             : type NoMultiLevel struct{}
    1657             : 
    1658             : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
    1659             : 
    1660             : func (nml NoMultiLevel) pick(
    1661             :         pc *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1662           1 : ) *pickedCompaction {
    1663           1 :         return pc
    1664           1 : }
    1665             : 
    1666           1 : func (nml NoMultiLevel) allowL0() bool  { return false }
    1667           1 : func (nml NoMultiLevel) String() string { return "none" }
    1668             : 
    1669           1 : func (pc *pickedCompaction) predictedWriteAmp() float64 {
    1670           1 :         var bytesToCompact uint64
    1671           1 :         var higherLevelBytes uint64
    1672           1 :         for i := range pc.inputs {
    1673           1 :                 levelSize := pc.inputs[i].files.SizeSum()
    1674           1 :                 bytesToCompact += levelSize
    1675           1 :                 if i != len(pc.inputs)-1 {
    1676           1 :                         higherLevelBytes += levelSize
    1677           1 :                 }
    1678             :         }
    1679           1 :         return float64(bytesToCompact) / float64(higherLevelBytes)
    1680             : }
    1681             : 
    1682           1 : func (pc *pickedCompaction) overlappingRatio() float64 {
    1683           1 :         var higherLevelBytes uint64
    1684           1 :         var lowestLevelBytes uint64
    1685           1 :         for i := range pc.inputs {
    1686           1 :                 levelSize := pc.inputs[i].files.SizeSum()
    1687           1 :                 if i == len(pc.inputs)-1 {
    1688           1 :                         lowestLevelBytes += levelSize
    1689           1 :                         continue
    1690             :                 }
    1691           1 :                 higherLevelBytes += levelSize
    1692             :         }
    1693           1 :         return float64(lowestLevelBytes) / float64(higherLevelBytes)
    1694             : }
    1695             : 
    1696             : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
    1697             : // an additional level to the picked compaction if it reduces predicted write
    1698             : // amp of the compaction + the addPropensity constant.
    1699             : type WriteAmpHeuristic struct {
    1700             :         // addPropensity is a constant that affects the propensity to conduct multilevel
    1701             :         // compactions. If positive, a multilevel compaction may get picked even if
    1702             :         // the single level compaction has lower write amp, and vice versa.
    1703             :         AddPropensity float64
    1704             : 
    1705             :         // AllowL0 if true, allow l0 to be involved in a ML compaction.
    1706             :         AllowL0 bool
    1707             : }
    1708             : 
    1709             : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
    1710             : 
    1711             : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
    1712             : // picking slows down the compaction picking process.  This should be as fast as
    1713             : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
    1714             : // in-progress flushes and compactions from completing, etc. Consider ways to
    1715             : // deduplicate work, given that setupInputs has already been called.
    1716             : func (wa WriteAmpHeuristic) pick(
    1717             :         pcOrig *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1718           1 : ) *pickedCompaction {
    1719           1 :         pcMulti := pcOrig.clone()
    1720           1 :         if !pcMulti.setupMultiLevelCandidate(opts, diskAvailBytes) {
    1721           1 :                 return pcOrig
    1722           1 :         }
    1723             :         // We consider the addition of a level as an "expansion" of the compaction.
    1724             :         // If pcMulti is past the expanded compaction byte size limit already,
    1725             :         // we don't consider it.
    1726           1 :         if pcMulti.compactionSize() >= expandedCompactionByteSizeLimit(
    1727           1 :                 opts, adjustedOutputLevel(pcMulti.outputLevel.level, pcMulti.baseLevel), diskAvailBytes) {
    1728           1 :                 return pcOrig
    1729           1 :         }
    1730           1 :         picked := pcOrig
    1731           1 :         if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
    1732           1 :                 picked = pcMulti
    1733           1 :         }
    1734             :         // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
    1735           1 :         picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
    1736           1 :         return picked
    1737             : }
    1738             : 
    1739           1 : func (wa WriteAmpHeuristic) allowL0() bool {
    1740           1 :         return wa.AllowL0
    1741           1 : }
    1742             : 
    1743             : // String implements fmt.Stringer.
    1744           1 : func (wa WriteAmpHeuristic) String() string {
    1745           1 :         return fmt.Sprintf("wamp(%.2f, %t)", wa.AddPropensity, wa.AllowL0)
    1746           1 : }
    1747             : 
    1748             : // Helper method to pick compactions originating from L0. Uses information about
    1749             : // sublevels to generate a compaction.
    1750           1 : func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc *pickedCompaction) {
    1751           1 :         // It is important to pass information about Lbase files to L0Sublevels
    1752           1 :         // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
    1753           1 :         // compaction. Without this, we observed reduced concurrency of L0=>Lbase
    1754           1 :         // compactions, and increasing read amplification in L0.
    1755           1 :         //
    1756           1 :         // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
    1757           1 :         // has been shown to not cause a performance regression.
    1758           1 :         lcf, err := vers.L0Sublevels.PickBaseCompaction(1, vers.Levels[baseLevel].Slice())
    1759           1 :         if err != nil {
    1760           1 :                 opts.Logger.Errorf("error when picking base compaction: %s", err)
    1761           1 :                 return
    1762           1 :         }
    1763           1 :         if lcf != nil {
    1764           1 :                 pc = newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
    1765           1 :                 pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel)
    1766           1 :                 if pc.startLevel.files.Empty() {
    1767           0 :                         opts.Logger.Fatalf("empty compaction chosen")
    1768           0 :                 }
    1769           1 :                 return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1770             :         }
    1771             : 
    1772             :         // Couldn't choose a base compaction. Try choosing an intra-L0
    1773             :         // compaction. Note that we pass in L0CompactionThreshold here as opposed to
    1774             :         // 1, since choosing a single sublevel intra-L0 compaction is
    1775             :         // counterproductive.
    1776           1 :         lcf, err = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
    1777           1 :         if err != nil {
    1778           0 :                 opts.Logger.Errorf("error when picking intra-L0 compaction: %s", err)
    1779           0 :                 return
    1780           0 :         }
    1781           1 :         if lcf != nil {
    1782           1 :                 pc = newPickedCompactionFromL0(lcf, opts, vers, 0, false)
    1783           1 :                 if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1784           0 :                         return nil
    1785           0 :                 }
    1786           1 :                 if pc.startLevel.files.Empty() {
    1787           0 :                         opts.Logger.Fatalf("empty compaction chosen")
    1788           0 :                 }
    1789           1 :                 {
    1790           1 :                         iter := pc.startLevel.files.Iter()
    1791           1 :                         if iter.First() == nil || iter.Next() == nil {
    1792           0 :                                 // A single-file intra-L0 compaction is unproductive.
    1793           0 :                                 return nil
    1794           0 :                         }
    1795             :                 }
    1796             : 
    1797           1 :                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
    1798             :         }
    1799           1 :         return pc
    1800             : }
    1801             : 
    1802             : func pickManualCompaction(
    1803             :         vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
    1804           1 : ) (pc *pickedCompaction, retryLater bool) {
    1805           1 :         outputLevel := manual.level + 1
    1806           1 :         if manual.level == 0 {
    1807           1 :                 outputLevel = baseLevel
    1808           1 :         } else if manual.level < baseLevel {
    1809           0 :                 // The start level for a compaction must be >= Lbase. A manual
    1810           0 :                 // compaction could have been created adhering to that condition, and
    1811           0 :                 // then an automatic compaction came in and compacted all of the
    1812           0 :                 // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
    1813           0 :                 // ignore this manual compaction as there is nothing to do (manual.level
    1814           0 :                 // points to an empty level).
    1815           0 :                 return nil, false
    1816           0 :         }
    1817             :         // This conflictsWithInProgress call is necessary for the manual compaction to
    1818             :         // be retried when it conflicts with an ongoing automatic compaction. Without
    1819             :         // it, the compaction is dropped due to pc.setupInputs returning false since
    1820             :         // the input/output range is already being compacted, and the manual
    1821             :         // compaction ends with a non-compacted LSM.
    1822           1 :         if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
    1823           1 :                 return nil, true
    1824           1 :         }
    1825           1 :         pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
    1826           1 :         manual.outputLevel = pc.outputLevel.level
    1827           1 :         pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
    1828           1 :         if pc.startLevel.files.Empty() {
    1829           1 :                 // Nothing to do
    1830           1 :                 return nil, false
    1831           1 :         }
    1832           1 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1833           0 :                 // setupInputs returned false indicating there's a conflicting
    1834           0 :                 // concurrent compaction.
    1835           0 :                 return nil, true
    1836           0 :         }
    1837           1 :         if pc = pc.maybeAddLevel(opts, env.diskAvailBytes); pc == nil {
    1838           0 :                 return nil, false
    1839           0 :         }
    1840           1 :         if pc.outputLevel.level != outputLevel {
    1841           1 :                 if len(pc.extraLevels) > 0 {
    1842           1 :                         // multilevel compactions relax this invariant
    1843           1 :                 } else {
    1844           0 :                         panic("pebble: compaction picked unexpected output level")
    1845             :                 }
    1846             :         }
    1847             :         // Fail-safe to protect against compacting the same sstable concurrently.
    1848           1 :         if inputRangeAlreadyCompacting(env, pc) {
    1849           0 :                 return nil, true
    1850           0 :         }
    1851           1 :         return pc, false
    1852             : }
    1853             : 
    1854             : // pickDownloadCompaction picks a download compaction for the downloadSpan,
    1855             : // which could be specified as being performed either by a copy compaction of
    1856             : // the backing file or a rewrite compaction.
    1857             : func pickDownloadCompaction(
    1858             :         vers *version,
    1859             :         opts *Options,
    1860             :         env compactionEnv,
    1861             :         baseLevel int,
    1862             :         kind compactionKind,
    1863             :         level int,
    1864             :         file *fileMetadata,
    1865           1 : ) (pc *pickedCompaction) {
    1866           1 :         // Check if the file is compacting already.
    1867           1 :         if file.CompactionState == manifest.CompactionStateCompacting {
    1868           0 :                 return nil
    1869           0 :         }
    1870           1 :         if kind != compactionKindCopy && kind != compactionKindRewrite {
    1871           0 :                 panic("invalid download/rewrite compaction kind")
    1872             :         }
    1873           1 :         pc = newPickedCompaction(opts, vers, level, level, baseLevel)
    1874           1 :         pc.kind = kind
    1875           1 :         pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*fileMetadata{file})
    1876           1 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1877           0 :                 // setupInputs returned false indicating there's a conflicting
    1878           0 :                 // concurrent compaction.
    1879           0 :                 return nil
    1880           0 :         }
    1881           1 :         if pc.outputLevel.level != level {
    1882           0 :                 panic("pebble: download compaction picked unexpected output level")
    1883             :         }
    1884             :         // Fail-safe to protect against compacting the same sstable concurrently.
    1885           1 :         if inputRangeAlreadyCompacting(env, pc) {
    1886           0 :                 return nil
    1887           0 :         }
    1888           1 :         return pc
    1889             : }
    1890             : 
    1891             : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
    1892             :         env compactionEnv,
    1893           1 : ) (pc *pickedCompaction) {
    1894           1 :         // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
    1895           1 :         // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
    1896           1 :         // lower priority.
    1897           1 :         if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
    1898           1 :                 return nil
    1899           1 :         }
    1900           1 :         for env.readCompactionEnv.readCompactions.size > 0 {
    1901           1 :                 rc := env.readCompactionEnv.readCompactions.remove()
    1902           1 :                 if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
    1903           1 :                         break
    1904             :                 }
    1905             :         }
    1906           1 :         return pc
    1907             : }
    1908             : 
    1909             : func pickReadTriggeredCompactionHelper(
    1910             :         p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
    1911           1 : ) (pc *pickedCompaction) {
    1912           1 :         overlapSlice := p.vers.Overlaps(rc.level, base.UserKeyBoundsInclusive(rc.start, rc.end))
    1913           1 :         if overlapSlice.Empty() {
    1914           1 :                 // If there is no overlap, then the file with the key range
    1915           1 :                 // must have been compacted away. So, we don't proceed to
    1916           1 :                 // compact the same key range again.
    1917           1 :                 return nil
    1918           1 :         }
    1919             : 
    1920           1 :         iter := overlapSlice.Iter()
    1921           1 :         var fileMatches bool
    1922           1 :         for f := iter.First(); f != nil; f = iter.Next() {
    1923           1 :                 if f.FileNum == rc.fileNum {
    1924           1 :                         fileMatches = true
    1925           1 :                         break
    1926             :                 }
    1927             :         }
    1928           1 :         if !fileMatches {
    1929           1 :                 return nil
    1930           1 :         }
    1931             : 
    1932           1 :         pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
    1933           1 : 
    1934           1 :         pc.startLevel.files = overlapSlice
    1935           1 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
    1936           0 :                 return nil
    1937           0 :         }
    1938           1 :         if inputRangeAlreadyCompacting(env, pc) {
    1939           0 :                 return nil
    1940           0 :         }
    1941           1 :         pc.kind = compactionKindRead
    1942           1 : 
    1943           1 :         // Prevent read compactions which are too wide.
    1944           1 :         outputOverlaps := pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
    1945           1 :         if outputOverlaps.SizeSum() > pc.maxReadCompactionBytes {
    1946           1 :                 return nil
    1947           1 :         }
    1948             : 
    1949             :         // Prevent compactions which start with a small seed file X, but overlap
    1950             :         // with over allowedCompactionWidth * X file sizes in the output layer.
    1951           1 :         const allowedCompactionWidth = 35
    1952           1 :         if outputOverlaps.SizeSum() > overlapSlice.SizeSum()*allowedCompactionWidth {
    1953           0 :                 return nil
    1954           0 :         }
    1955             : 
    1956           1 :         return pc
    1957             : }
    1958             : 
    1959           1 : func (p *compactionPickerByScore) forceBaseLevel1() {
    1960           1 :         p.baseLevel = 1
    1961           1 : }
    1962             : 
    1963           1 : func inputRangeAlreadyCompacting(env compactionEnv, pc *pickedCompaction) bool {
    1964           1 :         for _, cl := range pc.inputs {
    1965           1 :                 iter := cl.files.Iter()
    1966           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1967           1 :                         if f.IsCompacting() {
    1968           0 :                                 return true
    1969           0 :                         }
    1970             :                 }
    1971             :         }
    1972             : 
    1973             :         // Look for active compactions outputting to the same region of the key
    1974             :         // space in the same output level. Two potential compactions may conflict
    1975             :         // without sharing input files if there are no files in the output level
    1976             :         // that overlap with the intersection of the compactions' key spaces.
    1977             :         //
    1978             :         // Consider an active L0->Lbase compaction compacting two L0 files one
    1979             :         // [a-f] and the other [t-z] into Lbase.
    1980             :         //
    1981             :         // L0
    1982             :         //     ↦ 000100  ↤                           ↦  000101   ↤
    1983             :         // L1
    1984             :         //     ↦ 000004  ↤
    1985             :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    1986             :         //
    1987             :         // If a new file 000102 [j-p] is flushed while the existing compaction is
    1988             :         // still ongoing, new file would not be in any compacting sublevel
    1989             :         // intervals and would not overlap with any Lbase files that are also
    1990             :         // compacting. However, this compaction cannot be picked because the
    1991             :         // compaction's output key space [j-p] would overlap the existing
    1992             :         // compaction's output key space [a-z].
    1993             :         //
    1994             :         // L0
    1995             :         //     ↦ 000100* ↤       ↦   000102  ↤       ↦  000101*  ↤
    1996             :         // L1
    1997             :         //     ↦ 000004* ↤
    1998             :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    1999             :         //
    2000             :         // * - currently compacting
    2001           1 :         if pc.outputLevel != nil && pc.outputLevel.level != 0 {
    2002           1 :                 for _, c := range env.inProgressCompactions {
    2003           1 :                         if pc.outputLevel.level != c.outputLevel {
    2004           1 :                                 continue
    2005             :                         }
    2006           1 :                         if base.InternalCompare(pc.cmp, c.largest, pc.smallest) < 0 ||
    2007           1 :                                 base.InternalCompare(pc.cmp, c.smallest, pc.largest) > 0 {
    2008           1 :                                 continue
    2009             :                         }
    2010             : 
    2011             :                         // The picked compaction and the in-progress compaction c are
    2012             :                         // outputting to the same region of the key space of the same
    2013             :                         // level.
    2014           1 :                         return true
    2015             :                 }
    2016             :         }
    2017           1 :         return false
    2018             : }
    2019             : 
    2020             : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
    2021             : func conflictsWithInProgress(
    2022             :         manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
    2023           1 : ) bool {
    2024           1 :         for _, c := range inProgressCompactions {
    2025           1 :                 if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
    2026           1 :                         isUserKeysOverlapping(manual.start, manual.end, c.smallest.UserKey, c.largest.UserKey, cmp) {
    2027           1 :                         return true
    2028           1 :                 }
    2029           1 :                 for _, in := range c.inputs {
    2030           1 :                         if in.files.Empty() {
    2031           1 :                                 continue
    2032             :                         }
    2033           0 :                         iter := in.files.Iter()
    2034           0 :                         smallest := iter.First().Smallest.UserKey
    2035           0 :                         largest := iter.Last().Largest.UserKey
    2036           0 :                         if (in.level == manual.level || in.level == outputLevel) &&
    2037           0 :                                 isUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
    2038           0 :                                 return true
    2039           0 :                         }
    2040             :                 }
    2041             :         }
    2042           1 :         return false
    2043             : }
    2044             : 
    2045           1 : func isUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
    2046           1 :         return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
    2047           1 : }

Generated by: LCOV version 1.14