LCOV - code coverage report
Current view: top level - pebble - compaction_picker.go (source / functions) Coverage Total Hit
Test: 2025-04-02 08:17Z 1d68b012 - tests + meta.lcov Lines: 91.4 % 1202 1099
Test Date: 2025-04-02 08:19:01 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "fmt"
      10              :         "iter"
      11              :         "math"
      12              :         "sort"
      13              :         "strings"
      14              : 
      15              :         "github.com/cockroachdb/errors"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/humanize"
      18              :         "github.com/cockroachdb/pebble/internal/invariants"
      19              :         "github.com/cockroachdb/pebble/internal/manifest"
      20              : )
      21              : 
      22              : // The minimum count for an intra-L0 compaction. This matches the RocksDB
      23              : // heuristic.
      24              : const minIntraL0Count = 4
      25              : 
      26              : type compactionEnv struct {
      27              :         // diskAvailBytes holds a statistic on the number of bytes available on
      28              :         // disk, as reported by the filesystem. It's used to be more restrictive in
      29              :         // expanding compactions if available disk space is limited.
      30              :         //
      31              :         // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
      32              :         // and whenever a compaction or flush completes. Since file removal is the
      33              :         // primary means of reclaiming space, there is a rough bound on the
      34              :         // statistic's staleness when available bytes is growing. Compactions and
      35              :         // flushes are longer, slower operations and provide a much looser bound
      36              :         // when available bytes is decreasing.
      37              :         diskAvailBytes          uint64
      38              :         earliestUnflushedSeqNum base.SeqNum
      39              :         earliestSnapshotSeqNum  base.SeqNum
      40              :         inProgressCompactions   []compactionInfo
      41              :         readCompactionEnv       readCompactionEnv
      42              : }
      43              : 
      44              : type compactionPicker interface {
      45              :         getScores([]compactionInfo) [numLevels]float64
      46              :         getBaseLevel() int
      47              :         estimatedCompactionDebt(l0ExtraSize uint64) uint64
      48              :         pickAuto(env compactionEnv) (pc *pickedCompaction)
      49              :         pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
      50              :         pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
      51              :         pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
      52              :         forceBaseLevel1()
      53              : }
      54              : 
      55              : // readCompactionEnv is used to hold data required to perform read compactions
      56              : type readCompactionEnv struct {
      57              :         rescheduleReadCompaction *bool
      58              :         readCompactions          *readCompactionQueue
      59              :         flushing                 bool
      60              : }
      61              : 
      62              : // Information about in-progress compactions provided to the compaction picker.
      63              : // These are used to constrain the new compactions that will be picked.
      64              : type compactionInfo struct {
      65              :         // versionEditApplied is true if this compaction's version edit has already
      66              :         // been committed. The compaction may still be in-progress deleting newly
      67              :         // obsolete files.
      68              :         versionEditApplied bool
      69              :         inputs             []compactionLevel
      70              :         outputLevel        int
      71              :         smallest           InternalKey
      72              :         largest            InternalKey
      73              : }
      74              : 
      75            1 : func (info compactionInfo) String() string {
      76            1 :         var buf bytes.Buffer
      77            1 :         var largest int
      78            1 :         for i, in := range info.inputs {
      79            1 :                 if i > 0 {
      80            1 :                         fmt.Fprintf(&buf, " -> ")
      81            1 :                 }
      82            1 :                 fmt.Fprintf(&buf, "L%d", in.level)
      83            1 :                 for f := range in.files.All() {
      84            1 :                         fmt.Fprintf(&buf, " %s", f.FileNum)
      85            1 :                 }
      86            1 :                 if largest < in.level {
      87            1 :                         largest = in.level
      88            1 :                 }
      89              :         }
      90            1 :         if largest != info.outputLevel || len(info.inputs) == 1 {
      91            1 :                 fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
      92            1 :         }
      93            1 :         return buf.String()
      94              : }
      95              : 
      96              : type sortCompactionLevelsByPriority []candidateLevelInfo
      97              : 
      98            2 : func (s sortCompactionLevelsByPriority) Len() int {
      99            2 :         return len(s)
     100            2 : }
     101              : 
     102              : // A level should be picked for compaction if the compensatedScoreRatio is >= the
     103              : // compactionScoreThreshold.
     104              : const compactionScoreThreshold = 1
     105              : 
     106              : // Less should return true if s[i] must be placed earlier than s[j] in the final
     107              : // sorted list. The candidateLevelInfo for the level placed earlier is more likely
     108              : // to be picked for a compaction.
     109            2 : func (s sortCompactionLevelsByPriority) Less(i, j int) bool {
     110            2 :         iShouldCompact := s[i].compensatedScoreRatio >= compactionScoreThreshold
     111            2 :         jShouldCompact := s[j].compensatedScoreRatio >= compactionScoreThreshold
     112            2 :         // Ordering is defined as decreasing on (shouldCompact, uncompensatedScoreRatio)
     113            2 :         // where shouldCompact is 1 for true and 0 for false.
     114            2 :         if iShouldCompact && !jShouldCompact {
     115            2 :                 return true
     116            2 :         }
     117            2 :         if !iShouldCompact && jShouldCompact {
     118            2 :                 return false
     119            2 :         }
     120              : 
     121            2 :         if s[i].uncompensatedScoreRatio != s[j].uncompensatedScoreRatio {
     122            2 :                 return s[i].uncompensatedScoreRatio > s[j].uncompensatedScoreRatio
     123            2 :         }
     124            2 :         return s[i].level < s[j].level
     125              : }
     126              : 
     127            2 : func (s sortCompactionLevelsByPriority) Swap(i, j int) {
     128            2 :         s[i], s[j] = s[j], s[i]
     129            2 : }
     130              : 
     131              : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
     132              : // sublevel.
     133              : type sublevelInfo struct {
     134              :         manifest.LevelSlice
     135              :         sublevel manifest.Layer
     136              : }
     137              : 
     138            2 : func (cl sublevelInfo) Clone() sublevelInfo {
     139            2 :         return sublevelInfo{
     140            2 :                 sublevel:   cl.sublevel,
     141            2 :                 LevelSlice: cl.LevelSlice,
     142            2 :         }
     143            2 : }
     144            1 : func (cl sublevelInfo) String() string {
     145            1 :         return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
     146            1 : }
     147              : 
     148              : // generateSublevelInfo will generate the level slices for each of the sublevels
     149              : // from the level slice for all of L0.
     150            2 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
     151            2 :         sublevelMap := make(map[uint64][]*tableMetadata)
     152            2 :         for f := range levelFiles.All() {
     153            2 :                 sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
     154            2 :         }
     155              : 
     156            2 :         var sublevels []int
     157            2 :         for level := range sublevelMap {
     158            2 :                 sublevels = append(sublevels, int(level))
     159            2 :         }
     160            2 :         sort.Ints(sublevels)
     161            2 : 
     162            2 :         var levelSlices []sublevelInfo
     163            2 :         for _, sublevel := range sublevels {
     164            2 :                 metas := sublevelMap[uint64(sublevel)]
     165            2 :                 levelSlices = append(
     166            2 :                         levelSlices,
     167            2 :                         sublevelInfo{
     168            2 :                                 manifest.NewLevelSliceKeySorted(cmp, metas),
     169            2 :                                 manifest.L0Sublevel(sublevel),
     170            2 :                         },
     171            2 :                 )
     172            2 :         }
     173            2 :         return levelSlices
     174              : }
     175              : 
     176              : // compactionPickerMetrics holds metrics related to the compaction picking process
     177              : type compactionPickerMetrics struct {
     178              :         // scores contains the compensatedScoreRatio from the candidateLevelInfo.
     179              :         scores                      []float64
     180              :         singleLevelOverlappingRatio float64
     181              :         multiLevelOverlappingRatio  float64
     182              : }
     183              : 
     184              : // pickedCompaction contains information about a compaction that has already
     185              : // been chosen, and is being constructed. Compaction construction info lives in
     186              : // this struct, and is copied over into the compaction struct when that's
     187              : // created.
     188              : type pickedCompaction struct {
     189              :         cmp Compare
     190              :         // score of the chosen compaction. This is the same as the
     191              :         // compensatedScoreRatio in the candidateLevelInfo.
     192              :         score float64
     193              :         // kind indicates the kind of compaction.
     194              :         kind compactionKind
     195              :         // manualID > 0 iff this is a manual compaction. It exists solely for
     196              :         // internal bookkeeping.
     197              :         manualID uint64
     198              :         // startLevel is the level that is being compacted. Inputs from startLevel
     199              :         // and outputLevel will be merged to produce a set of outputLevel files.
     200              :         startLevel *compactionLevel
     201              :         // outputLevel is the level that files are being produced in. outputLevel is
     202              :         // equal to startLevel+1 except when:
     203              :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     204              :         //    - in multilevel compaction, the output level is the lowest level involved in
     205              :         //      the compaction
     206              :         outputLevel *compactionLevel
     207              :         // extraLevels contain additional levels in between the input and output
     208              :         // levels that get compacted in multi level compactions
     209              :         extraLevels []*compactionLevel
     210              :         inputs      []compactionLevel
     211              :         // LBase at the time of compaction picking.
     212              :         baseLevel int
     213              :         // L0-specific compaction info. Set to a non-nil value for all compactions
     214              :         // where startLevel == 0 that were generated by L0Sublevels.
     215              :         lcf *manifest.L0CompactionFiles
     216              :         // maxOutputFileSize is the maximum size of an individual table created
     217              :         // during compaction.
     218              :         maxOutputFileSize uint64
     219              :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     220              :         // single output table with the tables in the grandparent level.
     221              :         maxOverlapBytes uint64
     222              :         // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
     223              :         // overlap in its output level with. If the overlap is greater than
     224              :         // maxReadCompaction bytes, then we don't proceed with the compaction.
     225              :         maxReadCompactionBytes uint64
     226              : 
     227              :         // The boundaries of the input data.
     228              :         smallest      InternalKey
     229              :         largest       InternalKey
     230              :         version       *version
     231              :         pickerMetrics compactionPickerMetrics
     232              : }
     233              : 
     234            2 : func (pc *pickedCompaction) userKeyBounds() base.UserKeyBounds {
     235            2 :         return base.UserKeyBoundsFromInternal(pc.smallest, pc.largest)
     236            2 : }
     237              : 
     238            2 : func defaultOutputLevel(startLevel, baseLevel int) int {
     239            2 :         outputLevel := startLevel + 1
     240            2 :         if startLevel == 0 {
     241            2 :                 outputLevel = baseLevel
     242            2 :         }
     243            2 :         if outputLevel >= numLevels-1 {
     244            2 :                 outputLevel = numLevels - 1
     245            2 :         }
     246            2 :         return outputLevel
     247              : }
     248              : 
     249              : func newPickedCompaction(
     250              :         opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
     251            2 : ) *pickedCompaction {
     252            2 :         if startLevel > 0 && startLevel < baseLevel {
     253            1 :                 panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
     254            1 :                         startLevel, baseLevel))
     255              :         }
     256              : 
     257            2 :         adjustedLevel := adjustedOutputLevel(outputLevel, baseLevel)
     258            2 :         pc := &pickedCompaction{
     259            2 :                 cmp:                    opts.Comparer.Compare,
     260            2 :                 version:                cur,
     261            2 :                 baseLevel:              baseLevel,
     262            2 :                 inputs:                 []compactionLevel{{level: startLevel}, {level: outputLevel}},
     263            2 :                 maxOutputFileSize:      uint64(opts.Level(adjustedLevel).TargetFileSize),
     264            2 :                 maxOverlapBytes:        maxGrandparentOverlapBytes(opts, adjustedLevel),
     265            2 :                 maxReadCompactionBytes: maxReadCompactionBytes(opts, adjustedLevel),
     266            2 :         }
     267            2 :         pc.startLevel = &pc.inputs[0]
     268            2 :         pc.outputLevel = &pc.inputs[1]
     269            2 :         return pc
     270              : }
     271              : 
     272              : // adjustedOutputLevel is the output level used for the purpose of
     273              : // determining the target output file size, overlap bytes, and expanded
     274              : // bytes, taking into account the base level.
     275            2 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
     276            2 :         adjustedOutputLevel := outputLevel
     277            2 :         if adjustedOutputLevel > 0 {
     278            2 :                 // Output level is in the range [baseLevel, numLevels]. For the purpose of
     279            2 :                 // determining the target output file size, overlap bytes, and expanded
     280            2 :                 // bytes, we want to adjust the range to [1,numLevels].
     281            2 :                 adjustedOutputLevel = 1 + outputLevel - baseLevel
     282            2 :         }
     283            2 :         return adjustedOutputLevel
     284              : }
     285              : 
     286              : func newPickedCompactionFromL0(
     287              :         lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
     288            2 : ) *pickedCompaction {
     289            2 :         outputLevel := baseLevel
     290            2 :         if !isBase {
     291            2 :                 outputLevel = 0 // Intra L0
     292            2 :         }
     293              : 
     294            2 :         pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
     295            2 :         pc.lcf = lcf
     296            2 :         pc.outputLevel.level = outputLevel
     297            2 : 
     298            2 :         // Manually build the compaction as opposed to calling
     299            2 :         // pickAutoHelper. This is because L0Sublevels has already added
     300            2 :         // any overlapping L0 SSTables that need to be added, and
     301            2 :         // because compactions built by L0SSTables do not necessarily
     302            2 :         // pick contiguous sequences of files in pc.version.Levels[0].
     303            2 :         files := make([]*manifest.TableMetadata, 0, len(lcf.Files))
     304            2 :         for f := range vers.Levels[0].All() {
     305            2 :                 if lcf.FilesIncluded[f.L0Index] {
     306            2 :                         files = append(files, f)
     307            2 :                 }
     308              :         }
     309            2 :         pc.startLevel.files = manifest.NewLevelSliceSeqSorted(files)
     310            2 :         return pc
     311              : }
     312              : 
     313            1 : func (pc *pickedCompaction) String() string {
     314            1 :         var builder strings.Builder
     315            1 :         builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
     316            1 :         builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
     317            1 :         builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
     318            1 :         builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
     319            1 :         builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
     320            1 :         builder.WriteString(fmt.Sprintf(`smallest=%s, `, pc.smallest))
     321            1 :         builder.WriteString(fmt.Sprintf(`largest=%s, `, pc.largest))
     322            1 :         builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
     323            1 :         builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
     324            1 :         builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
     325            1 :         builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
     326            1 :         builder.WriteString(fmt.Sprintf(`extraLevels=%s, `, pc.extraLevels))
     327            1 :         builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
     328            1 :         builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
     329            1 :         return builder.String()
     330            1 : }
     331              : 
     332              : // Clone creates a deep copy of the pickedCompaction
     333            2 : func (pc *pickedCompaction) clone() *pickedCompaction {
     334            2 : 
     335            2 :         // Quickly copy over fields that do not require special deep copy care, and
     336            2 :         // set all fields that will require a deep copy to nil.
     337            2 :         newPC := &pickedCompaction{
     338            2 :                 cmp:                    pc.cmp,
     339            2 :                 score:                  pc.score,
     340            2 :                 kind:                   pc.kind,
     341            2 :                 baseLevel:              pc.baseLevel,
     342            2 :                 maxOutputFileSize:      pc.maxOutputFileSize,
     343            2 :                 maxOverlapBytes:        pc.maxOverlapBytes,
     344            2 :                 maxReadCompactionBytes: pc.maxReadCompactionBytes,
     345            2 :                 smallest:               pc.smallest.Clone(),
     346            2 :                 largest:                pc.largest.Clone(),
     347            2 : 
     348            2 :                 // TODO(msbutler): properly clone picker metrics
     349            2 :                 pickerMetrics: pc.pickerMetrics,
     350            2 : 
     351            2 :                 // Both copies see the same manifest, therefore, it's ok for them to se
     352            2 :                 // share the same pc. version.
     353            2 :                 version: pc.version,
     354            2 :         }
     355            2 : 
     356            2 :         newPC.inputs = make([]compactionLevel, len(pc.inputs))
     357            2 :         newPC.extraLevels = make([]*compactionLevel, 0, len(pc.extraLevels))
     358            2 :         for i := range pc.inputs {
     359            2 :                 newPC.inputs[i] = pc.inputs[i].Clone()
     360            2 :                 if i == 0 {
     361            2 :                         newPC.startLevel = &newPC.inputs[i]
     362            2 :                 } else if i == len(pc.inputs)-1 {
     363            2 :                         newPC.outputLevel = &newPC.inputs[i]
     364            2 :                 } else {
     365            1 :                         newPC.extraLevels = append(newPC.extraLevels, &newPC.inputs[i])
     366            1 :                 }
     367              :         }
     368              : 
     369            2 :         if len(pc.startLevel.l0SublevelInfo) > 0 {
     370            2 :                 newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
     371            2 :                 for i := range pc.startLevel.l0SublevelInfo {
     372            2 :                         newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
     373            2 :                 }
     374              :         }
     375            2 :         if pc.lcf != nil {
     376            2 :                 newPC.lcf = pc.lcf.Clone()
     377            2 :         }
     378            2 :         return newPC
     379              : }
     380              : 
     381              : // maybeExpandBounds is a helper function for setupInputs which ensures the
     382              : // pickedCompaction's smallest and largest internal keys are updated iff
     383              : // the candidate keys expand the key span. This avoids a bug for multi-level
     384              : // compactions: during the second call to setupInputs, the picked compaction's
     385              : // smallest and largest keys should not decrease the key span.
     386            2 : func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest InternalKey) {
     387            2 :         if len(smallest.UserKey) == 0 && len(largest.UserKey) == 0 {
     388            2 :                 return
     389            2 :         }
     390            2 :         if len(pc.smallest.UserKey) == 0 && len(pc.largest.UserKey) == 0 {
     391            2 :                 pc.smallest = smallest
     392            2 :                 pc.largest = largest
     393            2 :                 return
     394            2 :         }
     395            2 :         if base.InternalCompare(pc.cmp, pc.smallest, smallest) >= 0 {
     396            2 :                 pc.smallest = smallest
     397            2 :         }
     398            2 :         if base.InternalCompare(pc.cmp, pc.largest, largest) <= 0 {
     399            2 :                 pc.largest = largest
     400            2 :         }
     401              : }
     402              : 
     403              : // setupInputs returns true if a compaction has been set up. It returns false if
     404              : // a concurrent compaction is occurring on the start or output level files.
     405              : func (pc *pickedCompaction) setupInputs(
     406              :         opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
     407            2 : ) bool {
     408            2 :         // maxExpandedBytes is the maximum size of an expanded compaction. If
     409            2 :         // growing a compaction results in a larger size, the original compaction
     410            2 :         // is used instead.
     411            2 :         maxExpandedBytes := expandedCompactionByteSizeLimit(
     412            2 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
     413            2 :         )
     414            2 : 
     415            2 :         if anyTablesCompacting(startLevel.files) {
     416            2 :                 return false
     417            2 :         }
     418              : 
     419            2 :         pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.All()))
     420            2 : 
     421            2 :         // Determine the sstables in the output level which overlap with the input
     422            2 :         // sstables. No need to do this for intra-L0 compactions; outputLevel.files is
     423            2 :         // left empty for those.
     424            2 :         if startLevel.level != pc.outputLevel.level {
     425            2 :                 pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
     426            2 :                 if anyTablesCompacting(pc.outputLevel.files) {
     427            2 :                         return false
     428            2 :                 }
     429              : 
     430            2 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     431            2 :                         startLevel.files.All(), pc.outputLevel.files.All()))
     432              :         }
     433              : 
     434              :         // Grow the sstables in startLevel.level as long as it doesn't affect the number
     435              :         // of sstables included from pc.outputLevel.level.
     436            2 :         if pc.lcf != nil && startLevel.level == 0 && pc.outputLevel.level != 0 {
     437            2 :                 // Call the L0-specific compaction extension method. Similar logic as
     438            2 :                 // pc.grow. Additional L0 files are optionally added to the compaction at
     439            2 :                 // this step. Note that the bounds passed in are not the bounds of the
     440            2 :                 // compaction, but rather the smallest and largest internal keys that
     441            2 :                 // the compaction cannot include from L0 without pulling in more Lbase
     442            2 :                 // files. Consider this example:
     443            2 :                 //
     444            2 :                 // L0:        c-d e+f g-h
     445            2 :                 // Lbase: a-b     e+f     i-j
     446            2 :                 //        a b c d e f g h i j
     447            2 :                 //
     448            2 :                 // The e-f files have already been chosen in the compaction. As pulling
     449            2 :                 // in more LBase files is undesirable, the logic below will pass in
     450            2 :                 // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
     451            2 :                 // will expand the compaction to include c-d and g-h from L0. The
     452            2 :                 // bounds passed in are exclusive; the compaction cannot be expanded
     453            2 :                 // to include files that "touch" it.
     454            2 :                 smallestBaseKey := base.InvalidInternalKey
     455            2 :                 largestBaseKey := base.InvalidInternalKey
     456            2 :                 if pc.outputLevel.files.Empty() {
     457            2 :                         baseIter := pc.version.Levels[pc.outputLevel.level].Iter()
     458            2 :                         if sm := baseIter.SeekLT(pc.cmp, pc.smallest.UserKey); sm != nil {
     459            2 :                                 smallestBaseKey = sm.Largest
     460            2 :                         }
     461            2 :                         if la := baseIter.SeekGE(pc.cmp, pc.largest.UserKey); la != nil {
     462            2 :                                 largestBaseKey = la.Smallest
     463            2 :                         }
     464            2 :                 } else {
     465            2 :                         // NB: We use Reslice to access the underlying level's files, but
     466            2 :                         // we discard the returned slice. The pc.outputLevel.files slice
     467            2 :                         // is not modified.
     468            2 :                         _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
     469            2 :                                 if sm := start.Prev(); sm != nil {
     470            2 :                                         smallestBaseKey = sm.Largest
     471            2 :                                 }
     472            2 :                                 if la := end.Next(); la != nil {
     473            2 :                                         largestBaseKey = la.Smallest
     474            2 :                                 }
     475              :                         })
     476              :                 }
     477            2 :                 oldLcf := pc.lcf.Clone()
     478            2 :                 if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
     479            2 :                         var newStartLevelFiles []*tableMetadata
     480            2 :                         iter := pc.version.Levels[0].Iter()
     481            2 :                         var sizeSum uint64
     482            2 :                         for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
     483            2 :                                 if pc.lcf.FilesIncluded[f.L0Index] {
     484            2 :                                         newStartLevelFiles = append(newStartLevelFiles, f)
     485            2 :                                         sizeSum += f.Size
     486            2 :                                 }
     487              :                         }
     488            2 :                         if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
     489            2 :                                 startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
     490            2 :                                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
     491            2 :                                         startLevel.files.All(), pc.outputLevel.files.All())
     492            2 :                         } else {
     493            1 :                                 *pc.lcf = *oldLcf
     494            1 :                         }
     495              :                 }
     496            2 :         } else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
     497            2 :                 pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
     498            2 :                         startLevel.files.All(), pc.outputLevel.files.All()))
     499            2 :         }
     500              : 
     501            2 :         if pc.startLevel.level == 0 {
     502            2 :                 // We don't change the input files for the compaction beyond this point.
     503            2 :                 pc.startLevel.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
     504            2 :         }
     505              : 
     506            2 :         return true
     507              : }
     508              : 
     509              : // grow grows the number of inputs at c.level without changing the number of
     510              : // c.level+1 files in the compaction, and returns whether the inputs grew. sm
     511              : // and la are the smallest and largest InternalKeys in all of the inputs.
     512              : func (pc *pickedCompaction) grow(
     513              :         sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
     514            2 : ) bool {
     515            2 :         if pc.outputLevel.files.Empty() {
     516            2 :                 return false
     517            2 :         }
     518            2 :         grow0 := pc.version.Overlaps(startLevel.level, base.UserKeyBoundsFromInternal(sm, la))
     519            2 :         if anyTablesCompacting(grow0) {
     520            1 :                 return false
     521            1 :         }
     522            2 :         if grow0.Len() <= startLevel.files.Len() {
     523            2 :                 return false
     524            2 :         }
     525            2 :         if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
     526            2 :                 return false
     527            2 :         }
     528              :         // We need to include the outputLevel iter because without it, in a multiLevel scenario,
     529              :         // sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
     530            2 :         sm1, la1 := manifest.KeyRange(pc.cmp, grow0.All(), pc.outputLevel.files.All())
     531            2 :         grow1 := pc.version.Overlaps(pc.outputLevel.level, base.UserKeyBoundsFromInternal(sm1, la1))
     532            2 :         if anyTablesCompacting(grow1) {
     533            1 :                 return false
     534            1 :         }
     535            2 :         if grow1.Len() != pc.outputLevel.files.Len() {
     536            2 :                 return false
     537            2 :         }
     538            2 :         startLevel.files = grow0
     539            2 :         pc.outputLevel.files = grow1
     540            2 :         return true
     541              : }
     542              : 
     543            2 : func (pc *pickedCompaction) compactionSize() uint64 {
     544            2 :         var bytesToCompact uint64
     545            2 :         for i := range pc.inputs {
     546            2 :                 bytesToCompact += pc.inputs[i].files.SizeSum()
     547            2 :         }
     548            2 :         return bytesToCompact
     549              : }
     550              : 
     551              : // setupMultiLevelCandidated returns true if it successfully added another level
     552              : // to the compaction.
     553            2 : func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailBytes uint64) bool {
     554            2 :         pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
     555            2 : 
     556            2 :         // Recalibrate startLevel and outputLevel:
     557            2 :         //  - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
     558            2 :         //  - push outputLevel to extraLevels and move the new level to outputLevel
     559            2 :         pc.startLevel = &pc.inputs[0]
     560            2 :         pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
     561            2 :         pc.outputLevel = &pc.inputs[2]
     562            2 :         return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
     563            2 : }
     564              : 
     565              : // anyTablesCompacting returns true if any tables in the level slice are
     566              : // compacting.
     567            2 : func anyTablesCompacting(inputs manifest.LevelSlice) bool {
     568            2 :         for f := range inputs.All() {
     569            2 :                 if f.IsCompacting() {
     570            2 :                         return true
     571            2 :                 }
     572              :         }
     573            2 :         return false
     574              : }
     575              : 
     576              : // newCompactionPickerByScore creates a compactionPickerByScore associated with
     577              : // the newest version. The picker is used under logLock (until a new version is
     578              : // installed).
     579              : func newCompactionPickerByScore(
     580              :         v *version,
     581              :         virtualBackings *manifest.VirtualBackings,
     582              :         opts *Options,
     583              :         inProgressCompactions []compactionInfo,
     584            2 : ) *compactionPickerByScore {
     585            2 :         p := &compactionPickerByScore{
     586            2 :                 opts:            opts,
     587            2 :                 vers:            v,
     588            2 :                 virtualBackings: virtualBackings,
     589            2 :         }
     590            2 :         p.initLevelMaxBytes(inProgressCompactions)
     591            2 :         return p
     592            2 : }
     593              : 
     594              : // Information about a candidate compaction level that has been identified by
     595              : // the compaction picker.
     596              : type candidateLevelInfo struct {
     597              :         // The compensatedScore of the level after adjusting according to the other
     598              :         // levels' sizes. For L0, the compensatedScoreRatio is equivalent to the
     599              :         // uncompensatedScoreRatio as we don't account for level size compensation in
     600              :         // L0.
     601              :         compensatedScoreRatio float64
     602              :         // The score of the level after accounting for level size compensation before
     603              :         // adjusting according to other levels' sizes. For L0, the compensatedScore
     604              :         // is equivalent to the uncompensatedScore as we don't account for level
     605              :         // size compensation in L0.
     606              :         compensatedScore float64
     607              :         // The score of the level to be compacted, calculated using uncompensated file
     608              :         // sizes and without any adjustments.
     609              :         uncompensatedScore float64
     610              :         // uncompensatedScoreRatio is the uncompensatedScore adjusted according to
     611              :         // the other levels' sizes.
     612              :         uncompensatedScoreRatio float64
     613              :         level                   int
     614              :         // The level to compact to.
     615              :         outputLevel int
     616              :         // The file in level that will be compacted. Additional files may be
     617              :         // picked by the compaction, and a pickedCompaction created for the
     618              :         // compaction.
     619              :         file manifest.LevelFile
     620              : }
     621              : 
     622            2 : func (c *candidateLevelInfo) shouldCompact() bool {
     623            2 :         return c.compensatedScoreRatio >= compactionScoreThreshold
     624            2 : }
     625              : 
     626            2 : func fileCompensation(f *tableMetadata) uint64 {
     627            2 :         return uint64(f.Stats.PointDeletionsBytesEstimate) + f.Stats.RangeDeletionsBytesEstimate
     628            2 : }
     629              : 
     630              : // compensatedSize returns f's file size, inflated according to compaction
     631              : // priorities.
     632            2 : func compensatedSize(f *tableMetadata) uint64 {
     633            2 :         // Add in the estimate of disk space that may be reclaimed by compacting the
     634            2 :         // file's tombstones.
     635            2 :         return f.Size + fileCompensation(f)
     636            2 : }
     637              : 
     638              : // compensatedSizeAnnotator is a manifest.Annotator that annotates B-Tree
     639              : // nodes with the sum of the files' compensated sizes. Compensated sizes may
     640              : // change once a table's stats are loaded asynchronously, so its values are
     641              : // marked as cacheable only if a file's stats have been loaded.
     642            2 : var compensatedSizeAnnotator = manifest.SumAnnotator(func(f *tableMetadata) (uint64, bool) {
     643            2 :         return compensatedSize(f), f.StatsValid()
     644            2 : })
     645              : 
     646              : // totalCompensatedSize computes the compensated size over a table metadata
     647              : // iterator. Note that this function is linear in the files available to the
     648              : // iterator. Use the compensatedSizeAnnotator if querying the total
     649              : // compensated size of a level.
     650            2 : func totalCompensatedSize(iter iter.Seq[*manifest.TableMetadata]) uint64 {
     651            2 :         var sz uint64
     652            2 :         for f := range iter {
     653            2 :                 sz += compensatedSize(f)
     654            2 :         }
     655            2 :         return sz
     656              : }
     657              : 
     658              : // compactionPickerByScore holds the state and logic for picking a compaction. A
     659              : // compaction picker is associated with a single version. A new compaction
     660              : // picker is created and initialized every time a new version is installed.
     661              : type compactionPickerByScore struct {
     662              :         opts            *Options
     663              :         vers            *version
     664              :         virtualBackings *manifest.VirtualBackings
     665              :         // The level to target for L0 compactions. Levels L1 to baseLevel must be
     666              :         // empty.
     667              :         baseLevel int
     668              :         // levelMaxBytes holds the dynamically adjusted max bytes setting for each
     669              :         // level.
     670              :         levelMaxBytes [numLevels]int64
     671              : }
     672              : 
     673              : var _ compactionPicker = &compactionPickerByScore{}
     674              : 
     675            2 : func (p *compactionPickerByScore) getScores(inProgress []compactionInfo) [numLevels]float64 {
     676            2 :         var scores [numLevels]float64
     677            2 :         for _, info := range p.calculateLevelScores(inProgress) {
     678            2 :                 scores[info.level] = info.compensatedScoreRatio
     679            2 :         }
     680            2 :         return scores
     681              : }
     682              : 
     683            2 : func (p *compactionPickerByScore) getBaseLevel() int {
     684            2 :         if p == nil {
     685            0 :                 return 1
     686            0 :         }
     687            2 :         return p.baseLevel
     688              : }
     689              : 
     690              : // estimatedCompactionDebt estimates the number of bytes which need to be
     691              : // compacted before the LSM tree becomes stable.
     692            2 : func (p *compactionPickerByScore) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
     693            2 :         if p == nil {
     694            0 :                 return 0
     695            0 :         }
     696              : 
     697              :         // We assume that all the bytes in L0 need to be compacted to Lbase. This is
     698              :         // unlike the RocksDB logic that figures out whether L0 needs compaction.
     699            2 :         bytesAddedToNextLevel := l0ExtraSize + p.vers.Levels[0].Size()
     700            2 :         lbaseSize := p.vers.Levels[p.baseLevel].Size()
     701            2 : 
     702            2 :         var compactionDebt uint64
     703            2 :         if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
     704            2 :                 // We only incur compaction debt if both L0 and Lbase contain data. If L0
     705            2 :                 // is empty, no compaction is necessary. If Lbase is empty, a move-based
     706            2 :                 // compaction from L0 would occur.
     707            2 :                 compactionDebt += bytesAddedToNextLevel + lbaseSize
     708            2 :         }
     709              : 
     710              :         // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
     711              :         // bytes added to `level` in the loop.
     712            2 :         for level := p.baseLevel; level < numLevels-1; level++ {
     713            2 :                 levelSize := p.vers.Levels[level].Size() + bytesAddedToNextLevel
     714            2 :                 nextLevelSize := p.vers.Levels[level+1].Size()
     715            2 :                 if levelSize > uint64(p.levelMaxBytes[level]) {
     716            2 :                         bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
     717            2 :                         if nextLevelSize > 0 {
     718            2 :                                 // We only incur compaction debt if the next level contains data. If the
     719            2 :                                 // next level is empty, a move-based compaction would be used.
     720            2 :                                 levelRatio := float64(nextLevelSize) / float64(levelSize)
     721            2 :                                 // The current level contributes bytesAddedToNextLevel to compactions.
     722            2 :                                 // The next level contributes levelRatio * bytesAddedToNextLevel.
     723            2 :                                 compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
     724            2 :                         }
     725            2 :                 } else {
     726            2 :                         // We're not moving any bytes to the next level.
     727            2 :                         bytesAddedToNextLevel = 0
     728            2 :                 }
     729              :         }
     730            2 :         return compactionDebt
     731              : }
     732              : 
     733            2 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
     734            2 :         // The levelMaxBytes calculations here differ from RocksDB in two ways:
     735            2 :         //
     736            2 :         // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
     737            2 :         //    level in L1-L6, rather than determining the size of the bottom level
     738            2 :         //    based on the total amount of data in the dB. The RocksDB calculation is
     739            2 :         //    problematic if L0 contains a significant fraction of data, or if the
     740            2 :         //    level sizes are roughly equal and thus there is a significant fraction
     741            2 :         //    of data outside of the largest level.
     742            2 :         //
     743            2 :         // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
     744            2 :         //    baseBytesMax as the maximum of the configured LBaseMaxBytes and the
     745            2 :         //    size of L0. This is problematic because baseBytesMax is used to compute
     746            2 :         //    the max size of lower levels. A very large baseBytesMax will result in
     747            2 :         //    an overly large value for the size of lower levels which will caused
     748            2 :         //    those levels not to be compacted even when they should be
     749            2 :         //    compacted. This often results in "inverted" LSM shapes where Ln is
     750            2 :         //    larger than Ln+1.
     751            2 : 
     752            2 :         // Determine the first non-empty level and the total DB size.
     753            2 :         firstNonEmptyLevel := -1
     754            2 :         var dbSize uint64
     755            2 :         for level := 1; level < numLevels; level++ {
     756            2 :                 if p.vers.Levels[level].Size() > 0 {
     757            2 :                         if firstNonEmptyLevel == -1 {
     758            2 :                                 firstNonEmptyLevel = level
     759            2 :                         }
     760            2 :                         dbSize += p.vers.Levels[level].Size()
     761              :                 }
     762              :         }
     763            2 :         for _, c := range inProgressCompactions {
     764            2 :                 if c.outputLevel == 0 || c.outputLevel == -1 {
     765            2 :                         continue
     766              :                 }
     767            2 :                 if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
     768            2 :                         firstNonEmptyLevel = c.outputLevel
     769            2 :                 }
     770              :         }
     771              : 
     772              :         // Initialize the max-bytes setting for each level to "infinity" which will
     773              :         // disallow compaction for that level. We'll fill in the actual value below
     774              :         // for levels we want to allow compactions from.
     775            2 :         for level := 0; level < numLevels; level++ {
     776            2 :                 p.levelMaxBytes[level] = math.MaxInt64
     777            2 :         }
     778              : 
     779            2 :         if dbSize == 0 {
     780            2 :                 // No levels for L1 and up contain any data. Target L0 compactions for the
     781            2 :                 // last level or to the level to which there is an ongoing L0 compaction.
     782            2 :                 p.baseLevel = numLevels - 1
     783            2 :                 if firstNonEmptyLevel >= 0 {
     784            2 :                         p.baseLevel = firstNonEmptyLevel
     785            2 :                 }
     786            2 :                 return
     787              :         }
     788              : 
     789            2 :         dbSize += p.vers.Levels[0].Size()
     790            2 :         bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
     791            2 : 
     792            2 :         curLevelSize := bottomLevelSize
     793            2 :         for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
     794            2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     795            2 :         }
     796              : 
     797              :         // Compute base level (where L0 data is compacted to).
     798            2 :         baseBytesMax := uint64(p.opts.LBaseMaxBytes)
     799            2 :         p.baseLevel = firstNonEmptyLevel
     800            2 :         for p.baseLevel > 1 && curLevelSize > baseBytesMax {
     801            2 :                 p.baseLevel--
     802            2 :                 curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
     803            2 :         }
     804              : 
     805            2 :         smoothedLevelMultiplier := 1.0
     806            2 :         if p.baseLevel < numLevels-1 {
     807            2 :                 smoothedLevelMultiplier = math.Pow(
     808            2 :                         float64(bottomLevelSize)/float64(baseBytesMax),
     809            2 :                         1.0/float64(numLevels-p.baseLevel-1))
     810            2 :         }
     811              : 
     812            2 :         levelSize := float64(baseBytesMax)
     813            2 :         for level := p.baseLevel; level < numLevels; level++ {
     814            2 :                 if level > p.baseLevel && levelSize > 0 {
     815            2 :                         levelSize *= smoothedLevelMultiplier
     816            2 :                 }
     817              :                 // Round the result since test cases use small target level sizes, which
     818              :                 // can be impacted by floating-point imprecision + integer truncation.
     819            2 :                 roundedLevelSize := math.Round(levelSize)
     820            2 :                 if roundedLevelSize > float64(math.MaxInt64) {
     821            0 :                         p.levelMaxBytes[level] = math.MaxInt64
     822            2 :                 } else {
     823            2 :                         p.levelMaxBytes[level] = int64(roundedLevelSize)
     824            2 :                 }
     825              :         }
     826              : }
     827              : 
     828              : type levelSizeAdjust struct {
     829              :         incomingActualBytes      uint64
     830              :         outgoingActualBytes      uint64
     831              :         outgoingCompensatedBytes uint64
     832              : }
     833              : 
     834            2 : func (a levelSizeAdjust) compensated() uint64 {
     835            2 :         return a.incomingActualBytes - a.outgoingCompensatedBytes
     836            2 : }
     837              : 
     838            2 : func (a levelSizeAdjust) actual() uint64 {
     839            2 :         return a.incomingActualBytes - a.outgoingActualBytes
     840            2 : }
     841              : 
     842            2 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
     843            2 :         // Compute size adjustments for each level based on the in-progress
     844            2 :         // compactions. We sum the file sizes of all files leaving and entering each
     845            2 :         // level in in-progress compactions. For outgoing files, we also sum a
     846            2 :         // separate sum of 'compensated file sizes', which are inflated according
     847            2 :         // to deletion estimates.
     848            2 :         //
     849            2 :         // When we adjust a level's size according to these values during score
     850            2 :         // calculation, we subtract the compensated size of start level inputs to
     851            2 :         // account for the fact that score calculation uses compensated sizes.
     852            2 :         //
     853            2 :         // Since compensated file sizes may be compensated because they reclaim
     854            2 :         // space from the output level's files, we only add the real file size to
     855            2 :         // the output level.
     856            2 :         //
     857            2 :         // This is slightly different from RocksDB's behavior, which simply elides
     858            2 :         // compacting files from the level size calculation.
     859            2 :         var sizeAdjust [numLevels]levelSizeAdjust
     860            2 :         for i := range inProgressCompactions {
     861            2 :                 c := &inProgressCompactions[i]
     862            2 :                 // If this compaction's version edit has already been applied, there's
     863            2 :                 // no need to adjust: The LSM we'll examine will already reflect the
     864            2 :                 // new LSM state.
     865            2 :                 if c.versionEditApplied {
     866            2 :                         continue
     867              :                 }
     868              : 
     869            2 :                 for _, input := range c.inputs {
     870            2 :                         actualSize := input.files.SizeSum()
     871            2 :                         compensatedSize := totalCompensatedSize(input.files.All())
     872            2 : 
     873            2 :                         if input.level != c.outputLevel {
     874            2 :                                 sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
     875            2 :                                 sizeAdjust[input.level].outgoingActualBytes += actualSize
     876            2 :                                 if c.outputLevel != -1 {
     877            2 :                                         sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
     878            2 :                                 }
     879              :                         }
     880              :                 }
     881              :         }
     882            2 :         return sizeAdjust
     883              : }
     884              : 
     885              : func (p *compactionPickerByScore) calculateLevelScores(
     886              :         inProgressCompactions []compactionInfo,
     887            2 : ) [numLevels]candidateLevelInfo {
     888            2 :         var scores [numLevels]candidateLevelInfo
     889            2 :         for i := range scores {
     890            2 :                 scores[i].level = i
     891            2 :                 scores[i].outputLevel = i + 1
     892            2 :         }
     893            2 :         l0UncompensatedScore := calculateL0UncompensatedScore(p.vers, p.opts, inProgressCompactions)
     894            2 :         scores[0] = candidateLevelInfo{
     895            2 :                 outputLevel:        p.baseLevel,
     896            2 :                 uncompensatedScore: l0UncompensatedScore,
     897            2 :                 compensatedScore:   l0UncompensatedScore, /* No level size compensation for L0 */
     898            2 :         }
     899            2 :         sizeAdjust := calculateSizeAdjust(inProgressCompactions)
     900            2 :         for level := 1; level < numLevels; level++ {
     901            2 :                 compensatedLevelSize := *compensatedSizeAnnotator.LevelAnnotation(p.vers.Levels[level]) + sizeAdjust[level].compensated()
     902            2 :                 scores[level].compensatedScore = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
     903            2 :                 scores[level].uncompensatedScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
     904            2 :         }
     905              : 
     906              :         // Adjust each level's {compensated, uncompensated}Score by the uncompensatedScore
     907              :         // of the next level to get a {compensated, uncompensated}ScoreRatio. If the
     908              :         // next level has a high uncompensatedScore, and is thus a priority for compaction,
     909              :         // this reduces the priority for compacting the current level. If the next level
     910              :         // has a low uncompensatedScore (i.e. it is below its target size), this increases
     911              :         // the priority for compacting the current level.
     912              :         //
     913              :         // The effect of this adjustment is to help prioritize compactions in lower
     914              :         // levels. The following example shows the compensatedScoreRatio and the
     915              :         // compensatedScore. In this scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase)
     916              :         // is significantly above its target size. The original score prioritizes
     917              :         // compactions from those two levels, but doing so ends up causing a future
     918              :         // problem: data piles up in the higher levels, starving L5->L6 compactions,
     919              :         // and to a lesser degree starving L4->L5 compactions.
     920              :         //
     921              :         // Note that in the example shown there is no level size compensation so the
     922              :         // compensatedScore and the uncompensatedScore is the same for each level.
     923              :         //
     924              :         //        compensatedScoreRatio   compensatedScore   uncompensatedScore   size   max-size
     925              :         //   L0                     3.2               68.0                 68.0  2.2 G          -
     926              :         //   L3                     3.2               21.1                 21.1  1.3 G       64 M
     927              :         //   L4                     3.4                6.7                  6.7  3.1 G      467 M
     928              :         //   L5                     3.4                2.0                  2.0  6.6 G      3.3 G
     929              :         //   L6                     0.6                0.6                  0.6   14 G       24 G
     930            2 :         var prevLevel int
     931            2 :         for level := p.baseLevel; level < numLevels; level++ {
     932            2 :                 // The compensated scores, and uncompensated scores will be turned into
     933            2 :                 // ratios as they're adjusted according to other levels' sizes.
     934            2 :                 scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
     935            2 :                 scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
     936            2 : 
     937            2 :                 // Avoid absurdly large scores by placing a floor on the score that we'll
     938            2 :                 // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
     939            2 :                 const minScore = 0.01
     940            2 :                 if scores[prevLevel].compensatedScoreRatio >= compactionScoreThreshold {
     941            2 :                         if scores[level].uncompensatedScore >= minScore {
     942            2 :                                 scores[prevLevel].compensatedScoreRatio /= scores[level].uncompensatedScore
     943            2 :                         } else {
     944            2 :                                 scores[prevLevel].compensatedScoreRatio /= minScore
     945            2 :                         }
     946              :                 }
     947            2 :                 if scores[prevLevel].uncompensatedScoreRatio >= compactionScoreThreshold {
     948            2 :                         if scores[level].uncompensatedScore >= minScore {
     949            2 :                                 scores[prevLevel].uncompensatedScoreRatio /= scores[level].uncompensatedScore
     950            2 :                         } else {
     951            2 :                                 scores[prevLevel].uncompensatedScoreRatio /= minScore
     952            2 :                         }
     953              :                 }
     954            2 :                 prevLevel = level
     955              :         }
     956              :         // Set the score ratios for the lowest level.
     957              :         // INVARIANT: prevLevel == numLevels-1
     958            2 :         scores[prevLevel].compensatedScoreRatio = scores[prevLevel].compensatedScore
     959            2 :         scores[prevLevel].uncompensatedScoreRatio = scores[prevLevel].uncompensatedScore
     960            2 : 
     961            2 :         sort.Sort(sortCompactionLevelsByPriority(scores[:]))
     962            2 :         return scores
     963              : }
     964              : 
     965              : // calculateL0UncompensatedScore calculates a float score representing the
     966              : // relative priority of compacting L0. Level L0 is special in that files within
     967              : // L0 may overlap one another, so a different set of heuristics that take into
     968              : // account read amplification apply.
     969              : func calculateL0UncompensatedScore(
     970              :         vers *version, opts *Options, inProgressCompactions []compactionInfo,
     971            2 : ) float64 {
     972            2 :         // Use the sublevel count to calculate the score. The base vs intra-L0
     973            2 :         // compaction determination happens in pickAuto, not here.
     974            2 :         score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
     975            2 :                 float64(opts.L0CompactionThreshold)
     976            2 : 
     977            2 :         // Also calculate a score based on the file count but use it only if it
     978            2 :         // produces a higher score than the sublevel-based one. This heuristic is
     979            2 :         // designed to accommodate cases where L0 is accumulating non-overlapping
     980            2 :         // files in L0. Letting too many non-overlapping files accumulate in few
     981            2 :         // sublevels is undesirable, because:
     982            2 :         // 1) we can produce a massive backlog to compact once files do overlap.
     983            2 :         // 2) constructing L0 sublevels has a runtime that grows superlinearly with
     984            2 :         //    the number of files in L0 and must be done while holding D.mu.
     985            2 :         noncompactingFiles := vers.Levels[0].Len()
     986            2 :         for _, c := range inProgressCompactions {
     987            2 :                 for _, cl := range c.inputs {
     988            2 :                         if cl.level == 0 {
     989            2 :                                 noncompactingFiles -= cl.files.Len()
     990            2 :                         }
     991              :                 }
     992              :         }
     993            2 :         fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
     994            2 :         if score < fileScore {
     995            2 :                 score = fileScore
     996            2 :         }
     997            2 :         return score
     998              : }
     999              : 
    1000              : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
    1001              : // compaction around. Currently, this function implements a heuristic similar to
    1002              : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
    1003              : // function is linear with respect to the number of files in `level` and
    1004              : // `outputLevel`.
    1005              : func pickCompactionSeedFile(
    1006              :         vers *version,
    1007              :         virtualBackings *manifest.VirtualBackings,
    1008              :         opts *Options,
    1009              :         level, outputLevel int,
    1010              :         earliestSnapshotSeqNum base.SeqNum,
    1011            2 : ) (manifest.LevelFile, bool) {
    1012            2 :         // Select the file within the level to compact. We want to minimize write
    1013            2 :         // amplification, but also ensure that (a) deletes are propagated to the
    1014            2 :         // bottom level in a timely fashion, and (b) virtual sstables that are
    1015            2 :         // pinning backing sstables where most of the data is garbage are compacted
    1016            2 :         // away. Doing (a) and (b) reclaims disk space. A table's smallest sequence
    1017            2 :         // number provides a measure of its age. The ratio of overlapping-bytes /
    1018            2 :         // table-size gives an indication of write amplification (a smaller ratio is
    1019            2 :         // preferrable).
    1020            2 :         //
    1021            2 :         // The current heuristic is based off the RocksDB kMinOverlappingRatio
    1022            2 :         // heuristic. It chooses the file with the minimum overlapping ratio with
    1023            2 :         // the target level, which minimizes write amplification.
    1024            2 :         //
    1025            2 :         // The heuristic uses a "compensated size" for the denominator, which is the
    1026            2 :         // file size inflated by (a) an estimate of the space that may be reclaimed
    1027            2 :         // through compaction, and (b) a fraction of the amount of garbage in the
    1028            2 :         // backing sstable pinned by this (virtual) sstable.
    1029            2 :         //
    1030            2 :         // TODO(peter): For concurrent compactions, we may want to try harder to
    1031            2 :         // pick a seed file whose resulting compaction bounds do not overlap with
    1032            2 :         // an in-progress compaction.
    1033            2 : 
    1034            2 :         cmp := opts.Comparer.Compare
    1035            2 :         startIter := vers.Levels[level].Iter()
    1036            2 :         outputIter := vers.Levels[outputLevel].Iter()
    1037            2 : 
    1038            2 :         var file manifest.LevelFile
    1039            2 :         smallestRatio := uint64(math.MaxUint64)
    1040            2 : 
    1041            2 :         outputFile := outputIter.First()
    1042            2 : 
    1043            2 :         for f := startIter.First(); f != nil; f = startIter.Next() {
    1044            2 :                 var overlappingBytes uint64
    1045            2 :                 compacting := f.IsCompacting()
    1046            2 :                 if compacting {
    1047            2 :                         // Move on if this file is already being compacted. We'll likely
    1048            2 :                         // still need to move past the overlapping output files regardless,
    1049            2 :                         // but in cases where all start-level files are compacting we won't.
    1050            2 :                         continue
    1051              :                 }
    1052              : 
    1053              :                 // Trim any output-level files smaller than f.
    1054            2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
    1055            2 :                         outputFile = outputIter.Next()
    1056            2 :                 }
    1057              : 
    1058            2 :                 for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
    1059            2 :                         overlappingBytes += outputFile.Size
    1060            2 :                         compacting = compacting || outputFile.IsCompacting()
    1061            2 : 
    1062            2 :                         // For files in the bottommost level of the LSM, the
    1063            2 :                         // Stats.RangeDeletionsBytesEstimate field is set to the estimate
    1064            2 :                         // of bytes /within/ the file itself that may be dropped by
    1065            2 :                         // recompacting the file. These bytes from obsolete keys would not
    1066            2 :                         // need to be rewritten if we compacted `f` into `outputFile`, so
    1067            2 :                         // they don't contribute to write amplification. Subtracting them
    1068            2 :                         // out of the overlapping bytes helps prioritize these compactions
    1069            2 :                         // that are cheaper than their file sizes suggest.
    1070            2 :                         if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
    1071            2 :                                 overlappingBytes -= outputFile.Stats.RangeDeletionsBytesEstimate
    1072            2 :                         }
    1073              : 
    1074              :                         // If the file in the next level extends beyond f's largest key,
    1075              :                         // break out and don't advance outputIter because f's successor
    1076              :                         // might also overlap.
    1077              :                         //
    1078              :                         // Note, we stop as soon as we encounter an output-level file with a
    1079              :                         // largest key beyond the input-level file's largest bound. We
    1080              :                         // perform a simple user key comparison here using sstableKeyCompare
    1081              :                         // which handles the potential for exclusive largest key bounds.
    1082              :                         // There's some subtlety when the bounds are equal (eg, equal and
    1083              :                         // inclusive, or equal and exclusive). Current Pebble doesn't split
    1084              :                         // user keys across sstables within a level (and in format versions
    1085              :                         // FormatSplitUserKeysMarkedCompacted and later we guarantee no
    1086              :                         // split user keys exist within the entire LSM). In that case, we're
    1087              :                         // assured that neither the input level nor the output level's next
    1088              :                         // file shares the same user key, so compaction expansion will not
    1089              :                         // include them in any compaction compacting `f`.
    1090              :                         //
    1091              :                         // NB: If we /did/ allow split user keys, or we're running on an
    1092              :                         // old database with an earlier format major version where there are
    1093              :                         // existing split user keys, this logic would be incorrect. Consider
    1094              :                         //    L1: [a#120,a#100] [a#80,a#60]
    1095              :                         //    L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
    1096              :                         // While considering the first file in L1, [a#120,a#100], we'd skip
    1097              :                         // past all of the files in L2. When considering the second file in
    1098              :                         // L1, we'd improperly conclude that the second file overlaps
    1099              :                         // nothing in the second level and is cheap to compact, when in
    1100              :                         // reality we'd need to expand the compaction to include all 5
    1101              :                         // files.
    1102            2 :                         if sstableKeyCompare(cmp, outputFile.Largest, f.Largest) > 0 {
    1103            2 :                                 break
    1104              :                         }
    1105            2 :                         outputFile = outputIter.Next()
    1106              :                 }
    1107              : 
    1108              :                 // If the input level file or one of the overlapping files is
    1109              :                 // compacting, we're not going to be able to compact this file
    1110              :                 // anyways, so skip it.
    1111            2 :                 if compacting {
    1112            2 :                         continue
    1113              :                 }
    1114              : 
    1115            2 :                 compSz := compensatedSize(f) + responsibleForGarbageBytes(virtualBackings, f)
    1116            2 :                 scaledRatio := overlappingBytes * 1024 / compSz
    1117            2 :                 if scaledRatio < smallestRatio {
    1118            2 :                         smallestRatio = scaledRatio
    1119            2 :                         file = startIter.Take()
    1120            2 :                 }
    1121              :         }
    1122            2 :         return file, file.TableMetadata != nil
    1123              : }
    1124              : 
    1125              : // responsibleForGarbageBytes returns the amount of garbage in the backing
    1126              : // sstable that we consider the responsibility of this virtual sstable. For
    1127              : // non-virtual sstables, this is of course 0. For virtual sstables, we equally
    1128              : // distribute the responsibility of the garbage across all the virtual
    1129              : // sstables that are referencing the same backing sstable. One could
    1130              : // alternatively distribute this in proportion to the virtual sst sizes, but
    1131              : // it isn't clear that more sophisticated heuristics are worth it, given that
    1132              : // the garbage cannot be reclaimed until all the referencing virtual sstables
    1133              : // are compacted.
    1134              : func responsibleForGarbageBytes(
    1135              :         virtualBackings *manifest.VirtualBackings, m *tableMetadata,
    1136            2 : ) uint64 {
    1137            2 :         if !m.Virtual {
    1138            2 :                 return 0
    1139            2 :         }
    1140            2 :         useCount, virtualizedSize := virtualBackings.Usage(m.FileBacking.DiskFileNum)
    1141            2 :         // Since virtualizedSize is the sum of the estimated size of all virtual
    1142            2 :         // ssts, we allow for the possibility that virtualizedSize could exceed
    1143            2 :         // m.FileBacking.Size.
    1144            2 :         totalGarbage := int64(m.FileBacking.Size) - int64(virtualizedSize)
    1145            2 :         if totalGarbage <= 0 {
    1146            1 :                 return 0
    1147            1 :         }
    1148            2 :         if useCount == 0 {
    1149            0 :                 // This cannot happen if m exists in the latest version. The call to
    1150            0 :                 // ResponsibleForGarbageBytes during compaction picking ensures that m
    1151            0 :                 // exists in the latest version by holding versionSet.logLock.
    1152            0 :                 panic(errors.AssertionFailedf("%s has zero useCount", m.String()))
    1153              :         }
    1154            2 :         return uint64(totalGarbage) / uint64(useCount)
    1155              : }
    1156              : 
    1157            2 : func (p *compactionPickerByScore) getCompactionConcurrency() int {
    1158            2 :         maxConcurrentCompactions := p.opts.MaxConcurrentCompactions()
    1159            2 :         // Compaction concurrency is controlled by L0 read-amp. We allow one
    1160            2 :         // additional compaction per L0CompactionConcurrency sublevels, as well as
    1161            2 :         // one additional compaction per CompactionDebtConcurrency bytes of
    1162            2 :         // compaction debt. Compaction concurrency is tied to L0 sublevels as that
    1163            2 :         // signal is independent of the database size. We tack on the compaction
    1164            2 :         // debt as a second signal to prevent compaction concurrency from dropping
    1165            2 :         // significantly right after a base compaction finishes, and before those
    1166            2 :         // bytes have been compacted further down the LSM.
    1167            2 :         //
    1168            2 :         // Let n be the number of in-progress compactions.
    1169            2 :         //
    1170            2 :         // l0ReadAmp >= ccSignal1 then can run another compaction, where
    1171            2 :         // ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency
    1172            2 :         // Rearranging,
    1173            2 :         // n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
    1174            2 :         // So we can run up to
    1175            2 :         // l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency + 1 compactions
    1176            2 :         l0ReadAmpCompactions := 1
    1177            2 :         if p.opts.Experimental.L0CompactionConcurrency > 0 {
    1178            2 :                 l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
    1179            2 :                 l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency) + 1
    1180            2 :         }
    1181              :         // compactionDebt >= ccSignal2 then can run another compaction, where
    1182              :         // ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
    1183              :         // Rearranging,
    1184              :         // n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
    1185              :         // So we can run up to
    1186              :         // compactionDebt / p.opts.Experimental.CompactionDebtConcurrency + 1 compactions.
    1187            2 :         compactionDebtCompactions := 1
    1188            2 :         if p.opts.Experimental.CompactionDebtConcurrency > 0 {
    1189            2 :                 compactionDebt := p.estimatedCompactionDebt(0)
    1190            2 :                 compactionDebtCompactions = int(compactionDebt/p.opts.Experimental.CompactionDebtConcurrency) + 1
    1191            2 :         }
    1192            2 :         return max(min(maxConcurrentCompactions, max(l0ReadAmpCompactions, compactionDebtCompactions)), 1)
    1193              : }
    1194              : 
    1195              : // pickAuto picks the best compaction, if any.
    1196              : //
    1197              : // On each call, pickAuto computes per-level size adjustments based on
    1198              : // in-progress compactions, and computes a per-level score. The levels are
    1199              : // iterated over in decreasing score order trying to find a valid compaction
    1200              : // anchored at that level.
    1201              : //
    1202              : // If a score-based compaction cannot be found, pickAuto falls back to looking
    1203              : // for an elision-only compaction to remove obsolete keys.
    1204            2 : func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
    1205            2 :         scores := p.calculateLevelScores(env.inProgressCompactions)
    1206            2 : 
    1207            2 :         // TODO(bananabrick): Either remove, or change this into an event sent to the
    1208            2 :         // EventListener.
    1209            2 :         logCompaction := func(pc *pickedCompaction) {
    1210            0 :                 var buf bytes.Buffer
    1211            0 :                 for i := 0; i < numLevels; i++ {
    1212            0 :                         if i != 0 && i < p.baseLevel {
    1213            0 :                                 continue
    1214              :                         }
    1215              : 
    1216            0 :                         var info *candidateLevelInfo
    1217            0 :                         for j := range scores {
    1218            0 :                                 if scores[j].level == i {
    1219            0 :                                         info = &scores[j]
    1220            0 :                                         break
    1221              :                                 }
    1222              :                         }
    1223              : 
    1224            0 :                         marker := " "
    1225            0 :                         if pc.startLevel.level == info.level {
    1226            0 :                                 marker = "*"
    1227            0 :                         }
    1228            0 :                         fmt.Fprintf(&buf, "  %sL%d: %5.1f  %5.1f  %5.1f  %5.1f %8s  %8s",
    1229            0 :                                 marker, info.level, info.compensatedScoreRatio, info.compensatedScore,
    1230            0 :                                 info.uncompensatedScoreRatio, info.uncompensatedScore,
    1231            0 :                                 humanize.Bytes.Int64(int64(totalCompensatedSize(
    1232            0 :                                         p.vers.Levels[info.level].All(),
    1233            0 :                                 ))),
    1234            0 :                                 humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
    1235            0 :                         )
    1236            0 : 
    1237            0 :                         count := 0
    1238            0 :                         for i := range env.inProgressCompactions {
    1239            0 :                                 c := &env.inProgressCompactions[i]
    1240            0 :                                 if c.inputs[0].level != info.level {
    1241            0 :                                         continue
    1242              :                                 }
    1243            0 :                                 count++
    1244            0 :                                 if count == 1 {
    1245            0 :                                         fmt.Fprintf(&buf, "  [")
    1246            0 :                                 } else {
    1247            0 :                                         fmt.Fprintf(&buf, " ")
    1248            0 :                                 }
    1249            0 :                                 fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
    1250              :                         }
    1251            0 :                         if count > 0 {
    1252            0 :                                 fmt.Fprintf(&buf, "]")
    1253            0 :                         }
    1254            0 :                         fmt.Fprintf(&buf, "\n")
    1255              :                 }
    1256            0 :                 p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
    1257            0 :                         pc.startLevel.level, pc.outputLevel.level, buf.String())
    1258              :         }
    1259              : 
    1260              :         // Check for a score-based compaction. candidateLevelInfos are first sorted
    1261              :         // by whether they should be compacted, so if we find a level which shouldn't
    1262              :         // be compacted, we can break early.
    1263            2 :         for i := range scores {
    1264            2 :                 info := &scores[i]
    1265            2 :                 if !info.shouldCompact() {
    1266            2 :                         break
    1267              :                 }
    1268            2 :                 if info.level == numLevels-1 {
    1269            2 :                         continue
    1270              :                 }
    1271              : 
    1272            2 :                 if info.level == 0 {
    1273            2 :                         pc = pickL0(env, p.opts, p.vers, p.baseLevel)
    1274            2 :                         // Fail-safe to protect against compacting the same sstable
    1275            2 :                         // concurrently.
    1276            2 :                         if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1277            2 :                                 p.addScoresToPickedCompactionMetrics(pc, scores)
    1278            2 :                                 pc.score = info.compensatedScoreRatio
    1279            2 :                                 // TODO(bananabrick): Create an EventListener for logCompaction.
    1280            2 :                                 if false {
    1281            0 :                                         logCompaction(pc)
    1282            0 :                                 }
    1283            2 :                                 return pc
    1284              :                         }
    1285            2 :                         continue
    1286              :                 }
    1287              : 
    1288              :                 // info.level > 0
    1289            2 :                 var ok bool
    1290            2 :                 info.file, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
    1291            2 :                 if !ok {
    1292            2 :                         continue
    1293              :                 }
    1294              : 
    1295            2 :                 pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel)
    1296            2 :                 // Fail-safe to protect against compacting the same sstable concurrently.
    1297            2 :                 if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
    1298            2 :                         p.addScoresToPickedCompactionMetrics(pc, scores)
    1299            2 :                         pc.score = info.compensatedScoreRatio
    1300            2 :                         // TODO(bananabrick): Create an EventListener for logCompaction.
    1301            2 :                         if false {
    1302            0 :                                 logCompaction(pc)
    1303            0 :                         }
    1304            2 :                         return pc
    1305              :                 }
    1306              :         }
    1307              : 
    1308              :         // Check for files which contain excessive point tombstones that could slow
    1309              :         // down reads. Unlike elision-only compactions, these compactions may select
    1310              :         // a file at any level rather than only the lowest level.
    1311            2 :         if pc := p.pickTombstoneDensityCompaction(env); pc != nil {
    1312            1 :                 return pc
    1313            1 :         }
    1314              : 
    1315              :         // Check for L6 files with tombstones that may be elided. These files may
    1316              :         // exist if a snapshot prevented the elision of a tombstone or because of
    1317              :         // a move compaction. These are low-priority compactions because they
    1318              :         // don't help us keep up with writes, just reclaim disk space.
    1319            2 :         if pc := p.pickElisionOnlyCompaction(env); pc != nil {
    1320            2 :                 return pc
    1321            2 :         }
    1322              : 
    1323            2 :         if pc := p.pickReadTriggeredCompaction(env); pc != nil {
    1324            1 :                 return pc
    1325            1 :         }
    1326              : 
    1327              :         // NB: This should only be run if a read compaction wasn't
    1328              :         // scheduled.
    1329              :         //
    1330              :         // We won't be scheduling a read compaction right now, and in
    1331              :         // read heavy workloads, compactions won't be scheduled frequently
    1332              :         // because flushes aren't frequent. So we need to signal to the
    1333              :         // iterator to schedule a compaction when it adds compactions to
    1334              :         // the read compaction queue.
    1335              :         //
    1336              :         // We need the nil check here because without it, we have some
    1337              :         // tests which don't set that variable fail. Since there's a
    1338              :         // chance that one of those tests wouldn't want extra compactions
    1339              :         // to be scheduled, I added this check here, instead of
    1340              :         // setting rescheduleReadCompaction in those tests.
    1341            2 :         if env.readCompactionEnv.rescheduleReadCompaction != nil {
    1342            2 :                 *env.readCompactionEnv.rescheduleReadCompaction = true
    1343            2 :         }
    1344              : 
    1345              :         // At the lowest possible compaction-picking priority, look for files marked
    1346              :         // for compaction. Pebble will mark files for compaction if they have atomic
    1347              :         // compaction units that span multiple files. While current Pebble code does
    1348              :         // not construct such sstables, RocksDB and earlier versions of Pebble may
    1349              :         // have created them. These split user keys form sets of files that must be
    1350              :         // compacted together for correctness (referred to as "atomic compaction
    1351              :         // units" within the code). Rewrite them in-place.
    1352              :         //
    1353              :         // It's also possible that a file may have been marked for compaction by
    1354              :         // even earlier versions of Pebble code, since TableMetadata's
    1355              :         // MarkedForCompaction field is persisted in the manifest. That's okay. We
    1356              :         // previously would've ignored the designation, whereas now we'll re-compact
    1357              :         // the file in place.
    1358            2 :         if p.vers.Stats.MarkedForCompaction > 0 {
    1359            1 :                 if pc := p.pickRewriteCompaction(env); pc != nil {
    1360            1 :                         return pc
    1361            1 :                 }
    1362              :         }
    1363              : 
    1364            2 :         return nil
    1365              : }
    1366              : 
    1367              : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
    1368              :         pc *pickedCompaction, candInfo [numLevels]candidateLevelInfo,
    1369            2 : ) {
    1370            2 : 
    1371            2 :         // candInfo is sorted by score, not by compaction level.
    1372            2 :         infoByLevel := [numLevels]candidateLevelInfo{}
    1373            2 :         for i := range candInfo {
    1374            2 :                 level := candInfo[i].level
    1375            2 :                 infoByLevel[level] = candInfo[i]
    1376            2 :         }
    1377              :         // Gather the compaction scores for the levels participating in the compaction.
    1378            2 :         pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
    1379            2 :         inputIdx := 0
    1380            2 :         for i := range infoByLevel {
    1381            2 :                 if pc.inputs[inputIdx].level == infoByLevel[i].level {
    1382            2 :                         pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].compensatedScoreRatio
    1383            2 :                         inputIdx++
    1384            2 :                 }
    1385            2 :                 if inputIdx == len(pc.inputs) {
    1386            2 :                         break
    1387              :                 }
    1388              :         }
    1389              : }
    1390              : 
    1391              : // elisionOnlyAnnotator is a manifest.Annotator that annotates B-Tree
    1392              : // nodes with the *fileMetadata of a file meeting the obsolete keys criteria
    1393              : // for an elision-only compaction within the subtree. If multiple files meet
    1394              : // the criteria, it chooses whichever file has the lowest LargestSeqNum. The
    1395              : // lowest LargestSeqNum file will be the first eligible for an elision-only
    1396              : // compaction once snapshots less than or equal to its LargestSeqNum are closed.
    1397              : var elisionOnlyAnnotator = &manifest.Annotator[tableMetadata]{
    1398              :         Aggregator: manifest.PickFileAggregator{
    1399            2 :                 Filter: func(f *tableMetadata) (eligible bool, cacheOK bool) {
    1400            2 :                         if f.IsCompacting() {
    1401            2 :                                 return false, true
    1402            2 :                         }
    1403            2 :                         if !f.StatsValid() {
    1404            2 :                                 return false, false
    1405            2 :                         }
    1406              :                         // Bottommost files are large and not worthwhile to compact just
    1407              :                         // to remove a few tombstones. Consider a file eligible only if
    1408              :                         // either its own range deletions delete at least 10% of its data or
    1409              :                         // its deletion tombstones make at least 10% of its entries.
    1410              :                         //
    1411              :                         // TODO(jackson): This does not account for duplicate user keys
    1412              :                         // which may be collapsed. Ideally, we would have 'obsolete keys'
    1413              :                         // statistics that would include tombstones, the keys that are
    1414              :                         // dropped by tombstones and duplicated user keys. See #847.
    1415              :                         //
    1416              :                         // Note that tables that contain exclusively range keys (i.e. no point keys,
    1417              :                         // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
    1418              :                         // from elision-only compactions.
    1419              :                         // TODO(travers): Consider an alternative heuristic for elision of range-keys.
    1420            2 :                         return f.Stats.RangeDeletionsBytesEstimate*10 >= f.Size || f.Stats.NumDeletions*10 > f.Stats.NumEntries, true
    1421              :                 },
    1422            2 :                 Compare: func(f1 *tableMetadata, f2 *tableMetadata) bool {
    1423            2 :                         return f1.LargestSeqNum < f2.LargestSeqNum
    1424            2 :                 },
    1425              :         },
    1426              : }
    1427              : 
    1428              : // markedForCompactionAnnotator is a manifest.Annotator that annotates B-Tree
    1429              : // nodes with the *fileMetadata of a file that is marked for compaction
    1430              : // within the subtree. If multiple files meet the criteria, it chooses
    1431              : // whichever file has the lowest LargestSeqNum.
    1432              : var markedForCompactionAnnotator = &manifest.Annotator[tableMetadata]{
    1433              :         Aggregator: manifest.PickFileAggregator{
    1434            1 :                 Filter: func(f *tableMetadata) (eligible bool, cacheOK bool) {
    1435            1 :                         return f.MarkedForCompaction, true
    1436            1 :                 },
    1437            0 :                 Compare: func(f1 *tableMetadata, f2 *tableMetadata) bool {
    1438            0 :                         return f1.LargestSeqNum < f2.LargestSeqNum
    1439            0 :                 },
    1440              :         },
    1441              : }
    1442              : 
    1443              : // pickedCompactionFromCandidateFile creates a pickedCompaction from a *fileMetadata
    1444              : // with various checks to ensure that the file still exists in the expected level
    1445              : // and isn't already being compacted.
    1446              : func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
    1447              :         candidate *tableMetadata, env compactionEnv, startLevel int, outputLevel int, kind compactionKind,
    1448            2 : ) *pickedCompaction {
    1449            2 :         if candidate == nil || candidate.IsCompacting() {
    1450            2 :                 return nil
    1451            2 :         }
    1452              : 
    1453            2 :         var inputs manifest.LevelSlice
    1454            2 :         if startLevel == 0 {
    1455            2 :                 // Overlapping L0 files must also be compacted alongside the candidate.
    1456            2 :                 inputs = p.vers.Overlaps(0, candidate.UserKeyBounds())
    1457            2 :         } else {
    1458            2 :                 inputs = p.vers.Levels[startLevel].Find(p.opts.Comparer.Compare, candidate)
    1459            2 :         }
    1460            2 :         if invariants.Enabled {
    1461            2 :                 found := false
    1462            2 :                 for f := range inputs.All() {
    1463            2 :                         if f.FileNum == candidate.FileNum {
    1464            2 :                                 found = true
    1465            2 :                         }
    1466              :                 }
    1467            2 :                 if !found {
    1468            0 :                         panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, startLevel))
    1469              :                 }
    1470              :         }
    1471              : 
    1472            2 :         pc := newPickedCompaction(p.opts, p.vers, startLevel, outputLevel, p.baseLevel)
    1473            2 :         pc.kind = kind
    1474            2 :         pc.startLevel.files = inputs
    1475            2 :         pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.All())
    1476            2 : 
    1477            2 :         // Fail-safe to protect against compacting the same sstable concurrently.
    1478            2 :         if inputRangeAlreadyCompacting(env, pc) {
    1479            1 :                 return nil
    1480            1 :         }
    1481              : 
    1482            2 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
    1483            1 :                 // TODO(radu): do we expect this to happen? (it does seem to happen if I add
    1484            1 :                 // a log here).
    1485            1 :                 return nil
    1486            1 :         }
    1487              : 
    1488            2 :         return pc
    1489              : }
    1490              : 
    1491              : // pickElisionOnlyCompaction looks for compactions of sstables in the
    1492              : // bottommost level containing obsolete records that may now be dropped.
    1493              : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
    1494              :         env compactionEnv,
    1495            2 : ) (pc *pickedCompaction) {
    1496            2 :         if p.opts.private.disableElisionOnlyCompactions {
    1497            1 :                 return nil
    1498            1 :         }
    1499            2 :         candidate := elisionOnlyAnnotator.LevelAnnotation(p.vers.Levels[numLevels-1])
    1500            2 :         if candidate == nil {
    1501            2 :                 return nil
    1502            2 :         }
    1503            2 :         if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
    1504            2 :                 return nil
    1505            2 :         }
    1506            2 :         return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
    1507              : }
    1508              : 
    1509              : // pickRewriteCompaction attempts to construct a compaction that
    1510              : // rewrites a file marked for compaction. pickRewriteCompaction will
    1511              : // pull in adjacent files in the file's atomic compaction unit if
    1512              : // necessary. A rewrite compaction outputs files to the same level as
    1513              : // the input level.
    1514            1 : func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
    1515            1 :         if p.vers.Stats.MarkedForCompaction == 0 {
    1516            0 :                 return nil
    1517            0 :         }
    1518            1 :         for l := numLevels - 1; l >= 0; l-- {
    1519            1 :                 candidate := markedForCompactionAnnotator.LevelAnnotation(p.vers.Levels[l])
    1520            1 :                 if candidate == nil {
    1521            1 :                         // Try the next level.
    1522            1 :                         continue
    1523              :                 }
    1524            1 :                 pc := p.pickedCompactionFromCandidateFile(candidate, env, l, l, compactionKindRewrite)
    1525            1 :                 if pc != nil {
    1526            1 :                         return pc
    1527            1 :                 }
    1528              :         }
    1529            0 :         return nil
    1530              : }
    1531              : 
    1532              : // pickTombstoneDensityCompaction looks for a compaction that eliminates
    1533              : // regions of extremely high point tombstone density. For each level, it picks
    1534              : // a file where the ratio of tombstone-dense blocks is at least
    1535              : // options.Experimental.MinTombstoneDenseRatio, prioritizing compaction of
    1536              : // files with higher ratios of tombstone-dense blocks.
    1537              : func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
    1538              :         env compactionEnv,
    1539            2 : ) (pc *pickedCompaction) {
    1540            2 :         if p.opts.Experimental.TombstoneDenseCompactionThreshold <= 0 {
    1541            0 :                 // Tombstone density compactions are disabled.
    1542            0 :                 return nil
    1543            0 :         }
    1544              : 
    1545            2 :         var candidate *tableMetadata
    1546            2 :         var level int
    1547            2 :         // If a candidate file has a very high overlapping ratio, point tombstones
    1548            2 :         // in it are likely sparse in keyspace even if the sstable itself is tombstone
    1549            2 :         // dense. These tombstones likely wouldn't be slow to iterate over, so we exclude
    1550            2 :         // these files from tombstone density compactions. The threshold of 40.0 is
    1551            2 :         // chosen somewhat arbitrarily, after some observations around excessively large
    1552            2 :         // tombstone density compactions.
    1553            2 :         const maxOverlappingRatio = 40.0
    1554            2 :         // NB: we don't consider the lowest level because elision-only compactions
    1555            2 :         // handle that case.
    1556            2 :         lastNonEmptyLevel := numLevels - 1
    1557            2 :         for l := numLevels - 2; l >= 0; l-- {
    1558            2 :                 iter := p.vers.Levels[l].Iter()
    1559            2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1560            2 :                         if f.IsCompacting() || !f.StatsValid() || f.Size == 0 {
    1561            2 :                                 continue
    1562              :                         }
    1563            2 :                         if f.Stats.TombstoneDenseBlocksRatio < p.opts.Experimental.TombstoneDenseCompactionThreshold {
    1564            2 :                                 continue
    1565              :                         }
    1566            1 :                         overlaps := p.vers.Overlaps(lastNonEmptyLevel, f.UserKeyBounds())
    1567            1 :                         if float64(overlaps.SizeSum())/float64(f.Size) > maxOverlappingRatio {
    1568            0 :                                 continue
    1569              :                         }
    1570            1 :                         if candidate == nil || candidate.Stats.TombstoneDenseBlocksRatio < f.Stats.TombstoneDenseBlocksRatio {
    1571            1 :                                 candidate = f
    1572            1 :                                 level = l
    1573            1 :                         }
    1574              :                 }
    1575              :                 // We prefer lower level (ie. L5) candidates over higher level (ie. L4) ones.
    1576            2 :                 if candidate != nil {
    1577            1 :                         break
    1578              :                 }
    1579            2 :                 if !p.vers.Levels[l].Empty() {
    1580            2 :                         lastNonEmptyLevel = l
    1581            2 :                 }
    1582              :         }
    1583              : 
    1584            2 :         return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
    1585              : }
    1586              : 
    1587              : // pickAutoLPositive picks an automatic compaction for the candidate
    1588              : // file in a positive-numbered level. This function must not be used for
    1589              : // L0.
    1590              : func pickAutoLPositive(
    1591              :         env compactionEnv, opts *Options, vers *version, cInfo candidateLevelInfo, baseLevel int,
    1592            2 : ) (pc *pickedCompaction) {
    1593            2 :         if cInfo.level == 0 {
    1594            0 :                 panic("pebble: pickAutoLPositive called for L0")
    1595              :         }
    1596              : 
    1597            2 :         pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
    1598            2 :         if pc.outputLevel.level != cInfo.outputLevel {
    1599            0 :                 panic("pebble: compaction picked unexpected output level")
    1600              :         }
    1601            2 :         pc.startLevel.files = cInfo.file.Slice()
    1602            2 :         // Files in level 0 may overlap each other, so pick up all overlapping ones.
    1603            2 :         if pc.startLevel.level == 0 {
    1604            0 :                 cmp := opts.Comparer.Compare
    1605            0 :                 smallest, largest := manifest.KeyRange(cmp, pc.startLevel.files.All())
    1606            0 :                 pc.startLevel.files = vers.Overlaps(0, base.UserKeyBoundsFromInternal(smallest, largest))
    1607            0 :                 if pc.startLevel.files.Empty() {
    1608            0 :                         panic("pebble: empty compaction")
    1609              :                 }
    1610              :         }
    1611              : 
    1612            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1613            0 :                 opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
    1614            0 :                 return nil
    1615            0 :         }
    1616            2 :         return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1617              : }
    1618              : 
    1619              : // maybeAddLevel maybe adds a level to the picked compaction.
    1620            2 : func (pc *pickedCompaction) maybeAddLevel(opts *Options, diskAvailBytes uint64) *pickedCompaction {
    1621            2 :         pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
    1622            2 :         if pc.outputLevel.level == numLevels-1 {
    1623            2 :                 // Don't add a level if the current output level is in L6
    1624            2 :                 return pc
    1625            2 :         }
    1626            2 :         if !opts.Experimental.MultiLevelCompactionHeuristic.allowL0() && pc.startLevel.level == 0 {
    1627            2 :                 return pc
    1628            2 :         }
    1629            2 :         if pc.compactionSize() > expandedCompactionByteSizeLimit(
    1630            2 :                 opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes) {
    1631            2 :                 // Don't add a level if the current compaction exceeds the compaction size limit
    1632            2 :                 return pc
    1633            2 :         }
    1634            2 :         return opts.Experimental.MultiLevelCompactionHeuristic.pick(pc, opts, diskAvailBytes)
    1635              : }
    1636              : 
    1637              : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
    1638              : type MultiLevelHeuristic interface {
    1639              :         // Evaluate returns the preferred compaction.
    1640              :         pick(pc *pickedCompaction, opts *Options, diskAvailBytes uint64) *pickedCompaction
    1641              : 
    1642              :         // Returns if the heuristic allows L0 to be involved in ML compaction
    1643              :         allowL0() bool
    1644              : 
    1645              :         // String implements fmt.Stringer.
    1646              :         String() string
    1647              : }
    1648              : 
    1649              : // NoMultiLevel will never add an additional level to the compaction.
    1650              : type NoMultiLevel struct{}
    1651              : 
    1652              : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
    1653              : 
    1654              : func (nml NoMultiLevel) pick(
    1655              :         pc *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1656            2 : ) *pickedCompaction {
    1657            2 :         return pc
    1658            2 : }
    1659              : 
    1660            2 : func (nml NoMultiLevel) allowL0() bool  { return false }
    1661            2 : func (nml NoMultiLevel) String() string { return "none" }
    1662              : 
    1663            2 : func (pc *pickedCompaction) predictedWriteAmp() float64 {
    1664            2 :         var bytesToCompact uint64
    1665            2 :         var higherLevelBytes uint64
    1666            2 :         for i := range pc.inputs {
    1667            2 :                 levelSize := pc.inputs[i].files.SizeSum()
    1668            2 :                 bytesToCompact += levelSize
    1669            2 :                 if i != len(pc.inputs)-1 {
    1670            2 :                         higherLevelBytes += levelSize
    1671            2 :                 }
    1672              :         }
    1673            2 :         return float64(bytesToCompact) / float64(higherLevelBytes)
    1674              : }
    1675              : 
    1676            2 : func (pc *pickedCompaction) overlappingRatio() float64 {
    1677            2 :         var higherLevelBytes uint64
    1678            2 :         var lowestLevelBytes uint64
    1679            2 :         for i := range pc.inputs {
    1680            2 :                 levelSize := pc.inputs[i].files.SizeSum()
    1681            2 :                 if i == len(pc.inputs)-1 {
    1682            2 :                         lowestLevelBytes += levelSize
    1683            2 :                         continue
    1684              :                 }
    1685            2 :                 higherLevelBytes += levelSize
    1686              :         }
    1687            2 :         return float64(lowestLevelBytes) / float64(higherLevelBytes)
    1688              : }
    1689              : 
    1690              : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
    1691              : // an additional level to the picked compaction if it reduces predicted write
    1692              : // amp of the compaction + the addPropensity constant.
    1693              : type WriteAmpHeuristic struct {
    1694              :         // addPropensity is a constant that affects the propensity to conduct multilevel
    1695              :         // compactions. If positive, a multilevel compaction may get picked even if
    1696              :         // the single level compaction has lower write amp, and vice versa.
    1697              :         AddPropensity float64
    1698              : 
    1699              :         // AllowL0 if true, allow l0 to be involved in a ML compaction.
    1700              :         AllowL0 bool
    1701              : }
    1702              : 
    1703              : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
    1704              : 
    1705              : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
    1706              : // picking slows down the compaction picking process.  This should be as fast as
    1707              : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
    1708              : // in-progress flushes and compactions from completing, etc. Consider ways to
    1709              : // deduplicate work, given that setupInputs has already been called.
    1710              : func (wa WriteAmpHeuristic) pick(
    1711              :         pcOrig *pickedCompaction, opts *Options, diskAvailBytes uint64,
    1712            2 : ) *pickedCompaction {
    1713            2 :         pcMulti := pcOrig.clone()
    1714            2 :         if !pcMulti.setupMultiLevelCandidate(opts, diskAvailBytes) {
    1715            2 :                 return pcOrig
    1716            2 :         }
    1717              :         // We consider the addition of a level as an "expansion" of the compaction.
    1718              :         // If pcMulti is past the expanded compaction byte size limit already,
    1719              :         // we don't consider it.
    1720            2 :         if pcMulti.compactionSize() >= expandedCompactionByteSizeLimit(
    1721            2 :                 opts, adjustedOutputLevel(pcMulti.outputLevel.level, pcMulti.baseLevel), diskAvailBytes) {
    1722            2 :                 return pcOrig
    1723            2 :         }
    1724            2 :         picked := pcOrig
    1725            2 :         if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
    1726            2 :                 picked = pcMulti
    1727            2 :         }
    1728              :         // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
    1729            2 :         picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
    1730            2 :         return picked
    1731              : }
    1732              : 
    1733            2 : func (wa WriteAmpHeuristic) allowL0() bool {
    1734            2 :         return wa.AllowL0
    1735            2 : }
    1736              : 
    1737              : // String implements fmt.Stringer.
    1738            2 : func (wa WriteAmpHeuristic) String() string {
    1739            2 :         return fmt.Sprintf("wamp(%.2f, %t)", wa.AddPropensity, wa.AllowL0)
    1740            2 : }
    1741              : 
    1742              : // Helper method to pick compactions originating from L0. Uses information about
    1743              : // sublevels to generate a compaction.
    1744            2 : func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) *pickedCompaction {
    1745            2 :         // It is important to pass information about Lbase files to L0Sublevels
    1746            2 :         // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
    1747            2 :         // compaction. Without this, we observed reduced concurrency of L0=>Lbase
    1748            2 :         // compactions, and increasing read amplification in L0.
    1749            2 :         //
    1750            2 :         // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
    1751            2 :         // has been shown to not cause a performance regression.
    1752            2 :         lcf := vers.L0Sublevels.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice())
    1753            2 :         if lcf != nil {
    1754            2 :                 pc := newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
    1755            2 :                 if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1756            2 :                         if pc.startLevel.files.Empty() {
    1757            0 :                                 opts.Logger.Errorf("%v", base.AssertionFailedf("empty compaction chosen"))
    1758            0 :                         }
    1759            2 :                         return pc.maybeAddLevel(opts, env.diskAvailBytes)
    1760              :                 }
    1761              :                 // TODO(radu): investigate why this happens.
    1762              :                 // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
    1763              :         }
    1764              : 
    1765              :         // Couldn't choose a base compaction. Try choosing an intra-L0
    1766              :         // compaction. Note that we pass in L0CompactionThreshold here as opposed to
    1767              :         // 1, since choosing a single sublevel intra-L0 compaction is
    1768              :         // counterproductive.
    1769            2 :         lcf = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
    1770            2 :         if lcf != nil {
    1771            2 :                 pc := newPickedCompactionFromL0(lcf, opts, vers, 0, false)
    1772            2 :                 if pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1773            2 :                         if pc.startLevel.files.Empty() {
    1774            0 :                                 opts.Logger.Fatalf("empty compaction chosen")
    1775            0 :                         }
    1776              :                         // A single-file intra-L0 compaction is unproductive.
    1777            2 :                         if iter := pc.startLevel.files.Iter(); iter.First() != nil && iter.Next() != nil {
    1778            2 :                                 pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.All())
    1779            2 :                                 return pc
    1780            2 :                         }
    1781            0 :                 } else {
    1782            0 :                         // TODO(radu): investigate why this happens.
    1783            0 :                         // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
    1784            0 :                 }
    1785              :         }
    1786            2 :         return nil
    1787              : }
    1788              : 
    1789              : func newPickedManualCompaction(
    1790              :         vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
    1791            2 : ) (pc *pickedCompaction, retryLater bool) {
    1792            2 :         outputLevel := manual.level + 1
    1793            2 :         if manual.level == 0 {
    1794            2 :                 outputLevel = baseLevel
    1795            2 :         } else if manual.level < baseLevel {
    1796            2 :                 // The start level for a compaction must be >= Lbase. A manual
    1797            2 :                 // compaction could have been created adhering to that condition, and
    1798            2 :                 // then an automatic compaction came in and compacted all of the
    1799            2 :                 // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
    1800            2 :                 // ignore this manual compaction as there is nothing to do (manual.level
    1801            2 :                 // points to an empty level).
    1802            2 :                 return nil, false
    1803            2 :         }
    1804              :         // This conflictsWithInProgress call is necessary for the manual compaction to
    1805              :         // be retried when it conflicts with an ongoing automatic compaction. Without
    1806              :         // it, the compaction is dropped due to pc.setupInputs returning false since
    1807              :         // the input/output range is already being compacted, and the manual
    1808              :         // compaction ends with a non-compacted LSM.
    1809            2 :         if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
    1810            2 :                 return nil, true
    1811            2 :         }
    1812            2 :         pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
    1813            2 :         pc.manualID = manual.id
    1814            2 :         manual.outputLevel = pc.outputLevel.level
    1815            2 :         pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
    1816            2 :         if pc.startLevel.files.Empty() {
    1817            2 :                 // Nothing to do
    1818            2 :                 return nil, false
    1819            2 :         }
    1820            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1821            2 :                 // setupInputs returned false indicating there's a conflicting
    1822            2 :                 // concurrent compaction.
    1823            2 :                 return nil, true
    1824            2 :         }
    1825            2 :         if pc = pc.maybeAddLevel(opts, env.diskAvailBytes); pc == nil {
    1826            0 :                 return nil, false
    1827            0 :         }
    1828            2 :         if pc.outputLevel.level != outputLevel {
    1829            2 :                 if len(pc.extraLevels) > 0 {
    1830            2 :                         // multilevel compactions relax this invariant
    1831            2 :                 } else {
    1832            0 :                         panic("pebble: compaction picked unexpected output level")
    1833              :                 }
    1834              :         }
    1835              :         // Fail-safe to protect against compacting the same sstable concurrently.
    1836            2 :         if inputRangeAlreadyCompacting(env, pc) {
    1837            0 :                 return nil, true
    1838            0 :         }
    1839            2 :         return pc, false
    1840              : }
    1841              : 
    1842              : // pickDownloadCompaction picks a download compaction for the downloadSpan,
    1843              : // which could be specified as being performed either by a copy compaction of
    1844              : // the backing file or a rewrite compaction.
    1845              : func pickDownloadCompaction(
    1846              :         vers *version,
    1847              :         opts *Options,
    1848              :         env compactionEnv,
    1849              :         baseLevel int,
    1850              :         kind compactionKind,
    1851              :         level int,
    1852              :         file *tableMetadata,
    1853            2 : ) (pc *pickedCompaction) {
    1854            2 :         // Check if the file is compacting already.
    1855            2 :         if file.CompactionState == manifest.CompactionStateCompacting {
    1856            0 :                 return nil
    1857            0 :         }
    1858            2 :         if kind != compactionKindCopy && kind != compactionKindRewrite {
    1859            0 :                 panic("invalid download/rewrite compaction kind")
    1860              :         }
    1861            2 :         pc = newPickedCompaction(opts, vers, level, level, baseLevel)
    1862            2 :         pc.kind = kind
    1863            2 :         pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*tableMetadata{file})
    1864            2 :         if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
    1865            0 :                 // setupInputs returned false indicating there's a conflicting
    1866            0 :                 // concurrent compaction.
    1867            0 :                 return nil
    1868            0 :         }
    1869            2 :         if pc.outputLevel.level != level {
    1870            0 :                 panic("pebble: download compaction picked unexpected output level")
    1871              :         }
    1872              :         // Fail-safe to protect against compacting the same sstable concurrently.
    1873            2 :         if inputRangeAlreadyCompacting(env, pc) {
    1874            0 :                 return nil
    1875            0 :         }
    1876            2 :         return pc
    1877              : }
    1878              : 
    1879              : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
    1880              :         env compactionEnv,
    1881            2 : ) (pc *pickedCompaction) {
    1882            2 :         // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
    1883            2 :         // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
    1884            2 :         // lower priority.
    1885            2 :         if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
    1886            2 :                 return nil
    1887            2 :         }
    1888            2 :         for env.readCompactionEnv.readCompactions.size > 0 {
    1889            1 :                 rc := env.readCompactionEnv.readCompactions.remove()
    1890            1 :                 if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
    1891            1 :                         break
    1892              :                 }
    1893              :         }
    1894            2 :         return pc
    1895              : }
    1896              : 
    1897              : func pickReadTriggeredCompactionHelper(
    1898              :         p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
    1899            1 : ) (pc *pickedCompaction) {
    1900            1 :         overlapSlice := p.vers.Overlaps(rc.level, base.UserKeyBoundsInclusive(rc.start, rc.end))
    1901            1 :         var fileMatches bool
    1902            1 :         for f := range overlapSlice.All() {
    1903            1 :                 if f.FileNum == rc.fileNum {
    1904            1 :                         fileMatches = true
    1905            1 :                         break
    1906              :                 }
    1907              :         }
    1908            1 :         if !fileMatches {
    1909            1 :                 return nil
    1910            1 :         }
    1911              : 
    1912            1 :         pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
    1913            1 : 
    1914            1 :         pc.startLevel.files = overlapSlice
    1915            1 :         if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
    1916            0 :                 return nil
    1917            0 :         }
    1918            1 :         if inputRangeAlreadyCompacting(env, pc) {
    1919            0 :                 return nil
    1920            0 :         }
    1921            1 :         pc.kind = compactionKindRead
    1922            1 : 
    1923            1 :         // Prevent read compactions which are too wide.
    1924            1 :         outputOverlaps := pc.version.Overlaps(pc.outputLevel.level, pc.userKeyBounds())
    1925            1 :         if outputOverlaps.SizeSum() > pc.maxReadCompactionBytes {
    1926            1 :                 return nil
    1927            1 :         }
    1928              : 
    1929              :         // Prevent compactions which start with a small seed file X, but overlap
    1930              :         // with over allowedCompactionWidth * X file sizes in the output layer.
    1931            1 :         const allowedCompactionWidth = 35
    1932            1 :         if outputOverlaps.SizeSum() > overlapSlice.SizeSum()*allowedCompactionWidth {
    1933            0 :                 return nil
    1934            0 :         }
    1935              : 
    1936            1 :         return pc
    1937              : }
    1938              : 
    1939            1 : func (p *compactionPickerByScore) forceBaseLevel1() {
    1940            1 :         p.baseLevel = 1
    1941            1 : }
    1942              : 
    1943            2 : func inputRangeAlreadyCompacting(env compactionEnv, pc *pickedCompaction) bool {
    1944            2 :         for _, cl := range pc.inputs {
    1945            2 :                 for f := range cl.files.All() {
    1946            2 :                         if f.IsCompacting() {
    1947            1 :                                 return true
    1948            1 :                         }
    1949              :                 }
    1950              :         }
    1951              : 
    1952              :         // Look for active compactions outputting to the same region of the key
    1953              :         // space in the same output level. Two potential compactions may conflict
    1954              :         // without sharing input files if there are no files in the output level
    1955              :         // that overlap with the intersection of the compactions' key spaces.
    1956              :         //
    1957              :         // Consider an active L0->Lbase compaction compacting two L0 files one
    1958              :         // [a-f] and the other [t-z] into Lbase.
    1959              :         //
    1960              :         // L0
    1961              :         //     ↦ 000100  ↤                           ↦  000101   ↤
    1962              :         // L1
    1963              :         //     ↦ 000004  ↤
    1964              :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    1965              :         //
    1966              :         // If a new file 000102 [j-p] is flushed while the existing compaction is
    1967              :         // still ongoing, new file would not be in any compacting sublevel
    1968              :         // intervals and would not overlap with any Lbase files that are also
    1969              :         // compacting. However, this compaction cannot be picked because the
    1970              :         // compaction's output key space [j-p] would overlap the existing
    1971              :         // compaction's output key space [a-z].
    1972              :         //
    1973              :         // L0
    1974              :         //     ↦ 000100* ↤       ↦   000102  ↤       ↦  000101*  ↤
    1975              :         // L1
    1976              :         //     ↦ 000004* ↤
    1977              :         //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    1978              :         //
    1979              :         // * - currently compacting
    1980            2 :         if pc.outputLevel != nil && pc.outputLevel.level != 0 {
    1981            2 :                 for _, c := range env.inProgressCompactions {
    1982            2 :                         if pc.outputLevel.level != c.outputLevel {
    1983            2 :                                 continue
    1984              :                         }
    1985            2 :                         if base.InternalCompare(pc.cmp, c.largest, pc.smallest) < 0 ||
    1986            2 :                                 base.InternalCompare(pc.cmp, c.smallest, pc.largest) > 0 {
    1987            2 :                                 continue
    1988              :                         }
    1989              : 
    1990              :                         // The picked compaction and the in-progress compaction c are
    1991              :                         // outputting to the same region of the key space of the same
    1992              :                         // level.
    1993            2 :                         return true
    1994              :                 }
    1995              :         }
    1996            2 :         return false
    1997              : }
    1998              : 
    1999              : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
    2000              : func conflictsWithInProgress(
    2001              :         manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
    2002            2 : ) bool {
    2003            2 :         for _, c := range inProgressCompactions {
    2004            2 :                 if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
    2005            2 :                         isUserKeysOverlapping(manual.start, manual.end, c.smallest.UserKey, c.largest.UserKey, cmp) {
    2006            2 :                         return true
    2007            2 :                 }
    2008            2 :                 for _, in := range c.inputs {
    2009            2 :                         if in.files.Empty() {
    2010            2 :                                 continue
    2011              :                         }
    2012            2 :                         iter := in.files.Iter()
    2013            2 :                         smallest := iter.First().Smallest.UserKey
    2014            2 :                         largest := iter.Last().Largest.UserKey
    2015            2 :                         if (in.level == manual.level || in.level == outputLevel) &&
    2016            2 :                                 isUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
    2017            2 :                                 return true
    2018            2 :                         }
    2019              :                 }
    2020              :         }
    2021            2 :         return false
    2022              : }
    2023              : 
    2024            2 : func isUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
    2025            2 :         return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
    2026            2 : }
        

Generated by: LCOV version 2.0-1