LCOV - code coverage report
Current view: top level - pebble - compaction.go (source / functions) Hit Total Coverage
Test: 2024-10-08 08:17Z 3f7527ff - tests + meta.lcov Lines: 1835 1961 93.6 %
Date: 2024-10-08 08:18:11 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "context"
      10             :         "fmt"
      11             :         "math"
      12             :         "runtime/pprof"
      13             :         "slices"
      14             :         "sync/atomic"
      15             :         "time"
      16             : 
      17             :         "github.com/cockroachdb/errors"
      18             :         "github.com/cockroachdb/pebble/internal/base"
      19             :         "github.com/cockroachdb/pebble/internal/compact"
      20             :         "github.com/cockroachdb/pebble/internal/keyspan"
      21             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      22             :         "github.com/cockroachdb/pebble/internal/manifest"
      23             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      24             :         "github.com/cockroachdb/pebble/objstorage"
      25             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      26             :         "github.com/cockroachdb/pebble/objstorage/remote"
      27             :         "github.com/cockroachdb/pebble/sstable"
      28             :         "github.com/cockroachdb/pebble/vfs"
      29             : )
      30             : 
      31             : var errEmptyTable = errors.New("pebble: empty table")
      32             : 
      33             : // ErrCancelledCompaction is returned if a compaction is cancelled by a
      34             : // concurrent excise or ingest-split operation.
      35             : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
      36             : 
      37             : var compactLabels = pprof.Labels("pebble", "compact")
      38             : var flushLabels = pprof.Labels("pebble", "flush")
      39             : var gcLabels = pprof.Labels("pebble", "gc")
      40             : 
      41             : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
      42             : // compacted files. We avoid expanding the lower level file set of a compaction
      43             : // if it would make the total compaction cover more than this many bytes.
      44           2 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
      45           2 :         v := uint64(25 * opts.Level(level).TargetFileSize)
      46           2 : 
      47           2 :         // Never expand a compaction beyond half the available capacity, divided
      48           2 :         // by the maximum number of concurrent compactions. Each of the concurrent
      49           2 :         // compactions may expand up to this limit, so this attempts to limit
      50           2 :         // compactions to half of available disk space. Note that this will not
      51           2 :         // prevent compaction picking from pursuing compactions that are larger
      52           2 :         // than this threshold before expansion.
      53           2 :         diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
      54           2 :         if v > diskMax {
      55           1 :                 v = diskMax
      56           1 :         }
      57           2 :         return v
      58             : }
      59             : 
      60             : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
      61             : // before we stop building a single file in a level-1 to level compaction.
      62           2 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
      63           2 :         return uint64(10 * opts.Level(level).TargetFileSize)
      64           2 : }
      65             : 
      66             : // maxReadCompactionBytes is used to prevent read compactions which
      67             : // are too wide.
      68           2 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
      69           2 :         return uint64(10 * opts.Level(level).TargetFileSize)
      70           2 : }
      71             : 
      72             : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
      73             : // calls to Close. It is used during compaction to ensure that rangeDelIters
      74             : // are not closed prematurely.
      75             : type noCloseIter struct {
      76             :         keyspan.FragmentIterator
      77             : }
      78             : 
      79           2 : func (i *noCloseIter) Close() {}
      80             : 
      81             : type compactionLevel struct {
      82             :         level int
      83             :         files manifest.LevelSlice
      84             :         // l0SublevelInfo contains information about L0 sublevels being compacted.
      85             :         // It's only set for the start level of a compaction starting out of L0 and
      86             :         // is nil for all other compactions.
      87             :         l0SublevelInfo []sublevelInfo
      88             : }
      89             : 
      90           2 : func (cl compactionLevel) Clone() compactionLevel {
      91           2 :         newCL := compactionLevel{
      92           2 :                 level: cl.level,
      93           2 :                 files: cl.files,
      94           2 :         }
      95           2 :         return newCL
      96           2 : }
      97           1 : func (cl compactionLevel) String() string {
      98           1 :         return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
      99           1 : }
     100             : 
     101             : // compactionWritable is a objstorage.Writable wrapper that, on every write,
     102             : // updates a metric in `versions` on bytes written by in-progress compactions so
     103             : // far. It also increments a per-compaction `written` int.
     104             : type compactionWritable struct {
     105             :         objstorage.Writable
     106             : 
     107             :         versions *versionSet
     108             :         written  *int64
     109             : }
     110             : 
     111             : // Write is part of the objstorage.Writable interface.
     112           2 : func (c *compactionWritable) Write(p []byte) error {
     113           2 :         if err := c.Writable.Write(p); err != nil {
     114           0 :                 return err
     115           0 :         }
     116             : 
     117           2 :         *c.written += int64(len(p))
     118           2 :         c.versions.incrementCompactionBytes(int64(len(p)))
     119           2 :         return nil
     120             : }
     121             : 
     122             : type compactionKind int
     123             : 
     124             : const (
     125             :         compactionKindDefault compactionKind = iota
     126             :         compactionKindFlush
     127             :         // compactionKindMove denotes a move compaction where the input file is
     128             :         // retained and linked in a new level without being obsoleted.
     129             :         compactionKindMove
     130             :         // compactionKindCopy denotes a copy compaction where the input file is
     131             :         // copied byte-by-byte into a new file with a new FileNum in the output level.
     132             :         compactionKindCopy
     133             :         // compactionKindDeleteOnly denotes a compaction that only deletes input
     134             :         // files. It can occur when wide range tombstones completely contain sstables.
     135             :         compactionKindDeleteOnly
     136             :         compactionKindElisionOnly
     137             :         compactionKindRead
     138             :         compactionKindTombstoneDensity
     139             :         compactionKindRewrite
     140             :         compactionKindIngestedFlushable
     141             : )
     142             : 
     143           2 : func (k compactionKind) String() string {
     144           2 :         switch k {
     145           2 :         case compactionKindDefault:
     146           2 :                 return "default"
     147           0 :         case compactionKindFlush:
     148           0 :                 return "flush"
     149           2 :         case compactionKindMove:
     150           2 :                 return "move"
     151           2 :         case compactionKindDeleteOnly:
     152           2 :                 return "delete-only"
     153           2 :         case compactionKindElisionOnly:
     154           2 :                 return "elision-only"
     155           1 :         case compactionKindRead:
     156           1 :                 return "read"
     157           1 :         case compactionKindTombstoneDensity:
     158           1 :                 return "tombstone-density"
     159           2 :         case compactionKindRewrite:
     160           2 :                 return "rewrite"
     161           0 :         case compactionKindIngestedFlushable:
     162           0 :                 return "ingested-flushable"
     163           2 :         case compactionKindCopy:
     164           2 :                 return "copy"
     165             :         }
     166           0 :         return "?"
     167             : }
     168             : 
     169             : // compaction is a table compaction from one level to the next, starting from a
     170             : // given version.
     171             : type compaction struct {
     172             :         // cancel is a bool that can be used by other goroutines to signal a compaction
     173             :         // to cancel, such as if a conflicting excise operation raced it to manifest
     174             :         // application. Only holders of the manifest lock will write to this atomic.
     175             :         cancel atomic.Bool
     176             : 
     177             :         kind compactionKind
     178             :         // isDownload is true if this compaction was started as part of a Download
     179             :         // operation. In this case kind is compactionKindCopy or
     180             :         // compactionKindRewrite.
     181             :         isDownload bool
     182             : 
     183             :         cmp       Compare
     184             :         equal     Equal
     185             :         comparer  *base.Comparer
     186             :         formatKey base.FormatKey
     187             :         logger    Logger
     188             :         version   *version
     189             :         stats     base.InternalIteratorStats
     190             :         beganAt   time.Time
     191             :         // versionEditApplied is set to true when a compaction has completed and the
     192             :         // resulting version has been installed (if successful), but the compaction
     193             :         // goroutine is still cleaning up (eg, deleting obsolete files).
     194             :         versionEditApplied bool
     195             :         bufferPool         sstable.BufferPool
     196             : 
     197             :         // startLevel is the level that is being compacted. Inputs from startLevel
     198             :         // and outputLevel will be merged to produce a set of outputLevel files.
     199             :         startLevel *compactionLevel
     200             : 
     201             :         // outputLevel is the level that files are being produced in. outputLevel is
     202             :         // equal to startLevel+1 except when:
     203             :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     204             :         //    - in multilevel compaction, the output level is the lowest level involved in
     205             :         //      the compaction
     206             :         // A compaction's outputLevel is nil for delete-only compactions.
     207             :         outputLevel *compactionLevel
     208             : 
     209             :         // extraLevels point to additional levels in between the input and output
     210             :         // levels that get compacted in multilevel compactions
     211             :         extraLevels []*compactionLevel
     212             : 
     213             :         inputs []compactionLevel
     214             : 
     215             :         // maxOutputFileSize is the maximum size of an individual table created
     216             :         // during compaction.
     217             :         maxOutputFileSize uint64
     218             :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     219             :         // single output table with the tables in the grandparent level.
     220             :         maxOverlapBytes uint64
     221             : 
     222             :         // flushing contains the flushables (aka memtables) that are being flushed.
     223             :         flushing flushableList
     224             :         // bytesWritten contains the number of bytes that have been written to outputs.
     225             :         bytesWritten int64
     226             : 
     227             :         // The boundaries of the input data.
     228             :         smallest InternalKey
     229             :         largest  InternalKey
     230             : 
     231             :         // A list of fragment iterators to close when the compaction finishes. Used by
     232             :         // input iteration to keep rangeDelIters open for the lifetime of the
     233             :         // compaction, and only close them when the compaction finishes.
     234             :         closers []*noCloseIter
     235             : 
     236             :         // grandparents are the tables in level+2 that overlap with the files being
     237             :         // compacted. Used to determine output table boundaries. Do not assume that the actual files
     238             :         // in the grandparent when this compaction finishes will be the same.
     239             :         grandparents manifest.LevelSlice
     240             : 
     241             :         // Boundaries at which flushes to L0 should be split. Determined by
     242             :         // L0Sublevels. If nil, flushes aren't split.
     243             :         l0Limits [][]byte
     244             : 
     245             :         delElision      compact.TombstoneElision
     246             :         rangeKeyElision compact.TombstoneElision
     247             : 
     248             :         // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
     249             :         // snapshots requiring them to be kept. This determination is made by
     250             :         // looking for an sstable which overlaps the bounds of the compaction at a
     251             :         // lower level in the LSM during runCompaction.
     252             :         allowedZeroSeqNum bool
     253             : 
     254             :         metrics map[int]*LevelMetrics
     255             : 
     256             :         pickerMetrics compactionPickerMetrics
     257             : }
     258             : 
     259             : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
     260             : // input sstables.
     261           2 : func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
     262           2 :         var seqNum base.SeqNum
     263           2 :         for _, cl := range c.inputs {
     264           2 :                 cl.files.Each(func(m *manifest.FileMetadata) {
     265           2 :                         seqNum = max(seqNum, m.LargestSeqNumAbsolute)
     266           2 :                 })
     267             :         }
     268           2 :         return seqNum
     269             : }
     270             : 
     271           2 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
     272           2 :         info := CompactionInfo{
     273           2 :                 JobID:       int(jobID),
     274           2 :                 Reason:      c.kind.String(),
     275           2 :                 Input:       make([]LevelInfo, 0, len(c.inputs)),
     276           2 :                 Annotations: []string{},
     277           2 :         }
     278           2 :         if c.isDownload {
     279           2 :                 info.Reason = "download," + info.Reason
     280           2 :         }
     281           2 :         for _, cl := range c.inputs {
     282           2 :                 inputInfo := LevelInfo{Level: cl.level, Tables: nil}
     283           2 :                 iter := cl.files.Iter()
     284           2 :                 for m := iter.First(); m != nil; m = iter.Next() {
     285           2 :                         inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
     286           2 :                 }
     287           2 :                 info.Input = append(info.Input, inputInfo)
     288             :         }
     289           2 :         if c.outputLevel != nil {
     290           2 :                 info.Output.Level = c.outputLevel.level
     291           2 : 
     292           2 :                 // If there are no inputs from the output level (eg, a move
     293           2 :                 // compaction), add an empty LevelInfo to info.Input.
     294           2 :                 if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
     295           0 :                         info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
     296           0 :                 }
     297           2 :         } else {
     298           2 :                 // For a delete-only compaction, set the output level to L6. The
     299           2 :                 // output level is not meaningful here, but complicating the
     300           2 :                 // info.Output interface with a pointer doesn't seem worth the
     301           2 :                 // semantic distinction.
     302           2 :                 info.Output.Level = numLevels - 1
     303           2 :         }
     304             : 
     305           2 :         for i, score := range c.pickerMetrics.scores {
     306           2 :                 info.Input[i].Score = score
     307           2 :         }
     308           2 :         info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
     309           2 :         info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
     310           2 :         if len(info.Input) > 2 {
     311           2 :                 info.Annotations = append(info.Annotations, "multilevel")
     312           2 :         }
     313           2 :         return info
     314             : }
     315             : 
     316           2 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
     317           2 :         return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
     318           2 : }
     319             : 
     320             : func newCompaction(
     321             :         pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
     322           2 : ) *compaction {
     323           2 :         c := &compaction{
     324           2 :                 kind:              compactionKindDefault,
     325           2 :                 cmp:               pc.cmp,
     326           2 :                 equal:             opts.Comparer.Equal,
     327           2 :                 comparer:          opts.Comparer,
     328           2 :                 formatKey:         opts.Comparer.FormatKey,
     329           2 :                 inputs:            pc.inputs,
     330           2 :                 smallest:          pc.smallest,
     331           2 :                 largest:           pc.largest,
     332           2 :                 logger:            opts.Logger,
     333           2 :                 version:           pc.version,
     334           2 :                 beganAt:           beganAt,
     335           2 :                 maxOutputFileSize: pc.maxOutputFileSize,
     336           2 :                 maxOverlapBytes:   pc.maxOverlapBytes,
     337           2 :                 pickerMetrics:     pc.pickerMetrics,
     338           2 :         }
     339           2 :         c.startLevel = &c.inputs[0]
     340           2 :         if pc.startLevel.l0SublevelInfo != nil {
     341           2 :                 c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
     342           2 :         }
     343           2 :         c.outputLevel = &c.inputs[1]
     344           2 : 
     345           2 :         if len(pc.extraLevels) > 0 {
     346           2 :                 c.extraLevels = pc.extraLevels
     347           2 :                 c.outputLevel = &c.inputs[len(c.inputs)-1]
     348           2 :         }
     349             :         // Compute the set of outputLevel+1 files that overlap this compaction (these
     350             :         // are the grandparent sstables).
     351           2 :         if c.outputLevel.level+1 < numLevels {
     352           2 :                 c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
     353           2 :         }
     354           2 :         c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
     355           2 :                 c.cmp, c.version, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
     356           2 :         )
     357           2 :         c.kind = pc.kind
     358           2 : 
     359           2 :         if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
     360           2 :                 c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
     361           2 :                 // This compaction can be converted into a move or copy from one level
     362           2 :                 // to the next. We avoid such a move if there is lots of overlapping
     363           2 :                 // grandparent data. Otherwise, the move could create a parent file
     364           2 :                 // that will require a very expensive merge later on.
     365           2 :                 iter := c.startLevel.files.Iter()
     366           2 :                 meta := iter.First()
     367           2 :                 isRemote := false
     368           2 :                 // We should always be passed a provider, except in some unit tests.
     369           2 :                 if provider != nil {
     370           2 :                         isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
     371           2 :                 }
     372             :                 // Avoid a trivial move or copy if all of these are true, as rewriting a
     373             :                 // new file is better:
     374             :                 //
     375             :                 // 1) The source file is a virtual sstable
     376             :                 // 2) The existing file `meta` is on non-remote storage
     377             :                 // 3) The output level prefers shared storage
     378           2 :                 mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     379           2 :                 if mustCopy {
     380           2 :                         // If the source is virtual, it's best to just rewrite the file as all
     381           2 :                         // conditions in the above comment are met.
     382           2 :                         if !meta.Virtual {
     383           2 :                                 c.kind = compactionKindCopy
     384           2 :                         }
     385           2 :                 } else {
     386           2 :                         c.kind = compactionKindMove
     387           2 :                 }
     388             :         }
     389           2 :         return c
     390             : }
     391             : 
     392             : func newDeleteOnlyCompaction(
     393             :         opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
     394           2 : ) *compaction {
     395           2 :         c := &compaction{
     396           2 :                 kind:      compactionKindDeleteOnly,
     397           2 :                 cmp:       opts.Comparer.Compare,
     398           2 :                 equal:     opts.Comparer.Equal,
     399           2 :                 comparer:  opts.Comparer,
     400           2 :                 formatKey: opts.Comparer.FormatKey,
     401           2 :                 logger:    opts.Logger,
     402           2 :                 version:   cur,
     403           2 :                 beganAt:   beganAt,
     404           2 :                 inputs:    inputs,
     405           2 :         }
     406           2 : 
     407           2 :         // Set c.smallest, c.largest.
     408           2 :         files := make([]manifest.LevelIterator, 0, len(inputs))
     409           2 :         for _, in := range inputs {
     410           2 :                 files = append(files, in.files.Iter())
     411           2 :         }
     412           2 :         c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
     413           2 :         return c
     414             : }
     415             : 
     416           2 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
     417           2 :         // Heuristic to place a lower bound on compaction output file size
     418           2 :         // caused by Lbase. Prior to this heuristic we have observed an L0 in
     419           2 :         // production with 310K files of which 290K files were < 10KB in size.
     420           2 :         // Our hypothesis is that it was caused by L1 having 2600 files and
     421           2 :         // ~10GB, such that each flush got split into many tiny files due to
     422           2 :         // overlapping with most of the files in Lbase.
     423           2 :         //
     424           2 :         // The computation below is general in that it accounts
     425           2 :         // for flushing different volumes of data (e.g. we may be flushing
     426           2 :         // many memtables). For illustration, we consider the typical
     427           2 :         // example of flushing a 64MB memtable. So 12.8MB output,
     428           2 :         // based on the compression guess below. If the compressed bytes
     429           2 :         // guess is an over-estimate we will end up with smaller files,
     430           2 :         // and if an under-estimate we will end up with larger files.
     431           2 :         // With a 2MB target file size, 7 files. We are willing to accept
     432           2 :         // 4x the number of files, if it results in better write amplification
     433           2 :         // when later compacting to Lbase, i.e., ~450KB files (target file
     434           2 :         // size / 4).
     435           2 :         //
     436           2 :         // Note that this is a pessimistic heuristic in that
     437           2 :         // fileCountUpperBoundDueToGrandparents could be far from the actual
     438           2 :         // number of files produced due to the grandparent limits. For
     439           2 :         // example, in the extreme, consider a flush that overlaps with 1000
     440           2 :         // files in Lbase f0...f999, and the initially calculated value of
     441           2 :         // maxOverlapBytes will cause splits at f10, f20,..., f990, which
     442           2 :         // means an upper bound file count of 100 files. Say the input bytes
     443           2 :         // in the flush are such that acceptableFileCount=10. We will fatten
     444           2 :         // up maxOverlapBytes by 10x to ensure that the upper bound file count
     445           2 :         // drops to 10. However, it is possible that in practice, even without
     446           2 :         // this change, we would have produced no more than 10 files, and that
     447           2 :         // this change makes the files unnecessarily wide. Say the input bytes
     448           2 :         // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
     449           2 :         // 10% in f80...f89 and 10% in f990...f999. The original value of
     450           2 :         // maxOverlapBytes would have actually produced only 10 sstables. But
     451           2 :         // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
     452           2 :         // spans f0...f89, i.e., a much wider sstable than necessary.
     453           2 :         //
     454           2 :         // We could produce a tighter estimate of
     455           2 :         // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
     456           2 :         // distribution of the flush. The 4x multiplier mentioned earlier is
     457           2 :         // a way to try to compensate for this pessimism.
     458           2 :         //
     459           2 :         // TODO(sumeer): we don't have compression info for the data being
     460           2 :         // flushed, but it is likely that existing files that overlap with
     461           2 :         // this flush in Lbase are representative wrt compression ratio. We
     462           2 :         // could store the uncompressed size in FileMetadata and estimate
     463           2 :         // the compression ratio.
     464           2 :         const approxCompressionRatio = 0.2
     465           2 :         approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
     466           2 :         approxNumFilesBasedOnTargetSize :=
     467           2 :                 int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
     468           2 :         acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
     469           2 :         // The byte calculation is linear in numGrandparentFiles, but we will
     470           2 :         // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
     471           2 :         // also willing to pay it now. We could approximate this cheaply by using the
     472           2 :         // mean file size of Lbase.
     473           2 :         grandparentFileBytes := c.grandparents.SizeSum()
     474           2 :         fileCountUpperBoundDueToGrandparents :=
     475           2 :                 float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
     476           2 :         if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
     477           2 :                 c.maxOverlapBytes = uint64(
     478           2 :                         float64(c.maxOverlapBytes) *
     479           2 :                                 (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
     480           2 :         }
     481             : }
     482             : 
     483             : func newFlush(
     484             :         opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
     485           2 : ) (*compaction, error) {
     486           2 :         c := &compaction{
     487           2 :                 kind:              compactionKindFlush,
     488           2 :                 cmp:               opts.Comparer.Compare,
     489           2 :                 equal:             opts.Comparer.Equal,
     490           2 :                 comparer:          opts.Comparer,
     491           2 :                 formatKey:         opts.Comparer.FormatKey,
     492           2 :                 logger:            opts.Logger,
     493           2 :                 version:           cur,
     494           2 :                 beganAt:           beganAt,
     495           2 :                 inputs:            []compactionLevel{{level: -1}, {level: 0}},
     496           2 :                 maxOutputFileSize: math.MaxUint64,
     497           2 :                 maxOverlapBytes:   math.MaxUint64,
     498           2 :                 flushing:          flushing,
     499           2 :         }
     500           2 :         c.startLevel = &c.inputs[0]
     501           2 :         c.outputLevel = &c.inputs[1]
     502           2 : 
     503           2 :         if len(flushing) > 0 {
     504           2 :                 if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
     505           2 :                         if len(flushing) != 1 {
     506           0 :                                 panic("pebble: ingestedFlushable must be flushed one at a time.")
     507             :                         }
     508           2 :                         c.kind = compactionKindIngestedFlushable
     509           2 :                         return c, nil
     510             :                 }
     511             :         }
     512             : 
     513             :         // Make sure there's no ingestedFlushable after the first flushable in the
     514             :         // list.
     515           2 :         for _, f := range flushing {
     516           2 :                 if _, ok := f.flushable.(*ingestedFlushable); ok {
     517           0 :                         panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
     518             :                 }
     519             :         }
     520             : 
     521           2 :         if cur.L0Sublevels != nil {
     522           2 :                 c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
     523           2 :         }
     524             : 
     525           2 :         smallestSet, largestSet := false, false
     526           2 :         updatePointBounds := func(iter internalIterator) {
     527           2 :                 if kv := iter.First(); kv != nil {
     528           2 :                         if !smallestSet ||
     529           2 :                                 base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
     530           2 :                                 smallestSet = true
     531           2 :                                 c.smallest = kv.K.Clone()
     532           2 :                         }
     533             :                 }
     534           2 :                 if kv := iter.Last(); kv != nil {
     535           2 :                         if !largestSet ||
     536           2 :                                 base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
     537           2 :                                 largestSet = true
     538           2 :                                 c.largest = kv.K.Clone()
     539           2 :                         }
     540             :                 }
     541             :         }
     542             : 
     543           2 :         updateRangeBounds := func(iter keyspan.FragmentIterator) error {
     544           2 :                 // File bounds require s != nil && !s.Empty(). We only need to check for
     545           2 :                 // s != nil here, as the memtable's FragmentIterator would never surface
     546           2 :                 // empty spans.
     547           2 :                 if s, err := iter.First(); err != nil {
     548           0 :                         return err
     549           2 :                 } else if s != nil {
     550           2 :                         if key := s.SmallestKey(); !smallestSet ||
     551           2 :                                 base.InternalCompare(c.cmp, c.smallest, key) > 0 {
     552           2 :                                 smallestSet = true
     553           2 :                                 c.smallest = key.Clone()
     554           2 :                         }
     555             :                 }
     556           2 :                 if s, err := iter.Last(); err != nil {
     557           0 :                         return err
     558           2 :                 } else if s != nil {
     559           2 :                         if key := s.LargestKey(); !largestSet ||
     560           2 :                                 base.InternalCompare(c.cmp, c.largest, key) < 0 {
     561           2 :                                 largestSet = true
     562           2 :                                 c.largest = key.Clone()
     563           2 :                         }
     564             :                 }
     565           2 :                 return nil
     566             :         }
     567             : 
     568           2 :         var flushingBytes uint64
     569           2 :         for i := range flushing {
     570           2 :                 f := flushing[i]
     571           2 :                 updatePointBounds(f.newIter(nil))
     572           2 :                 if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
     573           2 :                         if err := updateRangeBounds(rangeDelIter); err != nil {
     574           0 :                                 return nil, err
     575           0 :                         }
     576             :                 }
     577           2 :                 if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     578           2 :                         if err := updateRangeBounds(rangeKeyIter); err != nil {
     579           0 :                                 return nil, err
     580           0 :                         }
     581             :                 }
     582           2 :                 flushingBytes += f.inuseBytes()
     583             :         }
     584             : 
     585           2 :         if opts.FlushSplitBytes > 0 {
     586           2 :                 c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
     587           2 :                 c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
     588           2 :                 c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
     589           2 :                 adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
     590           2 :         }
     591             : 
     592             :         // We don't elide tombstones for flushes.
     593           2 :         c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
     594           2 :         return c, nil
     595             : }
     596             : 
     597           2 : func (c *compaction) hasExtraLevelData() bool {
     598           2 :         if len(c.extraLevels) == 0 {
     599           2 :                 // not a multi level compaction
     600           2 :                 return false
     601           2 :         } else if c.extraLevels[0].files.Empty() {
     602           2 :                 // a multi level compaction without data in the intermediate input level;
     603           2 :                 // e.g. for a multi level compaction with levels 4,5, and 6, this could
     604           2 :                 // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
     605           2 :                 return false
     606           2 :         }
     607           2 :         return true
     608             : }
     609             : 
     610             : // errorOnUserKeyOverlap returns an error if the last two written sstables in
     611             : // this compaction have revisions of the same user key present in both sstables,
     612             : // when it shouldn't (eg. when splitting flushes).
     613           1 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
     614           1 :         if n := len(ve.NewFiles); n > 1 {
     615           1 :                 meta := ve.NewFiles[n-1].Meta
     616           1 :                 prevMeta := ve.NewFiles[n-2].Meta
     617           1 :                 if !prevMeta.Largest.IsExclusiveSentinel() &&
     618           1 :                         c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
     619           1 :                         return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
     620           1 :                                 prevMeta.Largest.Pretty(c.formatKey),
     621           1 :                                 prevMeta.FileNum,
     622           1 :                                 meta.FileNum)
     623           1 :                 }
     624             :         }
     625           1 :         return nil
     626             : }
     627             : 
     628             : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
     629             : // snapshots requiring them to be kept. It performs this determination by
     630             : // looking at the TombstoneElision values which are set up based on sstables
     631             : // which overlap the bounds of the compaction at a lower level in the LSM.
     632           2 : func (c *compaction) allowZeroSeqNum() bool {
     633           2 :         // TODO(peter): we disable zeroing of seqnums during flushing to match
     634           2 :         // RocksDB behavior and to avoid generating overlapping sstables during
     635           2 :         // DB.replayWAL. When replaying WAL files at startup, we flush after each
     636           2 :         // WAL is replayed building up a single version edit that is
     637           2 :         // applied. Because we don't apply the version edit after each flush, this
     638           2 :         // code doesn't know that L0 contains files and zeroing of seqnums should
     639           2 :         // be disabled. That is fixable, but it seems safer to just match the
     640           2 :         // RocksDB behavior for now.
     641           2 :         return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
     642           2 : }
     643             : 
     644             : // newInputIters returns an iterator over all the input tables in a compaction.
     645             : func (c *compaction) newInputIters(
     646             :         newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter,
     647             : ) (
     648             :         pointIter internalIterator,
     649             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     650             :         retErr error,
     651           2 : ) {
     652           2 :         // Validate the ordering of compaction input files for defense in depth.
     653           2 :         if len(c.flushing) == 0 {
     654           2 :                 if c.startLevel.level >= 0 {
     655           2 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     656           2 :                                 manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
     657           2 :                         if err != nil {
     658           1 :                                 return nil, nil, nil, err
     659           1 :                         }
     660             :                 }
     661           2 :                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     662           2 :                         manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
     663           2 :                 if err != nil {
     664           1 :                         return nil, nil, nil, err
     665           1 :                 }
     666           2 :                 if c.startLevel.level == 0 {
     667           2 :                         if c.startLevel.l0SublevelInfo == nil {
     668           0 :                                 panic("l0SublevelInfo not created for compaction out of L0")
     669             :                         }
     670           2 :                         for _, info := range c.startLevel.l0SublevelInfo {
     671           2 :                                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     672           2 :                                         info.sublevel, info.Iter())
     673           2 :                                 if err != nil {
     674           1 :                                         return nil, nil, nil, err
     675           1 :                                 }
     676             :                         }
     677             :                 }
     678           2 :                 if len(c.extraLevels) > 0 {
     679           2 :                         if len(c.extraLevels) > 1 {
     680           0 :                                 panic("n>2 multi level compaction not implemented yet")
     681             :                         }
     682           2 :                         interLevel := c.extraLevels[0]
     683           2 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     684           2 :                                 manifest.Level(interLevel.level), interLevel.files.Iter())
     685           2 :                         if err != nil {
     686           0 :                                 return nil, nil, nil, err
     687           0 :                         }
     688             :                 }
     689             :         }
     690             : 
     691             :         // There are three classes of keys that a compaction needs to process: point
     692             :         // keys, range deletion tombstones and range keys. Collect all iterators for
     693             :         // all these classes of keys from all the levels. We'll aggregate them
     694             :         // together farther below.
     695             :         //
     696             :         // numInputLevels is an approximation of the number of iterator levels. Due
     697             :         // to idiosyncrasies in iterator construction, we may (rarely) exceed this
     698             :         // initial capacity.
     699           2 :         numInputLevels := max(len(c.flushing), len(c.inputs))
     700           2 :         iters := make([]internalIterator, 0, numInputLevels)
     701           2 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     702           2 :         rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     703           2 : 
     704           2 :         // If construction of the iterator inputs fails, ensure that we close all
     705           2 :         // the consitutent iterators.
     706           2 :         defer func() {
     707           2 :                 if retErr != nil {
     708           0 :                         for _, iter := range iters {
     709           0 :                                 if iter != nil {
     710           0 :                                         iter.Close()
     711           0 :                                 }
     712             :                         }
     713           0 :                         for _, rangeDelIter := range rangeDelIters {
     714           0 :                                 rangeDelIter.Close()
     715           0 :                         }
     716             :                 }
     717             :         }()
     718           2 :         iterOpts := IterOptions{
     719           2 :                 CategoryAndQoS: sstable.CategoryAndQoS{
     720           2 :                         Category: "pebble-compaction",
     721           2 :                         QoSLevel: sstable.NonLatencySensitiveQoSLevel,
     722           2 :                 },
     723           2 :                 logger: c.logger,
     724           2 :         }
     725           2 : 
     726           2 :         // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
     727           2 :         // constituent iterators. This depends on whether this is a flush or a
     728           2 :         // compaction.
     729           2 :         if len(c.flushing) != 0 {
     730           2 :                 // If flushing, we need to build the input iterators over the memtables
     731           2 :                 // stored in c.flushing.
     732           2 :                 for i := range c.flushing {
     733           2 :                         f := c.flushing[i]
     734           2 :                         iters = append(iters, f.newFlushIter(nil))
     735           2 :                         rangeDelIter := f.newRangeDelIter(nil)
     736           2 :                         if rangeDelIter != nil {
     737           2 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
     738           2 :                         }
     739           2 :                         if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     740           2 :                                 rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
     741           2 :                         }
     742             :                 }
     743           2 :         } else {
     744           2 :                 addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
     745           2 :                         // Add a *levelIter for point iterators. Because we don't call
     746           2 :                         // initRangeDel, the levelIter will close and forget the range
     747           2 :                         // deletion iterator when it steps on to a new file. Surfacing range
     748           2 :                         // deletions to compactions are handled below.
     749           2 :                         iters = append(iters, newLevelIter(context.Background(),
     750           2 :                                 iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
     751           2 :                                         compaction: true,
     752           2 :                                         bufferPool: &c.bufferPool,
     753           2 :                                 }))
     754           2 :                         // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
     755           2 :                         // deletions into memory upfront. (See #2015, which reverted this.) There
     756           2 :                         // will be no user keys that are split between sstables within a level in
     757           2 :                         // Cockroach 23.1, which unblocks this optimization.
     758           2 : 
     759           2 :                         // Add the range deletion iterator for each file as an independent level
     760           2 :                         // in mergingIter, as opposed to making a levelIter out of those. This
     761           2 :                         // is safer as levelIter expects all keys coming from underlying
     762           2 :                         // iterators to be in order. Due to compaction / tombstone writing
     763           2 :                         // logic in finishOutput(), it is possible for range tombstones to not
     764           2 :                         // be strictly ordered across all files in one level.
     765           2 :                         //
     766           2 :                         // Consider this example from the metamorphic tests (also repeated in
     767           2 :                         // finishOutput()), consisting of three L3 files with their bounds
     768           2 :                         // specified in square brackets next to the file name:
     769           2 :                         //
     770           2 :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
     771           2 :                         // tmgc#391,MERGE [786e627a]
     772           2 :                         // tmgc-udkatvs#331,RANGEDEL
     773           2 :                         //
     774           2 :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
     775           2 :                         // tmgc#384,MERGE [666c7070]
     776           2 :                         // tmgc-tvsalezade#383,RANGEDEL
     777           2 :                         // tmgc-tvsalezade#331,RANGEDEL
     778           2 :                         //
     779           2 :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
     780           2 :                         // tmgc-tvsalezade#383,RANGEDEL
     781           2 :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
     782           2 :                         // tmgc-tvsalezade#356,RANGEDEL
     783           2 :                         //
     784           2 :                         // Here, the range tombstone in 000240.sst falls "after" one in
     785           2 :                         // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
     786           2 :                         // levelIter's purposes. While each file is still consistent before its
     787           2 :                         // bounds, it's safer to have all rangedel iterators be visible to
     788           2 :                         // mergingIter.
     789           2 :                         iter := level.files.Iter()
     790           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
     791           2 :                                 rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, l)
     792           2 :                                 if err != nil {
     793           0 :                                         // The error will already be annotated with the BackingFileNum, so
     794           0 :                                         // we annotate it with the FileNum.
     795           0 :                                         return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
     796           0 :                                 }
     797           2 :                                 if rangeDelIter == nil {
     798           2 :                                         continue
     799             :                                 }
     800           2 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
     801           2 :                                 c.closers = append(c.closers, rangeDelIter)
     802             :                         }
     803             : 
     804             :                         // Check if this level has any range keys.
     805           2 :                         hasRangeKeys := false
     806           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
     807           2 :                                 if f.HasRangeKeys {
     808           2 :                                         hasRangeKeys = true
     809           2 :                                         break
     810             :                                 }
     811             :                         }
     812           2 :                         if hasRangeKeys {
     813           2 :                                 newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
     814           2 :                                         rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
     815           2 :                                         if err != nil {
     816           0 :                                                 return nil, err
     817           2 :                                         } else if rangeKeyIter == nil {
     818           0 :                                                 return emptyKeyspanIter, nil
     819           0 :                                         }
     820             :                                         // Ensure that the range key iter is not closed until the compaction is
     821             :                                         // finished. This is necessary because range key processing
     822             :                                         // requires the range keys to be held in memory for up to the
     823             :                                         // lifetime of the compaction.
     824           2 :                                         noCloseIter := &noCloseIter{rangeKeyIter}
     825           2 :                                         c.closers = append(c.closers, noCloseIter)
     826           2 : 
     827           2 :                                         // We do not need to truncate range keys to sstable boundaries, or
     828           2 :                                         // only read within the file's atomic compaction units, unlike with
     829           2 :                                         // range tombstones. This is because range keys were added after we
     830           2 :                                         // stopped splitting user keys across sstables, so all the range keys
     831           2 :                                         // in this sstable must wholly lie within the file's bounds.
     832           2 :                                         return noCloseIter, err
     833             :                                 }
     834           2 :                                 li := keyspanimpl.NewLevelIter(
     835           2 :                                         context.Background(), keyspan.SpanIterOptions{}, c.cmp,
     836           2 :                                         newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
     837           2 :                                 )
     838           2 :                                 rangeKeyIters = append(rangeKeyIters, li)
     839             :                         }
     840           2 :                         return nil
     841             :                 }
     842             : 
     843           2 :                 for i := range c.inputs {
     844           2 :                         // If the level is annotated with l0SublevelInfo, expand it into one
     845           2 :                         // level per sublevel.
     846           2 :                         // TODO(jackson): Perform this expansion even earlier when we pick the
     847           2 :                         // compaction?
     848           2 :                         if len(c.inputs[i].l0SublevelInfo) > 0 {
     849           2 :                                 for _, info := range c.startLevel.l0SublevelInfo {
     850           2 :                                         sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
     851           2 :                                         if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
     852           0 :                                                 return nil, nil, nil, err
     853           0 :                                         }
     854             :                                 }
     855           2 :                                 continue
     856             :                         }
     857           2 :                         if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
     858           0 :                                 return nil, nil, nil, err
     859           0 :                         }
     860             :                 }
     861             :         }
     862             : 
     863             :         // If there's only one constituent point iterator, we can avoid the overhead
     864             :         // of a *mergingIter. This is possible, for example, when performing a flush
     865             :         // of a single memtable. Otherwise, combine all the iterators into a merging
     866             :         // iter.
     867           2 :         pointIter = iters[0]
     868           2 :         if len(iters) > 1 {
     869           2 :                 pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
     870           2 :         }
     871             : 
     872             :         // In normal operation, levelIter iterates over the point operations in a
     873             :         // level, and initializes a rangeDelIter pointer for the range deletions in
     874             :         // each table. During compaction, we want to iterate over the merged view of
     875             :         // point operations and range deletions. In order to do this we create one
     876             :         // levelIter per level to iterate over the point operations, and collect up
     877             :         // all the range deletion files.
     878             :         //
     879             :         // The range deletion levels are combined with a keyspanimpl.MergingIter. The
     880             :         // resulting merged rangedel iterator is then included using an
     881             :         // InterleavingIter.
     882             :         // TODO(jackson): Consider using a defragmenting iterator to stitch together
     883             :         // logical range deletions that were fragmented due to previous file
     884             :         // boundaries.
     885           2 :         if len(rangeDelIters) > 0 {
     886           2 :                 mi := &keyspanimpl.MergingIter{}
     887           2 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
     888           2 :                 rangeDelIter = mi
     889           2 :         }
     890             : 
     891             :         // If there are range key iterators, we need to combine them using
     892             :         // keyspanimpl.MergingIter, and then interleave them among the points.
     893           2 :         if len(rangeKeyIters) > 0 {
     894           2 :                 mi := &keyspanimpl.MergingIter{}
     895           2 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
     896           2 :                 // TODO(radu): why do we have a defragmenter here but not above?
     897           2 :                 di := &keyspan.DefragmentingIter{}
     898           2 :                 di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
     899           2 :                 rangeKeyIter = di
     900           2 :         }
     901           2 :         return pointIter, rangeDelIter, rangeKeyIter, nil
     902             : }
     903             : 
     904             : func (c *compaction) newRangeDelIter(
     905             :         newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Layer,
     906           2 : ) (*noCloseIter, error) {
     907           2 :         opts.layer = l
     908           2 :         iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
     909           2 :                 internalIterOpts{
     910           2 :                         compaction: true,
     911           2 :                         bufferPool: &c.bufferPool,
     912           2 :                 }, iterRangeDeletions)
     913           2 :         if err != nil {
     914           0 :                 return nil, err
     915           2 :         } else if iterSet.rangeDeletion == nil {
     916           2 :                 // The file doesn't contain any range deletions.
     917           2 :                 return nil, nil
     918           2 :         }
     919             :         // Ensure that rangeDelIter is not closed until the compaction is
     920             :         // finished. This is necessary because range tombstone processing
     921             :         // requires the range tombstones to be held in memory for up to the
     922             :         // lifetime of the compaction.
     923           2 :         return &noCloseIter{iterSet.rangeDeletion}, nil
     924             : }
     925             : 
     926           1 : func (c *compaction) String() string {
     927           1 :         if len(c.flushing) != 0 {
     928           0 :                 return "flush\n"
     929           0 :         }
     930             : 
     931           1 :         var buf bytes.Buffer
     932           1 :         for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
     933           1 :                 i := level - c.startLevel.level
     934           1 :                 fmt.Fprintf(&buf, "%d:", level)
     935           1 :                 iter := c.inputs[i].files.Iter()
     936           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
     937           1 :                         fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
     938           1 :                 }
     939           1 :                 fmt.Fprintf(&buf, "\n")
     940             :         }
     941           1 :         return buf.String()
     942             : }
     943             : 
     944             : type manualCompaction struct {
     945             :         // Count of the retries either due to too many concurrent compactions, or a
     946             :         // concurrent compaction to overlapping levels.
     947             :         retries     int
     948             :         level       int
     949             :         outputLevel int
     950             :         done        chan error
     951             :         start       []byte
     952             :         end         []byte
     953             :         split       bool
     954             : }
     955             : 
     956             : type readCompaction struct {
     957             :         level int
     958             :         // [start, end] key ranges are used for de-duping.
     959             :         start []byte
     960             :         end   []byte
     961             : 
     962             :         // The file associated with the compaction.
     963             :         // If the file no longer belongs in the same
     964             :         // level, then we skip the compaction.
     965             :         fileNum base.FileNum
     966             : }
     967             : 
     968           2 : func (d *DB) addInProgressCompaction(c *compaction) {
     969           2 :         d.mu.compact.inProgress[c] = struct{}{}
     970           2 :         var isBase, isIntraL0 bool
     971           2 :         for _, cl := range c.inputs {
     972           2 :                 iter := cl.files.Iter()
     973           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
     974           2 :                         if f.IsCompacting() {
     975           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
     976           0 :                         }
     977           2 :                         f.SetCompactionState(manifest.CompactionStateCompacting)
     978           2 :                         if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
     979           2 :                                 if c.outputLevel.level == 0 {
     980           2 :                                         f.IsIntraL0Compacting = true
     981           2 :                                         isIntraL0 = true
     982           2 :                                 } else {
     983           2 :                                         isBase = true
     984           2 :                                 }
     985             :                         }
     986             :                 }
     987             :         }
     988             : 
     989           2 :         if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
     990           2 :                 l0Inputs := []manifest.LevelSlice{c.startLevel.files}
     991           2 :                 if isIntraL0 {
     992           2 :                         l0Inputs = append(l0Inputs, c.outputLevel.files)
     993           2 :                 }
     994           2 :                 if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
     995           0 :                         d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
     996           0 :                 }
     997             :         }
     998             : }
     999             : 
    1000             : // Removes compaction markers from files in a compaction. The rollback parameter
    1001             : // indicates whether the compaction state should be rolled back to its original
    1002             : // state in the case of an unsuccessful compaction.
    1003             : //
    1004             : // DB.mu must be held when calling this method, however this method can drop and
    1005             : // re-acquire that mutex. All writes to the manifest for this compaction should
    1006             : // have completed by this point.
    1007           2 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
    1008           2 :         c.versionEditApplied = true
    1009           2 :         for _, cl := range c.inputs {
    1010           2 :                 iter := cl.files.Iter()
    1011           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1012           2 :                         if !f.IsCompacting() {
    1013           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1014           0 :                         }
    1015           2 :                         if !rollback {
    1016           2 :                                 // On success all compactions other than move-compactions transition the
    1017           2 :                                 // file into the Compacted state. Move-compacted files become eligible
    1018           2 :                                 // for compaction again and transition back to NotCompacting.
    1019           2 :                                 if c.kind != compactionKindMove {
    1020           2 :                                         f.SetCompactionState(manifest.CompactionStateCompacted)
    1021           2 :                                 } else {
    1022           2 :                                         f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1023           2 :                                 }
    1024           2 :                         } else {
    1025           2 :                                 // Else, on rollback, all input files unconditionally transition back to
    1026           2 :                                 // NotCompacting.
    1027           2 :                                 f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1028           2 :                         }
    1029           2 :                         f.IsIntraL0Compacting = false
    1030             :                 }
    1031             :         }
    1032           2 :         l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
    1033           2 :         func() {
    1034           2 :                 // InitCompactingFileInfo requires that no other manifest writes be
    1035           2 :                 // happening in parallel with it, i.e. we're not in the midst of installing
    1036           2 :                 // another version. Otherwise, it's possible that we've created another
    1037           2 :                 // L0Sublevels instance, but not added it to the versions list, causing
    1038           2 :                 // all the indices in FileMetadata to be inaccurate. To ensure this,
    1039           2 :                 // grab the manifest lock.
    1040           2 :                 d.mu.versions.logLock()
    1041           2 :                 defer d.mu.versions.logUnlock()
    1042           2 :                 d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
    1043           2 :         }()
    1044             : }
    1045             : 
    1046           2 : func (d *DB) calculateDiskAvailableBytes() uint64 {
    1047           2 :         if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
    1048           2 :                 d.diskAvailBytes.Store(space.AvailBytes)
    1049           2 :                 return space.AvailBytes
    1050           2 :         } else if !errors.Is(err, vfs.ErrUnsupported) {
    1051           1 :                 d.opts.EventListener.BackgroundError(err)
    1052           1 :         }
    1053           2 :         return d.diskAvailBytes.Load()
    1054             : }
    1055             : 
    1056             : // maybeScheduleFlush schedules a flush if necessary.
    1057             : //
    1058             : // d.mu must be held when calling this.
    1059           2 : func (d *DB) maybeScheduleFlush() {
    1060           2 :         if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
    1061           2 :                 return
    1062           2 :         }
    1063           2 :         if len(d.mu.mem.queue) <= 1 {
    1064           2 :                 return
    1065           2 :         }
    1066             : 
    1067           2 :         if !d.passedFlushThreshold() {
    1068           2 :                 return
    1069           2 :         }
    1070             : 
    1071           2 :         d.mu.compact.flushing = true
    1072           2 :         go d.flush()
    1073             : }
    1074             : 
    1075           2 : func (d *DB) passedFlushThreshold() bool {
    1076           2 :         var n int
    1077           2 :         var size uint64
    1078           2 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1079           2 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1080           2 :                         break
    1081             :                 }
    1082           2 :                 if d.mu.mem.queue[n].flushForced {
    1083           2 :                         // A flush was forced. Pretend the memtable size is the configured
    1084           2 :                         // size. See minFlushSize below.
    1085           2 :                         size += d.opts.MemTableSize
    1086           2 :                 } else {
    1087           2 :                         size += d.mu.mem.queue[n].totalBytes()
    1088           2 :                 }
    1089             :         }
    1090           2 :         if n == 0 {
    1091           2 :                 // None of the immutable memtables are ready for flushing.
    1092           2 :                 return false
    1093           2 :         }
    1094             : 
    1095             :         // Only flush once the sum of the queued memtable sizes exceeds half the
    1096             :         // configured memtable size. This prevents flushing of memtables at startup
    1097             :         // while we're undergoing the ramp period on the memtable size. See
    1098             :         // DB.newMemTable().
    1099           2 :         minFlushSize := d.opts.MemTableSize / 2
    1100           2 :         return size >= minFlushSize
    1101             : }
    1102             : 
    1103           2 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
    1104           2 :         var mem *flushableEntry
    1105           2 :         for _, m := range d.mu.mem.queue {
    1106           2 :                 if m.flushable == tbl {
    1107           2 :                         mem = m
    1108           2 :                         break
    1109             :                 }
    1110             :         }
    1111           2 :         if mem == nil || mem.flushForced {
    1112           2 :                 return
    1113           2 :         }
    1114           2 :         deadline := d.timeNow().Add(dur)
    1115           2 :         if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
    1116           2 :                 // Already scheduled to flush sooner than within `dur`.
    1117           2 :                 return
    1118           2 :         }
    1119           2 :         mem.delayedFlushForcedAt = deadline
    1120           2 :         go func() {
    1121           2 :                 timer := time.NewTimer(dur)
    1122           2 :                 defer timer.Stop()
    1123           2 : 
    1124           2 :                 select {
    1125           2 :                 case <-d.closedCh:
    1126           2 :                         return
    1127           2 :                 case <-mem.flushed:
    1128           2 :                         return
    1129           2 :                 case <-timer.C:
    1130           2 :                         d.commit.mu.Lock()
    1131           2 :                         defer d.commit.mu.Unlock()
    1132           2 :                         d.mu.Lock()
    1133           2 :                         defer d.mu.Unlock()
    1134           2 : 
    1135           2 :                         // NB: The timer may fire concurrently with a call to Close.  If a
    1136           2 :                         // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
    1137           2 :                         // and it's too late to flush anything. Otherwise, the Close call
    1138           2 :                         // will block on locking d.mu until we've finished scheduling the
    1139           2 :                         // flush and set `d.mu.compact.flushing` to true. Close will wait
    1140           2 :                         // for the current flush to complete.
    1141           2 :                         if d.closed.Load() != nil {
    1142           2 :                                 return
    1143           2 :                         }
    1144             : 
    1145           2 :                         if d.mu.mem.mutable == tbl {
    1146           2 :                                 d.makeRoomForWrite(nil)
    1147           2 :                         } else {
    1148           2 :                                 mem.flushForced = true
    1149           2 :                         }
    1150           2 :                         d.maybeScheduleFlush()
    1151             :                 }
    1152             :         }()
    1153             : }
    1154             : 
    1155           2 : func (d *DB) flush() {
    1156           2 :         pprof.Do(context.Background(), flushLabels, func(context.Context) {
    1157           2 :                 flushingWorkStart := time.Now()
    1158           2 :                 d.mu.Lock()
    1159           2 :                 defer d.mu.Unlock()
    1160           2 :                 idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
    1161           2 :                 var bytesFlushed uint64
    1162           2 :                 var err error
    1163           2 :                 if bytesFlushed, err = d.flush1(); err != nil {
    1164           2 :                         // TODO(peter): count consecutive flush errors and backoff.
    1165           2 :                         d.opts.EventListener.BackgroundError(err)
    1166           2 :                 }
    1167           2 :                 d.mu.compact.flushing = false
    1168           2 :                 d.mu.compact.noOngoingFlushStartTime = time.Now()
    1169           2 :                 workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
    1170           2 :                 d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
    1171           2 :                 d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
    1172           2 :                 d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
    1173           2 :                 // More flush work may have arrived while we were flushing, so schedule
    1174           2 :                 // another flush if needed.
    1175           2 :                 d.maybeScheduleFlush()
    1176           2 :                 // The flush may have produced too many files in a level, so schedule a
    1177           2 :                 // compaction if needed.
    1178           2 :                 d.maybeScheduleCompaction()
    1179           2 :                 d.mu.compact.cond.Broadcast()
    1180             :         })
    1181             : }
    1182             : 
    1183             : // runIngestFlush is used to generate a flush version edit for sstables which
    1184             : // were ingested as flushables. Both DB.mu and the manifest lock must be held
    1185             : // while runIngestFlush is called.
    1186           2 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
    1187           2 :         if len(c.flushing) != 1 {
    1188           0 :                 panic("pebble: ingestedFlushable must be flushed one at a time.")
    1189             :         }
    1190             : 
    1191             :         // Construct the VersionEdit, levelMetrics etc.
    1192           2 :         c.metrics = make(map[int]*LevelMetrics, numLevels)
    1193           2 :         // Finding the target level for ingestion must use the latest version
    1194           2 :         // after the logLock has been acquired.
    1195           2 :         c.version = d.mu.versions.currentVersion()
    1196           2 : 
    1197           2 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1198           2 :         ve := &versionEdit{}
    1199           2 :         var ingestSplitFiles []ingestSplitFile
    1200           2 :         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1201           2 : 
    1202           2 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    1203           2 :                 levelMetrics := c.metrics[level]
    1204           2 :                 if levelMetrics == nil {
    1205           2 :                         levelMetrics = &LevelMetrics{}
    1206           2 :                         c.metrics[level] = levelMetrics
    1207           2 :                 }
    1208           2 :                 levelMetrics.NumFiles--
    1209           2 :                 levelMetrics.Size -= int64(m.Size)
    1210           2 :                 for i := range added {
    1211           2 :                         levelMetrics.NumFiles++
    1212           2 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    1213           2 :                 }
    1214             :         }
    1215             : 
    1216           2 :         suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
    1217           2 :                 d.FormatMajorVersion() >= FormatVirtualSSTables
    1218           2 : 
    1219           2 :         if suggestSplit || ingestFlushable.exciseSpan.Valid() {
    1220           2 :                 // We could add deleted files to ve.
    1221           2 :                 ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
    1222           2 :         }
    1223             : 
    1224           2 :         ctx := context.Background()
    1225           2 :         overlapChecker := &overlapChecker{
    1226           2 :                 comparer: d.opts.Comparer,
    1227           2 :                 newIters: d.newIters,
    1228           2 :                 opts: IterOptions{
    1229           2 :                         logger: d.opts.Logger,
    1230           2 :                         CategoryAndQoS: sstable.CategoryAndQoS{
    1231           2 :                                 Category: "pebble-ingest",
    1232           2 :                                 QoSLevel: sstable.LatencySensitiveQoSLevel,
    1233           2 :                         },
    1234           2 :                 },
    1235           2 :                 v: c.version,
    1236           2 :         }
    1237           2 :         replacedFiles := make(map[base.FileNum][]newFileEntry)
    1238           2 :         for _, file := range ingestFlushable.files {
    1239           2 :                 var fileToSplit *fileMetadata
    1240           2 :                 var level int
    1241           2 : 
    1242           2 :                 // This file fits perfectly within the excise span, so we can slot it at L6.
    1243           2 :                 if ingestFlushable.exciseSpan.Valid() &&
    1244           2 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
    1245           2 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
    1246           2 :                         level = 6
    1247           2 :                 } else {
    1248           2 :                         // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
    1249           2 :                         lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
    1250           2 :                         if err != nil {
    1251           0 :                                 return nil, err
    1252           0 :                         }
    1253           2 :                         level, fileToSplit, err = ingestTargetLevel(
    1254           2 :                                 ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit,
    1255           2 :                         )
    1256           2 :                         if err != nil {
    1257           0 :                                 return nil, err
    1258           0 :                         }
    1259             :                 }
    1260             : 
    1261             :                 // Add the current flushableIngest file to the version.
    1262           2 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
    1263           2 :                 if fileToSplit != nil {
    1264           1 :                         ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
    1265           1 :                                 ingestFile: file.FileMetadata,
    1266           1 :                                 splitFile:  fileToSplit,
    1267           1 :                                 level:      level,
    1268           1 :                         })
    1269           1 :                 }
    1270           2 :                 levelMetrics := c.metrics[level]
    1271           2 :                 if levelMetrics == nil {
    1272           2 :                         levelMetrics = &LevelMetrics{}
    1273           2 :                         c.metrics[level] = levelMetrics
    1274           2 :                 }
    1275           2 :                 levelMetrics.BytesIngested += file.Size
    1276           2 :                 levelMetrics.TablesIngested++
    1277             :         }
    1278           2 :         if ingestFlushable.exciseSpan.Valid() {
    1279           2 :                 // Iterate through all levels and find files that intersect with exciseSpan.
    1280           2 :                 for l := range c.version.Levels {
    1281           2 :                         overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
    1282           2 :                         iter := overlaps.Iter()
    1283           2 : 
    1284           2 :                         for m := iter.First(); m != nil; m = iter.Next() {
    1285           2 :                                 newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
    1286           2 :                                 if err != nil {
    1287           0 :                                         return nil, err
    1288           0 :                                 }
    1289             : 
    1290           2 :                                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    1291           2 :                                         Level:   l,
    1292           2 :                                         FileNum: m.FileNum,
    1293           2 :                                 }]; !ok {
    1294           1 :                                         // We did not excise this file.
    1295           1 :                                         continue
    1296             :                                 }
    1297           2 :                                 replacedFiles[m.FileNum] = newFiles
    1298           2 :                                 updateLevelMetricsOnExcise(m, l, newFiles)
    1299             :                         }
    1300             :                 }
    1301             :         }
    1302             : 
    1303           2 :         if len(ingestSplitFiles) > 0 {
    1304           1 :                 if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
    1305           0 :                         return nil, err
    1306           0 :                 }
    1307             :         }
    1308             : 
    1309           2 :         return ve, nil
    1310             : }
    1311             : 
    1312             : // flush runs a compaction that copies the immutable memtables from memory to
    1313             : // disk.
    1314             : //
    1315             : // d.mu must be held when calling this, but the mutex may be dropped and
    1316             : // re-acquired during the course of this method.
    1317           2 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
    1318           2 :         // NB: The flushable queue can contain flushables of type ingestedFlushable.
    1319           2 :         // The sstables in ingestedFlushable.files must be placed into the appropriate
    1320           2 :         // level in the lsm. Let's say the flushable queue contains a prefix of
    1321           2 :         // regular immutable memtables, then an ingestedFlushable, and then the
    1322           2 :         // mutable memtable. When the flush of the ingestedFlushable is performed,
    1323           2 :         // it needs an updated view of the lsm. That is, the prefix of immutable
    1324           2 :         // memtables must have already been flushed. Similarly, if there are two
    1325           2 :         // contiguous ingestedFlushables in the queue, then the first flushable must
    1326           2 :         // be flushed, so that the second flushable can see an updated view of the
    1327           2 :         // lsm.
    1328           2 :         //
    1329           2 :         // Given the above, we restrict flushes to either some prefix of regular
    1330           2 :         // memtables, or a single flushable of type ingestedFlushable. The DB.flush
    1331           2 :         // function will call DB.maybeScheduleFlush again, so a new flush to finish
    1332           2 :         // the remaining flush work should be scheduled right away.
    1333           2 :         //
    1334           2 :         // NB: Large batches placed in the flushable queue share the WAL with the
    1335           2 :         // previous memtable in the queue. We must ensure the property that both the
    1336           2 :         // large batch and the memtable with which it shares a WAL are flushed
    1337           2 :         // together. The property ensures that the minimum unflushed log number
    1338           2 :         // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
    1339           2 :         // returns true, and since the large batch will always be placed right after
    1340           2 :         // the memtable with which it shares a WAL, the property is naturally
    1341           2 :         // ensured. The large batch will always be placed after the memtable with
    1342           2 :         // which it shares a WAL because we ensure it in DB.commitWrite by holding
    1343           2 :         // the commitPipeline.mu and then holding DB.mu. As an extra defensive
    1344           2 :         // measure, if we try to flush the memtable without also flushing the
    1345           2 :         // flushable batch in the same flush, since the memtable and flushableBatch
    1346           2 :         // have the same logNum, the logNum invariant check below will trigger.
    1347           2 :         var n, inputs int
    1348           2 :         var inputBytes uint64
    1349           2 :         var ingest bool
    1350           2 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1351           2 :                 if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
    1352           2 :                         if n == 0 {
    1353           2 :                                 // The first flushable is of type ingestedFlushable. Since these
    1354           2 :                                 // must be flushed individually, we perform a flush for just
    1355           2 :                                 // this.
    1356           2 :                                 if !f.readyForFlush() {
    1357           0 :                                         // This check is almost unnecessary, but we guard against it
    1358           0 :                                         // just in case this invariant changes in the future.
    1359           0 :                                         panic("pebble: ingestedFlushable should always be ready to flush.")
    1360             :                                 }
    1361             :                                 // By setting n = 1, we ensure that the first flushable(n == 0)
    1362             :                                 // is scheduled for a flush. The number of tables added is equal to the
    1363             :                                 // number of files in the ingest operation.
    1364           2 :                                 n = 1
    1365           2 :                                 inputs = len(f.files)
    1366           2 :                                 ingest = true
    1367           2 :                                 break
    1368           2 :                         } else {
    1369           2 :                                 // There was some prefix of flushables which weren't of type
    1370           2 :                                 // ingestedFlushable. So, perform a flush for those.
    1371           2 :                                 break
    1372             :                         }
    1373             :                 }
    1374           2 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1375           1 :                         break
    1376             :                 }
    1377           2 :                 inputBytes += d.mu.mem.queue[n].inuseBytes()
    1378             :         }
    1379           2 :         if n == 0 {
    1380           0 :                 // None of the immutable memtables are ready for flushing.
    1381           0 :                 return 0, nil
    1382           0 :         }
    1383           2 :         if !ingest {
    1384           2 :                 // Flushes of memtables add the prefix of n memtables from the flushable
    1385           2 :                 // queue.
    1386           2 :                 inputs = n
    1387           2 :         }
    1388             : 
    1389             :         // Require that every memtable being flushed has a log number less than the
    1390             :         // new minimum unflushed log number.
    1391           2 :         minUnflushedLogNum := d.mu.mem.queue[n].logNum
    1392           2 :         if !d.opts.DisableWAL {
    1393           2 :                 for i := 0; i < n; i++ {
    1394           2 :                         if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
    1395           0 :                                 panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
    1396           0 :                                         n,
    1397           0 :                                         i, d.mu.mem.queue[i].flushable, logNum,
    1398           0 :                                         n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
    1399             :                         }
    1400             :                 }
    1401             :         }
    1402             : 
    1403           2 :         c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
    1404           2 :                 d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
    1405           2 :         if err != nil {
    1406           0 :                 return 0, err
    1407           0 :         }
    1408           2 :         d.addInProgressCompaction(c)
    1409           2 : 
    1410           2 :         jobID := d.newJobIDLocked()
    1411           2 :         d.opts.EventListener.FlushBegin(FlushInfo{
    1412           2 :                 JobID:      int(jobID),
    1413           2 :                 Input:      inputs,
    1414           2 :                 InputBytes: inputBytes,
    1415           2 :                 Ingest:     ingest,
    1416           2 :         })
    1417           2 :         startTime := d.timeNow()
    1418           2 : 
    1419           2 :         var ve *manifest.VersionEdit
    1420           2 :         var stats compact.Stats
    1421           2 :         // To determine the target level of the files in the ingestedFlushable, we
    1422           2 :         // need to acquire the logLock, and not release it for that duration. Since,
    1423           2 :         // we need to acquire the logLock below to perform the logAndApply step
    1424           2 :         // anyway, we create the VersionEdit for ingestedFlushable outside of
    1425           2 :         // runCompaction. For all other flush cases, we construct the VersionEdit
    1426           2 :         // inside runCompaction.
    1427           2 :         if c.kind != compactionKindIngestedFlushable {
    1428           2 :                 ve, stats, err = d.runCompaction(jobID, c)
    1429           2 :         }
    1430             : 
    1431             :         // Acquire logLock. This will be released either on an error, by way of
    1432             :         // logUnlock, or through a call to logAndApply if there is no error.
    1433           2 :         d.mu.versions.logLock()
    1434           2 : 
    1435           2 :         if c.kind == compactionKindIngestedFlushable {
    1436           2 :                 ve, err = d.runIngestFlush(c)
    1437           2 :         }
    1438             : 
    1439           2 :         info := FlushInfo{
    1440           2 :                 JobID:      int(jobID),
    1441           2 :                 Input:      inputs,
    1442           2 :                 InputBytes: inputBytes,
    1443           2 :                 Duration:   d.timeNow().Sub(startTime),
    1444           2 :                 Done:       true,
    1445           2 :                 Ingest:     ingest,
    1446           2 :                 Err:        err,
    1447           2 :         }
    1448           2 :         if err == nil {
    1449           2 :                 validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
    1450           2 :                 for i := range ve.NewFiles {
    1451           2 :                         e := &ve.NewFiles[i]
    1452           2 :                         info.Output = append(info.Output, e.Meta.TableInfo())
    1453           2 :                         // Ingested tables are not necessarily flushed to L0. Record the level of
    1454           2 :                         // each ingested file explicitly.
    1455           2 :                         if ingest {
    1456           2 :                                 info.IngestLevels = append(info.IngestLevels, e.Level)
    1457           2 :                         }
    1458             :                 }
    1459           2 :                 if len(ve.NewFiles) == 0 {
    1460           2 :                         info.Err = errEmptyTable
    1461           2 :                 }
    1462             : 
    1463             :                 // The flush succeeded or it produced an empty sstable. In either case we
    1464             :                 // want to bump the minimum unflushed log number to the log number of the
    1465             :                 // oldest unflushed memtable.
    1466           2 :                 ve.MinUnflushedLogNum = minUnflushedLogNum
    1467           2 :                 if c.kind != compactionKindIngestedFlushable {
    1468           2 :                         metrics := c.metrics[0]
    1469           2 :                         if d.opts.DisableWAL {
    1470           2 :                                 // If the WAL is disabled, every flushable has a zero [logSize],
    1471           2 :                                 // resulting in zero bytes in. Instead, use the number of bytes we
    1472           2 :                                 // flushed as the BytesIn. This ensures we get a reasonable w-amp
    1473           2 :                                 // calculation even when the WAL is disabled.
    1474           2 :                                 metrics.BytesIn = metrics.BytesFlushed
    1475           2 :                         } else {
    1476           2 :                                 for i := 0; i < n; i++ {
    1477           2 :                                         metrics.BytesIn += d.mu.mem.queue[i].logSize
    1478           2 :                                 }
    1479             :                         }
    1480           2 :                 } else {
    1481           2 :                         // c.kind == compactionKindIngestedFlushable && we could have deleted files due
    1482           2 :                         // to ingest-time splits or excises.
    1483           2 :                         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1484           2 :                         for c2 := range d.mu.compact.inProgress {
    1485           2 :                                 // Check if this compaction overlaps with the excise span. Note that just
    1486           2 :                                 // checking if the inputs individually overlap with the excise span
    1487           2 :                                 // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    1488           2 :                                 // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    1489           2 :                                 // doing a [c,d) excise at the same time as this compaction, we will have
    1490           2 :                                 // to error out the whole compaction as we can't guarantee it hasn't/won't
    1491           2 :                                 // write a file overlapping with the excise span.
    1492           2 :                                 if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
    1493           2 :                                         c2.cancel.Store(true)
    1494           2 :                                         continue
    1495             :                                 }
    1496             :                         }
    1497             : 
    1498           2 :                         if len(ve.DeletedFiles) > 0 {
    1499           2 :                                 // Iterate through all other compactions, and check if their inputs have
    1500           2 :                                 // been replaced due to an ingest-time split or excise. In that case,
    1501           2 :                                 // cancel the compaction.
    1502           2 :                                 for c2 := range d.mu.compact.inProgress {
    1503           2 :                                         for i := range c2.inputs {
    1504           2 :                                                 iter := c2.inputs[i].files.Iter()
    1505           2 :                                                 for f := iter.First(); f != nil; f = iter.Next() {
    1506           2 :                                                         if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
    1507           2 :                                                                 c2.cancel.Store(true)
    1508           2 :                                                                 break
    1509             :                                                         }
    1510             :                                                 }
    1511             :                                         }
    1512             :                                 }
    1513             :                         }
    1514             :                 }
    1515           2 :                 err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
    1516           2 :                         func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
    1517           2 :                 if err != nil {
    1518           1 :                         info.Err = err
    1519           1 :                 }
    1520           2 :         } else {
    1521           2 :                 // We won't be performing the logAndApply step because of the error,
    1522           2 :                 // so logUnlock.
    1523           2 :                 d.mu.versions.logUnlock()
    1524           2 :         }
    1525             : 
    1526             :         // If err != nil, then the flush will be retried, and we will recalculate
    1527             :         // these metrics.
    1528           2 :         if err == nil {
    1529           2 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    1530           2 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    1531           2 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    1532           2 :         }
    1533             : 
    1534           2 :         d.clearCompactingState(c, err != nil)
    1535           2 :         delete(d.mu.compact.inProgress, c)
    1536           2 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    1537           2 : 
    1538           2 :         var flushed flushableList
    1539           2 :         if err == nil {
    1540           2 :                 flushed = d.mu.mem.queue[:n]
    1541           2 :                 d.mu.mem.queue = d.mu.mem.queue[n:]
    1542           2 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    1543           2 :                 d.updateTableStatsLocked(ve.NewFiles)
    1544           2 :                 if ingest {
    1545           2 :                         d.mu.versions.metrics.Flush.AsIngestCount++
    1546           2 :                         for _, l := range c.metrics {
    1547           2 :                                 d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
    1548           2 :                                 d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
    1549           2 :                         }
    1550             :                 }
    1551           2 :                 d.maybeTransitionSnapshotsToFileOnlyLocked()
    1552             : 
    1553             :         }
    1554             :         // Signal FlushEnd after installing the new readState. This helps for unit
    1555             :         // tests that use the callback to trigger a read using an iterator with
    1556             :         // IterOptions.OnlyReadGuaranteedDurable.
    1557           2 :         info.TotalDuration = d.timeNow().Sub(startTime)
    1558           2 :         d.opts.EventListener.FlushEnd(info)
    1559           2 : 
    1560           2 :         // The order of these operations matters here for ease of testing.
    1561           2 :         // Removing the reader reference first allows tests to be guaranteed that
    1562           2 :         // the memtable reservation has been released by the time a synchronous
    1563           2 :         // flush returns. readerUnrefLocked may also produce obsolete files so the
    1564           2 :         // call to deleteObsoleteFiles must happen after it.
    1565           2 :         for i := range flushed {
    1566           2 :                 flushed[i].readerUnrefLocked(true)
    1567           2 :         }
    1568             : 
    1569           2 :         d.deleteObsoleteFiles(jobID)
    1570           2 : 
    1571           2 :         // Mark all the memtables we flushed as flushed.
    1572           2 :         for i := range flushed {
    1573           2 :                 close(flushed[i].flushed)
    1574           2 :         }
    1575             : 
    1576           2 :         return inputBytes, err
    1577             : }
    1578             : 
    1579             : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
    1580             : // file-only" snapshots to be file-only if all their visible state has been
    1581             : // flushed to sstables.
    1582             : //
    1583             : // REQUIRES: d.mu.
    1584           2 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
    1585           2 :         earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
    1586           2 :         currentVersion := d.mu.versions.currentVersion()
    1587           2 :         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
    1588           2 :                 if s.efos == nil {
    1589           2 :                         s = s.next
    1590           2 :                         continue
    1591             :                 }
    1592           2 :                 overlapsFlushable := false
    1593           2 :                 if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1594           2 :                         // There are some unflushed keys that are still visible to the EFOS.
    1595           2 :                         // Check if any memtables older than the EFOS contain keys within a
    1596           2 :                         // protected range of the EFOS. If no, we can transition.
    1597           2 :                         protectedRanges := make([]bounded, len(s.efos.protectedRanges))
    1598           2 :                         for i := range s.efos.protectedRanges {
    1599           2 :                                 protectedRanges[i] = s.efos.protectedRanges[i]
    1600           2 :                         }
    1601           2 :                         for i := range d.mu.mem.queue {
    1602           2 :                                 if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1603           1 :                                         // All keys in this memtable are newer than the EFOS. Skip this
    1604           1 :                                         // memtable.
    1605           1 :                                         continue
    1606             :                                 }
    1607             :                                 // NB: computePossibleOverlaps could have false positives, such as if
    1608             :                                 // the flushable is a flushable ingest and not a memtable. In that
    1609             :                                 // case we don't open the sstables to check; we just pessimistically
    1610             :                                 // assume an overlap.
    1611           2 :                                 d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
    1612           2 :                                         overlapsFlushable = true
    1613           2 :                                         return stopIteration
    1614           2 :                                 }, protectedRanges...)
    1615           2 :                                 if overlapsFlushable {
    1616           2 :                                         break
    1617             :                                 }
    1618             :                         }
    1619             :                 }
    1620           2 :                 if overlapsFlushable {
    1621           2 :                         s = s.next
    1622           2 :                         continue
    1623             :                 }
    1624           2 :                 currentVersion.Ref()
    1625           2 : 
    1626           2 :                 // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
    1627           2 :                 // case s.next would be nil. Save it before calling it.
    1628           2 :                 next := s.next
    1629           2 :                 _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
    1630           2 :                 s = next
    1631             :         }
    1632             : }
    1633             : 
    1634             : // maybeScheduleCompactionAsync should be used when
    1635             : // we want to possibly schedule a compaction, but don't
    1636             : // want to eat the cost of running maybeScheduleCompaction.
    1637             : // This method should be launched in a separate goroutine.
    1638             : // d.mu must not be held when this is called.
    1639           0 : func (d *DB) maybeScheduleCompactionAsync() {
    1640           0 :         defer d.compactionSchedulers.Done()
    1641           0 : 
    1642           0 :         d.mu.Lock()
    1643           0 :         d.maybeScheduleCompaction()
    1644           0 :         d.mu.Unlock()
    1645           0 : }
    1646             : 
    1647             : // maybeScheduleCompaction schedules a compaction if necessary.
    1648             : //
    1649             : // d.mu must be held when calling this.
    1650           2 : func (d *DB) maybeScheduleCompaction() {
    1651           2 :         d.maybeScheduleCompactionPicker(pickAuto)
    1652           2 : }
    1653             : 
    1654           2 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1655           2 :         return picker.pickAuto(env)
    1656           2 : }
    1657             : 
    1658           2 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1659           2 :         return picker.pickElisionOnlyCompaction(env)
    1660           2 : }
    1661             : 
    1662             : // tryScheduleDownloadCompaction tries to start a download compaction.
    1663             : //
    1664             : // Returns true if we started a download compaction (or completed it
    1665             : // immediately because it is a no-op or we hit an error).
    1666             : //
    1667             : // Requires d.mu to be held. Updates d.mu.compact.downloads.
    1668           2 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
    1669           2 :         vers := d.mu.versions.currentVersion()
    1670           2 :         for i := 0; i < len(d.mu.compact.downloads); {
    1671           2 :                 download := d.mu.compact.downloads[i]
    1672           2 :                 switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
    1673           2 :                 case launchedCompaction:
    1674           2 :                         return true
    1675           1 :                 case didNotLaunchCompaction:
    1676           1 :                         // See if we can launch a compaction for another download task.
    1677           1 :                         i++
    1678           2 :                 case downloadTaskCompleted:
    1679           2 :                         // Task is completed and must be removed.
    1680           2 :                         d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
    1681             :                 }
    1682             :         }
    1683           2 :         return false
    1684             : }
    1685             : 
    1686             : // maybeScheduleCompactionPicker schedules a compaction if necessary,
    1687             : // calling `pickFunc` to pick automatic compactions.
    1688             : //
    1689             : // Requires d.mu to be held.
    1690             : func (d *DB) maybeScheduleCompactionPicker(
    1691             :         pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    1692           2 : ) {
    1693           2 :         if d.closed.Load() != nil || d.opts.ReadOnly {
    1694           2 :                 return
    1695           2 :         }
    1696           2 :         maxCompactions := d.opts.MaxConcurrentCompactions()
    1697           2 :         maxDownloads := d.opts.MaxConcurrentDownloads()
    1698           2 : 
    1699           2 :         if d.mu.compact.compactingCount >= maxCompactions &&
    1700           2 :                 (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
    1701           2 :                 if len(d.mu.compact.manual) > 0 {
    1702           2 :                         // Inability to run head blocks later manual compactions.
    1703           2 :                         d.mu.compact.manual[0].retries++
    1704           2 :                 }
    1705           2 :                 return
    1706             :         }
    1707             : 
    1708             :         // Compaction picking needs a coherent view of a Version. In particular, we
    1709             :         // need to exclude concurrent ingestions from making a decision on which level
    1710             :         // to ingest into that conflicts with our compaction
    1711             :         // decision. versionSet.logLock provides the necessary mutual exclusion.
    1712           2 :         d.mu.versions.logLock()
    1713           2 :         defer d.mu.versions.logUnlock()
    1714           2 : 
    1715           2 :         // Check for the closed flag again, in case the DB was closed while we were
    1716           2 :         // waiting for logLock().
    1717           2 :         if d.closed.Load() != nil {
    1718           2 :                 return
    1719           2 :         }
    1720             : 
    1721           2 :         env := compactionEnv{
    1722           2 :                 diskAvailBytes:          d.diskAvailBytes.Load(),
    1723           2 :                 earliestSnapshotSeqNum:  d.mu.snapshots.earliest(),
    1724           2 :                 earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
    1725           2 :         }
    1726           2 : 
    1727           2 :         if d.mu.compact.compactingCount < maxCompactions {
    1728           2 :                 // Check for delete-only compactions first, because they're expected to be
    1729           2 :                 // cheap and reduce future compaction work.
    1730           2 :                 if !d.opts.private.disableDeleteOnlyCompactions &&
    1731           2 :                         !d.opts.DisableAutomaticCompactions &&
    1732           2 :                         len(d.mu.compact.deletionHints) > 0 {
    1733           2 :                         d.tryScheduleDeleteOnlyCompaction()
    1734           2 :                 }
    1735             : 
    1736           2 :                 for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
    1737           2 :                         if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
    1738           2 :                                 // Inability to run head blocks later manual compactions.
    1739           2 :                                 manual.retries++
    1740           2 :                                 break
    1741             :                         }
    1742           2 :                         d.mu.compact.manual = d.mu.compact.manual[1:]
    1743             :                 }
    1744             : 
    1745           2 :                 for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
    1746           2 :                         d.tryScheduleAutoCompaction(env, pickFunc) {
    1747           2 :                 }
    1748             :         }
    1749             : 
    1750           2 :         for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
    1751           2 :                 d.tryScheduleDownloadCompaction(env, maxDownloads) {
    1752           2 :         }
    1753             : }
    1754             : 
    1755             : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
    1756             : // for all files that can be deleted as suggested by deletionHints.
    1757             : //
    1758             : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
    1759           2 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
    1760           2 :         v := d.mu.versions.currentVersion()
    1761           2 :         snapshots := d.mu.snapshots.toSlice()
    1762           2 :         inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
    1763           2 :         d.mu.compact.deletionHints = unresolvedHints
    1764           2 : 
    1765           2 :         if len(inputs) > 0 {
    1766           2 :                 c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
    1767           2 :                 d.mu.compact.compactingCount++
    1768           2 :                 d.addInProgressCompaction(c)
    1769           2 :                 go d.compact(c, nil)
    1770           2 :         }
    1771             : }
    1772             : 
    1773             : // tryScheduleManualCompaction tries to kick off the given manual compaction.
    1774             : //
    1775             : // Returns false if we are not able to run this compaction at this time.
    1776             : //
    1777             : // Requires d.mu to be held.
    1778           2 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
    1779           2 :         v := d.mu.versions.currentVersion()
    1780           2 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    1781           2 :         pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
    1782           2 :         if pc == nil {
    1783           2 :                 if !retryLater {
    1784           2 :                         // Manual compaction is a no-op. Signal completion and exit.
    1785           2 :                         manual.done <- nil
    1786           2 :                         return true
    1787           2 :                 }
    1788             :                 // We are not able to run this manual compaction at this time.
    1789           2 :                 return false
    1790             :         }
    1791             : 
    1792           2 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    1793           2 :         d.mu.compact.compactingCount++
    1794           2 :         d.addInProgressCompaction(c)
    1795           2 :         go d.compact(c, manual.done)
    1796           2 :         return true
    1797             : }
    1798             : 
    1799             : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
    1800             : //
    1801             : // Returns false if no automatic compactions are necessary or able to run at
    1802             : // this time.
    1803             : //
    1804             : // Requires d.mu to be held.
    1805             : func (d *DB) tryScheduleAutoCompaction(
    1806             :         env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    1807           2 : ) bool {
    1808           2 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    1809           2 :         env.readCompactionEnv = readCompactionEnv{
    1810           2 :                 readCompactions:          &d.mu.compact.readCompactions,
    1811           2 :                 flushing:                 d.mu.compact.flushing || d.passedFlushThreshold(),
    1812           2 :                 rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
    1813           2 :         }
    1814           2 :         pc := pickFunc(d.mu.versions.picker, env)
    1815           2 :         if pc == nil {
    1816           2 :                 return false
    1817           2 :         }
    1818           2 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    1819           2 :         d.mu.compact.compactingCount++
    1820           2 :         d.addInProgressCompaction(c)
    1821           2 :         go d.compact(c, nil)
    1822           2 :         return true
    1823             : }
    1824             : 
    1825             : // deleteCompactionHintType indicates whether the deleteCompactionHint was
    1826             : // generated from a span containing a range del (point key only), a range key
    1827             : // delete (range key only), or both a point and range key.
    1828             : type deleteCompactionHintType uint8
    1829             : 
    1830             : const (
    1831             :         // NOTE: While these are primarily used as enumeration types, they are also
    1832             :         // used for some bitwise operations. Care should be taken when updating.
    1833             :         deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
    1834             :         deleteCompactionHintTypePointKeyOnly
    1835             :         deleteCompactionHintTypeRangeKeyOnly
    1836             :         deleteCompactionHintTypePointAndRangeKey
    1837             : )
    1838             : 
    1839             : // String implements fmt.Stringer.
    1840           1 : func (h deleteCompactionHintType) String() string {
    1841           1 :         switch h {
    1842           0 :         case deleteCompactionHintTypeUnknown:
    1843           0 :                 return "unknown"
    1844           1 :         case deleteCompactionHintTypePointKeyOnly:
    1845           1 :                 return "point-key-only"
    1846           1 :         case deleteCompactionHintTypeRangeKeyOnly:
    1847           1 :                 return "range-key-only"
    1848           1 :         case deleteCompactionHintTypePointAndRangeKey:
    1849           1 :                 return "point-and-range-key"
    1850           0 :         default:
    1851           0 :                 panic(fmt.Sprintf("unknown hint type: %d", h))
    1852             :         }
    1853             : }
    1854             : 
    1855             : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
    1856             : // keyspan.Keys.
    1857           2 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
    1858           2 :         var hintType deleteCompactionHintType
    1859           2 :         for _, k := range keys {
    1860           2 :                 switch k.Kind() {
    1861           2 :                 case base.InternalKeyKindRangeDelete:
    1862           2 :                         hintType |= deleteCompactionHintTypePointKeyOnly
    1863           2 :                 case base.InternalKeyKindRangeKeyDelete:
    1864           2 :                         hintType |= deleteCompactionHintTypeRangeKeyOnly
    1865           0 :                 default:
    1866           0 :                         panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
    1867             :                 }
    1868             :         }
    1869           2 :         return hintType
    1870             : }
    1871             : 
    1872             : // A deleteCompactionHint records a user key and sequence number span that has been
    1873             : // deleted by a range tombstone. A hint is recorded if at least one sstable
    1874             : // falls completely within both the user key and sequence number spans.
    1875             : // Once the tombstones and the observed completely-contained sstables fall
    1876             : // into the same snapshot stripe, a delete-only compaction may delete any
    1877             : // sstables within the range.
    1878             : type deleteCompactionHint struct {
    1879             :         // The type of key span that generated this hint (point key, range key, or
    1880             :         // both).
    1881             :         hintType deleteCompactionHintType
    1882             :         // start and end are user keys specifying a key range [start, end) of
    1883             :         // deleted keys.
    1884             :         start []byte
    1885             :         end   []byte
    1886             :         // The level of the file containing the range tombstone(s) when the hint
    1887             :         // was created. Only lower levels need to be searched for files that may
    1888             :         // be deleted.
    1889             :         tombstoneLevel int
    1890             :         // The file containing the range tombstone(s) that created the hint.
    1891             :         tombstoneFile *fileMetadata
    1892             :         // The smallest and largest sequence numbers of the abutting tombstones
    1893             :         // merged to form this hint. All of a tables' keys must be less than the
    1894             :         // tombstone smallest sequence number to be deleted. All of a tables'
    1895             :         // sequence numbers must fall into the same snapshot stripe as the
    1896             :         // tombstone largest sequence number to be deleted.
    1897             :         tombstoneLargestSeqNum  base.SeqNum
    1898             :         tombstoneSmallestSeqNum base.SeqNum
    1899             :         // The smallest sequence number of a sstable that was found to be covered
    1900             :         // by this hint. The hint cannot be resolved until this sequence number is
    1901             :         // in the same snapshot stripe as the largest tombstone sequence number.
    1902             :         // This is set when a hint is created, so the LSM may look different and
    1903             :         // notably no longer contain the sstable that contained the key at this
    1904             :         // sequence number.
    1905             :         fileSmallestSeqNum base.SeqNum
    1906             : }
    1907             : 
    1908           1 : func (h deleteCompactionHint) String() string {
    1909           1 :         return fmt.Sprintf(
    1910           1 :                 "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
    1911           1 :                 h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
    1912           1 :                 h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
    1913           1 :                 h.hintType,
    1914           1 :         )
    1915           1 : }
    1916             : 
    1917             : func (h *deleteCompactionHint) canDelete(
    1918             :         cmp Compare, m *fileMetadata, snapshots compact.Snapshots,
    1919           2 : ) bool {
    1920           2 :         // The file can only be deleted if all of its keys are older than the
    1921           2 :         // earliest tombstone aggregated into the hint. Note that we use
    1922           2 :         // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
    1923           2 :         // zeroes sequence numbers. A compaction may zero the sequence number of a
    1924           2 :         // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
    1925           2 :         // zero. If we looked at m.LargestSeqNum, the resulting output file would
    1926           2 :         // appear to not contain any keys more recent than the oldest tombstone. To
    1927           2 :         // avoid this error, the largest pre-zeroing sequence number is maintained
    1928           2 :         // in LargestSeqNumAbsolute and used here to make the determination whether
    1929           2 :         // the file's keys are older than all of the hint's tombstones.
    1930           2 :         if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
    1931           2 :                 return false
    1932           2 :         }
    1933             : 
    1934             :         // The file's oldest key must  be in the same snapshot stripe as the
    1935             :         // newest tombstone. NB: We already checked the hint's sequence numbers,
    1936             :         // but this file's oldest sequence number might be lower than the hint's
    1937             :         // smallest sequence number despite the file falling within the key range
    1938             :         // if this file was constructed after the hint by a compaction.
    1939           2 :         if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
    1940           0 :                 return false
    1941           0 :         }
    1942             : 
    1943           2 :         switch h.hintType {
    1944           2 :         case deleteCompactionHintTypePointKeyOnly:
    1945           2 :                 // A hint generated by a range del span cannot delete tables that contain
    1946           2 :                 // range keys.
    1947           2 :                 if m.HasRangeKeys {
    1948           1 :                         return false
    1949           1 :                 }
    1950           2 :         case deleteCompactionHintTypeRangeKeyOnly:
    1951           2 :                 // A hint generated by a range key del span cannot delete tables that
    1952           2 :                 // contain point keys.
    1953           2 :                 if m.HasPointKeys {
    1954           2 :                         return false
    1955           2 :                 }
    1956           2 :         case deleteCompactionHintTypePointAndRangeKey:
    1957             :                 // A hint from a span that contains both range dels *and* range keys can
    1958             :                 // only be deleted if both bounds fall within the hint. The next check takes
    1959             :                 // care of this.
    1960           0 :         default:
    1961           0 :                 panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
    1962             :         }
    1963             : 
    1964             :         // The file's keys must be completely contained within the hint range.
    1965           2 :         return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
    1966             : }
    1967             : 
    1968             : func checkDeleteCompactionHints(
    1969             :         cmp Compare, v *version, hints []deleteCompactionHint, snapshots compact.Snapshots,
    1970           2 : ) ([]compactionLevel, []deleteCompactionHint) {
    1971           2 :         var files map[*fileMetadata]bool
    1972           2 :         var byLevel [numLevels][]*fileMetadata
    1973           2 : 
    1974           2 :         unresolvedHints := hints[:0]
    1975           2 :         for _, h := range hints {
    1976           2 :                 // Check each compaction hint to see if it's resolvable. Resolvable
    1977           2 :                 // hints are removed and trigger a delete-only compaction if any files
    1978           2 :                 // in the current LSM still meet their criteria. Unresolvable hints
    1979           2 :                 // are saved and don't trigger a delete-only compaction.
    1980           2 :                 //
    1981           2 :                 // When a compaction hint is created, the sequence numbers of the
    1982           2 :                 // range tombstones and the covered file with the oldest key are
    1983           2 :                 // recorded. The largest tombstone sequence number and the smallest
    1984           2 :                 // file sequence number must be in the same snapshot stripe for the
    1985           2 :                 // hint to be resolved. The below graphic models a compaction hint
    1986           2 :                 // covering the keyspace [b, r). The hint completely contains two
    1987           2 :                 // files, 000002 and 000003. The file 000003 contains the lowest
    1988           2 :                 // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
    1989           2 :                 // the highest tombstone sequence number incorporated into the hint.
    1990           2 :                 // The hint may be resolved only once the snapshots at #100, #180 and
    1991           2 :                 // #210 are all closed. File 000001 is not included within the hint
    1992           2 :                 // because it extends beyond the range tombstones in user key space.
    1993           2 :                 //
    1994           2 :                 // 250
    1995           2 :                 //
    1996           2 :                 //       |-b...230:h-|
    1997           2 :                 // _____________________________________________________ snapshot #210
    1998           2 :                 // 200               |--h.RANGEDEL.200:r--|
    1999           2 :                 //
    2000           2 :                 // _____________________________________________________ snapshot #180
    2001           2 :                 //
    2002           2 :                 // 150                     +--------+
    2003           2 :                 //           +---------+   | 000003 |
    2004           2 :                 //           | 000002  |   |        |
    2005           2 :                 //           +_________+   |        |
    2006           2 :                 // 100_____________________|________|___________________ snapshot #100
    2007           2 :                 //                         +--------+
    2008           2 :                 // _____________________________________________________ snapshot #70
    2009           2 :                 //                             +---------------+
    2010           2 :                 //  50                         | 000001        |
    2011           2 :                 //                             |               |
    2012           2 :                 //                             +---------------+
    2013           2 :                 // ______________________________________________________________
    2014           2 :                 //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2015           2 : 
    2016           2 :                 if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) {
    2017           2 :                         // Cannot resolve yet.
    2018           2 :                         unresolvedHints = append(unresolvedHints, h)
    2019           2 :                         continue
    2020             :                 }
    2021             : 
    2022             :                 // The hint h will be resolved and dropped, regardless of whether
    2023             :                 // there are any tables that can be deleted.
    2024           2 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2025           2 :                         overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
    2026           2 :                         iter := overlaps.Iter()
    2027           2 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2028           2 :                                 if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
    2029           2 :                                         continue
    2030             :                                 }
    2031           2 :                                 if files == nil {
    2032           2 :                                         // Construct files lazily, assuming most calls will not
    2033           2 :                                         // produce delete-only compactions.
    2034           2 :                                         files = make(map[*fileMetadata]bool)
    2035           2 :                                 }
    2036           2 :                                 files[m] = true
    2037           2 :                                 byLevel[l] = append(byLevel[l], m)
    2038             :                         }
    2039             :                 }
    2040             :         }
    2041             : 
    2042           2 :         var compactLevels []compactionLevel
    2043           2 :         for l, files := range byLevel {
    2044           2 :                 if len(files) == 0 {
    2045           2 :                         continue
    2046             :                 }
    2047           2 :                 compactLevels = append(compactLevels, compactionLevel{
    2048           2 :                         level: l,
    2049           2 :                         files: manifest.NewLevelSliceKeySorted(cmp, files),
    2050           2 :                 })
    2051             :         }
    2052           2 :         return compactLevels, unresolvedHints
    2053             : }
    2054             : 
    2055             : // compact runs one compaction and maybe schedules another call to compact.
    2056           2 : func (d *DB) compact(c *compaction, errChannel chan error) {
    2057           2 :         pprof.Do(context.Background(), compactLabels, func(context.Context) {
    2058           2 :                 d.mu.Lock()
    2059           2 :                 defer d.mu.Unlock()
    2060           2 :                 if err := d.compact1(c, errChannel); err != nil {
    2061           2 :                         // TODO(peter): count consecutive compaction errors and backoff.
    2062           2 :                         d.opts.EventListener.BackgroundError(err)
    2063           2 :                 }
    2064           2 :                 if c.isDownload {
    2065           2 :                         d.mu.compact.downloadingCount--
    2066           2 :                 } else {
    2067           2 :                         d.mu.compact.compactingCount--
    2068           2 :                 }
    2069           2 :                 delete(d.mu.compact.inProgress, c)
    2070           2 :                 // Add this compaction's duration to the cumulative duration. NB: This
    2071           2 :                 // must be atomic with the above removal of c from
    2072           2 :                 // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
    2073           2 :                 // miss or double count a completing compaction's duration.
    2074           2 :                 d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
    2075           2 : 
    2076           2 :                 // The previous compaction may have produced too many files in a
    2077           2 :                 // level, so reschedule another compaction if needed.
    2078           2 :                 d.maybeScheduleCompaction()
    2079           2 :                 d.mu.compact.cond.Broadcast()
    2080             :         })
    2081             : }
    2082             : 
    2083             : // compact1 runs one compaction.
    2084             : //
    2085             : // d.mu must be held when calling this, but the mutex may be dropped and
    2086             : // re-acquired during the course of this method.
    2087           2 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
    2088           2 :         if errChannel != nil {
    2089           2 :                 defer func() {
    2090           2 :                         errChannel <- err
    2091           2 :                 }()
    2092             :         }
    2093             : 
    2094           2 :         jobID := d.newJobIDLocked()
    2095           2 :         info := c.makeInfo(jobID)
    2096           2 :         d.opts.EventListener.CompactionBegin(info)
    2097           2 :         startTime := d.timeNow()
    2098           2 : 
    2099           2 :         ve, stats, err := d.runCompaction(jobID, c)
    2100           2 : 
    2101           2 :         info.Duration = d.timeNow().Sub(startTime)
    2102           2 :         if err == nil {
    2103           2 :                 validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
    2104           2 :                 err = func() error {
    2105           2 :                         var err error
    2106           2 :                         d.mu.versions.logLock()
    2107           2 :                         // Check if this compaction had a conflicting operation (eg. a d.excise())
    2108           2 :                         // that necessitates it restarting from scratch. Note that since we hold
    2109           2 :                         // the manifest lock, we don't expect this bool to change its value
    2110           2 :                         // as only the holder of the manifest lock will ever write to it.
    2111           2 :                         if c.cancel.Load() {
    2112           2 :                                 err = firstError(err, ErrCancelledCompaction)
    2113           2 :                         }
    2114           2 :                         if err != nil {
    2115           2 :                                 // logAndApply calls logUnlock. If we didn't call it, we need to call
    2116           2 :                                 // logUnlock ourselves.
    2117           2 :                                 d.mu.versions.logUnlock()
    2118           2 :                                 return err
    2119           2 :                         }
    2120           2 :                         return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
    2121           2 :                                 return d.getInProgressCompactionInfoLocked(c)
    2122           2 :                         })
    2123             :                 }()
    2124             :         }
    2125             : 
    2126           2 :         info.Done = true
    2127           2 :         info.Err = err
    2128           2 :         if err == nil {
    2129           2 :                 for i := range ve.NewFiles {
    2130           2 :                         e := &ve.NewFiles[i]
    2131           2 :                         info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
    2132           2 :                 }
    2133           2 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    2134           2 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    2135           2 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    2136             :         }
    2137             : 
    2138             :         // NB: clearing compacting state must occur before updating the read state;
    2139             :         // L0Sublevels initialization depends on it.
    2140           2 :         d.clearCompactingState(c, err != nil)
    2141           2 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    2142           2 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
    2143           2 : 
    2144           2 :         info.TotalDuration = d.timeNow().Sub(c.beganAt)
    2145           2 :         d.opts.EventListener.CompactionEnd(info)
    2146           2 : 
    2147           2 :         // Update the read state before deleting obsolete files because the
    2148           2 :         // read-state update will cause the previous version to be unref'd and if
    2149           2 :         // there are no references obsolete tables will be added to the obsolete
    2150           2 :         // table list.
    2151           2 :         if err == nil {
    2152           2 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2153           2 :                 d.updateTableStatsLocked(ve.NewFiles)
    2154           2 :         }
    2155           2 :         d.deleteObsoleteFiles(jobID)
    2156           2 : 
    2157           2 :         return err
    2158             : }
    2159             : 
    2160             : // runCopyCompaction runs a copy compaction where a new FileNum is created that
    2161             : // is a byte-for-byte copy of the input file or span thereof in some cases. This
    2162             : // is used in lieu of a move compaction when a file is being moved across the
    2163             : // local/remote storage boundary. It could also be used in lieu of a rewrite
    2164             : // compaction as part of a Download() call, which allows copying only a span of
    2165             : // the external file, provided the file does not contain range keys or value
    2166             : // blocks (see sstable.CopySpan).
    2167             : //
    2168             : // d.mu must be held when calling this method. The mutex will be released when
    2169             : // doing IO.
    2170             : func (d *DB) runCopyCompaction(
    2171             :         jobID JobID, c *compaction,
    2172           2 : ) (ve *versionEdit, stats compact.Stats, _ error) {
    2173           2 :         iter := c.startLevel.files.Iter()
    2174           2 :         inputMeta := iter.First()
    2175           2 :         if iter.Next() != nil {
    2176           0 :                 return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
    2177           0 :         }
    2178           2 :         if c.cancel.Load() {
    2179           1 :                 return nil, compact.Stats{}, ErrCancelledCompaction
    2180           1 :         }
    2181           2 :         ve = &versionEdit{
    2182           2 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2183           2 :                         {Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
    2184           2 :                 },
    2185           2 :         }
    2186           2 : 
    2187           2 :         objMeta, err := d.objProvider.Lookup(fileTypeTable, inputMeta.FileBacking.DiskFileNum)
    2188           2 :         if err != nil {
    2189           0 :                 return nil, compact.Stats{}, err
    2190           0 :         }
    2191           2 :         if !objMeta.IsExternal() {
    2192           2 :                 if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
    2193           0 :                         panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
    2194             :                 }
    2195             :                 // Note that based on logic in the compaction picker, we're guaranteed
    2196             :                 // inputMeta.Virtual is false.
    2197           2 :                 if inputMeta.Virtual {
    2198           0 :                         panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
    2199             :                 }
    2200             :         }
    2201             : 
    2202             :         // We are in the relatively more complex case where we need to copy this
    2203             :         // file to remote storage. Drop the db mutex while we do the copy
    2204             :         //
    2205             :         // To ease up cleanup of the local file and tracking of refs, we create
    2206             :         // a new FileNum. This has the potential of making the block cache less
    2207             :         // effective, however.
    2208           2 :         newMeta := &fileMetadata{
    2209           2 :                 Size:                  inputMeta.Size,
    2210           2 :                 CreationTime:          inputMeta.CreationTime,
    2211           2 :                 SmallestSeqNum:        inputMeta.SmallestSeqNum,
    2212           2 :                 LargestSeqNum:         inputMeta.LargestSeqNum,
    2213           2 :                 LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
    2214           2 :                 Stats:                 inputMeta.Stats,
    2215           2 :                 Virtual:               inputMeta.Virtual,
    2216           2 :                 SyntheticPrefix:       inputMeta.SyntheticPrefix,
    2217           2 :                 SyntheticSuffix:       inputMeta.SyntheticSuffix,
    2218           2 :         }
    2219           2 :         if inputMeta.HasPointKeys {
    2220           2 :                 newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
    2221           2 :         }
    2222           2 :         if inputMeta.HasRangeKeys {
    2223           2 :                 newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
    2224           2 :         }
    2225           2 :         newMeta.FileNum = d.mu.versions.getNextFileNum()
    2226           2 :         if objMeta.IsExternal() {
    2227           2 :                 // external -> local/shared copy. File must be virtual.
    2228           2 :                 // We will update this size later after we produce the new backing file.
    2229           2 :                 newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
    2230           2 :         } else {
    2231           2 :                 // local -> shared copy. New file is guaranteed to not be virtual.
    2232           2 :                 newMeta.InitPhysicalBacking()
    2233           2 :         }
    2234             : 
    2235             :         // Before dropping the db mutex, grab a ref to the current version. This
    2236             :         // prevents any concurrent excises from deleting files that this compaction
    2237             :         // needs to read/maintain a reference to.
    2238           2 :         vers := d.mu.versions.currentVersion()
    2239           2 :         vers.Ref()
    2240           2 :         defer vers.UnrefLocked()
    2241           2 : 
    2242           2 :         // NB: The order here is reversed, lock after unlock. This is similar to
    2243           2 :         // runCompaction.
    2244           2 :         d.mu.Unlock()
    2245           2 :         defer d.mu.Lock()
    2246           2 : 
    2247           2 :         deleteOnExit := false
    2248           2 :         defer func() {
    2249           2 :                 if deleteOnExit {
    2250           1 :                         _ = d.objProvider.Remove(fileTypeTable, newMeta.FileBacking.DiskFileNum)
    2251           1 :                 }
    2252             :         }()
    2253             : 
    2254             :         // If the src obj is external, we're doing an external to local/shared copy.
    2255           2 :         if objMeta.IsExternal() {
    2256           2 :                 ctx := context.TODO()
    2257           2 :                 src, err := d.objProvider.OpenForReading(
    2258           2 :                         ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
    2259           2 :                 )
    2260           2 :                 if err != nil {
    2261           0 :                         return nil, compact.Stats{}, err
    2262           0 :                 }
    2263           2 :                 defer func() {
    2264           2 :                         if src != nil {
    2265           0 :                                 src.Close()
    2266           0 :                         }
    2267             :                 }()
    2268             : 
    2269           2 :                 w, _, err := d.objProvider.Create(
    2270           2 :                         ctx, fileTypeTable, newMeta.FileBacking.DiskFileNum,
    2271           2 :                         objstorage.CreateOptions{
    2272           2 :                                 PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    2273           2 :                         },
    2274           2 :                 )
    2275           2 :                 if err != nil {
    2276           0 :                         return nil, compact.Stats{}, err
    2277           0 :                 }
    2278           2 :                 deleteOnExit = true
    2279           2 : 
    2280           2 :                 start, end := newMeta.Smallest, newMeta.Largest
    2281           2 :                 if newMeta.SyntheticPrefix.IsSet() {
    2282           2 :                         start.UserKey = newMeta.SyntheticPrefix.Invert(start.UserKey)
    2283           2 :                         end.UserKey = newMeta.SyntheticPrefix.Invert(end.UserKey)
    2284           2 :                 }
    2285           2 :                 if newMeta.SyntheticSuffix.IsSet() {
    2286           1 :                         // Extend the bounds as necessary so that the keys don't include suffixes.
    2287           1 :                         start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
    2288           1 :                         if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
    2289           0 :                                 end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
    2290           0 :                         }
    2291             :                 }
    2292             : 
    2293             :                 // NB: external files are always virtual.
    2294           2 :                 var wrote uint64
    2295           2 :                 err = d.tableCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
    2296           2 :                         var err error
    2297           2 :                         wrote, err = sstable.CopySpan(ctx,
    2298           2 :                                 src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
    2299           2 :                                 w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
    2300           2 :                                 start, end,
    2301           2 :                         )
    2302           2 :                         return err
    2303           2 :                 })
    2304             : 
    2305           2 :                 src = nil // We passed src to CopySpan; it's responsible for closing it.
    2306           2 :                 if err != nil {
    2307           1 :                         if errors.Is(err, sstable.ErrEmptySpan) {
    2308           1 :                                 // The virtual table was empty. Just remove the backing file.
    2309           1 :                                 // Note that deleteOnExit is true so we will delete the created object.
    2310           1 :                                 c.metrics = map[int]*LevelMetrics{
    2311           1 :                                         c.outputLevel.level: {
    2312           1 :                                                 BytesIn: inputMeta.Size,
    2313           1 :                                         },
    2314           1 :                                 }
    2315           1 :                                 return ve, compact.Stats{}, nil
    2316           1 :                         }
    2317           0 :                         return nil, compact.Stats{}, err
    2318             :                 }
    2319           2 :                 newMeta.FileBacking.Size = wrote
    2320           2 :                 newMeta.Size = wrote
    2321           2 :         } else {
    2322           2 :                 _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
    2323           2 :                         d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
    2324           2 :                         objstorage.CreateOptions{PreferSharedStorage: true})
    2325           2 :                 if err != nil {
    2326           0 :                         return nil, compact.Stats{}, err
    2327           0 :                 }
    2328           2 :                 deleteOnExit = true
    2329             :         }
    2330           2 :         ve.NewFiles = []newFileEntry{{
    2331           2 :                 Level: c.outputLevel.level,
    2332           2 :                 Meta:  newMeta,
    2333           2 :         }}
    2334           2 :         if newMeta.Virtual {
    2335           2 :                 ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
    2336           2 :         }
    2337           2 :         c.metrics = map[int]*LevelMetrics{
    2338           2 :                 c.outputLevel.level: {
    2339           2 :                         BytesIn:         inputMeta.Size,
    2340           2 :                         BytesCompacted:  newMeta.Size,
    2341           2 :                         TablesCompacted: 1,
    2342           2 :                 },
    2343           2 :         }
    2344           2 : 
    2345           2 :         if err := d.objProvider.Sync(); err != nil {
    2346           0 :                 return nil, compact.Stats{}, err
    2347           0 :         }
    2348           2 :         deleteOnExit = false
    2349           2 :         return ve, compact.Stats{}, nil
    2350             : }
    2351             : 
    2352             : func (d *DB) runDeleteOnlyCompaction(
    2353             :         jobID JobID, c *compaction,
    2354           2 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
    2355           2 :         c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
    2356           2 :         ve = &versionEdit{
    2357           2 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2358           2 :         }
    2359           2 :         for _, cl := range c.inputs {
    2360           2 :                 levelMetrics := &LevelMetrics{}
    2361           2 :                 iter := cl.files.Iter()
    2362           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    2363           2 :                         ve.DeletedFiles[deletedFileEntry{
    2364           2 :                                 Level:   cl.level,
    2365           2 :                                 FileNum: f.FileNum,
    2366           2 :                         }] = f
    2367           2 :                 }
    2368           2 :                 c.metrics[cl.level] = levelMetrics
    2369             :         }
    2370           2 :         return ve, stats, nil
    2371             : }
    2372             : 
    2373             : func (d *DB) runMoveCompaction(
    2374             :         jobID JobID, c *compaction,
    2375           2 : ) (ve *versionEdit, stats compact.Stats, _ error) {
    2376           2 :         iter := c.startLevel.files.Iter()
    2377           2 :         meta := iter.First()
    2378           2 :         if iter.Next() != nil {
    2379           0 :                 return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
    2380           0 :         }
    2381           2 :         if c.cancel.Load() {
    2382           1 :                 return ve, stats, ErrCancelledCompaction
    2383           1 :         }
    2384           2 :         c.metrics = map[int]*LevelMetrics{
    2385           2 :                 c.outputLevel.level: {
    2386           2 :                         BytesMoved:  meta.Size,
    2387           2 :                         TablesMoved: 1,
    2388           2 :                 },
    2389           2 :         }
    2390           2 :         ve = &versionEdit{
    2391           2 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2392           2 :                         {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
    2393           2 :                 },
    2394           2 :                 NewFiles: []newFileEntry{
    2395           2 :                         {Level: c.outputLevel.level, Meta: meta},
    2396           2 :                 },
    2397           2 :         }
    2398           2 : 
    2399           2 :         return ve, stats, nil
    2400             : }
    2401             : 
    2402             : // runCompaction runs a compaction that produces new on-disk tables from
    2403             : // memtables or old on-disk tables.
    2404             : //
    2405             : // runCompaction cannot be used for compactionKindIngestedFlushable.
    2406             : //
    2407             : // d.mu must be held when calling this, but the mutex may be dropped and
    2408             : // re-acquired during the course of this method.
    2409             : func (d *DB) runCompaction(
    2410             :         jobID JobID, c *compaction,
    2411           2 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
    2412           2 :         switch c.kind {
    2413           2 :         case compactionKindDeleteOnly:
    2414           2 :                 return d.runDeleteOnlyCompaction(jobID, c)
    2415           2 :         case compactionKindMove:
    2416           2 :                 return d.runMoveCompaction(jobID, c)
    2417           2 :         case compactionKindCopy:
    2418           2 :                 return d.runCopyCompaction(jobID, c)
    2419           0 :         case compactionKindIngestedFlushable:
    2420           0 :                 panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
    2421             :         }
    2422             : 
    2423           2 :         snapshots := d.mu.snapshots.toSlice()
    2424           2 : 
    2425           2 :         if c.flushing == nil {
    2426           2 :                 // Before dropping the db mutex, grab a ref to the current version. This
    2427           2 :                 // prevents any concurrent excises from deleting files that this compaction
    2428           2 :                 // needs to read/maintain a reference to.
    2429           2 :                 //
    2430           2 :                 // Note that unlike user iterators, compactionIter does not maintain a ref
    2431           2 :                 // of the version or read state.
    2432           2 :                 vers := d.mu.versions.currentVersion()
    2433           2 :                 vers.Ref()
    2434           2 :                 defer vers.UnrefLocked()
    2435           2 :         }
    2436             : 
    2437           2 :         if c.cancel.Load() {
    2438           2 :                 return ve, stats, ErrCancelledCompaction
    2439           2 :         }
    2440             : 
    2441             :         // The table is typically written at the maximum allowable format implied by
    2442             :         // the current format major version of the DB.
    2443           2 :         tableFormat := d.FormatMajorVersion().MaxTableFormat()
    2444           2 :         // In format major versions with maximum table formats of Pebblev3, value
    2445           2 :         // blocks were conditional on an experimental setting. In format major
    2446           2 :         // versions with maximum table formats of Pebblev4 and higher, value blocks
    2447           2 :         // are always enabled.
    2448           2 :         if tableFormat == sstable.TableFormatPebblev3 &&
    2449           2 :                 (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
    2450           2 :                 tableFormat = sstable.TableFormatPebblev2
    2451           2 :         }
    2452             : 
    2453             :         // Release the d.mu lock while doing I/O.
    2454             :         // Note the unusual order: Unlock and then Lock.
    2455           2 :         d.mu.Unlock()
    2456           2 :         defer d.mu.Lock()
    2457           2 : 
    2458           2 :         result := d.compactAndWrite(jobID, c, snapshots, tableFormat)
    2459           2 :         if result.Err == nil {
    2460           2 :                 ve, result.Err = c.makeVersionEdit(result)
    2461           2 :         }
    2462           2 :         if result.Err != nil {
    2463           2 :                 // Delete any created tables.
    2464           2 :                 for i := range result.Tables {
    2465           2 :                         _ = d.objProvider.Remove(fileTypeTable, result.Tables[i].ObjMeta.DiskFileNum)
    2466           2 :                 }
    2467             :         }
    2468             :         // Refresh the disk available statistic whenever a compaction/flush
    2469             :         // completes, before re-acquiring the mutex.
    2470           2 :         d.calculateDiskAvailableBytes()
    2471           2 :         return ve, result.Stats, result.Err
    2472             : }
    2473             : 
    2474             : // compactAndWrite runs the data part of a compaction, where we set up a
    2475             : // compaction iterator and use it to write output tables.
    2476             : func (d *DB) compactAndWrite(
    2477             :         jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat,
    2478           2 : ) (result compact.Result) {
    2479           2 :         // Compactions use a pool of buffers to read blocks, avoiding polluting the
    2480           2 :         // block cache with blocks that will not be read again. We initialize the
    2481           2 :         // buffer pool with a size 12. This initial size does not need to be
    2482           2 :         // accurate, because the pool will grow to accommodate the maximum number of
    2483           2 :         // blocks allocated at a given time over the course of the compaction. But
    2484           2 :         // choosing a size larger than that working set avoids any additional
    2485           2 :         // allocations to grow the size of the pool over the course of iteration.
    2486           2 :         //
    2487           2 :         // Justification for initial size 12: In a two-level compaction, at any
    2488           2 :         // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
    2489           2 :         // Additionally, when decoding a compressed block, we'll temporarily
    2490           2 :         // allocate 1 additional block to hold the compressed buffer. In the worst
    2491           2 :         // case that all input sstables have two-level index blocks (+2), value
    2492           2 :         // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
    2493           2 :         // additionally require 2n+4 blocks where n is the number of input sstables.
    2494           2 :         // Range deletion and range key blocks are relatively rare, and the cost of
    2495           2 :         // an additional allocation or two over the course of the compaction is
    2496           2 :         // considered to be okay. A larger initial size would cause the pool to hold
    2497           2 :         // on to more memory, even when it's not in-use because the pool will
    2498           2 :         // recycle buffers up to the current capacity of the pool. The memory use of
    2499           2 :         // a 12-buffer pool is expected to be within reason, even if all the buffers
    2500           2 :         // grow to the typical size of an index block (256 KiB) which would
    2501           2 :         // translate to 3 MiB per compaction.
    2502           2 :         c.bufferPool.Init(12)
    2503           2 :         defer c.bufferPool.Release()
    2504           2 : 
    2505           2 :         pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter)
    2506           2 :         defer func() {
    2507           2 :                 for _, closer := range c.closers {
    2508           2 :                         closer.FragmentIterator.Close()
    2509           2 :                 }
    2510             :         }()
    2511           2 :         if err != nil {
    2512           0 :                 return compact.Result{Err: err}
    2513           0 :         }
    2514           2 :         c.allowedZeroSeqNum = c.allowZeroSeqNum()
    2515           2 :         cfg := compact.IterConfig{
    2516           2 :                 Comparer:                               c.comparer,
    2517           2 :                 Merge:                                  d.merge,
    2518           2 :                 TombstoneElision:                       c.delElision,
    2519           2 :                 RangeKeyElision:                        c.rangeKeyElision,
    2520           2 :                 Snapshots:                              snapshots,
    2521           2 :                 AllowZeroSeqNum:                        c.allowedZeroSeqNum,
    2522           2 :                 IneffectualSingleDeleteCallback:        d.opts.Experimental.IneffectualSingleDeleteCallback,
    2523           2 :                 SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback,
    2524           2 :         }
    2525           2 :         iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
    2526           2 : 
    2527           2 :         runnerCfg := compact.RunnerConfig{
    2528           2 :                 CompactionBounds:           base.UserKeyBoundsFromInternal(c.smallest, c.largest),
    2529           2 :                 L0SplitKeys:                c.l0Limits,
    2530           2 :                 Grandparents:               c.grandparents,
    2531           2 :                 MaxGrandparentOverlapBytes: c.maxOverlapBytes,
    2532           2 :                 TargetOutputFileSize:       c.maxOutputFileSize,
    2533           2 :         }
    2534           2 :         runner := compact.NewRunner(runnerCfg, iter)
    2535           2 :         for runner.MoreDataToWrite() {
    2536           2 :                 if c.cancel.Load() {
    2537           2 :                         return runner.Finish().WithError(ErrCancelledCompaction)
    2538           2 :                 }
    2539             :                 // Create a new table.
    2540           2 :                 writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
    2541           2 :                 objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
    2542           2 :                 if err != nil {
    2543           1 :                         return runner.Finish().WithError(err)
    2544           1 :                 }
    2545           2 :                 runner.WriteTable(objMeta, tw)
    2546           2 :                 d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    2547             :         }
    2548           2 :         result = runner.Finish()
    2549           2 :         if result.Err == nil {
    2550           2 :                 result.Err = d.objProvider.Sync()
    2551           2 :         }
    2552           2 :         return result
    2553             : }
    2554             : 
    2555             : // makeVersionEdit creates the version edit for a compaction, based on the
    2556             : // tables in compact.Result.
    2557           2 : func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
    2558           2 :         ve := &versionEdit{
    2559           2 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2560           2 :         }
    2561           2 :         for _, cl := range c.inputs {
    2562           2 :                 iter := cl.files.Iter()
    2563           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    2564           2 :                         ve.DeletedFiles[deletedFileEntry{
    2565           2 :                                 Level:   cl.level,
    2566           2 :                                 FileNum: f.FileNum,
    2567           2 :                         }] = f
    2568           2 :                 }
    2569             :         }
    2570             : 
    2571           2 :         startLevelBytes := c.startLevel.files.SizeSum()
    2572           2 :         outputMetrics := &LevelMetrics{
    2573           2 :                 BytesIn:   startLevelBytes,
    2574           2 :                 BytesRead: c.outputLevel.files.SizeSum(),
    2575           2 :         }
    2576           2 :         if len(c.extraLevels) > 0 {
    2577           2 :                 outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
    2578           2 :         }
    2579           2 :         outputMetrics.BytesRead += outputMetrics.BytesIn
    2580           2 : 
    2581           2 :         c.metrics = map[int]*LevelMetrics{
    2582           2 :                 c.outputLevel.level: outputMetrics,
    2583           2 :         }
    2584           2 :         if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
    2585           2 :                 c.metrics[c.startLevel.level] = &LevelMetrics{}
    2586           2 :         }
    2587           2 :         if len(c.extraLevels) > 0 {
    2588           2 :                 c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
    2589           2 :                 outputMetrics.MultiLevel.BytesInTop = startLevelBytes
    2590           2 :                 outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
    2591           2 :                 outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
    2592           2 :         }
    2593             : 
    2594           2 :         inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
    2595           2 :         ve.NewFiles = make([]newFileEntry, len(result.Tables))
    2596           2 :         for i := range result.Tables {
    2597           2 :                 t := &result.Tables[i]
    2598           2 : 
    2599           2 :                 fileMeta := &fileMetadata{
    2600           2 :                         FileNum:        base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
    2601           2 :                         CreationTime:   t.CreationTime.Unix(),
    2602           2 :                         Size:           t.WriterMeta.Size,
    2603           2 :                         SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
    2604           2 :                         LargestSeqNum:  t.WriterMeta.LargestSeqNum,
    2605           2 :                 }
    2606           2 :                 if c.flushing == nil {
    2607           2 :                         // Set the file's LargestSeqNumAbsolute to be the maximum value of any
    2608           2 :                         // of the compaction's input sstables.
    2609           2 :                         // TODO(jackson): This could be narrowed to be the maximum of input
    2610           2 :                         // sstables that overlap the output sstable's key range.
    2611           2 :                         fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
    2612           2 :                 } else {
    2613           2 :                         fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
    2614           2 :                 }
    2615           2 :                 fileMeta.InitPhysicalBacking()
    2616           2 : 
    2617           2 :                 // If the file didn't contain any range deletions, we can fill its
    2618           2 :                 // table stats now, avoiding unnecessarily loading the table later.
    2619           2 :                 maybeSetStatsFromProperties(
    2620           2 :                         fileMeta.PhysicalMeta(), &t.WriterMeta.Properties,
    2621           2 :                 )
    2622           2 : 
    2623           2 :                 if t.WriterMeta.HasPointKeys {
    2624           2 :                         fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint)
    2625           2 :                 }
    2626           2 :                 if t.WriterMeta.HasRangeDelKeys {
    2627           2 :                         fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel)
    2628           2 :                 }
    2629           2 :                 if t.WriterMeta.HasRangeKeys {
    2630           2 :                         fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
    2631           2 :                 }
    2632             : 
    2633           2 :                 ve.NewFiles[i] = newFileEntry{
    2634           2 :                         Level: c.outputLevel.level,
    2635           2 :                         Meta:  fileMeta,
    2636           2 :                 }
    2637           2 : 
    2638           2 :                 // Update metrics.
    2639           2 :                 if c.flushing == nil {
    2640           2 :                         outputMetrics.TablesCompacted++
    2641           2 :                         outputMetrics.BytesCompacted += fileMeta.Size
    2642           2 :                 } else {
    2643           2 :                         outputMetrics.TablesFlushed++
    2644           2 :                         outputMetrics.BytesFlushed += fileMeta.Size
    2645           2 :                 }
    2646           2 :                 outputMetrics.Size += int64(fileMeta.Size)
    2647           2 :                 outputMetrics.NumFiles++
    2648           2 :                 outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
    2649           2 :                 outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
    2650             :         }
    2651             : 
    2652             :         // Sanity check that the tables are ordered and don't overlap.
    2653           2 :         for i := 1; i < len(ve.NewFiles); i++ {
    2654           2 :                 if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) {
    2655           0 :                         return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
    2656           0 :                                 ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true),
    2657           0 :                                 ve.NewFiles[i].Meta.DebugString(c.formatKey, true),
    2658           0 :                         )
    2659           0 :                 }
    2660             :         }
    2661             : 
    2662           2 :         return ve, nil
    2663             : }
    2664             : 
    2665             : // newCompactionOutput creates an object for a new table produced by a
    2666             : // compaction or flush.
    2667             : func (d *DB) newCompactionOutput(
    2668             :         jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
    2669           2 : ) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
    2670           2 :         diskFileNum := d.mu.versions.getNextDiskFileNum()
    2671           2 : 
    2672           2 :         var writeCategory vfs.DiskWriteCategory
    2673           2 :         if d.opts.EnableSQLRowSpillMetrics {
    2674           0 :                 // In the scenario that the Pebble engine is used for SQL row spills the
    2675           0 :                 // data written to the memtable will correspond to spills to disk and
    2676           0 :                 // should be categorized as such.
    2677           0 :                 writeCategory = "sql-row-spill"
    2678           2 :         } else if c.kind == compactionKindFlush {
    2679           2 :                 writeCategory = "pebble-memtable-flush"
    2680           2 :         } else {
    2681           2 :                 writeCategory = "pebble-compaction"
    2682           2 :         }
    2683             : 
    2684           2 :         var reason string
    2685           2 :         if c.kind == compactionKindFlush {
    2686           2 :                 reason = "flushing"
    2687           2 :         } else {
    2688           2 :                 reason = "compacting"
    2689           2 :         }
    2690             : 
    2691           2 :         ctx := context.TODO()
    2692           2 :         if objiotracing.Enabled {
    2693           0 :                 ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
    2694           0 :                 if c.kind == compactionKindFlush {
    2695           0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
    2696           0 :                 } else {
    2697           0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
    2698           0 :                 }
    2699             :         }
    2700             : 
    2701             :         // Prefer shared storage if present.
    2702           2 :         createOpts := objstorage.CreateOptions{
    2703           2 :                 PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    2704           2 :                 WriteCategory:       writeCategory,
    2705           2 :         }
    2706           2 :         writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
    2707           2 :         if err != nil {
    2708           1 :                 return objstorage.ObjectMetadata{}, nil, nil, err
    2709           1 :         }
    2710             : 
    2711           2 :         if c.kind != compactionKindFlush {
    2712           2 :                 writable = &compactionWritable{
    2713           2 :                         Writable: writable,
    2714           2 :                         versions: d.mu.versions,
    2715           2 :                         written:  &c.bytesWritten,
    2716           2 :                 }
    2717           2 :         }
    2718           2 :         d.opts.EventListener.TableCreated(TableCreateInfo{
    2719           2 :                 JobID:   int(jobID),
    2720           2 :                 Reason:  reason,
    2721           2 :                 Path:    d.objProvider.Path(objMeta),
    2722           2 :                 FileNum: diskFileNum,
    2723           2 :         })
    2724           2 : 
    2725           2 :         writerOpts.SetInternal(sstableinternal.WriterOptions{
    2726           2 :                 CacheOpts: sstableinternal.CacheOptions{
    2727           2 :                         Cache:   d.opts.Cache,
    2728           2 :                         CacheID: d.cacheID,
    2729           2 :                         FileNum: diskFileNum,
    2730           2 :                 },
    2731           2 :         })
    2732           2 : 
    2733           2 :         const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
    2734           2 :         cpuWorkHandle := d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
    2735           2 :                 MaxFileWriteAdditionalCPUTime,
    2736           2 :         )
    2737           2 :         writerOpts.Parallelism =
    2738           2 :                 d.opts.Experimental.MaxWriterConcurrency > 0 &&
    2739           2 :                         (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
    2740           2 : 
    2741           2 :         // TODO(jackson): Make the compaction body generic over the RawWriter type,
    2742           2 :         // so that we don't need to pay the cost of dynamic dispatch. For now, we
    2743           2 :         // type assert into the only concrete RawWriter type we see (until colblk is
    2744           2 :         // integrated).
    2745           2 :         tw := sstable.NewRawWriter(writable, writerOpts).(*sstable.RawRowWriter)
    2746           2 :         return objMeta, tw, cpuWorkHandle, nil
    2747             : }
    2748             : 
    2749             : // validateVersionEdit validates that start and end keys across new and deleted
    2750             : // files in a versionEdit pass the given validation function.
    2751             : func validateVersionEdit(
    2752             :         ve *versionEdit, validateFn func([]byte) error, format base.FormatKey, logger Logger,
    2753           2 : ) {
    2754           2 :         validateKey := func(f *manifest.FileMetadata, key []byte) {
    2755           2 :                 if err := validateFn(key); err != nil {
    2756           1 :                         logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
    2757           1 :                 }
    2758             :         }
    2759             : 
    2760             :         // Validate both new and deleted files.
    2761           2 :         for _, f := range ve.NewFiles {
    2762           2 :                 validateKey(f.Meta, f.Meta.Smallest.UserKey)
    2763           2 :                 validateKey(f.Meta, f.Meta.Largest.UserKey)
    2764           2 :         }
    2765           2 :         for _, m := range ve.DeletedFiles {
    2766           2 :                 validateKey(m, m.Smallest.UserKey)
    2767           2 :                 validateKey(m, m.Largest.UserKey)
    2768           2 :         }
    2769             : }

Generated by: LCOV version 1.14