LCOV - code coverage report
Current view: top level - pebble - compaction.go (source / functions) Coverage Total Hit
Test: 2025-02-12 08:16Z 419f2391 - meta test only.lcov Lines: 89.2 % 2312 2062
Test Date: 2025-02-12 08:17:57 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         "context"
      10              :         "fmt"
      11              :         "math"
      12              :         "runtime/pprof"
      13              :         "slices"
      14              :         "sort"
      15              :         "sync/atomic"
      16              :         "time"
      17              :         "unsafe"
      18              : 
      19              :         "github.com/cockroachdb/crlib/crtime"
      20              :         "github.com/cockroachdb/errors"
      21              :         "github.com/cockroachdb/pebble/internal/base"
      22              :         "github.com/cockroachdb/pebble/internal/compact"
      23              :         "github.com/cockroachdb/pebble/internal/keyspan"
      24              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      25              :         "github.com/cockroachdb/pebble/internal/manifest"
      26              :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      27              :         "github.com/cockroachdb/pebble/objstorage"
      28              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      29              :         "github.com/cockroachdb/pebble/objstorage/remote"
      30              :         "github.com/cockroachdb/pebble/sstable"
      31              :         "github.com/cockroachdb/pebble/sstable/block"
      32              :         "github.com/cockroachdb/pebble/vfs"
      33              : )
      34              : 
      35              : var errEmptyTable = errors.New("pebble: empty table")
      36              : 
      37              : // ErrCancelledCompaction is returned if a compaction is cancelled by a
      38              : // concurrent excise or ingest-split operation.
      39              : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
      40              : 
      41              : var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
      42              : var gcLabels = pprof.Labels("pebble", "gc")
      43              : 
      44              : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
      45              : // compacted files. We avoid expanding the lower level file set of a compaction
      46              : // if it would make the total compaction cover more than this many bytes.
      47            1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
      48            1 :         v := uint64(25 * opts.Level(level).TargetFileSize)
      49            1 : 
      50            1 :         // Never expand a compaction beyond half the available capacity, divided
      51            1 :         // by the maximum number of concurrent compactions. Each of the concurrent
      52            1 :         // compactions may expand up to this limit, so this attempts to limit
      53            1 :         // compactions to half of available disk space. Note that this will not
      54            1 :         // prevent compaction picking from pursuing compactions that are larger
      55            1 :         // than this threshold before expansion.
      56            1 :         diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
      57            1 :         if v > diskMax {
      58            0 :                 v = diskMax
      59            0 :         }
      60            1 :         return v
      61              : }
      62              : 
      63              : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
      64              : // before we stop building a single file in a level-1 to level compaction.
      65            1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
      66            1 :         return uint64(10 * opts.Level(level).TargetFileSize)
      67            1 : }
      68              : 
      69              : // maxReadCompactionBytes is used to prevent read compactions which
      70              : // are too wide.
      71            1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
      72            1 :         return uint64(10 * opts.Level(level).TargetFileSize)
      73            1 : }
      74              : 
      75              : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
      76              : // calls to Close. It is used during compaction to ensure that rangeDelIters
      77              : // are not closed prematurely.
      78              : type noCloseIter struct {
      79              :         keyspan.FragmentIterator
      80              : }
      81              : 
      82            1 : func (i *noCloseIter) Close() {}
      83              : 
      84              : type compactionLevel struct {
      85              :         level int
      86              :         files manifest.LevelSlice
      87              :         // l0SublevelInfo contains information about L0 sublevels being compacted.
      88              :         // It's only set for the start level of a compaction starting out of L0 and
      89              :         // is nil for all other compactions.
      90              :         l0SublevelInfo []sublevelInfo
      91              : }
      92              : 
      93            1 : func (cl compactionLevel) Clone() compactionLevel {
      94            1 :         newCL := compactionLevel{
      95            1 :                 level: cl.level,
      96            1 :                 files: cl.files,
      97            1 :         }
      98            1 :         return newCL
      99            1 : }
     100            0 : func (cl compactionLevel) String() string {
     101            0 :         return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
     102            0 : }
     103              : 
     104              : // compactionWritable is a objstorage.Writable wrapper that, on every write,
     105              : // updates a metric in `versions` on bytes written by in-progress compactions so
     106              : // far. It also increments a per-compaction `written` int.
     107              : type compactionWritable struct {
     108              :         objstorage.Writable
     109              : 
     110              :         versions *versionSet
     111              :         written  *int64
     112              : }
     113              : 
     114              : // Write is part of the objstorage.Writable interface.
     115            1 : func (c *compactionWritable) Write(p []byte) error {
     116            1 :         if err := c.Writable.Write(p); err != nil {
     117            0 :                 return err
     118            0 :         }
     119              : 
     120            1 :         *c.written += int64(len(p))
     121            1 :         c.versions.incrementCompactionBytes(int64(len(p)))
     122            1 :         return nil
     123              : }
     124              : 
     125              : type compactionKind int
     126              : 
     127              : const (
     128              :         compactionKindDefault compactionKind = iota
     129              :         compactionKindFlush
     130              :         // compactionKindMove denotes a move compaction where the input file is
     131              :         // retained and linked in a new level without being obsoleted.
     132              :         compactionKindMove
     133              :         // compactionKindCopy denotes a copy compaction where the input file is
     134              :         // copied byte-by-byte into a new file with a new FileNum in the output level.
     135              :         compactionKindCopy
     136              :         // compactionKindDeleteOnly denotes a compaction that only deletes input
     137              :         // files. It can occur when wide range tombstones completely contain sstables.
     138              :         compactionKindDeleteOnly
     139              :         compactionKindElisionOnly
     140              :         compactionKindRead
     141              :         compactionKindTombstoneDensity
     142              :         compactionKindRewrite
     143              :         compactionKindIngestedFlushable
     144              : )
     145              : 
     146            1 : func (k compactionKind) String() string {
     147            1 :         switch k {
     148            1 :         case compactionKindDefault:
     149            1 :                 return "default"
     150            0 :         case compactionKindFlush:
     151            0 :                 return "flush"
     152            1 :         case compactionKindMove:
     153            1 :                 return "move"
     154            1 :         case compactionKindDeleteOnly:
     155            1 :                 return "delete-only"
     156            1 :         case compactionKindElisionOnly:
     157            1 :                 return "elision-only"
     158            0 :         case compactionKindRead:
     159            0 :                 return "read"
     160            1 :         case compactionKindTombstoneDensity:
     161            1 :                 return "tombstone-density"
     162            1 :         case compactionKindRewrite:
     163            1 :                 return "rewrite"
     164            0 :         case compactionKindIngestedFlushable:
     165            0 :                 return "ingested-flushable"
     166            1 :         case compactionKindCopy:
     167            1 :                 return "copy"
     168              :         }
     169            0 :         return "?"
     170              : }
     171              : 
     172              : // compaction is a table compaction from one level to the next, starting from a
     173              : // given version.
     174              : type compaction struct {
     175              :         // cancel is a bool that can be used by other goroutines to signal a compaction
     176              :         // to cancel, such as if a conflicting excise operation raced it to manifest
     177              :         // application. Only holders of the manifest lock will write to this atomic.
     178              :         cancel atomic.Bool
     179              : 
     180              :         kind compactionKind
     181              :         // isDownload is true if this compaction was started as part of a Download
     182              :         // operation. In this case kind is compactionKindCopy or
     183              :         // compactionKindRewrite.
     184              :         isDownload bool
     185              : 
     186              :         cmp       Compare
     187              :         equal     Equal
     188              :         comparer  *base.Comparer
     189              :         formatKey base.FormatKey
     190              :         logger    Logger
     191              :         version   *version
     192              :         stats     base.InternalIteratorStats
     193              :         beganAt   time.Time
     194              :         // versionEditApplied is set to true when a compaction has completed and the
     195              :         // resulting version has been installed (if successful), but the compaction
     196              :         // goroutine is still cleaning up (eg, deleting obsolete files).
     197              :         versionEditApplied bool
     198              :         bufferPool         sstable.BufferPool
     199              : 
     200              :         // startLevel is the level that is being compacted. Inputs from startLevel
     201              :         // and outputLevel will be merged to produce a set of outputLevel files.
     202              :         startLevel *compactionLevel
     203              : 
     204              :         // outputLevel is the level that files are being produced in. outputLevel is
     205              :         // equal to startLevel+1 except when:
     206              :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     207              :         //    - in multilevel compaction, the output level is the lowest level involved in
     208              :         //      the compaction
     209              :         // A compaction's outputLevel is nil for delete-only compactions.
     210              :         outputLevel *compactionLevel
     211              : 
     212              :         // extraLevels point to additional levels in between the input and output
     213              :         // levels that get compacted in multilevel compactions
     214              :         extraLevels []*compactionLevel
     215              : 
     216              :         inputs []compactionLevel
     217              : 
     218              :         // maxOutputFileSize is the maximum size of an individual table created
     219              :         // during compaction.
     220              :         maxOutputFileSize uint64
     221              :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     222              :         // single output table with the tables in the grandparent level.
     223              :         maxOverlapBytes uint64
     224              : 
     225              :         // flushing contains the flushables (aka memtables) that are being flushed.
     226              :         flushing flushableList
     227              :         // bytesWritten contains the number of bytes that have been written to outputs.
     228              :         bytesWritten int64
     229              : 
     230              :         // The boundaries of the input data.
     231              :         smallest InternalKey
     232              :         largest  InternalKey
     233              : 
     234              :         // A list of fragment iterators to close when the compaction finishes. Used by
     235              :         // input iteration to keep rangeDelIters open for the lifetime of the
     236              :         // compaction, and only close them when the compaction finishes.
     237              :         closers []*noCloseIter
     238              : 
     239              :         // grandparents are the tables in level+2 that overlap with the files being
     240              :         // compacted. Used to determine output table boundaries. Do not assume that the actual files
     241              :         // in the grandparent when this compaction finishes will be the same.
     242              :         grandparents manifest.LevelSlice
     243              : 
     244              :         // Boundaries at which flushes to L0 should be split. Determined by
     245              :         // L0Sublevels. If nil, flushes aren't split.
     246              :         l0Limits [][]byte
     247              : 
     248              :         delElision      compact.TombstoneElision
     249              :         rangeKeyElision compact.TombstoneElision
     250              : 
     251              :         // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
     252              :         // snapshots requiring them to be kept. This determination is made by
     253              :         // looking for an sstable which overlaps the bounds of the compaction at a
     254              :         // lower level in the LSM during runCompaction.
     255              :         allowedZeroSeqNum bool
     256              : 
     257              :         // deletionHints are set if this is a compactionKindDeleteOnly. Used to figure
     258              :         // out whether an input must be deleted in its entirety, or excised into
     259              :         // virtual sstables.
     260              :         deletionHints []deleteCompactionHint
     261              : 
     262              :         // exciseEnabled is set to true if this is a compactionKindDeleteOnly and
     263              :         // this compaction is allowed to excise files.
     264              :         exciseEnabled bool
     265              : 
     266              :         metrics map[int]*LevelMetrics
     267              : 
     268              :         pickerMetrics compactionPickerMetrics
     269              : 
     270              :         slot base.CompactionSlot
     271              : }
     272              : 
     273              : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
     274              : // input sstables.
     275            1 : func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
     276            1 :         var seqNum base.SeqNum
     277            1 :         for _, cl := range c.inputs {
     278            1 :                 cl.files.Each(func(m *manifest.FileMetadata) {
     279            1 :                         seqNum = max(seqNum, m.LargestSeqNumAbsolute)
     280            1 :                 })
     281              :         }
     282            1 :         return seqNum
     283              : }
     284              : 
     285            1 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
     286            1 :         info := CompactionInfo{
     287            1 :                 JobID:       int(jobID),
     288            1 :                 Reason:      c.kind.String(),
     289            1 :                 Input:       make([]LevelInfo, 0, len(c.inputs)),
     290            1 :                 Annotations: []string{},
     291            1 :         }
     292            1 :         if c.isDownload {
     293            1 :                 info.Reason = "download," + info.Reason
     294            1 :         }
     295            1 :         for _, cl := range c.inputs {
     296            1 :                 inputInfo := LevelInfo{Level: cl.level, Tables: nil}
     297            1 :                 iter := cl.files.Iter()
     298            1 :                 for m := iter.First(); m != nil; m = iter.Next() {
     299            1 :                         inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
     300            1 :                 }
     301            1 :                 info.Input = append(info.Input, inputInfo)
     302              :         }
     303            1 :         if c.outputLevel != nil {
     304            1 :                 info.Output.Level = c.outputLevel.level
     305            1 : 
     306            1 :                 // If there are no inputs from the output level (eg, a move
     307            1 :                 // compaction), add an empty LevelInfo to info.Input.
     308            1 :                 if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
     309            0 :                         info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
     310            0 :                 }
     311            1 :         } else {
     312            1 :                 // For a delete-only compaction, set the output level to L6. The
     313            1 :                 // output level is not meaningful here, but complicating the
     314            1 :                 // info.Output interface with a pointer doesn't seem worth the
     315            1 :                 // semantic distinction.
     316            1 :                 info.Output.Level = numLevels - 1
     317            1 :         }
     318              : 
     319            1 :         for i, score := range c.pickerMetrics.scores {
     320            1 :                 info.Input[i].Score = score
     321            1 :         }
     322            1 :         info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
     323            1 :         info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
     324            1 :         if len(info.Input) > 2 {
     325            1 :                 info.Annotations = append(info.Annotations, "multilevel")
     326            1 :         }
     327            1 :         return info
     328              : }
     329              : 
     330            1 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
     331            1 :         return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
     332            1 : }
     333              : 
     334              : func newCompaction(
     335              :         pc *pickedCompaction,
     336              :         opts *Options,
     337              :         beganAt time.Time,
     338              :         provider objstorage.Provider,
     339              :         slot base.CompactionSlot,
     340            1 : ) *compaction {
     341            1 :         c := &compaction{
     342            1 :                 kind:              compactionKindDefault,
     343            1 :                 cmp:               pc.cmp,
     344            1 :                 equal:             opts.Comparer.Equal,
     345            1 :                 comparer:          opts.Comparer,
     346            1 :                 formatKey:         opts.Comparer.FormatKey,
     347            1 :                 inputs:            pc.inputs,
     348            1 :                 smallest:          pc.smallest,
     349            1 :                 largest:           pc.largest,
     350            1 :                 logger:            opts.Logger,
     351            1 :                 version:           pc.version,
     352            1 :                 beganAt:           beganAt,
     353            1 :                 maxOutputFileSize: pc.maxOutputFileSize,
     354            1 :                 maxOverlapBytes:   pc.maxOverlapBytes,
     355            1 :                 pickerMetrics:     pc.pickerMetrics,
     356            1 :                 slot:              slot,
     357            1 :         }
     358            1 :         c.startLevel = &c.inputs[0]
     359            1 :         if pc.startLevel.l0SublevelInfo != nil {
     360            1 :                 c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
     361            1 :         }
     362            1 :         c.outputLevel = &c.inputs[1]
     363            1 :         if c.slot == nil {
     364            1 :                 c.slot = opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO())
     365            1 :                 c.slot.CompactionSelected(c.startLevel.level, c.outputLevel.level, c.startLevel.files.SizeSum())
     366            1 :         }
     367              : 
     368            1 :         if len(pc.extraLevels) > 0 {
     369            1 :                 c.extraLevels = pc.extraLevels
     370            1 :                 c.outputLevel = &c.inputs[len(c.inputs)-1]
     371            1 :         }
     372              :         // Compute the set of outputLevel+1 files that overlap this compaction (these
     373              :         // are the grandparent sstables).
     374            1 :         if c.outputLevel.level+1 < numLevels {
     375            1 :                 c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
     376            1 :         }
     377            1 :         c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
     378            1 :                 c.cmp, c.version, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
     379            1 :         )
     380            1 :         c.kind = pc.kind
     381            1 : 
     382            1 :         if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
     383            1 :                 c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
     384            1 :                 // This compaction can be converted into a move or copy from one level
     385            1 :                 // to the next. We avoid such a move if there is lots of overlapping
     386            1 :                 // grandparent data. Otherwise, the move could create a parent file
     387            1 :                 // that will require a very expensive merge later on.
     388            1 :                 iter := c.startLevel.files.Iter()
     389            1 :                 meta := iter.First()
     390            1 :                 isRemote := false
     391            1 :                 // We should always be passed a provider, except in some unit tests.
     392            1 :                 if provider != nil {
     393            1 :                         isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
     394            1 :                 }
     395              :                 // Avoid a trivial move or copy if all of these are true, as rewriting a
     396              :                 // new file is better:
     397              :                 //
     398              :                 // 1) The source file is a virtual sstable
     399              :                 // 2) The existing file `meta` is on non-remote storage
     400              :                 // 3) The output level prefers shared storage
     401            1 :                 mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     402            1 :                 if mustCopy {
     403            1 :                         // If the source is virtual, it's best to just rewrite the file as all
     404            1 :                         // conditions in the above comment are met.
     405            1 :                         if !meta.Virtual {
     406            1 :                                 c.kind = compactionKindCopy
     407            1 :                         }
     408            1 :                 } else {
     409            1 :                         c.kind = compactionKindMove
     410            1 :                 }
     411              :         }
     412            1 :         return c
     413              : }
     414              : 
     415              : func newDeleteOnlyCompaction(
     416              :         opts *Options,
     417              :         cur *version,
     418              :         inputs []compactionLevel,
     419              :         beganAt time.Time,
     420              :         hints []deleteCompactionHint,
     421              :         exciseEnabled bool,
     422            1 : ) *compaction {
     423            1 :         c := &compaction{
     424            1 :                 kind:          compactionKindDeleteOnly,
     425            1 :                 cmp:           opts.Comparer.Compare,
     426            1 :                 equal:         opts.Comparer.Equal,
     427            1 :                 comparer:      opts.Comparer,
     428            1 :                 formatKey:     opts.Comparer.FormatKey,
     429            1 :                 logger:        opts.Logger,
     430            1 :                 version:       cur,
     431            1 :                 beganAt:       beganAt,
     432            1 :                 inputs:        inputs,
     433            1 :                 deletionHints: hints,
     434            1 :                 exciseEnabled: exciseEnabled,
     435            1 :         }
     436            1 : 
     437            1 :         // Set c.smallest, c.largest.
     438            1 :         files := make([]manifest.LevelIterator, 0, len(inputs))
     439            1 :         for _, in := range inputs {
     440            1 :                 files = append(files, in.files.Iter())
     441            1 :         }
     442            1 :         c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
     443            1 :         return c
     444              : }
     445              : 
     446            1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
     447            1 :         // Heuristic to place a lower bound on compaction output file size
     448            1 :         // caused by Lbase. Prior to this heuristic we have observed an L0 in
     449            1 :         // production with 310K files of which 290K files were < 10KB in size.
     450            1 :         // Our hypothesis is that it was caused by L1 having 2600 files and
     451            1 :         // ~10GB, such that each flush got split into many tiny files due to
     452            1 :         // overlapping with most of the files in Lbase.
     453            1 :         //
     454            1 :         // The computation below is general in that it accounts
     455            1 :         // for flushing different volumes of data (e.g. we may be flushing
     456            1 :         // many memtables). For illustration, we consider the typical
     457            1 :         // example of flushing a 64MB memtable. So 12.8MB output,
     458            1 :         // based on the compression guess below. If the compressed bytes
     459            1 :         // guess is an over-estimate we will end up with smaller files,
     460            1 :         // and if an under-estimate we will end up with larger files.
     461            1 :         // With a 2MB target file size, 7 files. We are willing to accept
     462            1 :         // 4x the number of files, if it results in better write amplification
     463            1 :         // when later compacting to Lbase, i.e., ~450KB files (target file
     464            1 :         // size / 4).
     465            1 :         //
     466            1 :         // Note that this is a pessimistic heuristic in that
     467            1 :         // fileCountUpperBoundDueToGrandparents could be far from the actual
     468            1 :         // number of files produced due to the grandparent limits. For
     469            1 :         // example, in the extreme, consider a flush that overlaps with 1000
     470            1 :         // files in Lbase f0...f999, and the initially calculated value of
     471            1 :         // maxOverlapBytes will cause splits at f10, f20,..., f990, which
     472            1 :         // means an upper bound file count of 100 files. Say the input bytes
     473            1 :         // in the flush are such that acceptableFileCount=10. We will fatten
     474            1 :         // up maxOverlapBytes by 10x to ensure that the upper bound file count
     475            1 :         // drops to 10. However, it is possible that in practice, even without
     476            1 :         // this change, we would have produced no more than 10 files, and that
     477            1 :         // this change makes the files unnecessarily wide. Say the input bytes
     478            1 :         // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
     479            1 :         // 10% in f80...f89 and 10% in f990...f999. The original value of
     480            1 :         // maxOverlapBytes would have actually produced only 10 sstables. But
     481            1 :         // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
     482            1 :         // spans f0...f89, i.e., a much wider sstable than necessary.
     483            1 :         //
     484            1 :         // We could produce a tighter estimate of
     485            1 :         // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
     486            1 :         // distribution of the flush. The 4x multiplier mentioned earlier is
     487            1 :         // a way to try to compensate for this pessimism.
     488            1 :         //
     489            1 :         // TODO(sumeer): we don't have compression info for the data being
     490            1 :         // flushed, but it is likely that existing files that overlap with
     491            1 :         // this flush in Lbase are representative wrt compression ratio. We
     492            1 :         // could store the uncompressed size in FileMetadata and estimate
     493            1 :         // the compression ratio.
     494            1 :         const approxCompressionRatio = 0.2
     495            1 :         approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
     496            1 :         approxNumFilesBasedOnTargetSize :=
     497            1 :                 int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
     498            1 :         acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
     499            1 :         // The byte calculation is linear in numGrandparentFiles, but we will
     500            1 :         // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
     501            1 :         // also willing to pay it now. We could approximate this cheaply by using the
     502            1 :         // mean file size of Lbase.
     503            1 :         grandparentFileBytes := c.grandparents.SizeSum()
     504            1 :         fileCountUpperBoundDueToGrandparents :=
     505            1 :                 float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
     506            1 :         if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
     507            1 :                 c.maxOverlapBytes = uint64(
     508            1 :                         float64(c.maxOverlapBytes) *
     509            1 :                                 (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
     510            1 :         }
     511              : }
     512              : 
     513              : func newFlush(
     514              :         opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
     515            1 : ) (*compaction, error) {
     516            1 :         c := &compaction{
     517            1 :                 kind:              compactionKindFlush,
     518            1 :                 cmp:               opts.Comparer.Compare,
     519            1 :                 equal:             opts.Comparer.Equal,
     520            1 :                 comparer:          opts.Comparer,
     521            1 :                 formatKey:         opts.Comparer.FormatKey,
     522            1 :                 logger:            opts.Logger,
     523            1 :                 version:           cur,
     524            1 :                 beganAt:           beganAt,
     525            1 :                 inputs:            []compactionLevel{{level: -1}, {level: 0}},
     526            1 :                 maxOutputFileSize: math.MaxUint64,
     527            1 :                 maxOverlapBytes:   math.MaxUint64,
     528            1 :                 flushing:          flushing,
     529            1 :         }
     530            1 :         c.startLevel = &c.inputs[0]
     531            1 :         c.outputLevel = &c.inputs[1]
     532            1 : 
     533            1 :         // Flush slots are always taken without permission.
     534            1 :         //
     535            1 :         // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
     536            1 :         // and passed-in as an option during Open.
     537            1 :         slot := opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO())
     538            1 :         var flushingSize uint64
     539            1 :         for i := range flushing {
     540            1 :                 flushingSize += flushing[i].totalBytes()
     541            1 :         }
     542            1 :         slot.CompactionSelected(-1, 0, flushingSize)
     543            1 :         c.slot = slot
     544            1 : 
     545            1 :         if len(flushing) > 0 {
     546            1 :                 if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
     547            1 :                         if len(flushing) != 1 {
     548            0 :                                 panic("pebble: ingestedFlushable must be flushed one at a time.")
     549              :                         }
     550            1 :                         c.kind = compactionKindIngestedFlushable
     551            1 :                         return c, nil
     552              :                 }
     553              :         }
     554              : 
     555              :         // Make sure there's no ingestedFlushable after the first flushable in the
     556              :         // list.
     557            1 :         for _, f := range flushing {
     558            1 :                 if _, ok := f.flushable.(*ingestedFlushable); ok {
     559            0 :                         panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
     560              :                 }
     561              :         }
     562              : 
     563            1 :         if cur.L0Sublevels != nil {
     564            1 :                 c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
     565            1 :         }
     566              : 
     567            1 :         smallestSet, largestSet := false, false
     568            1 :         updatePointBounds := func(iter internalIterator) {
     569            1 :                 if kv := iter.First(); kv != nil {
     570            1 :                         if !smallestSet ||
     571            1 :                                 base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
     572            1 :                                 smallestSet = true
     573            1 :                                 c.smallest = kv.K.Clone()
     574            1 :                         }
     575              :                 }
     576            1 :                 if kv := iter.Last(); kv != nil {
     577            1 :                         if !largestSet ||
     578            1 :                                 base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
     579            1 :                                 largestSet = true
     580            1 :                                 c.largest = kv.K.Clone()
     581            1 :                         }
     582              :                 }
     583              :         }
     584              : 
     585            1 :         updateRangeBounds := func(iter keyspan.FragmentIterator) error {
     586            1 :                 // File bounds require s != nil && !s.Empty(). We only need to check for
     587            1 :                 // s != nil here, as the memtable's FragmentIterator would never surface
     588            1 :                 // empty spans.
     589            1 :                 if s, err := iter.First(); err != nil {
     590            0 :                         return err
     591            1 :                 } else if s != nil {
     592            1 :                         if key := s.SmallestKey(); !smallestSet ||
     593            1 :                                 base.InternalCompare(c.cmp, c.smallest, key) > 0 {
     594            1 :                                 smallestSet = true
     595            1 :                                 c.smallest = key.Clone()
     596            1 :                         }
     597              :                 }
     598            1 :                 if s, err := iter.Last(); err != nil {
     599            0 :                         return err
     600            1 :                 } else if s != nil {
     601            1 :                         if key := s.LargestKey(); !largestSet ||
     602            1 :                                 base.InternalCompare(c.cmp, c.largest, key) < 0 {
     603            1 :                                 largestSet = true
     604            1 :                                 c.largest = key.Clone()
     605            1 :                         }
     606              :                 }
     607            1 :                 return nil
     608              :         }
     609              : 
     610            1 :         var flushingBytes uint64
     611            1 :         for i := range flushing {
     612            1 :                 f := flushing[i]
     613            1 :                 updatePointBounds(f.newIter(nil))
     614            1 :                 if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
     615            1 :                         if err := updateRangeBounds(rangeDelIter); err != nil {
     616            0 :                                 c.slot.Release(0)
     617            0 :                                 c.slot = nil
     618            0 :                                 return nil, err
     619            0 :                         }
     620              :                 }
     621            1 :                 if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     622            1 :                         if err := updateRangeBounds(rangeKeyIter); err != nil {
     623            0 :                                 c.slot.Release(0)
     624            0 :                                 c.slot = nil
     625            0 :                                 return nil, err
     626            0 :                         }
     627              :                 }
     628            1 :                 flushingBytes += f.inuseBytes()
     629              :         }
     630              : 
     631            1 :         if opts.FlushSplitBytes > 0 {
     632            1 :                 c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
     633            1 :                 c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
     634            1 :                 c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
     635            1 :                 adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
     636            1 :         }
     637              : 
     638              :         // We don't elide tombstones for flushes.
     639            1 :         c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
     640            1 :         return c, nil
     641              : }
     642              : 
     643            1 : func (c *compaction) hasExtraLevelData() bool {
     644            1 :         if len(c.extraLevels) == 0 {
     645            1 :                 // not a multi level compaction
     646            1 :                 return false
     647            1 :         } else if c.extraLevels[0].files.Empty() {
     648            1 :                 // a multi level compaction without data in the intermediate input level;
     649            1 :                 // e.g. for a multi level compaction with levels 4,5, and 6, this could
     650            1 :                 // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
     651            1 :                 return false
     652            1 :         }
     653            1 :         return true
     654              : }
     655              : 
     656              : // errorOnUserKeyOverlap returns an error if the last two written sstables in
     657              : // this compaction have revisions of the same user key present in both sstables,
     658              : // when it shouldn't (eg. when splitting flushes).
     659            0 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
     660            0 :         if n := len(ve.NewFiles); n > 1 {
     661            0 :                 meta := ve.NewFiles[n-1].Meta
     662            0 :                 prevMeta := ve.NewFiles[n-2].Meta
     663            0 :                 if !prevMeta.Largest.IsExclusiveSentinel() &&
     664            0 :                         c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
     665            0 :                         return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
     666            0 :                                 prevMeta.Largest.Pretty(c.formatKey),
     667            0 :                                 prevMeta.FileNum,
     668            0 :                                 meta.FileNum)
     669            0 :                 }
     670              :         }
     671            0 :         return nil
     672              : }
     673              : 
     674              : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
     675              : // snapshots requiring them to be kept. It performs this determination by
     676              : // looking at the TombstoneElision values which are set up based on sstables
     677              : // which overlap the bounds of the compaction at a lower level in the LSM.
     678            1 : func (c *compaction) allowZeroSeqNum() bool {
     679            1 :         // TODO(peter): we disable zeroing of seqnums during flushing to match
     680            1 :         // RocksDB behavior and to avoid generating overlapping sstables during
     681            1 :         // DB.replayWAL. When replaying WAL files at startup, we flush after each
     682            1 :         // WAL is replayed building up a single version edit that is
     683            1 :         // applied. Because we don't apply the version edit after each flush, this
     684            1 :         // code doesn't know that L0 contains files and zeroing of seqnums should
     685            1 :         // be disabled. That is fixable, but it seems safer to just match the
     686            1 :         // RocksDB behavior for now.
     687            1 :         return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
     688            1 : }
     689              : 
     690              : // newInputIters returns an iterator over all the input tables in a compaction.
     691              : func (c *compaction) newInputIters(
     692              :         newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, iiopts internalIterOpts,
     693              : ) (
     694              :         pointIter internalIterator,
     695              :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     696              :         retErr error,
     697            1 : ) {
     698            1 :         // Validate the ordering of compaction input files for defense in depth.
     699            1 :         if len(c.flushing) == 0 {
     700            1 :                 if c.startLevel.level >= 0 {
     701            1 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     702            1 :                                 manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
     703            1 :                         if err != nil {
     704            0 :                                 return nil, nil, nil, err
     705            0 :                         }
     706              :                 }
     707            1 :                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     708            1 :                         manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
     709            1 :                 if err != nil {
     710            0 :                         return nil, nil, nil, err
     711            0 :                 }
     712            1 :                 if c.startLevel.level == 0 {
     713            1 :                         if c.startLevel.l0SublevelInfo == nil {
     714            0 :                                 panic("l0SublevelInfo not created for compaction out of L0")
     715              :                         }
     716            1 :                         for _, info := range c.startLevel.l0SublevelInfo {
     717            1 :                                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     718            1 :                                         info.sublevel, info.Iter())
     719            1 :                                 if err != nil {
     720            0 :                                         return nil, nil, nil, err
     721            0 :                                 }
     722              :                         }
     723              :                 }
     724            1 :                 if len(c.extraLevels) > 0 {
     725            1 :                         if len(c.extraLevels) > 1 {
     726            0 :                                 panic("n>2 multi level compaction not implemented yet")
     727              :                         }
     728            1 :                         interLevel := c.extraLevels[0]
     729            1 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     730            1 :                                 manifest.Level(interLevel.level), interLevel.files.Iter())
     731            1 :                         if err != nil {
     732            0 :                                 return nil, nil, nil, err
     733            0 :                         }
     734              :                 }
     735              :         }
     736              : 
     737              :         // There are three classes of keys that a compaction needs to process: point
     738              :         // keys, range deletion tombstones and range keys. Collect all iterators for
     739              :         // all these classes of keys from all the levels. We'll aggregate them
     740              :         // together farther below.
     741              :         //
     742              :         // numInputLevels is an approximation of the number of iterator levels. Due
     743              :         // to idiosyncrasies in iterator construction, we may (rarely) exceed this
     744              :         // initial capacity.
     745            1 :         numInputLevels := max(len(c.flushing), len(c.inputs))
     746            1 :         iters := make([]internalIterator, 0, numInputLevels)
     747            1 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     748            1 :         rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     749            1 : 
     750            1 :         // If construction of the iterator inputs fails, ensure that we close all
     751            1 :         // the consitutent iterators.
     752            1 :         defer func() {
     753            1 :                 if retErr != nil {
     754            0 :                         for _, iter := range iters {
     755            0 :                                 if iter != nil {
     756            0 :                                         iter.Close()
     757            0 :                                 }
     758              :                         }
     759            0 :                         for _, rangeDelIter := range rangeDelIters {
     760            0 :                                 rangeDelIter.Close()
     761            0 :                         }
     762              :                 }
     763              :         }()
     764            1 :         iterOpts := IterOptions{
     765            1 :                 Category: categoryCompaction,
     766            1 :                 logger:   c.logger,
     767            1 :         }
     768            1 : 
     769            1 :         // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
     770            1 :         // constituent iterators. This depends on whether this is a flush or a
     771            1 :         // compaction.
     772            1 :         if len(c.flushing) != 0 {
     773            1 :                 // If flushing, we need to build the input iterators over the memtables
     774            1 :                 // stored in c.flushing.
     775            1 :                 for i := range c.flushing {
     776            1 :                         f := c.flushing[i]
     777            1 :                         iters = append(iters, f.newFlushIter(nil))
     778            1 :                         rangeDelIter := f.newRangeDelIter(nil)
     779            1 :                         if rangeDelIter != nil {
     780            1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
     781            1 :                         }
     782            1 :                         if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     783            1 :                                 rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
     784            1 :                         }
     785              :                 }
     786            1 :         } else {
     787            1 :                 addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
     788            1 :                         // Add a *levelIter for point iterators. Because we don't call
     789            1 :                         // initRangeDel, the levelIter will close and forget the range
     790            1 :                         // deletion iterator when it steps on to a new file. Surfacing range
     791            1 :                         // deletions to compactions are handled below.
     792            1 :                         iters = append(iters, newLevelIter(context.Background(),
     793            1 :                                 iterOpts, c.comparer, newIters, level.files.Iter(), l, iiopts))
     794            1 :                         // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
     795            1 :                         // deletions into memory upfront. (See #2015, which reverted this.) There
     796            1 :                         // will be no user keys that are split between sstables within a level in
     797            1 :                         // Cockroach 23.1, which unblocks this optimization.
     798            1 : 
     799            1 :                         // Add the range deletion iterator for each file as an independent level
     800            1 :                         // in mergingIter, as opposed to making a levelIter out of those. This
     801            1 :                         // is safer as levelIter expects all keys coming from underlying
     802            1 :                         // iterators to be in order. Due to compaction / tombstone writing
     803            1 :                         // logic in finishOutput(), it is possible for range tombstones to not
     804            1 :                         // be strictly ordered across all files in one level.
     805            1 :                         //
     806            1 :                         // Consider this example from the metamorphic tests (also repeated in
     807            1 :                         // finishOutput()), consisting of three L3 files with their bounds
     808            1 :                         // specified in square brackets next to the file name:
     809            1 :                         //
     810            1 :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
     811            1 :                         // tmgc#391,MERGE [786e627a]
     812            1 :                         // tmgc-udkatvs#331,RANGEDEL
     813            1 :                         //
     814            1 :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
     815            1 :                         // tmgc#384,MERGE [666c7070]
     816            1 :                         // tmgc-tvsalezade#383,RANGEDEL
     817            1 :                         // tmgc-tvsalezade#331,RANGEDEL
     818            1 :                         //
     819            1 :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
     820            1 :                         // tmgc-tvsalezade#383,RANGEDEL
     821            1 :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
     822            1 :                         // tmgc-tvsalezade#356,RANGEDEL
     823            1 :                         //
     824            1 :                         // Here, the range tombstone in 000240.sst falls "after" one in
     825            1 :                         // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
     826            1 :                         // levelIter's purposes. While each file is still consistent before its
     827            1 :                         // bounds, it's safer to have all rangedel iterators be visible to
     828            1 :                         // mergingIter.
     829            1 :                         iter := level.files.Iter()
     830            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     831            1 :                                 rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, iiopts, l)
     832            1 :                                 if err != nil {
     833            0 :                                         // The error will already be annotated with the BackingFileNum, so
     834            0 :                                         // we annotate it with the FileNum.
     835            0 :                                         return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
     836            0 :                                 }
     837            1 :                                 if rangeDelIter == nil {
     838            1 :                                         continue
     839              :                                 }
     840            1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
     841            1 :                                 c.closers = append(c.closers, rangeDelIter)
     842              :                         }
     843              : 
     844              :                         // Check if this level has any range keys.
     845            1 :                         hasRangeKeys := false
     846            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     847            1 :                                 if f.HasRangeKeys {
     848            1 :                                         hasRangeKeys = true
     849            1 :                                         break
     850              :                                 }
     851              :                         }
     852            1 :                         if hasRangeKeys {
     853            1 :                                 newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
     854            1 :                                         rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
     855            1 :                                         if err != nil {
     856            0 :                                                 return nil, err
     857            1 :                                         } else if rangeKeyIter == nil {
     858            0 :                                                 return emptyKeyspanIter, nil
     859            0 :                                         }
     860              :                                         // Ensure that the range key iter is not closed until the compaction is
     861              :                                         // finished. This is necessary because range key processing
     862              :                                         // requires the range keys to be held in memory for up to the
     863              :                                         // lifetime of the compaction.
     864            1 :                                         noCloseIter := &noCloseIter{rangeKeyIter}
     865            1 :                                         c.closers = append(c.closers, noCloseIter)
     866            1 : 
     867            1 :                                         // We do not need to truncate range keys to sstable boundaries, or
     868            1 :                                         // only read within the file's atomic compaction units, unlike with
     869            1 :                                         // range tombstones. This is because range keys were added after we
     870            1 :                                         // stopped splitting user keys across sstables, so all the range keys
     871            1 :                                         // in this sstable must wholly lie within the file's bounds.
     872            1 :                                         return noCloseIter, err
     873              :                                 }
     874            1 :                                 li := keyspanimpl.NewLevelIter(
     875            1 :                                         context.Background(), keyspan.SpanIterOptions{}, c.cmp,
     876            1 :                                         newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
     877            1 :                                 )
     878            1 :                                 rangeKeyIters = append(rangeKeyIters, li)
     879              :                         }
     880            1 :                         return nil
     881              :                 }
     882              : 
     883            1 :                 for i := range c.inputs {
     884            1 :                         // If the level is annotated with l0SublevelInfo, expand it into one
     885            1 :                         // level per sublevel.
     886            1 :                         // TODO(jackson): Perform this expansion even earlier when we pick the
     887            1 :                         // compaction?
     888            1 :                         if len(c.inputs[i].l0SublevelInfo) > 0 {
     889            1 :                                 for _, info := range c.startLevel.l0SublevelInfo {
     890            1 :                                         sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
     891            1 :                                         if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
     892            0 :                                                 return nil, nil, nil, err
     893            0 :                                         }
     894              :                                 }
     895            1 :                                 continue
     896              :                         }
     897            1 :                         if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
     898            0 :                                 return nil, nil, nil, err
     899            0 :                         }
     900              :                 }
     901              :         }
     902              : 
     903              :         // If there's only one constituent point iterator, we can avoid the overhead
     904              :         // of a *mergingIter. This is possible, for example, when performing a flush
     905              :         // of a single memtable. Otherwise, combine all the iterators into a merging
     906              :         // iter.
     907            1 :         pointIter = iters[0]
     908            1 :         if len(iters) > 1 {
     909            1 :                 pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
     910            1 :         }
     911              : 
     912              :         // In normal operation, levelIter iterates over the point operations in a
     913              :         // level, and initializes a rangeDelIter pointer for the range deletions in
     914              :         // each table. During compaction, we want to iterate over the merged view of
     915              :         // point operations and range deletions. In order to do this we create one
     916              :         // levelIter per level to iterate over the point operations, and collect up
     917              :         // all the range deletion files.
     918              :         //
     919              :         // The range deletion levels are combined with a keyspanimpl.MergingIter. The
     920              :         // resulting merged rangedel iterator is then included using an
     921              :         // InterleavingIter.
     922              :         // TODO(jackson): Consider using a defragmenting iterator to stitch together
     923              :         // logical range deletions that were fragmented due to previous file
     924              :         // boundaries.
     925            1 :         if len(rangeDelIters) > 0 {
     926            1 :                 mi := &keyspanimpl.MergingIter{}
     927            1 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
     928            1 :                 rangeDelIter = mi
     929            1 :         }
     930              : 
     931              :         // If there are range key iterators, we need to combine them using
     932              :         // keyspanimpl.MergingIter, and then interleave them among the points.
     933            1 :         if len(rangeKeyIters) > 0 {
     934            1 :                 mi := &keyspanimpl.MergingIter{}
     935            1 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
     936            1 :                 // TODO(radu): why do we have a defragmenter here but not above?
     937            1 :                 di := &keyspan.DefragmentingIter{}
     938            1 :                 di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
     939            1 :                 rangeKeyIter = di
     940            1 :         }
     941            1 :         return pointIter, rangeDelIter, rangeKeyIter, nil
     942              : }
     943              : 
     944              : func (c *compaction) newRangeDelIter(
     945              :         newIters tableNewIters,
     946              :         f manifest.LevelFile,
     947              :         opts IterOptions,
     948              :         iiopts internalIterOpts,
     949              :         l manifest.Layer,
     950            1 : ) (*noCloseIter, error) {
     951            1 :         opts.layer = l
     952            1 :         iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
     953            1 :                 iiopts, iterRangeDeletions)
     954            1 :         if err != nil {
     955            0 :                 return nil, err
     956            1 :         } else if iterSet.rangeDeletion == nil {
     957            1 :                 // The file doesn't contain any range deletions.
     958            1 :                 return nil, nil
     959            1 :         }
     960              :         // Ensure that rangeDelIter is not closed until the compaction is
     961              :         // finished. This is necessary because range tombstone processing
     962              :         // requires the range tombstones to be held in memory for up to the
     963              :         // lifetime of the compaction.
     964            1 :         return &noCloseIter{iterSet.rangeDeletion}, nil
     965              : }
     966              : 
     967            0 : func (c *compaction) String() string {
     968            0 :         if len(c.flushing) != 0 {
     969            0 :                 return "flush\n"
     970            0 :         }
     971              : 
     972            0 :         var buf bytes.Buffer
     973            0 :         for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
     974            0 :                 i := level - c.startLevel.level
     975            0 :                 fmt.Fprintf(&buf, "%d:", level)
     976            0 :                 iter := c.inputs[i].files.Iter()
     977            0 :                 for f := iter.First(); f != nil; f = iter.Next() {
     978            0 :                         fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
     979            0 :                 }
     980            0 :                 fmt.Fprintf(&buf, "\n")
     981              :         }
     982            0 :         return buf.String()
     983              : }
     984              : 
     985              : type manualCompaction struct {
     986              :         // Count of the retries either due to too many concurrent compactions, or a
     987              :         // concurrent compaction to overlapping levels.
     988              :         retries     int
     989              :         level       int
     990              :         outputLevel int
     991              :         done        chan error
     992              :         start       []byte
     993              :         end         []byte
     994              :         split       bool
     995              : }
     996              : 
     997              : type readCompaction struct {
     998              :         level int
     999              :         // [start, end] key ranges are used for de-duping.
    1000              :         start []byte
    1001              :         end   []byte
    1002              : 
    1003              :         // The file associated with the compaction.
    1004              :         // If the file no longer belongs in the same
    1005              :         // level, then we skip the compaction.
    1006              :         fileNum base.FileNum
    1007              : }
    1008              : 
    1009            1 : func (d *DB) addInProgressCompaction(c *compaction) {
    1010            1 :         d.mu.compact.inProgress[c] = struct{}{}
    1011            1 :         var isBase, isIntraL0 bool
    1012            1 :         for _, cl := range c.inputs {
    1013            1 :                 iter := cl.files.Iter()
    1014            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1015            1 :                         if f.IsCompacting() {
    1016            0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1017            0 :                         }
    1018            1 :                         f.SetCompactionState(manifest.CompactionStateCompacting)
    1019            1 :                         if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
    1020            1 :                                 if c.outputLevel.level == 0 {
    1021            1 :                                         f.IsIntraL0Compacting = true
    1022            1 :                                         isIntraL0 = true
    1023            1 :                                 } else {
    1024            1 :                                         isBase = true
    1025            1 :                                 }
    1026              :                         }
    1027              :                 }
    1028              :         }
    1029              : 
    1030            1 :         if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
    1031            1 :                 l0Inputs := []manifest.LevelSlice{c.startLevel.files}
    1032            1 :                 if isIntraL0 {
    1033            1 :                         l0Inputs = append(l0Inputs, c.outputLevel.files)
    1034            1 :                 }
    1035            1 :                 if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
    1036            0 :                         d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
    1037            0 :                 }
    1038              :         }
    1039              : }
    1040              : 
    1041              : // Removes compaction markers from files in a compaction. The rollback parameter
    1042              : // indicates whether the compaction state should be rolled back to its original
    1043              : // state in the case of an unsuccessful compaction.
    1044              : //
    1045              : // DB.mu must be held when calling this method, however this method can drop and
    1046              : // re-acquire that mutex. All writes to the manifest for this compaction should
    1047              : // have completed by this point.
    1048            1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
    1049            1 :         c.versionEditApplied = true
    1050            1 :         if c.slot != nil {
    1051            0 :                 panic("pebble: compaction slot should have been released before clearing compacting state")
    1052              :         }
    1053            1 :         for _, cl := range c.inputs {
    1054            1 :                 iter := cl.files.Iter()
    1055            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1056            1 :                         if !f.IsCompacting() {
    1057            0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1058            0 :                         }
    1059            1 :                         if !rollback {
    1060            1 :                                 // On success all compactions other than move and delete-only compactions
    1061            1 :                                 // transition the file into the Compacted state. Move-compacted files
    1062            1 :                                 // become eligible for compaction again and transition back to NotCompacting.
    1063            1 :                                 // Delete-only compactions could, on rare occasion, leave files untouched
    1064            1 :                                 // (eg. if files have a loose bound), so we revert them all to NotCompacting
    1065            1 :                                 // just in case they need to be compacted again.
    1066            1 :                                 if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
    1067            1 :                                         f.SetCompactionState(manifest.CompactionStateCompacted)
    1068            1 :                                 } else {
    1069            1 :                                         f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1070            1 :                                 }
    1071            1 :                         } else {
    1072            1 :                                 // Else, on rollback, all input files unconditionally transition back to
    1073            1 :                                 // NotCompacting.
    1074            1 :                                 f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1075            1 :                         }
    1076            1 :                         f.IsIntraL0Compacting = false
    1077              :                 }
    1078              :         }
    1079            1 :         l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
    1080            1 :         func() {
    1081            1 :                 // InitCompactingFileInfo requires that no other manifest writes be
    1082            1 :                 // happening in parallel with it, i.e. we're not in the midst of installing
    1083            1 :                 // another version. Otherwise, it's possible that we've created another
    1084            1 :                 // L0Sublevels instance, but not added it to the versions list, causing
    1085            1 :                 // all the indices in FileMetadata to be inaccurate. To ensure this,
    1086            1 :                 // grab the manifest lock.
    1087            1 :                 d.mu.versions.logLock()
    1088            1 :                 defer d.mu.versions.logUnlock()
    1089            1 :                 d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
    1090            1 :         }()
    1091              : }
    1092              : 
    1093            1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
    1094            1 :         space, err := d.opts.FS.GetDiskUsage(d.dirname)
    1095            1 :         if err != nil {
    1096            1 :                 if !errors.Is(err, vfs.ErrUnsupported) {
    1097            0 :                         d.opts.EventListener.BackgroundError(err)
    1098            0 :                 }
    1099              :                 // Return the last value we managed to obtain.
    1100            1 :                 return d.diskAvailBytes.Load()
    1101              :         }
    1102              : 
    1103            1 :         d.lowDiskSpaceReporter.Report(space.AvailBytes, space.TotalBytes, d.opts.EventListener)
    1104            1 :         d.diskAvailBytes.Store(space.AvailBytes)
    1105            1 :         return space.AvailBytes
    1106              : }
    1107              : 
    1108              : // maybeScheduleFlush schedules a flush if necessary.
    1109              : //
    1110              : // d.mu must be held when calling this.
    1111            1 : func (d *DB) maybeScheduleFlush() {
    1112            1 :         if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
    1113            1 :                 return
    1114            1 :         }
    1115            1 :         if len(d.mu.mem.queue) <= 1 {
    1116            1 :                 return
    1117            1 :         }
    1118              : 
    1119            1 :         if !d.passedFlushThreshold() {
    1120            1 :                 return
    1121            1 :         }
    1122              : 
    1123            1 :         d.mu.compact.flushing = true
    1124            1 :         go d.flush()
    1125              : }
    1126              : 
    1127            1 : func (d *DB) passedFlushThreshold() bool {
    1128            1 :         var n int
    1129            1 :         var size uint64
    1130            1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1131            1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1132            1 :                         break
    1133              :                 }
    1134            1 :                 if d.mu.mem.queue[n].flushForced {
    1135            1 :                         // A flush was forced. Pretend the memtable size is the configured
    1136            1 :                         // size. See minFlushSize below.
    1137            1 :                         size += d.opts.MemTableSize
    1138            1 :                 } else {
    1139            1 :                         size += d.mu.mem.queue[n].totalBytes()
    1140            1 :                 }
    1141              :         }
    1142            1 :         if n == 0 {
    1143            1 :                 // None of the immutable memtables are ready for flushing.
    1144            1 :                 return false
    1145            1 :         }
    1146              : 
    1147              :         // Only flush once the sum of the queued memtable sizes exceeds half the
    1148              :         // configured memtable size. This prevents flushing of memtables at startup
    1149              :         // while we're undergoing the ramp period on the memtable size. See
    1150              :         // DB.newMemTable().
    1151            1 :         minFlushSize := d.opts.MemTableSize / 2
    1152            1 :         return size >= minFlushSize
    1153              : }
    1154              : 
    1155            1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
    1156            1 :         var mem *flushableEntry
    1157            1 :         for _, m := range d.mu.mem.queue {
    1158            1 :                 if m.flushable == tbl {
    1159            1 :                         mem = m
    1160            1 :                         break
    1161              :                 }
    1162              :         }
    1163            1 :         if mem == nil || mem.flushForced {
    1164            1 :                 return
    1165            1 :         }
    1166            1 :         deadline := d.timeNow().Add(dur)
    1167            1 :         if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
    1168            1 :                 // Already scheduled to flush sooner than within `dur`.
    1169            1 :                 return
    1170            1 :         }
    1171            1 :         mem.delayedFlushForcedAt = deadline
    1172            1 :         go func() {
    1173            1 :                 timer := time.NewTimer(dur)
    1174            1 :                 defer timer.Stop()
    1175            1 : 
    1176            1 :                 select {
    1177            1 :                 case <-d.closedCh:
    1178            1 :                         return
    1179            1 :                 case <-mem.flushed:
    1180            1 :                         return
    1181            1 :                 case <-timer.C:
    1182            1 :                         d.commit.mu.Lock()
    1183            1 :                         defer d.commit.mu.Unlock()
    1184            1 :                         d.mu.Lock()
    1185            1 :                         defer d.mu.Unlock()
    1186            1 : 
    1187            1 :                         // NB: The timer may fire concurrently with a call to Close.  If a
    1188            1 :                         // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
    1189            1 :                         // and it's too late to flush anything. Otherwise, the Close call
    1190            1 :                         // will block on locking d.mu until we've finished scheduling the
    1191            1 :                         // flush and set `d.mu.compact.flushing` to true. Close will wait
    1192            1 :                         // for the current flush to complete.
    1193            1 :                         if d.closed.Load() != nil {
    1194            0 :                                 return
    1195            0 :                         }
    1196              : 
    1197            1 :                         if d.mu.mem.mutable == tbl {
    1198            1 :                                 d.makeRoomForWrite(nil)
    1199            1 :                         } else {
    1200            1 :                                 mem.flushForced = true
    1201            1 :                         }
    1202            1 :                         d.maybeScheduleFlush()
    1203              :                 }
    1204              :         }()
    1205              : }
    1206              : 
    1207            1 : func (d *DB) flush() {
    1208            1 :         pprof.Do(context.Background(), flushLabels, func(context.Context) {
    1209            1 :                 flushingWorkStart := crtime.NowMono()
    1210            1 :                 d.mu.Lock()
    1211            1 :                 defer d.mu.Unlock()
    1212            1 :                 idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
    1213            1 :                 var bytesFlushed uint64
    1214            1 :                 var err error
    1215            1 :                 if bytesFlushed, err = d.flush1(); err != nil {
    1216            1 :                         // TODO(peter): count consecutive flush errors and backoff.
    1217            1 :                         d.opts.EventListener.BackgroundError(err)
    1218            1 :                 }
    1219            1 :                 d.mu.compact.flushing = false
    1220            1 :                 d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
    1221            1 :                 workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
    1222            1 :                 d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
    1223            1 :                 d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
    1224            1 :                 d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
    1225            1 :                 // More flush work may have arrived while we were flushing, so schedule
    1226            1 :                 // another flush if needed.
    1227            1 :                 d.maybeScheduleFlush()
    1228            1 :                 // The flush may have produced too many files in a level, so schedule a
    1229            1 :                 // compaction if needed.
    1230            1 :                 d.maybeScheduleCompaction()
    1231            1 :                 d.mu.compact.cond.Broadcast()
    1232              :         })
    1233              : }
    1234              : 
    1235              : // runIngestFlush is used to generate a flush version edit for sstables which
    1236              : // were ingested as flushables. Both DB.mu and the manifest lock must be held
    1237              : // while runIngestFlush is called.
    1238            1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
    1239            1 :         if len(c.flushing) != 1 {
    1240            0 :                 panic("pebble: ingestedFlushable must be flushed one at a time.")
    1241              :         }
    1242            1 :         defer func() {
    1243            1 :                 c.slot.Release(0 /* totalBytesWritten */)
    1244            1 :                 c.slot = nil
    1245            1 :         }()
    1246              : 
    1247              :         // Construct the VersionEdit, levelMetrics etc.
    1248            1 :         c.metrics = make(map[int]*LevelMetrics, numLevels)
    1249            1 :         // Finding the target level for ingestion must use the latest version
    1250            1 :         // after the logLock has been acquired.
    1251            1 :         c.version = d.mu.versions.currentVersion()
    1252            1 : 
    1253            1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1254            1 :         ve := &versionEdit{}
    1255            1 :         var ingestSplitFiles []ingestSplitFile
    1256            1 :         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1257            1 : 
    1258            1 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    1259            1 :                 levelMetrics := c.metrics[level]
    1260            1 :                 if levelMetrics == nil {
    1261            1 :                         levelMetrics = &LevelMetrics{}
    1262            1 :                         c.metrics[level] = levelMetrics
    1263            1 :                 }
    1264            1 :                 levelMetrics.NumFiles--
    1265            1 :                 levelMetrics.Size -= int64(m.Size)
    1266            1 :                 for i := range added {
    1267            1 :                         levelMetrics.NumFiles++
    1268            1 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    1269            1 :                 }
    1270              :         }
    1271              : 
    1272            1 :         suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
    1273            1 :                 d.FormatMajorVersion() >= FormatVirtualSSTables
    1274            1 : 
    1275            1 :         if suggestSplit || ingestFlushable.exciseSpan.Valid() {
    1276            1 :                 // We could add deleted files to ve.
    1277            1 :                 ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
    1278            1 :         }
    1279              : 
    1280            1 :         ctx := context.Background()
    1281            1 :         overlapChecker := &overlapChecker{
    1282            1 :                 comparer: d.opts.Comparer,
    1283            1 :                 newIters: d.newIters,
    1284            1 :                 opts: IterOptions{
    1285            1 :                         logger:   d.opts.Logger,
    1286            1 :                         Category: categoryIngest,
    1287            1 :                 },
    1288            1 :                 v: c.version,
    1289            1 :         }
    1290            1 :         replacedFiles := make(map[base.FileNum][]newFileEntry)
    1291            1 :         for _, file := range ingestFlushable.files {
    1292            1 :                 var fileToSplit *fileMetadata
    1293            1 :                 var level int
    1294            1 : 
    1295            1 :                 // This file fits perfectly within the excise span, so we can slot it at L6.
    1296            1 :                 if ingestFlushable.exciseSpan.Valid() &&
    1297            1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
    1298            1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
    1299            1 :                         level = 6
    1300            1 :                 } else {
    1301            1 :                         // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
    1302            1 :                         lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
    1303            1 :                         if err != nil {
    1304            0 :                                 return nil, err
    1305            0 :                         }
    1306            1 :                         level, fileToSplit, err = ingestTargetLevel(
    1307            1 :                                 ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit,
    1308            1 :                         )
    1309            1 :                         if err != nil {
    1310            0 :                                 return nil, err
    1311            0 :                         }
    1312              :                 }
    1313              : 
    1314              :                 // Add the current flushableIngest file to the version.
    1315            1 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
    1316            1 :                 if fileToSplit != nil {
    1317            1 :                         ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
    1318            1 :                                 ingestFile: file.FileMetadata,
    1319            1 :                                 splitFile:  fileToSplit,
    1320            1 :                                 level:      level,
    1321            1 :                         })
    1322            1 :                 }
    1323            1 :                 levelMetrics := c.metrics[level]
    1324            1 :                 if levelMetrics == nil {
    1325            1 :                         levelMetrics = &LevelMetrics{}
    1326            1 :                         c.metrics[level] = levelMetrics
    1327            1 :                 }
    1328            1 :                 levelMetrics.BytesIngested += file.Size
    1329            1 :                 levelMetrics.TablesIngested++
    1330              :         }
    1331            1 :         if ingestFlushable.exciseSpan.Valid() {
    1332            1 :                 // Iterate through all levels and find files that intersect with exciseSpan.
    1333            1 :                 for l := range c.version.Levels {
    1334            1 :                         overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
    1335            1 :                         iter := overlaps.Iter()
    1336            1 : 
    1337            1 :                         for m := iter.First(); m != nil; m = iter.Next() {
    1338            1 :                                 newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
    1339            1 :                                 if err != nil {
    1340            0 :                                         return nil, err
    1341            0 :                                 }
    1342              : 
    1343            1 :                                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    1344            1 :                                         Level:   l,
    1345            1 :                                         FileNum: m.FileNum,
    1346            1 :                                 }]; !ok {
    1347            1 :                                         // We did not excise this file.
    1348            1 :                                         continue
    1349              :                                 }
    1350            1 :                                 replacedFiles[m.FileNum] = newFiles
    1351            1 :                                 updateLevelMetricsOnExcise(m, l, newFiles)
    1352              :                         }
    1353              :                 }
    1354              :         }
    1355              : 
    1356            1 :         if len(ingestSplitFiles) > 0 {
    1357            1 :                 if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
    1358            0 :                         return nil, err
    1359            0 :                 }
    1360              :         }
    1361              : 
    1362            1 :         return ve, nil
    1363              : }
    1364              : 
    1365              : // flush runs a compaction that copies the immutable memtables from memory to
    1366              : // disk.
    1367              : //
    1368              : // d.mu must be held when calling this, but the mutex may be dropped and
    1369              : // re-acquired during the course of this method.
    1370            1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
    1371            1 :         // NB: The flushable queue can contain flushables of type ingestedFlushable.
    1372            1 :         // The sstables in ingestedFlushable.files must be placed into the appropriate
    1373            1 :         // level in the lsm. Let's say the flushable queue contains a prefix of
    1374            1 :         // regular immutable memtables, then an ingestedFlushable, and then the
    1375            1 :         // mutable memtable. When the flush of the ingestedFlushable is performed,
    1376            1 :         // it needs an updated view of the lsm. That is, the prefix of immutable
    1377            1 :         // memtables must have already been flushed. Similarly, if there are two
    1378            1 :         // contiguous ingestedFlushables in the queue, then the first flushable must
    1379            1 :         // be flushed, so that the second flushable can see an updated view of the
    1380            1 :         // lsm.
    1381            1 :         //
    1382            1 :         // Given the above, we restrict flushes to either some prefix of regular
    1383            1 :         // memtables, or a single flushable of type ingestedFlushable. The DB.flush
    1384            1 :         // function will call DB.maybeScheduleFlush again, so a new flush to finish
    1385            1 :         // the remaining flush work should be scheduled right away.
    1386            1 :         //
    1387            1 :         // NB: Large batches placed in the flushable queue share the WAL with the
    1388            1 :         // previous memtable in the queue. We must ensure the property that both the
    1389            1 :         // large batch and the memtable with which it shares a WAL are flushed
    1390            1 :         // together. The property ensures that the minimum unflushed log number
    1391            1 :         // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
    1392            1 :         // returns true, and since the large batch will always be placed right after
    1393            1 :         // the memtable with which it shares a WAL, the property is naturally
    1394            1 :         // ensured. The large batch will always be placed after the memtable with
    1395            1 :         // which it shares a WAL because we ensure it in DB.commitWrite by holding
    1396            1 :         // the commitPipeline.mu and then holding DB.mu. As an extra defensive
    1397            1 :         // measure, if we try to flush the memtable without also flushing the
    1398            1 :         // flushable batch in the same flush, since the memtable and flushableBatch
    1399            1 :         // have the same logNum, the logNum invariant check below will trigger.
    1400            1 :         var n, inputs int
    1401            1 :         var inputBytes uint64
    1402            1 :         var ingest bool
    1403            1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1404            1 :                 if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
    1405            1 :                         if n == 0 {
    1406            1 :                                 // The first flushable is of type ingestedFlushable. Since these
    1407            1 :                                 // must be flushed individually, we perform a flush for just
    1408            1 :                                 // this.
    1409            1 :                                 if !f.readyForFlush() {
    1410            0 :                                         // This check is almost unnecessary, but we guard against it
    1411            0 :                                         // just in case this invariant changes in the future.
    1412            0 :                                         panic("pebble: ingestedFlushable should always be ready to flush.")
    1413              :                                 }
    1414              :                                 // By setting n = 1, we ensure that the first flushable(n == 0)
    1415              :                                 // is scheduled for a flush. The number of tables added is equal to the
    1416              :                                 // number of files in the ingest operation.
    1417            1 :                                 n = 1
    1418            1 :                                 inputs = len(f.files)
    1419            1 :                                 ingest = true
    1420            1 :                                 break
    1421            1 :                         } else {
    1422            1 :                                 // There was some prefix of flushables which weren't of type
    1423            1 :                                 // ingestedFlushable. So, perform a flush for those.
    1424            1 :                                 break
    1425              :                         }
    1426              :                 }
    1427            1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1428            0 :                         break
    1429              :                 }
    1430            1 :                 inputBytes += d.mu.mem.queue[n].inuseBytes()
    1431              :         }
    1432            1 :         if n == 0 {
    1433            0 :                 // None of the immutable memtables are ready for flushing.
    1434            0 :                 return 0, nil
    1435            0 :         }
    1436            1 :         if !ingest {
    1437            1 :                 // Flushes of memtables add the prefix of n memtables from the flushable
    1438            1 :                 // queue.
    1439            1 :                 inputs = n
    1440            1 :         }
    1441              : 
    1442              :         // Require that every memtable being flushed has a log number less than the
    1443              :         // new minimum unflushed log number.
    1444            1 :         minUnflushedLogNum := d.mu.mem.queue[n].logNum
    1445            1 :         if !d.opts.DisableWAL {
    1446            1 :                 for i := 0; i < n; i++ {
    1447            1 :                         if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
    1448            0 :                                 panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
    1449            0 :                                         n,
    1450            0 :                                         i, d.mu.mem.queue[i].flushable, logNum,
    1451            0 :                                         n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
    1452              :                         }
    1453              :                 }
    1454              :         }
    1455              : 
    1456            1 :         c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
    1457            1 :                 d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
    1458            1 :         if err != nil {
    1459            0 :                 return 0, err
    1460            0 :         }
    1461            1 :         d.addInProgressCompaction(c)
    1462            1 : 
    1463            1 :         jobID := d.newJobIDLocked()
    1464            1 :         d.opts.EventListener.FlushBegin(FlushInfo{
    1465            1 :                 JobID:      int(jobID),
    1466            1 :                 Input:      inputs,
    1467            1 :                 InputBytes: inputBytes,
    1468            1 :                 Ingest:     ingest,
    1469            1 :         })
    1470            1 :         startTime := d.timeNow()
    1471            1 : 
    1472            1 :         var ve *manifest.VersionEdit
    1473            1 :         var stats compact.Stats
    1474            1 :         // To determine the target level of the files in the ingestedFlushable, we
    1475            1 :         // need to acquire the logLock, and not release it for that duration. Since,
    1476            1 :         // we need to acquire the logLock below to perform the logAndApply step
    1477            1 :         // anyway, we create the VersionEdit for ingestedFlushable outside of
    1478            1 :         // runCompaction. For all other flush cases, we construct the VersionEdit
    1479            1 :         // inside runCompaction.
    1480            1 :         if c.kind != compactionKindIngestedFlushable {
    1481            1 :                 ve, stats, err = d.runCompaction(jobID, c)
    1482            1 :         }
    1483              : 
    1484              :         // Acquire logLock. This will be released either on an error, by way of
    1485              :         // logUnlock, or through a call to logAndApply if there is no error.
    1486            1 :         d.mu.versions.logLock()
    1487            1 : 
    1488            1 :         if c.kind == compactionKindIngestedFlushable {
    1489            1 :                 ve, err = d.runIngestFlush(c)
    1490            1 :         }
    1491              : 
    1492            1 :         info := FlushInfo{
    1493            1 :                 JobID:      int(jobID),
    1494            1 :                 Input:      inputs,
    1495            1 :                 InputBytes: inputBytes,
    1496            1 :                 Duration:   d.timeNow().Sub(startTime),
    1497            1 :                 Done:       true,
    1498            1 :                 Ingest:     ingest,
    1499            1 :                 Err:        err,
    1500            1 :         }
    1501            1 :         if err == nil {
    1502            1 :                 validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
    1503            1 :                 for i := range ve.NewFiles {
    1504            1 :                         e := &ve.NewFiles[i]
    1505            1 :                         info.Output = append(info.Output, e.Meta.TableInfo())
    1506            1 :                         // Ingested tables are not necessarily flushed to L0. Record the level of
    1507            1 :                         // each ingested file explicitly.
    1508            1 :                         if ingest {
    1509            1 :                                 info.IngestLevels = append(info.IngestLevels, e.Level)
    1510            1 :                         }
    1511              :                 }
    1512            1 :                 if len(ve.NewFiles) == 0 {
    1513            1 :                         info.Err = errEmptyTable
    1514            1 :                 }
    1515              : 
    1516              :                 // The flush succeeded or it produced an empty sstable. In either case we
    1517              :                 // want to bump the minimum unflushed log number to the log number of the
    1518              :                 // oldest unflushed memtable.
    1519            1 :                 ve.MinUnflushedLogNum = minUnflushedLogNum
    1520            1 :                 if c.kind != compactionKindIngestedFlushable {
    1521            1 :                         metrics := c.metrics[0]
    1522            1 :                         if d.opts.DisableWAL {
    1523            1 :                                 // If the WAL is disabled, every flushable has a zero [logSize],
    1524            1 :                                 // resulting in zero bytes in. Instead, use the number of bytes we
    1525            1 :                                 // flushed as the BytesIn. This ensures we get a reasonable w-amp
    1526            1 :                                 // calculation even when the WAL is disabled.
    1527            1 :                                 metrics.BytesIn = metrics.BytesFlushed
    1528            1 :                         } else {
    1529            1 :                                 for i := 0; i < n; i++ {
    1530            1 :                                         metrics.BytesIn += d.mu.mem.queue[i].logSize
    1531            1 :                                 }
    1532              :                         }
    1533            1 :                 } else {
    1534            1 :                         // c.kind == compactionKindIngestedFlushable && we could have deleted files due
    1535            1 :                         // to ingest-time splits or excises.
    1536            1 :                         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1537            1 :                         for c2 := range d.mu.compact.inProgress {
    1538            1 :                                 // Check if this compaction overlaps with the excise span. Note that just
    1539            1 :                                 // checking if the inputs individually overlap with the excise span
    1540            1 :                                 // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    1541            1 :                                 // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    1542            1 :                                 // doing a [c,d) excise at the same time as this compaction, we will have
    1543            1 :                                 // to error out the whole compaction as we can't guarantee it hasn't/won't
    1544            1 :                                 // write a file overlapping with the excise span.
    1545            1 :                                 if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
    1546            1 :                                         c2.cancel.Store(true)
    1547            1 :                                         continue
    1548              :                                 }
    1549              :                         }
    1550              : 
    1551            1 :                         if len(ve.DeletedFiles) > 0 {
    1552            1 :                                 // Iterate through all other compactions, and check if their inputs have
    1553            1 :                                 // been replaced due to an ingest-time split or excise. In that case,
    1554            1 :                                 // cancel the compaction.
    1555            1 :                                 for c2 := range d.mu.compact.inProgress {
    1556            1 :                                         for i := range c2.inputs {
    1557            1 :                                                 iter := c2.inputs[i].files.Iter()
    1558            1 :                                                 for f := iter.First(); f != nil; f = iter.Next() {
    1559            1 :                                                         if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
    1560            1 :                                                                 c2.cancel.Store(true)
    1561            1 :                                                                 break
    1562              :                                                         }
    1563              :                                                 }
    1564              :                                         }
    1565              :                                 }
    1566              :                         }
    1567              :                 }
    1568            1 :                 err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
    1569            1 :                         func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
    1570            1 :                 if err != nil {
    1571            0 :                         info.Err = err
    1572            0 :                 }
    1573            1 :         } else {
    1574            1 :                 // We won't be performing the logAndApply step because of the error,
    1575            1 :                 // so logUnlock.
    1576            1 :                 d.mu.versions.logUnlock()
    1577            1 :         }
    1578              : 
    1579              :         // If err != nil, then the flush will be retried, and we will recalculate
    1580              :         // these metrics.
    1581            1 :         if err == nil {
    1582            1 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    1583            1 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    1584            1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    1585            1 :         }
    1586              : 
    1587            1 :         d.clearCompactingState(c, err != nil)
    1588            1 :         delete(d.mu.compact.inProgress, c)
    1589            1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    1590            1 : 
    1591            1 :         var flushed flushableList
    1592            1 :         if err == nil {
    1593            1 :                 flushed = d.mu.mem.queue[:n]
    1594            1 :                 d.mu.mem.queue = d.mu.mem.queue[n:]
    1595            1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    1596            1 :                 d.updateTableStatsLocked(ve.NewFiles)
    1597            1 :                 if ingest {
    1598            1 :                         d.mu.versions.metrics.Flush.AsIngestCount++
    1599            1 :                         for _, l := range c.metrics {
    1600            1 :                                 d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
    1601            1 :                                 d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
    1602            1 :                         }
    1603              :                 }
    1604            1 :                 d.maybeTransitionSnapshotsToFileOnlyLocked()
    1605              : 
    1606              :         }
    1607              :         // Signal FlushEnd after installing the new readState. This helps for unit
    1608              :         // tests that use the callback to trigger a read using an iterator with
    1609              :         // IterOptions.OnlyReadGuaranteedDurable.
    1610            1 :         info.TotalDuration = d.timeNow().Sub(startTime)
    1611            1 :         d.opts.EventListener.FlushEnd(info)
    1612            1 : 
    1613            1 :         // The order of these operations matters here for ease of testing.
    1614            1 :         // Removing the reader reference first allows tests to be guaranteed that
    1615            1 :         // the memtable reservation has been released by the time a synchronous
    1616            1 :         // flush returns. readerUnrefLocked may also produce obsolete files so the
    1617            1 :         // call to deleteObsoleteFiles must happen after it.
    1618            1 :         for i := range flushed {
    1619            1 :                 flushed[i].readerUnrefLocked(true)
    1620            1 :         }
    1621              : 
    1622            1 :         d.deleteObsoleteFiles(jobID)
    1623            1 : 
    1624            1 :         // Mark all the memtables we flushed as flushed.
    1625            1 :         for i := range flushed {
    1626            1 :                 close(flushed[i].flushed)
    1627            1 :         }
    1628              : 
    1629            1 :         return inputBytes, err
    1630              : }
    1631              : 
    1632              : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
    1633              : // file-only" snapshots to be file-only if all their visible state has been
    1634              : // flushed to sstables.
    1635              : //
    1636              : // REQUIRES: d.mu.
    1637            1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
    1638            1 :         earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
    1639            1 :         currentVersion := d.mu.versions.currentVersion()
    1640            1 :         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
    1641            1 :                 if s.efos == nil {
    1642            1 :                         s = s.next
    1643            1 :                         continue
    1644              :                 }
    1645            1 :                 overlapsFlushable := false
    1646            1 :                 if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1647            1 :                         // There are some unflushed keys that are still visible to the EFOS.
    1648            1 :                         // Check if any memtables older than the EFOS contain keys within a
    1649            1 :                         // protected range of the EFOS. If no, we can transition.
    1650            1 :                         protectedRanges := make([]bounded, len(s.efos.protectedRanges))
    1651            1 :                         for i := range s.efos.protectedRanges {
    1652            1 :                                 protectedRanges[i] = s.efos.protectedRanges[i]
    1653            1 :                         }
    1654            1 :                         for i := range d.mu.mem.queue {
    1655            1 :                                 if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1656            1 :                                         // All keys in this memtable are newer than the EFOS. Skip this
    1657            1 :                                         // memtable.
    1658            1 :                                         continue
    1659              :                                 }
    1660              :                                 // NB: computePossibleOverlaps could have false positives, such as if
    1661              :                                 // the flushable is a flushable ingest and not a memtable. In that
    1662              :                                 // case we don't open the sstables to check; we just pessimistically
    1663              :                                 // assume an overlap.
    1664            1 :                                 d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
    1665            1 :                                         overlapsFlushable = true
    1666            1 :                                         return stopIteration
    1667            1 :                                 }, protectedRanges...)
    1668            1 :                                 if overlapsFlushable {
    1669            1 :                                         break
    1670              :                                 }
    1671              :                         }
    1672              :                 }
    1673            1 :                 if overlapsFlushable {
    1674            1 :                         s = s.next
    1675            1 :                         continue
    1676              :                 }
    1677            1 :                 currentVersion.Ref()
    1678            1 : 
    1679            1 :                 // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
    1680            1 :                 // case s.next would be nil. Save it before calling it.
    1681            1 :                 next := s.next
    1682            1 :                 _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
    1683            1 :                 s = next
    1684              :         }
    1685              : }
    1686              : 
    1687              : // maybeScheduleCompactionAsync should be used when
    1688              : // we want to possibly schedule a compaction, but don't
    1689              : // want to eat the cost of running maybeScheduleCompaction.
    1690              : // This method should be launched in a separate goroutine.
    1691              : // d.mu must not be held when this is called.
    1692            0 : func (d *DB) maybeScheduleCompactionAsync() {
    1693            0 :         defer d.compactionSchedulers.Done()
    1694            0 : 
    1695            0 :         d.mu.Lock()
    1696            0 :         d.maybeScheduleCompaction()
    1697            0 :         d.mu.Unlock()
    1698            0 : }
    1699              : 
    1700              : // maybeScheduleCompaction schedules a compaction if necessary.
    1701              : //
    1702              : // d.mu must be held when calling this.
    1703            1 : func (d *DB) maybeScheduleCompaction() {
    1704            1 :         d.maybeScheduleCompactionPicker(pickAuto)
    1705            1 : }
    1706              : 
    1707            1 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1708            1 :         return picker.pickAuto(env)
    1709            1 : }
    1710              : 
    1711            1 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1712            1 :         return picker.pickElisionOnlyCompaction(env)
    1713            1 : }
    1714              : 
    1715              : // tryScheduleDownloadCompaction tries to start a download compaction.
    1716              : //
    1717              : // Returns true if we started a download compaction (or completed it
    1718              : // immediately because it is a no-op or we hit an error).
    1719              : //
    1720              : // Requires d.mu to be held. Updates d.mu.compact.downloads.
    1721            1 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
    1722            1 :         vers := d.mu.versions.currentVersion()
    1723            1 :         for i := 0; i < len(d.mu.compact.downloads); {
    1724            1 :                 download := d.mu.compact.downloads[i]
    1725            1 :                 switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
    1726            1 :                 case launchedCompaction:
    1727            1 :                         return true
    1728            1 :                 case didNotLaunchCompaction:
    1729            1 :                         // See if we can launch a compaction for another download task.
    1730            1 :                         i++
    1731            1 :                 case downloadTaskCompleted:
    1732            1 :                         // Task is completed and must be removed.
    1733            1 :                         d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
    1734              :                 }
    1735              :         }
    1736            1 :         return false
    1737              : }
    1738              : 
    1739              : // maybeScheduleCompactionPicker schedules a compaction if necessary,
    1740              : // calling `pickFunc` to pick automatic compactions.
    1741              : //
    1742              : // Requires d.mu to be held.
    1743              : func (d *DB) maybeScheduleCompactionPicker(
    1744              :         pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    1745            1 : ) {
    1746            1 :         if d.closed.Load() != nil || d.opts.ReadOnly {
    1747            1 :                 return
    1748            1 :         }
    1749            1 :         maxCompactions := d.opts.MaxConcurrentCompactions()
    1750            1 :         maxDownloads := d.opts.MaxConcurrentDownloads()
    1751            1 : 
    1752            1 :         if d.mu.compact.compactingCount >= maxCompactions &&
    1753            1 :                 (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
    1754            1 :                 if len(d.mu.compact.manual) > 0 {
    1755            1 :                         // Inability to run head blocks later manual compactions.
    1756            1 :                         d.mu.compact.manual[0].retries++
    1757            1 :                 }
    1758            1 :                 return
    1759              :         }
    1760              : 
    1761              :         // Compaction picking needs a coherent view of a Version. In particular, we
    1762              :         // need to exclude concurrent ingestions from making a decision on which level
    1763              :         // to ingest into that conflicts with our compaction
    1764              :         // decision. versionSet.logLock provides the necessary mutual exclusion.
    1765            1 :         d.mu.versions.logLock()
    1766            1 :         defer d.mu.versions.logUnlock()
    1767            1 : 
    1768            1 :         // Check for the closed flag again, in case the DB was closed while we were
    1769            1 :         // waiting for logLock().
    1770            1 :         if d.closed.Load() != nil {
    1771            1 :                 return
    1772            1 :         }
    1773              : 
    1774            1 :         env := compactionEnv{
    1775            1 :                 diskAvailBytes:          d.diskAvailBytes.Load(),
    1776            1 :                 earliestSnapshotSeqNum:  d.mu.snapshots.earliest(),
    1777            1 :                 earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
    1778            1 :         }
    1779            1 : 
    1780            1 :         if d.mu.compact.compactingCount < maxCompactions {
    1781            1 :                 // Check for delete-only compactions first, because they're expected to be
    1782            1 :                 // cheap and reduce future compaction work.
    1783            1 :                 if !d.opts.private.disableDeleteOnlyCompactions &&
    1784            1 :                         !d.opts.DisableAutomaticCompactions &&
    1785            1 :                         len(d.mu.compact.deletionHints) > 0 {
    1786            1 :                         d.tryScheduleDeleteOnlyCompaction()
    1787            1 :                 }
    1788              : 
    1789            1 :                 for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
    1790            1 :                         if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
    1791            1 :                                 // Inability to run head blocks later manual compactions.
    1792            1 :                                 manual.retries++
    1793            1 :                                 break
    1794              :                         }
    1795            1 :                         d.mu.compact.manual = d.mu.compact.manual[1:]
    1796              :                 }
    1797              : 
    1798            1 :                 for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
    1799            1 :                         d.tryScheduleAutoCompaction(env, pickFunc) {
    1800            1 :                 }
    1801              :         }
    1802              : 
    1803            1 :         for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
    1804            1 :                 d.tryScheduleDownloadCompaction(env, maxDownloads) {
    1805            1 :         }
    1806              : }
    1807              : 
    1808              : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
    1809              : // for all files that can be deleted as suggested by deletionHints.
    1810              : //
    1811              : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
    1812            1 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
    1813            1 :         v := d.mu.versions.currentVersion()
    1814            1 :         snapshots := d.mu.snapshots.toSlice()
    1815            1 :         // We need to save the value of exciseEnabled in the compaction itself, as
    1816            1 :         // it can change dynamically between now and when the compaction runs.
    1817            1 :         exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables &&
    1818            1 :                 d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises()
    1819            1 :         // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
    1820            1 :         // and passed-in as an option during Open.
    1821            1 :         limiter := d.opts.Experimental.CompactionLimiter
    1822            1 :         var slot base.CompactionSlot
    1823            1 :         // TODO(bilal): Should we always take a slot without permission?
    1824            1 :         if n := len(d.getInProgressCompactionInfoLocked(nil)); n == 0 {
    1825            1 :                 // We are not running a compaction at the moment. We should take a compaction slot
    1826            1 :                 // without permission.
    1827            1 :                 slot = limiter.TookWithoutPermission(context.TODO())
    1828            1 :         } else {
    1829            1 :                 var err error
    1830            1 :                 slot, err = limiter.RequestSlot(context.TODO())
    1831            1 :                 if err != nil {
    1832            0 :                         d.opts.EventListener.BackgroundError(err)
    1833            0 :                         return
    1834            0 :                 }
    1835            1 :                 if slot == nil {
    1836            0 :                         // The limiter is denying us a compaction slot. Yield to other work.
    1837            0 :                         return
    1838            0 :                 }
    1839              :         }
    1840            1 :         inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled)
    1841            1 :         d.mu.compact.deletionHints = unresolvedHints
    1842            1 : 
    1843            1 :         if len(inputs) > 0 {
    1844            1 :                 c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
    1845            1 :                 c.slot = slot
    1846            1 :                 d.mu.compact.compactingCount++
    1847            1 :                 d.addInProgressCompaction(c)
    1848            1 :                 go d.compact(c, nil)
    1849            1 :         } else {
    1850            1 :                 slot.Release(0 /* totalBytesWritten */)
    1851            1 :         }
    1852              : }
    1853              : 
    1854              : // tryScheduleManualCompaction tries to kick off the given manual compaction.
    1855              : //
    1856              : // Returns false if we are not able to run this compaction at this time.
    1857              : //
    1858              : // Requires d.mu to be held.
    1859            1 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
    1860            1 :         v := d.mu.versions.currentVersion()
    1861            1 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    1862            1 :         pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
    1863            1 :         if pc == nil {
    1864            1 :                 if !retryLater {
    1865            1 :                         // Manual compaction is a no-op. Signal completion and exit.
    1866            1 :                         manual.done <- nil
    1867            1 :                         return true
    1868            1 :                 }
    1869              :                 // We are not able to run this manual compaction at this time.
    1870            1 :                 return false
    1871              :         }
    1872              : 
    1873            1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), nil /* compactionSlot */)
    1874            1 :         d.mu.compact.compactingCount++
    1875            1 :         d.addInProgressCompaction(c)
    1876            1 :         go d.compact(c, manual.done)
    1877            1 :         return true
    1878              : }
    1879              : 
    1880              : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
    1881              : //
    1882              : // Returns false if no automatic compactions are necessary or able to run at
    1883              : // this time.
    1884              : //
    1885              : // Requires d.mu to be held.
    1886              : func (d *DB) tryScheduleAutoCompaction(
    1887              :         env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    1888            1 : ) bool {
    1889            1 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    1890            1 :         env.readCompactionEnv = readCompactionEnv{
    1891            1 :                 readCompactions:          &d.mu.compact.readCompactions,
    1892            1 :                 flushing:                 d.mu.compact.flushing || d.passedFlushThreshold(),
    1893            1 :                 rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
    1894            1 :         }
    1895            1 :         // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
    1896            1 :         // and passed-in as an option during Open.
    1897            1 :         limiter := d.opts.Experimental.CompactionLimiter
    1898            1 :         var slot base.CompactionSlot
    1899            1 :         if n := len(env.inProgressCompactions); n == 0 {
    1900            1 :                 // We are not running a compaction at the moment. We should take a compaction slot
    1901            1 :                 // without permission.
    1902            1 :                 slot = limiter.TookWithoutPermission(context.TODO())
    1903            1 :         } else {
    1904            1 :                 var err error
    1905            1 :                 slot, err = limiter.RequestSlot(context.TODO())
    1906            1 :                 if err != nil {
    1907            0 :                         d.opts.EventListener.BackgroundError(err)
    1908            0 :                         return false
    1909            0 :                 }
    1910            1 :                 if slot == nil {
    1911            0 :                         // The limiter is denying us a compaction slot. Yield to other work.
    1912            0 :                         return false
    1913            0 :                 }
    1914              :         }
    1915            1 :         pc := pickFunc(d.mu.versions.picker, env)
    1916            1 :         if pc == nil {
    1917            1 :                 slot.Release(0 /* bytesWritten */)
    1918            1 :                 return false
    1919            1 :         }
    1920            1 :         var inputSize uint64
    1921            1 :         for i := range pc.inputs {
    1922            1 :                 inputSize += pc.inputs[i].files.SizeSum()
    1923            1 :         }
    1924            1 :         slot.CompactionSelected(pc.startLevel.level, pc.outputLevel.level, inputSize)
    1925            1 : 
    1926            1 :         // Responsibility for releasing slot passes over to the compaction.
    1927            1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), slot)
    1928            1 :         d.mu.compact.compactingCount++
    1929            1 :         d.addInProgressCompaction(c)
    1930            1 :         go d.compact(c, nil)
    1931            1 :         return true
    1932              : }
    1933              : 
    1934              : // deleteCompactionHintType indicates whether the deleteCompactionHint was
    1935              : // generated from a span containing a range del (point key only), a range key
    1936              : // delete (range key only), or both a point and range key.
    1937              : type deleteCompactionHintType uint8
    1938              : 
    1939              : const (
    1940              :         // NOTE: While these are primarily used as enumeration types, they are also
    1941              :         // used for some bitwise operations. Care should be taken when updating.
    1942              :         deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
    1943              :         deleteCompactionHintTypePointKeyOnly
    1944              :         deleteCompactionHintTypeRangeKeyOnly
    1945              :         deleteCompactionHintTypePointAndRangeKey
    1946              : )
    1947              : 
    1948              : // String implements fmt.Stringer.
    1949            0 : func (h deleteCompactionHintType) String() string {
    1950            0 :         switch h {
    1951            0 :         case deleteCompactionHintTypeUnknown:
    1952            0 :                 return "unknown"
    1953            0 :         case deleteCompactionHintTypePointKeyOnly:
    1954            0 :                 return "point-key-only"
    1955            0 :         case deleteCompactionHintTypeRangeKeyOnly:
    1956            0 :                 return "range-key-only"
    1957            0 :         case deleteCompactionHintTypePointAndRangeKey:
    1958            0 :                 return "point-and-range-key"
    1959            0 :         default:
    1960            0 :                 panic(fmt.Sprintf("unknown hint type: %d", h))
    1961              :         }
    1962              : }
    1963              : 
    1964              : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
    1965              : // keyspan.Keys.
    1966            1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
    1967            1 :         var hintType deleteCompactionHintType
    1968            1 :         for _, k := range keys {
    1969            1 :                 switch k.Kind() {
    1970            1 :                 case base.InternalKeyKindRangeDelete:
    1971            1 :                         hintType |= deleteCompactionHintTypePointKeyOnly
    1972            1 :                 case base.InternalKeyKindRangeKeyDelete:
    1973            1 :                         hintType |= deleteCompactionHintTypeRangeKeyOnly
    1974            0 :                 default:
    1975            0 :                         panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
    1976              :                 }
    1977              :         }
    1978            1 :         return hintType
    1979              : }
    1980              : 
    1981              : // A deleteCompactionHint records a user key and sequence number span that has been
    1982              : // deleted by a range tombstone. A hint is recorded if at least one sstable
    1983              : // falls completely within both the user key and sequence number spans.
    1984              : // Once the tombstones and the observed completely-contained sstables fall
    1985              : // into the same snapshot stripe, a delete-only compaction may delete any
    1986              : // sstables within the range.
    1987              : type deleteCompactionHint struct {
    1988              :         // The type of key span that generated this hint (point key, range key, or
    1989              :         // both).
    1990              :         hintType deleteCompactionHintType
    1991              :         // start and end are user keys specifying a key range [start, end) of
    1992              :         // deleted keys.
    1993              :         start []byte
    1994              :         end   []byte
    1995              :         // The level of the file containing the range tombstone(s) when the hint
    1996              :         // was created. Only lower levels need to be searched for files that may
    1997              :         // be deleted.
    1998              :         tombstoneLevel int
    1999              :         // The file containing the range tombstone(s) that created the hint.
    2000              :         tombstoneFile *fileMetadata
    2001              :         // The smallest and largest sequence numbers of the abutting tombstones
    2002              :         // merged to form this hint. All of a tables' keys must be less than the
    2003              :         // tombstone smallest sequence number to be deleted. All of a tables'
    2004              :         // sequence numbers must fall into the same snapshot stripe as the
    2005              :         // tombstone largest sequence number to be deleted.
    2006              :         tombstoneLargestSeqNum  base.SeqNum
    2007              :         tombstoneSmallestSeqNum base.SeqNum
    2008              :         // The smallest sequence number of a sstable that was found to be covered
    2009              :         // by this hint. The hint cannot be resolved until this sequence number is
    2010              :         // in the same snapshot stripe as the largest tombstone sequence number.
    2011              :         // This is set when a hint is created, so the LSM may look different and
    2012              :         // notably no longer contain the sstable that contained the key at this
    2013              :         // sequence number.
    2014              :         fileSmallestSeqNum base.SeqNum
    2015              : }
    2016              : 
    2017              : type deletionHintOverlap int8
    2018              : 
    2019              : const (
    2020              :         // hintDoesNotApply indicates that the hint does not apply to the file.
    2021              :         hintDoesNotApply deletionHintOverlap = iota
    2022              :         // hintExcisesFile indicates that the hint excises a portion of the file,
    2023              :         // and the format major version of the DB supports excises.
    2024              :         hintExcisesFile
    2025              :         // hintDeletesFile indicates that the hint deletes the entirety of the file.
    2026              :         hintDeletesFile
    2027              : )
    2028              : 
    2029            0 : func (h deleteCompactionHint) String() string {
    2030            0 :         return fmt.Sprintf(
    2031            0 :                 "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
    2032            0 :                 h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
    2033            0 :                 h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
    2034            0 :                 h.hintType,
    2035            0 :         )
    2036            0 : }
    2037              : 
    2038              : func (h *deleteCompactionHint) canDeleteOrExcise(
    2039              :         cmp Compare, m *fileMetadata, snapshots compact.Snapshots, exciseEnabled bool,
    2040            1 : ) deletionHintOverlap {
    2041            1 :         // The file can only be deleted if all of its keys are older than the
    2042            1 :         // earliest tombstone aggregated into the hint. Note that we use
    2043            1 :         // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
    2044            1 :         // zeroes sequence numbers. A compaction may zero the sequence number of a
    2045            1 :         // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
    2046            1 :         // zero. If we looked at m.LargestSeqNum, the resulting output file would
    2047            1 :         // appear to not contain any keys more recent than the oldest tombstone. To
    2048            1 :         // avoid this error, the largest pre-zeroing sequence number is maintained
    2049            1 :         // in LargestSeqNumAbsolute and used here to make the determination whether
    2050            1 :         // the file's keys are older than all of the hint's tombstones.
    2051            1 :         if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
    2052            1 :                 return hintDoesNotApply
    2053            1 :         }
    2054              : 
    2055              :         // The file's oldest key must  be in the same snapshot stripe as the
    2056              :         // newest tombstone. NB: We already checked the hint's sequence numbers,
    2057              :         // but this file's oldest sequence number might be lower than the hint's
    2058              :         // smallest sequence number despite the file falling within the key range
    2059              :         // if this file was constructed after the hint by a compaction.
    2060            1 :         if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
    2061            0 :                 return hintDoesNotApply
    2062            0 :         }
    2063              : 
    2064            1 :         switch h.hintType {
    2065            1 :         case deleteCompactionHintTypePointKeyOnly:
    2066            1 :                 // A hint generated by a range del span cannot delete tables that contain
    2067            1 :                 // range keys.
    2068            1 :                 if m.HasRangeKeys {
    2069            1 :                         return hintDoesNotApply
    2070            1 :                 }
    2071            1 :         case deleteCompactionHintTypeRangeKeyOnly:
    2072            1 :                 // A hint generated by a range key del span cannot delete tables that
    2073            1 :                 // contain point keys.
    2074            1 :                 if m.HasPointKeys {
    2075            1 :                         return hintDoesNotApply
    2076            1 :                 }
    2077            1 :         case deleteCompactionHintTypePointAndRangeKey:
    2078              :                 // A hint from a span that contains both range dels *and* range keys can
    2079              :                 // only be deleted if both bounds fall within the hint. The next check takes
    2080              :                 // care of this.
    2081            0 :         default:
    2082            0 :                 panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
    2083              :         }
    2084            1 :         if cmp(h.start, m.Smallest.UserKey) <= 0 &&
    2085            1 :                 base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0 {
    2086            1 :                 return hintDeletesFile
    2087            1 :         }
    2088            1 :         if !exciseEnabled {
    2089            1 :                 // The file's keys must be completely contained within the hint range; excises
    2090            1 :                 // aren't allowed.
    2091            1 :                 return hintDoesNotApply
    2092            1 :         }
    2093              :         // Check for any overlap. In cases of partial overlap, we can excise the part of the file
    2094              :         // that overlaps with the deletion hint.
    2095            1 :         if cmp(h.end, m.Smallest.UserKey) > 0 &&
    2096            1 :                 (m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0) {
    2097            1 :                 return hintExcisesFile
    2098            1 :         }
    2099            1 :         return hintDoesNotApply
    2100              : }
    2101              : 
    2102              : // checkDeleteCompactionHints checks the passed-in deleteCompactionHints for those that
    2103              : // can be resolved and those that cannot. A hint is considered resolved when its largest
    2104              : // tombstone sequence number and the smallest sequence number of covered files fall in
    2105              : // the same snapshot stripe. No more than maxHintsPerDeleteOnlyCompaction will be resolved
    2106              : // per method call. Resolved and unresolved hints are returned in separate return values.
    2107              : // The files that the resolved hints apply to, are returned as compactionLevels.
    2108              : func checkDeleteCompactionHints(
    2109              :         cmp Compare,
    2110              :         v *version,
    2111              :         hints []deleteCompactionHint,
    2112              :         snapshots compact.Snapshots,
    2113              :         exciseEnabled bool,
    2114            1 : ) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
    2115            1 :         var files map[*fileMetadata]bool
    2116            1 :         var byLevel [numLevels][]*fileMetadata
    2117            1 : 
    2118            1 :         // Delete-only compactions can be quadratic (O(mn)) in terms of runtime
    2119            1 :         // where m = number of files in the delete-only compaction and n = number
    2120            1 :         // of resolved hints. To prevent these from growing unbounded, we cap
    2121            1 :         // the number of hints we resolve for one delete-only compaction. This
    2122            1 :         // cap only applies if exciseEnabled == true.
    2123            1 :         const maxHintsPerDeleteOnlyCompaction = 10
    2124            1 : 
    2125            1 :         unresolvedHints := hints[:0]
    2126            1 :         // Lazily populate resolvedHints, similar to files above.
    2127            1 :         resolvedHints := make([]deleteCompactionHint, 0)
    2128            1 :         for _, h := range hints {
    2129            1 :                 // Check each compaction hint to see if it's resolvable. Resolvable
    2130            1 :                 // hints are removed and trigger a delete-only compaction if any files
    2131            1 :                 // in the current LSM still meet their criteria. Unresolvable hints
    2132            1 :                 // are saved and don't trigger a delete-only compaction.
    2133            1 :                 //
    2134            1 :                 // When a compaction hint is created, the sequence numbers of the
    2135            1 :                 // range tombstones and the covered file with the oldest key are
    2136            1 :                 // recorded. The largest tombstone sequence number and the smallest
    2137            1 :                 // file sequence number must be in the same snapshot stripe for the
    2138            1 :                 // hint to be resolved. The below graphic models a compaction hint
    2139            1 :                 // covering the keyspace [b, r). The hint completely contains two
    2140            1 :                 // files, 000002 and 000003. The file 000003 contains the lowest
    2141            1 :                 // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
    2142            1 :                 // the highest tombstone sequence number incorporated into the hint.
    2143            1 :                 // The hint may be resolved only once the snapshots at #100, #180 and
    2144            1 :                 // #210 are all closed. File 000001 is not included within the hint
    2145            1 :                 // because it extends beyond the range tombstones in user key space.
    2146            1 :                 //
    2147            1 :                 // 250
    2148            1 :                 //
    2149            1 :                 //       |-b...230:h-|
    2150            1 :                 // _____________________________________________________ snapshot #210
    2151            1 :                 // 200               |--h.RANGEDEL.200:r--|
    2152            1 :                 //
    2153            1 :                 // _____________________________________________________ snapshot #180
    2154            1 :                 //
    2155            1 :                 // 150                     +--------+
    2156            1 :                 //           +---------+   | 000003 |
    2157            1 :                 //           | 000002  |   |        |
    2158            1 :                 //           +_________+   |        |
    2159            1 :                 // 100_____________________|________|___________________ snapshot #100
    2160            1 :                 //                         +--------+
    2161            1 :                 // _____________________________________________________ snapshot #70
    2162            1 :                 //                             +---------------+
    2163            1 :                 //  50                         | 000001        |
    2164            1 :                 //                             |               |
    2165            1 :                 //                             +---------------+
    2166            1 :                 // ______________________________________________________________
    2167            1 :                 //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2168            1 : 
    2169            1 :                 if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) ||
    2170            1 :                         (len(resolvedHints) >= maxHintsPerDeleteOnlyCompaction && exciseEnabled) {
    2171            1 :                         // Cannot resolve yet.
    2172            1 :                         unresolvedHints = append(unresolvedHints, h)
    2173            1 :                         continue
    2174              :                 }
    2175              : 
    2176              :                 // The hint h will be resolved and dropped, if it either affects no files at all
    2177              :                 // or if the number of files it creates (eg. through excision) is less than or
    2178              :                 // equal to the number of files it deletes. First, determine how many files are
    2179              :                 // affected by this hint.
    2180            1 :                 filesDeletedByCurrentHint := 0
    2181            1 :                 var filesDeletedByLevel [7][]*fileMetadata
    2182            1 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2183            1 :                         overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
    2184            1 :                         iter := overlaps.Iter()
    2185            1 : 
    2186            1 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2187            1 :                                 doesHintApply := h.canDeleteOrExcise(cmp, m, snapshots, exciseEnabled)
    2188            1 :                                 if m.IsCompacting() || doesHintApply == hintDoesNotApply || files[m] {
    2189            1 :                                         continue
    2190              :                                 }
    2191            1 :                                 switch doesHintApply {
    2192            1 :                                 case hintDeletesFile:
    2193            1 :                                         filesDeletedByCurrentHint++
    2194            1 :                                 case hintExcisesFile:
    2195            1 :                                         // Account for the original file being deleted.
    2196            1 :                                         filesDeletedByCurrentHint++
    2197            1 :                                         // An excise could produce up to 2 new files. If the hint
    2198            1 :                                         // leaves a fragment of the file on the left, decrement
    2199            1 :                                         // the counter once. If the hint leaves a fragment of the
    2200            1 :                                         // file on the right, decrement the counter once.
    2201            1 :                                         if cmp(h.start, m.Smallest.UserKey) > 0 {
    2202            1 :                                                 filesDeletedByCurrentHint--
    2203            1 :                                         }
    2204            1 :                                         if m.UserKeyBounds().End.IsUpperBoundFor(cmp, h.end) {
    2205            1 :                                                 filesDeletedByCurrentHint--
    2206            1 :                                         }
    2207              :                                 }
    2208            1 :                                 filesDeletedByLevel[l] = append(filesDeletedByLevel[l], m)
    2209              :                         }
    2210              :                 }
    2211            1 :                 if filesDeletedByCurrentHint < 0 {
    2212            1 :                         // This hint does not delete a sufficient number of files to warrant
    2213            1 :                         // a delete-only compaction at this stage. Drop it (ie. don't add it
    2214            1 :                         // to either resolved or unresolved hints) so it doesn't stick around
    2215            1 :                         // forever.
    2216            1 :                         continue
    2217              :                 }
    2218              :                 // This hint will be resolved and dropped.
    2219            1 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2220            1 :                         byLevel[l] = append(byLevel[l], filesDeletedByLevel[l]...)
    2221            1 :                         for _, m := range filesDeletedByLevel[l] {
    2222            1 :                                 if files == nil {
    2223            1 :                                         // Construct files lazily, assuming most calls will not
    2224            1 :                                         // produce delete-only compactions.
    2225            1 :                                         files = make(map[*fileMetadata]bool)
    2226            1 :                                 }
    2227            1 :                                 files[m] = true
    2228              :                         }
    2229              :                 }
    2230            1 :                 resolvedHints = append(resolvedHints, h)
    2231              :         }
    2232              : 
    2233            1 :         var compactLevels []compactionLevel
    2234            1 :         for l, files := range byLevel {
    2235            1 :                 if len(files) == 0 {
    2236            1 :                         continue
    2237              :                 }
    2238            1 :                 compactLevels = append(compactLevels, compactionLevel{
    2239            1 :                         level: l,
    2240            1 :                         files: manifest.NewLevelSliceKeySorted(cmp, files),
    2241            1 :                 })
    2242              :         }
    2243            1 :         return compactLevels, resolvedHints, unresolvedHints
    2244              : }
    2245              : 
    2246            1 : func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet {
    2247            1 :         activity := "compact"
    2248            1 :         if len(c.flushing) != 0 {
    2249            0 :                 activity = "flush"
    2250            0 :         }
    2251            1 :         level := "L?"
    2252            1 :         // Delete-only compactions don't have an output level.
    2253            1 :         if c.outputLevel != nil {
    2254            1 :                 level = fmt.Sprintf("L%d", c.outputLevel.level)
    2255            1 :         }
    2256            1 :         if kc := d.opts.Experimental.UserKeyCategories; kc.Len() > 0 {
    2257            0 :                 cat := kc.CategorizeKeyRange(c.smallest.UserKey, c.largest.UserKey)
    2258            0 :                 return pprof.Labels("pebble", activity, "output-level", level, "key-type", cat)
    2259            0 :         }
    2260            1 :         return pprof.Labels("pebble", activity, "output-level", level)
    2261              : }
    2262              : 
    2263              : // compact runs one compaction and maybe schedules another call to compact.
    2264            1 : func (d *DB) compact(c *compaction, errChannel chan error) {
    2265            1 :         pprof.Do(context.Background(), d.compactionPprofLabels(c), func(context.Context) {
    2266            1 :                 d.mu.Lock()
    2267            1 :                 defer d.mu.Unlock()
    2268            1 :                 if err := d.compact1(c, errChannel); err != nil {
    2269            1 :                         // TODO(peter): count consecutive compaction errors and backoff.
    2270            1 :                         d.opts.EventListener.BackgroundError(err)
    2271            1 :                 }
    2272            1 :                 if c.isDownload {
    2273            1 :                         d.mu.compact.downloadingCount--
    2274            1 :                 } else {
    2275            1 :                         d.mu.compact.compactingCount--
    2276            1 :                 }
    2277            1 :                 delete(d.mu.compact.inProgress, c)
    2278            1 :                 // Add this compaction's duration to the cumulative duration. NB: This
    2279            1 :                 // must be atomic with the above removal of c from
    2280            1 :                 // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
    2281            1 :                 // miss or double count a completing compaction's duration.
    2282            1 :                 d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
    2283            1 : 
    2284            1 :                 // The previous compaction may have produced too many files in a
    2285            1 :                 // level, so reschedule another compaction if needed.
    2286            1 :                 d.maybeScheduleCompaction()
    2287            1 :                 d.mu.compact.cond.Broadcast()
    2288              :         })
    2289              : }
    2290              : 
    2291              : // cleanupVersionEdit cleans up any on-disk artifacts that were created
    2292              : // for the application of a versionEdit that is no longer going to be applied.
    2293              : //
    2294              : // d.mu must be held when calling this method.
    2295            1 : func (d *DB) cleanupVersionEdit(ve *versionEdit) {
    2296            1 :         obsoleteFiles := make([]*fileBacking, 0, len(ve.NewFiles))
    2297            1 :         deletedFiles := make(map[base.FileNum]struct{})
    2298            1 :         for key := range ve.DeletedFiles {
    2299            1 :                 deletedFiles[key.FileNum] = struct{}{}
    2300            1 :         }
    2301            1 :         for i := range ve.NewFiles {
    2302            1 :                 if ve.NewFiles[i].Meta.Virtual {
    2303            1 :                         // We handle backing files separately.
    2304            1 :                         continue
    2305              :                 }
    2306            1 :                 if _, ok := deletedFiles[ve.NewFiles[i].Meta.FileNum]; ok {
    2307            1 :                         // This file is being moved in this ve to a different level.
    2308            1 :                         // Don't mark it as obsolete.
    2309            1 :                         continue
    2310              :                 }
    2311            1 :                 obsoleteFiles = append(obsoleteFiles, ve.NewFiles[i].Meta.PhysicalMeta().FileBacking)
    2312              :         }
    2313            1 :         for i := range ve.CreatedBackingTables {
    2314            1 :                 if ve.CreatedBackingTables[i].IsUnused() {
    2315            0 :                         obsoleteFiles = append(obsoleteFiles, ve.CreatedBackingTables[i])
    2316            0 :                 }
    2317              :         }
    2318            1 :         for i := range obsoleteFiles {
    2319            1 :                 // Add this file to zombie tables as well, as the versionSet
    2320            1 :                 // asserts on whether every obsolete file was at one point
    2321            1 :                 // marked zombie.
    2322            1 :                 d.mu.versions.zombieTables[obsoleteFiles[i].DiskFileNum] = objectInfo{
    2323            1 :                         fileInfo: fileInfo{
    2324            1 :                                 FileNum:  obsoleteFiles[i].DiskFileNum,
    2325            1 :                                 FileSize: obsoleteFiles[i].Size,
    2326            1 :                         },
    2327            1 :                         // TODO(bilal): This is harmless if it's wrong, as it only causes
    2328            1 :                         // incorrect accounting for the size of it in metrics. Currently
    2329            1 :                         // all compactions only write to local files anyway except with
    2330            1 :                         // disaggregated storage; if this becomes the norm, we should do
    2331            1 :                         // an objprovider lookup here.
    2332            1 :                         isLocal: true,
    2333            1 :                 }
    2334            1 :         }
    2335            1 :         d.mu.versions.addObsoleteLocked(obsoleteFiles)
    2336              : }
    2337              : 
    2338              : // compact1 runs one compaction.
    2339              : //
    2340              : // d.mu must be held when calling this, but the mutex may be dropped and
    2341              : // re-acquired during the course of this method.
    2342            1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
    2343            1 :         if errChannel != nil {
    2344            1 :                 defer func() {
    2345            1 :                         errChannel <- err
    2346            1 :                 }()
    2347              :         }
    2348              : 
    2349            1 :         jobID := d.newJobIDLocked()
    2350            1 :         info := c.makeInfo(jobID)
    2351            1 :         d.opts.EventListener.CompactionBegin(info)
    2352            1 :         startTime := d.timeNow()
    2353            1 : 
    2354            1 :         ve, stats, err := d.runCompaction(jobID, c)
    2355            1 : 
    2356            1 :         info.Duration = d.timeNow().Sub(startTime)
    2357            1 :         if err == nil {
    2358            1 :                 validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
    2359            1 :                 err = func() error {
    2360            1 :                         var err error
    2361            1 :                         d.mu.versions.logLock()
    2362            1 :                         // Check if this compaction had a conflicting operation (eg. a d.excise())
    2363            1 :                         // that necessitates it restarting from scratch. Note that since we hold
    2364            1 :                         // the manifest lock, we don't expect this bool to change its value
    2365            1 :                         // as only the holder of the manifest lock will ever write to it.
    2366            1 :                         if c.cancel.Load() {
    2367            1 :                                 d.mu.versions.metrics.Compact.CancelledCount++
    2368            1 :                                 d.mu.versions.metrics.Compact.CancelledBytes += c.bytesWritten
    2369            1 : 
    2370            1 :                                 err = firstError(err, ErrCancelledCompaction)
    2371            1 :                                 // This is the first time we've seen a cancellation during the
    2372            1 :                                 // life of this compaction (or the original condition on err == nil
    2373            1 :                                 // would not have been true). We should delete any tables already
    2374            1 :                                 // created, as d.runCompaction did not do that.
    2375            1 :                                 d.cleanupVersionEdit(ve)
    2376            1 :                                 // logAndApply calls logUnlock. If we didn't call it, we need to call
    2377            1 :                                 // logUnlock ourselves.
    2378            1 :                                 d.mu.versions.logUnlock()
    2379            1 :                                 return err
    2380            1 :                         }
    2381            1 :                         return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
    2382            1 :                                 return d.getInProgressCompactionInfoLocked(c)
    2383            1 :                         })
    2384              :                 }()
    2385              :         }
    2386              : 
    2387            1 :         info.Done = true
    2388            1 :         info.Err = err
    2389            1 :         if err == nil {
    2390            1 :                 for i := range ve.NewFiles {
    2391            1 :                         e := &ve.NewFiles[i]
    2392            1 :                         info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
    2393            1 :                 }
    2394            1 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    2395            1 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    2396            1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    2397              :         }
    2398              : 
    2399              :         // NB: clearing compacting state must occur before updating the read state;
    2400              :         // L0Sublevels initialization depends on it.
    2401            1 :         d.clearCompactingState(c, err != nil)
    2402            1 :         if err != nil && errors.Is(err, ErrCancelledCompaction) {
    2403            1 :                 d.mu.versions.metrics.Compact.CancelledCount++
    2404            1 :                 d.mu.versions.metrics.Compact.CancelledBytes += c.bytesWritten
    2405            1 :         }
    2406            1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    2407            1 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
    2408            1 : 
    2409            1 :         info.TotalDuration = d.timeNow().Sub(c.beganAt)
    2410            1 :         d.opts.EventListener.CompactionEnd(info)
    2411            1 : 
    2412            1 :         // Update the read state before deleting obsolete files because the
    2413            1 :         // read-state update will cause the previous version to be unref'd and if
    2414            1 :         // there are no references obsolete tables will be added to the obsolete
    2415            1 :         // table list.
    2416            1 :         if err == nil {
    2417            1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2418            1 :                 d.updateTableStatsLocked(ve.NewFiles)
    2419            1 :         }
    2420            1 :         d.deleteObsoleteFiles(jobID)
    2421            1 : 
    2422            1 :         return err
    2423              : }
    2424              : 
    2425              : // runCopyCompaction runs a copy compaction where a new FileNum is created that
    2426              : // is a byte-for-byte copy of the input file or span thereof in some cases. This
    2427              : // is used in lieu of a move compaction when a file is being moved across the
    2428              : // local/remote storage boundary. It could also be used in lieu of a rewrite
    2429              : // compaction as part of a Download() call, which allows copying only a span of
    2430              : // the external file, provided the file does not contain range keys or value
    2431              : // blocks (see sstable.CopySpan).
    2432              : //
    2433              : // d.mu must be held when calling this method. The mutex will be released when
    2434              : // doing IO.
    2435              : func (d *DB) runCopyCompaction(
    2436              :         jobID JobID, c *compaction,
    2437            1 : ) (ve *versionEdit, stats compact.Stats, _ error) {
    2438            1 :         iter := c.startLevel.files.Iter()
    2439            1 :         inputMeta := iter.First()
    2440            1 :         if iter.Next() != nil {
    2441            0 :                 return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
    2442            0 :         }
    2443            1 :         if c.cancel.Load() {
    2444            0 :                 return nil, compact.Stats{}, ErrCancelledCompaction
    2445            0 :         }
    2446            1 :         ve = &versionEdit{
    2447            1 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2448            1 :                         {Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
    2449            1 :                 },
    2450            1 :         }
    2451            1 : 
    2452            1 :         objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.FileBacking.DiskFileNum)
    2453            1 :         if err != nil {
    2454            0 :                 return nil, compact.Stats{}, err
    2455            0 :         }
    2456            1 :         if !objMeta.IsExternal() {
    2457            1 :                 if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
    2458            0 :                         panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
    2459              :                 }
    2460              :                 // Note that based on logic in the compaction picker, we're guaranteed
    2461              :                 // inputMeta.Virtual is false.
    2462            1 :                 if inputMeta.Virtual {
    2463            0 :                         panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
    2464              :                 }
    2465              :         }
    2466              : 
    2467              :         // We are in the relatively more complex case where we need to copy this
    2468              :         // file to remote storage. Drop the db mutex while we do the copy
    2469              :         //
    2470              :         // To ease up cleanup of the local file and tracking of refs, we create
    2471              :         // a new FileNum. This has the potential of making the block cache less
    2472              :         // effective, however.
    2473            1 :         newMeta := &fileMetadata{
    2474            1 :                 Size:                     inputMeta.Size,
    2475            1 :                 CreationTime:             inputMeta.CreationTime,
    2476            1 :                 SmallestSeqNum:           inputMeta.SmallestSeqNum,
    2477            1 :                 LargestSeqNum:            inputMeta.LargestSeqNum,
    2478            1 :                 LargestSeqNumAbsolute:    inputMeta.LargestSeqNumAbsolute,
    2479            1 :                 Stats:                    inputMeta.Stats,
    2480            1 :                 Virtual:                  inputMeta.Virtual,
    2481            1 :                 SyntheticPrefixAndSuffix: inputMeta.SyntheticPrefixAndSuffix,
    2482            1 :         }
    2483            1 :         if inputMeta.HasPointKeys {
    2484            1 :                 newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
    2485            1 :         }
    2486            1 :         if inputMeta.HasRangeKeys {
    2487            1 :                 newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
    2488            1 :         }
    2489            1 :         newMeta.FileNum = d.mu.versions.getNextFileNum()
    2490            1 :         if objMeta.IsExternal() {
    2491            1 :                 // external -> local/shared copy. File must be virtual.
    2492            1 :                 // We will update this size later after we produce the new backing file.
    2493            1 :                 newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
    2494            1 :         } else {
    2495            1 :                 // local -> shared copy. New file is guaranteed to not be virtual.
    2496            1 :                 newMeta.InitPhysicalBacking()
    2497            1 :         }
    2498              : 
    2499              :         // Before dropping the db mutex, grab a ref to the current version. This
    2500              :         // prevents any concurrent excises from deleting files that this compaction
    2501              :         // needs to read/maintain a reference to.
    2502            1 :         vers := d.mu.versions.currentVersion()
    2503            1 :         vers.Ref()
    2504            1 :         defer vers.UnrefLocked()
    2505            1 : 
    2506            1 :         // NB: The order here is reversed, lock after unlock. This is similar to
    2507            1 :         // runCompaction.
    2508            1 :         d.mu.Unlock()
    2509            1 :         defer d.mu.Lock()
    2510            1 : 
    2511            1 :         deleteOnExit := false
    2512            1 :         defer func() {
    2513            1 :                 if deleteOnExit {
    2514            0 :                         _ = d.objProvider.Remove(base.FileTypeTable, newMeta.FileBacking.DiskFileNum)
    2515            0 :                 }
    2516              :         }()
    2517              : 
    2518              :         // If the src obj is external, we're doing an external to local/shared copy.
    2519            1 :         if objMeta.IsExternal() {
    2520            1 :                 ctx := context.TODO()
    2521            1 :                 src, err := d.objProvider.OpenForReading(
    2522            1 :                         ctx, base.FileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
    2523            1 :                 )
    2524            1 :                 if err != nil {
    2525            0 :                         return nil, compact.Stats{}, err
    2526            0 :                 }
    2527            1 :                 defer func() {
    2528            1 :                         if src != nil {
    2529            0 :                                 src.Close()
    2530            0 :                         }
    2531              :                 }()
    2532              : 
    2533            1 :                 w, _, err := d.objProvider.Create(
    2534            1 :                         ctx, base.FileTypeTable, newMeta.FileBacking.DiskFileNum,
    2535            1 :                         objstorage.CreateOptions{
    2536            1 :                                 PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    2537            1 :                         },
    2538            1 :                 )
    2539            1 :                 if err != nil {
    2540            0 :                         return nil, compact.Stats{}, err
    2541            0 :                 }
    2542            1 :                 deleteOnExit = true
    2543            1 : 
    2544            1 :                 start, end := newMeta.Smallest, newMeta.Largest
    2545            1 :                 if newMeta.SyntheticPrefixAndSuffix.HasPrefix() {
    2546            1 :                         syntheticPrefix := newMeta.SyntheticPrefixAndSuffix.Prefix()
    2547            1 :                         start.UserKey = syntheticPrefix.Invert(start.UserKey)
    2548            1 :                         end.UserKey = syntheticPrefix.Invert(end.UserKey)
    2549            1 :                 }
    2550            1 :                 if newMeta.SyntheticPrefixAndSuffix.HasSuffix() {
    2551            1 :                         // Extend the bounds as necessary so that the keys don't include suffixes.
    2552            1 :                         start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
    2553            1 :                         if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
    2554            0 :                                 end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
    2555            0 :                         }
    2556              :                 }
    2557              : 
    2558              :                 // NB: external files are always virtual.
    2559            1 :                 var wrote uint64
    2560            1 :                 err = d.fileCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
    2561            1 :                         var err error
    2562            1 :                         wrote, err = sstable.CopySpan(ctx,
    2563            1 :                                 src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
    2564            1 :                                 w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
    2565            1 :                                 start, end,
    2566            1 :                         )
    2567            1 :                         return err
    2568            1 :                 })
    2569              : 
    2570            1 :                 src = nil // We passed src to CopySpan; it's responsible for closing it.
    2571            1 :                 if err != nil {
    2572            0 :                         if errors.Is(err, sstable.ErrEmptySpan) {
    2573            0 :                                 // The virtual table was empty. Just remove the backing file.
    2574            0 :                                 // Note that deleteOnExit is true so we will delete the created object.
    2575            0 :                                 c.metrics = map[int]*LevelMetrics{
    2576            0 :                                         c.outputLevel.level: {
    2577            0 :                                                 BytesIn: inputMeta.Size,
    2578            0 :                                         },
    2579            0 :                                 }
    2580            0 :                                 return ve, compact.Stats{}, nil
    2581            0 :                         }
    2582            0 :                         return nil, compact.Stats{}, err
    2583              :                 }
    2584            1 :                 newMeta.FileBacking.Size = wrote
    2585            1 :                 newMeta.Size = wrote
    2586            1 :         } else {
    2587            1 :                 _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
    2588            1 :                         d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.FileBacking.DiskFileNum,
    2589            1 :                         objstorage.CreateOptions{PreferSharedStorage: true})
    2590            1 :                 if err != nil {
    2591            0 :                         return nil, compact.Stats{}, err
    2592            0 :                 }
    2593            1 :                 deleteOnExit = true
    2594              :         }
    2595            1 :         ve.NewFiles = []newFileEntry{{
    2596            1 :                 Level: c.outputLevel.level,
    2597            1 :                 Meta:  newMeta,
    2598            1 :         }}
    2599            1 :         if newMeta.Virtual {
    2600            1 :                 ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
    2601            1 :         }
    2602            1 :         c.metrics = map[int]*LevelMetrics{
    2603            1 :                 c.outputLevel.level: {
    2604            1 :                         BytesIn:         inputMeta.Size,
    2605            1 :                         BytesCompacted:  newMeta.Size,
    2606            1 :                         TablesCompacted: 1,
    2607            1 :                 },
    2608            1 :         }
    2609            1 : 
    2610            1 :         if err := d.objProvider.Sync(); err != nil {
    2611            0 :                 return nil, compact.Stats{}, err
    2612            0 :         }
    2613            1 :         deleteOnExit = false
    2614            1 :         return ve, compact.Stats{}, nil
    2615              : }
    2616              : 
    2617              : // applyHintOnFile applies a deleteCompactionHint to a file, and updates the
    2618              : // versionEdit accordingly. It returns a list of new files that were created
    2619              : // if the hint was applied partially to a file (eg. through an excise as opposed
    2620              : // to an outright deletion). levelMetrics is kept up-to-date with the number
    2621              : // of tables deleted or excised.
    2622              : func (d *DB) applyHintOnFile(
    2623              :         h deleteCompactionHint,
    2624              :         f *fileMetadata,
    2625              :         level int,
    2626              :         levelMetrics *LevelMetrics,
    2627              :         ve *versionEdit,
    2628              :         hintOverlap deletionHintOverlap,
    2629            1 : ) (newFiles []manifest.NewFileEntry, err error) {
    2630            1 :         if hintOverlap == hintDoesNotApply {
    2631            0 :                 return nil, nil
    2632            0 :         }
    2633              : 
    2634              :         // The hint overlaps with at least part of the file.
    2635            1 :         if hintOverlap == hintDeletesFile {
    2636            1 :                 // The hint deletes the entirety of this file.
    2637            1 :                 ve.DeletedFiles[deletedFileEntry{
    2638            1 :                         Level:   level,
    2639            1 :                         FileNum: f.FileNum,
    2640            1 :                 }] = f
    2641            1 :                 levelMetrics.TablesDeleted++
    2642            1 :                 return nil, nil
    2643            1 :         }
    2644              :         // The hint overlaps with only a part of the file, not the entirety of it. We need
    2645              :         // to use d.excise. (hintOverlap == hintExcisesFile)
    2646            1 :         if d.FormatMajorVersion() < FormatVirtualSSTables {
    2647            0 :                 panic("pebble: delete-only compaction hint excising a file is not supported in this version")
    2648              :         }
    2649              : 
    2650            1 :         levelMetrics.TablesExcised++
    2651            1 :         newFiles, err = d.excise(context.TODO(), base.UserKeyBoundsEndExclusive(h.start, h.end), f, ve, level)
    2652            1 :         if err != nil {
    2653            0 :                 return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
    2654            0 :         }
    2655            1 :         if _, ok := ve.DeletedFiles[deletedFileEntry{
    2656            1 :                 Level:   level,
    2657            1 :                 FileNum: f.FileNum,
    2658            1 :         }]; !ok {
    2659            0 :                 panic("pebble: delete-only compaction hint overlapping a file did not excise that file")
    2660              :         }
    2661            1 :         return newFiles, nil
    2662              : }
    2663              : 
    2664              : func (d *DB) runDeleteOnlyCompactionForLevel(
    2665              :         cl compactionLevel,
    2666              :         levelMetrics *LevelMetrics,
    2667              :         ve *versionEdit,
    2668              :         snapshots compact.Snapshots,
    2669              :         fragments []deleteCompactionHintFragment,
    2670              :         exciseEnabled bool,
    2671            1 : ) error {
    2672            1 :         curFragment := 0
    2673            1 :         iter := cl.files.Iter()
    2674            1 :         if cl.level == 0 {
    2675            0 :                 panic("cannot run delete-only compaction for L0")
    2676              :         }
    2677              : 
    2678              :         // Outer loop loops on files. Middle loop loops on fragments. Inner loop
    2679              :         // loops on raw fragments of hints. Number of fragments are bounded by
    2680              :         // the number of hints this compaction was created with, which is capped
    2681              :         // in the compaction picker to avoid very CPU-hot loops here.
    2682            1 :         for f := iter.First(); f != nil; f = iter.Next() {
    2683            1 :                 // curFile usually matches f, except if f got excised in which case
    2684            1 :                 // it maps to a virtual file that replaces f, or nil if f got removed
    2685            1 :                 // in its entirety.
    2686            1 :                 curFile := f
    2687            1 :                 for curFragment < len(fragments) && d.cmp(fragments[curFragment].start, f.Smallest.UserKey) <= 0 {
    2688            1 :                         curFragment++
    2689            1 :                 }
    2690            1 :                 if curFragment > 0 {
    2691            1 :                         curFragment--
    2692            1 :                 }
    2693              : 
    2694            1 :                 for ; curFragment < len(fragments); curFragment++ {
    2695            1 :                         if f.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(fragments[curFragment].start)) < 0 {
    2696            1 :                                 break
    2697              :                         }
    2698              :                         // Process all overlapping hints with this file. Note that applying
    2699              :                         // a hint twice is idempotent; curFile should have already been excised
    2700              :                         // the first time, resulting in no change the second time.
    2701            1 :                         for _, h := range fragments[curFragment].hints {
    2702            1 :                                 if h.tombstoneLevel >= cl.level {
    2703            1 :                                         // We cannot excise out the deletion tombstone itself, or anything
    2704            1 :                                         // above it.
    2705            1 :                                         continue
    2706              :                                 }
    2707            1 :                                 hintOverlap := h.canDeleteOrExcise(d.cmp, curFile, snapshots, exciseEnabled)
    2708            1 :                                 if hintOverlap == hintDoesNotApply {
    2709            1 :                                         continue
    2710              :                                 }
    2711            1 :                                 newFiles, err := d.applyHintOnFile(h, curFile, cl.level, levelMetrics, ve, hintOverlap)
    2712            1 :                                 if err != nil {
    2713            0 :                                         return err
    2714            0 :                                 }
    2715            1 :                                 if _, ok := ve.DeletedFiles[manifest.DeletedFileEntry{Level: cl.level, FileNum: curFile.FileNum}]; ok {
    2716            1 :                                         curFile = nil
    2717            1 :                                 }
    2718            1 :                                 if len(newFiles) > 0 {
    2719            1 :                                         curFile = newFiles[len(newFiles)-1].Meta
    2720            1 :                                 } else if curFile == nil {
    2721            1 :                                         // Nothing remains of the file.
    2722            1 :                                         break
    2723              :                                 }
    2724              :                         }
    2725            1 :                         if curFile == nil {
    2726            1 :                                 // Nothing remains of the file.
    2727            1 :                                 break
    2728              :                         }
    2729              :                 }
    2730            1 :                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    2731            1 :                         Level:   cl.level,
    2732            1 :                         FileNum: f.FileNum,
    2733            1 :                 }]; !ok {
    2734            0 :                         panic("pebble: delete-only compaction scheduled with hints that did not delete or excise a file")
    2735              :                 }
    2736              :         }
    2737            1 :         return nil
    2738              : }
    2739              : 
    2740              : // deleteCompactionHintFragment represents a fragment of the key space and
    2741              : // contains a set of deleteCompactionHints that apply to that fragment; a
    2742              : // fragment starts at the start field and ends where the next fragment starts.
    2743              : type deleteCompactionHintFragment struct {
    2744              :         start []byte
    2745              :         hints []deleteCompactionHint
    2746              : }
    2747              : 
    2748              : // Delete compaction hints can overlap with each other, and multiple fragments
    2749              : // can apply to a single file. This function takes a list of hints and fragments
    2750              : // them, to make it easier to apply them to non-overlapping files occupying a level;
    2751              : // that way, files and hint fragments can be iterated on in lockstep, while efficiently
    2752              : // being able to apply all hints overlapping with a given file.
    2753              : func fragmentDeleteCompactionHints(
    2754              :         cmp Compare, hints []deleteCompactionHint,
    2755            1 : ) []deleteCompactionHintFragment {
    2756            1 :         fragments := make([]deleteCompactionHintFragment, 0, len(hints)*2)
    2757            1 :         for i := range hints {
    2758            1 :                 fragments = append(fragments, deleteCompactionHintFragment{start: hints[i].start},
    2759            1 :                         deleteCompactionHintFragment{start: hints[i].end})
    2760            1 :         }
    2761            1 :         slices.SortFunc(fragments, func(i, j deleteCompactionHintFragment) int {
    2762            1 :                 return cmp(i.start, j.start)
    2763            1 :         })
    2764            1 :         fragments = slices.CompactFunc(fragments, func(i, j deleteCompactionHintFragment) bool {
    2765            1 :                 return bytes.Equal(i.start, j.start)
    2766            1 :         })
    2767            1 :         for _, h := range hints {
    2768            1 :                 startIdx := sort.Search(len(fragments), func(i int) bool {
    2769            1 :                         return cmp(fragments[i].start, h.start) >= 0
    2770            1 :                 })
    2771            1 :                 endIdx := sort.Search(len(fragments), func(i int) bool {
    2772            1 :                         return cmp(fragments[i].start, h.end) >= 0
    2773            1 :                 })
    2774            1 :                 for i := startIdx; i < endIdx; i++ {
    2775            1 :                         fragments[i].hints = append(fragments[i].hints, h)
    2776            1 :                 }
    2777              :         }
    2778            1 :         return fragments
    2779              : }
    2780              : 
    2781              : // Runs a delete-only compaction.
    2782              : //
    2783              : // d.mu must *not* be held when calling this.
    2784              : func (d *DB) runDeleteOnlyCompaction(
    2785              :         jobID JobID, c *compaction, snapshots compact.Snapshots,
    2786            1 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
    2787            1 :         c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
    2788            1 :         fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
    2789            1 :         ve = &versionEdit{
    2790            1 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2791            1 :         }
    2792            1 :         for _, cl := range c.inputs {
    2793            1 :                 levelMetrics := &LevelMetrics{}
    2794            1 :                 if err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.exciseEnabled); err != nil {
    2795            0 :                         return nil, stats, err
    2796            0 :                 }
    2797            1 :                 c.metrics[cl.level] = levelMetrics
    2798              :         }
    2799              :         // Remove any files that were added and deleted in the same versionEdit.
    2800            1 :         ve.NewFiles = slices.DeleteFunc(ve.NewFiles, func(e manifest.NewFileEntry) bool {
    2801            1 :                 deletedFileEntry := manifest.DeletedFileEntry{Level: e.Level, FileNum: e.Meta.FileNum}
    2802            1 :                 if _, deleted := ve.DeletedFiles[deletedFileEntry]; deleted {
    2803            1 :                         delete(ve.DeletedFiles, deletedFileEntry)
    2804            1 :                         return true
    2805            1 :                 }
    2806            1 :                 return false
    2807              :         })
    2808              :         // Remove any entries from CreatedBackingTables that are not used in any
    2809              :         // NewFiles.
    2810            1 :         usedBackingFiles := make(map[base.DiskFileNum]struct{})
    2811            1 :         for _, e := range ve.NewFiles {
    2812            1 :                 if e.Meta.Virtual {
    2813            1 :                         usedBackingFiles[e.Meta.FileBacking.DiskFileNum] = struct{}{}
    2814            1 :                 }
    2815              :         }
    2816            1 :         ve.CreatedBackingTables = slices.DeleteFunc(ve.CreatedBackingTables, func(b *fileBacking) bool {
    2817            1 :                 _, used := usedBackingFiles[b.DiskFileNum]
    2818            1 :                 return !used
    2819            1 :         })
    2820              :         // Refresh the disk available statistic whenever a compaction/flush
    2821              :         // completes, before re-acquiring the mutex.
    2822            1 :         d.calculateDiskAvailableBytes()
    2823            1 :         return ve, stats, nil
    2824              : }
    2825              : 
    2826              : func (d *DB) runMoveCompaction(
    2827              :         jobID JobID, c *compaction,
    2828            1 : ) (ve *versionEdit, stats compact.Stats, _ error) {
    2829            1 :         iter := c.startLevel.files.Iter()
    2830            1 :         meta := iter.First()
    2831            1 :         if iter.Next() != nil {
    2832            0 :                 return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
    2833            0 :         }
    2834            1 :         if c.cancel.Load() {
    2835            0 :                 return ve, stats, ErrCancelledCompaction
    2836            0 :         }
    2837            1 :         c.metrics = map[int]*LevelMetrics{
    2838            1 :                 c.outputLevel.level: {
    2839            1 :                         BytesMoved:  meta.Size,
    2840            1 :                         TablesMoved: 1,
    2841            1 :                 },
    2842            1 :         }
    2843            1 :         ve = &versionEdit{
    2844            1 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2845            1 :                         {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
    2846            1 :                 },
    2847            1 :                 NewFiles: []newFileEntry{
    2848            1 :                         {Level: c.outputLevel.level, Meta: meta},
    2849            1 :                 },
    2850            1 :         }
    2851            1 : 
    2852            1 :         return ve, stats, nil
    2853              : }
    2854              : 
    2855              : // runCompaction runs a compaction that produces new on-disk tables from
    2856              : // memtables or old on-disk tables.
    2857              : //
    2858              : // runCompaction cannot be used for compactionKindIngestedFlushable.
    2859              : //
    2860              : // d.mu must be held when calling this, but the mutex may be dropped and
    2861              : // re-acquired during the course of this method.
    2862              : func (d *DB) runCompaction(
    2863              :         jobID JobID, c *compaction,
    2864            1 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
    2865            1 :         defer func() {
    2866            1 :                 c.slot.Release(stats.CumulativeWrittenSize)
    2867            1 :                 c.slot = nil
    2868            1 :         }()
    2869            1 :         if c.cancel.Load() {
    2870            1 :                 return ve, stats, ErrCancelledCompaction
    2871            1 :         }
    2872            1 :         switch c.kind {
    2873            1 :         case compactionKindDeleteOnly:
    2874            1 :                 // Before dropping the db mutex, grab a ref to the current version. This
    2875            1 :                 // prevents any concurrent excises from deleting files that this compaction
    2876            1 :                 // needs to read/maintain a reference to.
    2877            1 :                 //
    2878            1 :                 // Note that delete-only compactions can call excise(), which needs to be able
    2879            1 :                 // to read these files.
    2880            1 :                 vers := d.mu.versions.currentVersion()
    2881            1 :                 vers.Ref()
    2882            1 :                 defer vers.UnrefLocked()
    2883            1 :                 // Release the d.mu lock while doing I/O.
    2884            1 :                 // Note the unusual order: Unlock and then Lock.
    2885            1 :                 snapshots := d.mu.snapshots.toSlice()
    2886            1 :                 d.mu.Unlock()
    2887            1 :                 defer d.mu.Lock()
    2888            1 :                 return d.runDeleteOnlyCompaction(jobID, c, snapshots)
    2889            1 :         case compactionKindMove:
    2890            1 :                 return d.runMoveCompaction(jobID, c)
    2891            1 :         case compactionKindCopy:
    2892            1 :                 return d.runCopyCompaction(jobID, c)
    2893            0 :         case compactionKindIngestedFlushable:
    2894            0 :                 panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
    2895              :         }
    2896              : 
    2897            1 :         snapshots := d.mu.snapshots.toSlice()
    2898            1 : 
    2899            1 :         if c.flushing == nil {
    2900            1 :                 // Before dropping the db mutex, grab a ref to the current version. This
    2901            1 :                 // prevents any concurrent excises from deleting files that this compaction
    2902            1 :                 // needs to read/maintain a reference to.
    2903            1 :                 //
    2904            1 :                 // Note that unlike user iterators, compactionIter does not maintain a ref
    2905            1 :                 // of the version or read state.
    2906            1 :                 vers := d.mu.versions.currentVersion()
    2907            1 :                 vers.Ref()
    2908            1 :                 defer vers.UnrefLocked()
    2909            1 :         }
    2910              : 
    2911              :         // The table is typically written at the maximum allowable format implied by
    2912              :         // the current format major version of the DB, but Options may define
    2913              :         // additional constraints.
    2914            1 :         tableFormat := d.TableFormat()
    2915            1 : 
    2916            1 :         // Release the d.mu lock while doing I/O.
    2917            1 :         // Note the unusual order: Unlock and then Lock.
    2918            1 :         d.mu.Unlock()
    2919            1 :         defer d.mu.Lock()
    2920            1 : 
    2921            1 :         result := d.compactAndWrite(jobID, c, snapshots, tableFormat)
    2922            1 :         if result.Err == nil {
    2923            1 :                 ve, result.Err = c.makeVersionEdit(result)
    2924            1 :         }
    2925            1 :         if result.Err != nil {
    2926            1 :                 // Delete any created tables.
    2927            1 :                 obsoleteFiles := make([]*fileBacking, 0, len(result.Tables))
    2928            1 :                 d.mu.Lock()
    2929            1 :                 for i := range result.Tables {
    2930            1 :                         backing := &fileBacking{
    2931            1 :                                 DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
    2932            1 :                                 Size:        result.Tables[i].WriterMeta.Size,
    2933            1 :                         }
    2934            1 :                         obsoleteFiles = append(obsoleteFiles, backing)
    2935            1 :                         // Add this file to zombie tables as well, as the versionSet
    2936            1 :                         // asserts on whether every obsolete file was at one point
    2937            1 :                         // marked zombie.
    2938            1 :                         d.mu.versions.zombieTables[backing.DiskFileNum] = objectInfo{
    2939            1 :                                 fileInfo: fileInfo{
    2940            1 :                                         FileNum:  backing.DiskFileNum,
    2941            1 :                                         FileSize: backing.Size,
    2942            1 :                                 },
    2943            1 :                                 isLocal: true,
    2944            1 :                         }
    2945            1 :                 }
    2946            1 :                 d.mu.versions.addObsoleteLocked(obsoleteFiles)
    2947            1 :                 d.mu.Unlock()
    2948              :         }
    2949              :         // Refresh the disk available statistic whenever a compaction/flush
    2950              :         // completes, before re-acquiring the mutex.
    2951            1 :         d.calculateDiskAvailableBytes()
    2952            1 :         return ve, result.Stats, result.Err
    2953              : }
    2954              : 
    2955              : // compactAndWrite runs the data part of a compaction, where we set up a
    2956              : // compaction iterator and use it to write output tables.
    2957              : func (d *DB) compactAndWrite(
    2958              :         jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat,
    2959            1 : ) (result compact.Result) {
    2960            1 :         // Compactions use a pool of buffers to read blocks, avoiding polluting the
    2961            1 :         // block cache with blocks that will not be read again. We initialize the
    2962            1 :         // buffer pool with a size 12. This initial size does not need to be
    2963            1 :         // accurate, because the pool will grow to accommodate the maximum number of
    2964            1 :         // blocks allocated at a given time over the course of the compaction. But
    2965            1 :         // choosing a size larger than that working set avoids any additional
    2966            1 :         // allocations to grow the size of the pool over the course of iteration.
    2967            1 :         //
    2968            1 :         // Justification for initial size 12: In a two-level compaction, at any
    2969            1 :         // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
    2970            1 :         // Additionally, when decoding a compressed block, we'll temporarily
    2971            1 :         // allocate 1 additional block to hold the compressed buffer. In the worst
    2972            1 :         // case that all input sstables have two-level index blocks (+2), value
    2973            1 :         // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
    2974            1 :         // additionally require 2n+4 blocks where n is the number of input sstables.
    2975            1 :         // Range deletion and range key blocks are relatively rare, and the cost of
    2976            1 :         // an additional allocation or two over the course of the compaction is
    2977            1 :         // considered to be okay. A larger initial size would cause the pool to hold
    2978            1 :         // on to more memory, even when it's not in-use because the pool will
    2979            1 :         // recycle buffers up to the current capacity of the pool. The memory use of
    2980            1 :         // a 12-buffer pool is expected to be within reason, even if all the buffers
    2981            1 :         // grow to the typical size of an index block (256 KiB) which would
    2982            1 :         // translate to 3 MiB per compaction.
    2983            1 :         c.bufferPool.Init(12)
    2984            1 :         defer c.bufferPool.Release()
    2985            1 :         iiopts := internalIterOpts{
    2986            1 :                 compaction: true,
    2987            1 :                 readEnv: block.ReadEnv{
    2988            1 :                         BufferPool: &c.bufferPool,
    2989            1 :                         Stats:      &c.stats,
    2990            1 :                         IterStats: d.fileCache.SSTStatsCollector().Accumulator(
    2991            1 :                                 uint64(uintptr(unsafe.Pointer(c))),
    2992            1 :                                 categoryCompaction,
    2993            1 :                         ),
    2994            1 :                 },
    2995            1 :         }
    2996            1 : 
    2997            1 :         pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter, iiopts)
    2998            1 :         defer func() {
    2999            1 :                 for _, closer := range c.closers {
    3000            1 :                         closer.FragmentIterator.Close()
    3001            1 :                 }
    3002              :         }()
    3003            1 :         if err != nil {
    3004            0 :                 return compact.Result{Err: err}
    3005            0 :         }
    3006            1 :         c.allowedZeroSeqNum = c.allowZeroSeqNum()
    3007            1 :         cfg := compact.IterConfig{
    3008            1 :                 Comparer:         c.comparer,
    3009            1 :                 Merge:            d.merge,
    3010            1 :                 TombstoneElision: c.delElision,
    3011            1 :                 RangeKeyElision:  c.rangeKeyElision,
    3012            1 :                 Snapshots:        snapshots,
    3013            1 :                 AllowZeroSeqNum:  c.allowedZeroSeqNum,
    3014            1 :                 IneffectualSingleDeleteCallback: func(userKey []byte) {
    3015            1 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
    3016            1 :                                 Kind:    IneffectualSingleDelete,
    3017            1 :                                 UserKey: slices.Clone(userKey),
    3018            1 :                         })
    3019            1 :                 },
    3020            0 :                 NondeterministicSingleDeleteCallback: func(userKey []byte) {
    3021            0 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
    3022            0 :                                 Kind:    NondeterministicSingleDelete,
    3023            0 :                                 UserKey: slices.Clone(userKey),
    3024            0 :                         })
    3025            0 :                 },
    3026              :         }
    3027            1 :         iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
    3028            1 : 
    3029            1 :         runnerCfg := compact.RunnerConfig{
    3030            1 :                 CompactionBounds:           base.UserKeyBoundsFromInternal(c.smallest, c.largest),
    3031            1 :                 L0SplitKeys:                c.l0Limits,
    3032            1 :                 Grandparents:               c.grandparents,
    3033            1 :                 MaxGrandparentOverlapBytes: c.maxOverlapBytes,
    3034            1 :                 TargetOutputFileSize:       c.maxOutputFileSize,
    3035            1 :                 Slot:                       c.slot,
    3036            1 :                 IteratorStats:              &c.stats,
    3037            1 :         }
    3038            1 :         runner := compact.NewRunner(runnerCfg, iter)
    3039            1 :         for runner.MoreDataToWrite() {
    3040            1 :                 if c.cancel.Load() {
    3041            1 :                         return runner.Finish().WithError(ErrCancelledCompaction)
    3042            1 :                 }
    3043              :                 // Create a new table.
    3044            1 :                 writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
    3045            1 :                 objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
    3046            1 :                 if err != nil {
    3047            0 :                         return runner.Finish().WithError(err)
    3048            0 :                 }
    3049            1 :                 runner.WriteTable(objMeta, tw)
    3050            1 :                 d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    3051              :         }
    3052            1 :         result = runner.Finish()
    3053            1 :         if result.Err == nil {
    3054            1 :                 result.Err = d.objProvider.Sync()
    3055            1 :         }
    3056            1 :         return result
    3057              : }
    3058              : 
    3059              : // makeVersionEdit creates the version edit for a compaction, based on the
    3060              : // tables in compact.Result.
    3061            1 : func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
    3062            1 :         ve := &versionEdit{
    3063            1 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    3064            1 :         }
    3065            1 :         for _, cl := range c.inputs {
    3066            1 :                 iter := cl.files.Iter()
    3067            1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    3068            1 :                         ve.DeletedFiles[deletedFileEntry{
    3069            1 :                                 Level:   cl.level,
    3070            1 :                                 FileNum: f.FileNum,
    3071            1 :                         }] = f
    3072            1 :                 }
    3073              :         }
    3074              : 
    3075            1 :         startLevelBytes := c.startLevel.files.SizeSum()
    3076            1 :         outputMetrics := &LevelMetrics{
    3077            1 :                 BytesIn:   startLevelBytes,
    3078            1 :                 BytesRead: c.outputLevel.files.SizeSum(),
    3079            1 :         }
    3080            1 :         if len(c.extraLevels) > 0 {
    3081            1 :                 outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
    3082            1 :         }
    3083            1 :         outputMetrics.BytesRead += outputMetrics.BytesIn
    3084            1 : 
    3085            1 :         c.metrics = map[int]*LevelMetrics{
    3086            1 :                 c.outputLevel.level: outputMetrics,
    3087            1 :         }
    3088            1 :         if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
    3089            1 :                 c.metrics[c.startLevel.level] = &LevelMetrics{}
    3090            1 :         }
    3091            1 :         if len(c.extraLevels) > 0 {
    3092            1 :                 c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
    3093            1 :                 outputMetrics.MultiLevel.BytesInTop = startLevelBytes
    3094            1 :                 outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
    3095            1 :                 outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
    3096            1 :         }
    3097              : 
    3098            1 :         inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
    3099            1 :         ve.NewFiles = make([]newFileEntry, len(result.Tables))
    3100            1 :         for i := range result.Tables {
    3101            1 :                 t := &result.Tables[i]
    3102            1 : 
    3103            1 :                 fileMeta := &fileMetadata{
    3104            1 :                         FileNum:        base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
    3105            1 :                         CreationTime:   t.CreationTime.Unix(),
    3106            1 :                         Size:           t.WriterMeta.Size,
    3107            1 :                         SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
    3108            1 :                         LargestSeqNum:  t.WriterMeta.LargestSeqNum,
    3109            1 :                 }
    3110            1 :                 if c.flushing == nil {
    3111            1 :                         // Set the file's LargestSeqNumAbsolute to be the maximum value of any
    3112            1 :                         // of the compaction's input sstables.
    3113            1 :                         // TODO(jackson): This could be narrowed to be the maximum of input
    3114            1 :                         // sstables that overlap the output sstable's key range.
    3115            1 :                         fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
    3116            1 :                 } else {
    3117            1 :                         fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
    3118            1 :                 }
    3119            1 :                 fileMeta.InitPhysicalBacking()
    3120            1 : 
    3121            1 :                 // If the file didn't contain any range deletions, we can fill its
    3122            1 :                 // table stats now, avoiding unnecessarily loading the table later.
    3123            1 :                 maybeSetStatsFromProperties(
    3124            1 :                         fileMeta.PhysicalMeta(), &t.WriterMeta.Properties,
    3125            1 :                 )
    3126            1 : 
    3127            1 :                 if t.WriterMeta.HasPointKeys {
    3128            1 :                         fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint)
    3129            1 :                 }
    3130            1 :                 if t.WriterMeta.HasRangeDelKeys {
    3131            1 :                         fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel)
    3132            1 :                 }
    3133            1 :                 if t.WriterMeta.HasRangeKeys {
    3134            1 :                         fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
    3135            1 :                 }
    3136              : 
    3137            1 :                 ve.NewFiles[i] = newFileEntry{
    3138            1 :                         Level: c.outputLevel.level,
    3139            1 :                         Meta:  fileMeta,
    3140            1 :                 }
    3141            1 : 
    3142            1 :                 // Update metrics.
    3143            1 :                 if c.flushing == nil {
    3144            1 :                         outputMetrics.TablesCompacted++
    3145            1 :                         outputMetrics.BytesCompacted += fileMeta.Size
    3146            1 :                 } else {
    3147            1 :                         outputMetrics.TablesFlushed++
    3148            1 :                         outputMetrics.BytesFlushed += fileMeta.Size
    3149            1 :                 }
    3150            1 :                 outputMetrics.Size += int64(fileMeta.Size)
    3151            1 :                 outputMetrics.NumFiles++
    3152            1 :                 outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
    3153            1 :                 outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
    3154              :         }
    3155              : 
    3156              :         // Sanity check that the tables are ordered and don't overlap.
    3157            1 :         for i := 1; i < len(ve.NewFiles); i++ {
    3158            1 :                 if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) {
    3159            0 :                         return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
    3160            0 :                                 ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true),
    3161            0 :                                 ve.NewFiles[i].Meta.DebugString(c.formatKey, true),
    3162            0 :                         )
    3163            0 :                 }
    3164              :         }
    3165              : 
    3166            1 :         return ve, nil
    3167              : }
    3168              : 
    3169              : // newCompactionOutput creates an object for a new table produced by a
    3170              : // compaction or flush.
    3171              : func (d *DB) newCompactionOutput(
    3172              :         jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
    3173            1 : ) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
    3174            1 :         diskFileNum := d.mu.versions.getNextDiskFileNum()
    3175            1 : 
    3176            1 :         var writeCategory vfs.DiskWriteCategory
    3177            1 :         if d.opts.EnableSQLRowSpillMetrics {
    3178            0 :                 // In the scenario that the Pebble engine is used for SQL row spills the
    3179            0 :                 // data written to the memtable will correspond to spills to disk and
    3180            0 :                 // should be categorized as such.
    3181            0 :                 writeCategory = "sql-row-spill"
    3182            1 :         } else if c.kind == compactionKindFlush {
    3183            1 :                 writeCategory = "pebble-memtable-flush"
    3184            1 :         } else {
    3185            1 :                 writeCategory = "pebble-compaction"
    3186            1 :         }
    3187              : 
    3188            1 :         var reason string
    3189            1 :         if c.kind == compactionKindFlush {
    3190            1 :                 reason = "flushing"
    3191            1 :         } else {
    3192            1 :                 reason = "compacting"
    3193            1 :         }
    3194              : 
    3195            1 :         ctx := context.TODO()
    3196            1 :         if objiotracing.Enabled {
    3197            0 :                 ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
    3198            0 :                 if c.kind == compactionKindFlush {
    3199            0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
    3200            0 :                 } else {
    3201            0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
    3202            0 :                 }
    3203              :         }
    3204              : 
    3205              :         // Prefer shared storage if present.
    3206            1 :         createOpts := objstorage.CreateOptions{
    3207            1 :                 PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    3208            1 :                 WriteCategory:       writeCategory,
    3209            1 :         }
    3210            1 :         writable, objMeta, err := d.objProvider.Create(ctx, base.FileTypeTable, diskFileNum, createOpts)
    3211            1 :         if err != nil {
    3212            0 :                 return objstorage.ObjectMetadata{}, nil, nil, err
    3213            0 :         }
    3214              : 
    3215            1 :         if c.kind != compactionKindFlush {
    3216            1 :                 writable = &compactionWritable{
    3217            1 :                         Writable: writable,
    3218            1 :                         versions: d.mu.versions,
    3219            1 :                         written:  &c.bytesWritten,
    3220            1 :                 }
    3221            1 :         }
    3222            1 :         d.opts.EventListener.TableCreated(TableCreateInfo{
    3223            1 :                 JobID:   int(jobID),
    3224            1 :                 Reason:  reason,
    3225            1 :                 Path:    d.objProvider.Path(objMeta),
    3226            1 :                 FileNum: diskFileNum,
    3227            1 :         })
    3228            1 : 
    3229            1 :         writerOpts.SetInternal(sstableinternal.WriterOptions{
    3230            1 :                 CacheOpts: sstableinternal.CacheOptions{
    3231            1 :                         CacheHandle: d.cacheHandle,
    3232            1 :                         FileNum:     diskFileNum,
    3233            1 :                 },
    3234            1 :         })
    3235            1 : 
    3236            1 :         const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
    3237            1 :         cpuWorkHandle := d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
    3238            1 :                 MaxFileWriteAdditionalCPUTime,
    3239            1 :         )
    3240            1 :         writerOpts.Parallelism =
    3241            1 :                 d.opts.Experimental.MaxWriterConcurrency > 0 &&
    3242            1 :                         (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
    3243            1 : 
    3244            1 :         // TODO(jackson): Make the compaction body generic over the RawWriter type,
    3245            1 :         // so that we don't need to pay the cost of dynamic dispatch?
    3246            1 :         tw := sstable.NewRawWriter(writable, writerOpts)
    3247            1 :         return objMeta, tw, cpuWorkHandle, nil
    3248              : }
    3249              : 
    3250              : // validateVersionEdit validates that start and end keys across new and deleted
    3251              : // files in a versionEdit pass the given validation function.
    3252              : func validateVersionEdit(
    3253              :         ve *versionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
    3254            1 : ) {
    3255            1 :         validateKey := func(f *manifest.FileMetadata, key []byte) {
    3256            1 :                 if err := vk.Validate(key); err != nil {
    3257            0 :                         logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
    3258            0 :                 }
    3259              :         }
    3260              : 
    3261              :         // Validate both new and deleted files.
    3262            1 :         for _, f := range ve.NewFiles {
    3263            1 :                 validateKey(f.Meta, f.Meta.Smallest.UserKey)
    3264            1 :                 validateKey(f.Meta, f.Meta.Largest.UserKey)
    3265            1 :         }
    3266            1 :         for _, m := range ve.DeletedFiles {
    3267            1 :                 validateKey(m, m.Smallest.UserKey)
    3268            1 :                 validateKey(m, m.Largest.UserKey)
    3269            1 :         }
    3270              : }
        

Generated by: LCOV version 2.0-1