LCOV - code coverage report
Current view: top level - pebble - compaction.go (source / functions) Hit Total Coverage
Test: 2024-04-18 08:16Z e0c6d173 - meta test only.lcov Lines: 2273 2559 88.8 %
Date: 2024-04-18 08:17:43 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "runtime/pprof"
      15             :         "slices"
      16             :         "sort"
      17             :         "sync/atomic"
      18             :         "time"
      19             : 
      20             :         "github.com/cockroachdb/errors"
      21             :         "github.com/cockroachdb/pebble/internal/base"
      22             :         "github.com/cockroachdb/pebble/internal/compact"
      23             :         "github.com/cockroachdb/pebble/internal/invalidating"
      24             :         "github.com/cockroachdb/pebble/internal/invariants"
      25             :         "github.com/cockroachdb/pebble/internal/keyspan"
      26             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      27             :         "github.com/cockroachdb/pebble/internal/manifest"
      28             :         "github.com/cockroachdb/pebble/internal/private"
      29             :         "github.com/cockroachdb/pebble/internal/rangedel"
      30             :         "github.com/cockroachdb/pebble/internal/rangekey"
      31             :         "github.com/cockroachdb/pebble/objstorage"
      32             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      33             :         "github.com/cockroachdb/pebble/objstorage/remote"
      34             :         "github.com/cockroachdb/pebble/sstable"
      35             :         "github.com/cockroachdb/pebble/vfs"
      36             :         "github.com/cockroachdb/pebble/wal"
      37             : )
      38             : 
      39             : var errEmptyTable = errors.New("pebble: empty table")
      40             : 
      41             : // ErrCancelledCompaction is returned if a compaction is cancelled by a
      42             : // concurrent excise or ingest-split operation.
      43             : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
      44             : 
      45             : var compactLabels = pprof.Labels("pebble", "compact")
      46             : var flushLabels = pprof.Labels("pebble", "flush")
      47             : var gcLabels = pprof.Labels("pebble", "gc")
      48             : 
      49             : // getInternalWriterProperties accesses a private variable (in the
      50             : // internal/private package) initialized by the sstable Writer. This indirection
      51             : // is necessary to ensure non-Pebble users constructing sstables for ingestion
      52             : // are unable to set internal-only properties.
      53             : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
      54             : 
      55             : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
      56             : // compacted files. We avoid expanding the lower level file set of a compaction
      57             : // if it would make the total compaction cover more than this many bytes.
      58           1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
      59           1 :         v := uint64(25 * opts.Level(level).TargetFileSize)
      60           1 : 
      61           1 :         // Never expand a compaction beyond half the available capacity, divided
      62           1 :         // by the maximum number of concurrent compactions. Each of the concurrent
      63           1 :         // compactions may expand up to this limit, so this attempts to limit
      64           1 :         // compactions to half of available disk space. Note that this will not
      65           1 :         // prevent compaction picking from pursuing compactions that are larger
      66           1 :         // than this threshold before expansion.
      67           1 :         diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
      68           1 :         if v > diskMax {
      69           0 :                 v = diskMax
      70           0 :         }
      71           1 :         return v
      72             : }
      73             : 
      74             : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
      75             : // before we stop building a single file in a level-1 to level compaction.
      76           1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
      77           1 :         return uint64(10 * opts.Level(level).TargetFileSize)
      78           1 : }
      79             : 
      80             : // maxReadCompactionBytes is used to prevent read compactions which
      81             : // are too wide.
      82           1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
      83           1 :         return uint64(10 * opts.Level(level).TargetFileSize)
      84           1 : }
      85             : 
      86             : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
      87             : // calls to Close. It is used during compaction to ensure that rangeDelIters
      88             : // are not closed prematurely.
      89             : type noCloseIter struct {
      90             :         keyspan.FragmentIterator
      91             : }
      92             : 
      93           1 : func (i noCloseIter) Close() error {
      94           1 :         return nil
      95           1 : }
      96             : 
      97             : type compactionLevel struct {
      98             :         level int
      99             :         files manifest.LevelSlice
     100             :         // l0SublevelInfo contains information about L0 sublevels being compacted.
     101             :         // It's only set for the start level of a compaction starting out of L0 and
     102             :         // is nil for all other compactions.
     103             :         l0SublevelInfo []sublevelInfo
     104             : }
     105             : 
     106           1 : func (cl compactionLevel) Clone() compactionLevel {
     107           1 :         newCL := compactionLevel{
     108           1 :                 level: cl.level,
     109           1 :                 files: cl.files.Reslice(func(start, end *manifest.LevelIterator) {}),
     110             :         }
     111           1 :         return newCL
     112             : }
     113           0 : func (cl compactionLevel) String() string {
     114           0 :         return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
     115           0 : }
     116             : 
     117             : // compactionWritable is a objstorage.Writable wrapper that, on every write,
     118             : // updates a metric in `versions` on bytes written by in-progress compactions so
     119             : // far. It also increments a per-compaction `written` int.
     120             : type compactionWritable struct {
     121             :         objstorage.Writable
     122             : 
     123             :         versions *versionSet
     124             :         written  *int64
     125             : }
     126             : 
     127             : // Write is part of the objstorage.Writable interface.
     128           1 : func (c *compactionWritable) Write(p []byte) error {
     129           1 :         if err := c.Writable.Write(p); err != nil {
     130           0 :                 return err
     131           0 :         }
     132             : 
     133           1 :         *c.written += int64(len(p))
     134           1 :         c.versions.incrementCompactionBytes(int64(len(p)))
     135           1 :         return nil
     136             : }
     137             : 
     138             : type compactionKind int
     139             : 
     140             : const (
     141             :         compactionKindDefault compactionKind = iota
     142             :         compactionKindFlush
     143             :         // compactionKindMove denotes a move compaction where the input file is
     144             :         // retained and linked in a new level without being obsoleted.
     145             :         compactionKindMove
     146             :         // compactionKindCopy denotes a copy compaction where the input file is
     147             :         // copied byte-by-byte into a new file with a new FileNum in the output level.
     148             :         compactionKindCopy
     149             :         compactionKindDeleteOnly
     150             :         compactionKindElisionOnly
     151             :         compactionKindRead
     152             :         compactionKindRewrite
     153             :         compactionKindIngestedFlushable
     154             : )
     155             : 
     156           1 : func (k compactionKind) String() string {
     157           1 :         switch k {
     158           1 :         case compactionKindDefault:
     159           1 :                 return "default"
     160           0 :         case compactionKindFlush:
     161           0 :                 return "flush"
     162           1 :         case compactionKindMove:
     163           1 :                 return "move"
     164           1 :         case compactionKindDeleteOnly:
     165           1 :                 return "delete-only"
     166           1 :         case compactionKindElisionOnly:
     167           1 :                 return "elision-only"
     168           0 :         case compactionKindRead:
     169           0 :                 return "read"
     170           1 :         case compactionKindRewrite:
     171           1 :                 return "rewrite"
     172           0 :         case compactionKindIngestedFlushable:
     173           0 :                 return "ingested-flushable"
     174           1 :         case compactionKindCopy:
     175           1 :                 return "copy"
     176             :         }
     177           0 :         return "?"
     178             : }
     179             : 
     180             : // rangeKeyCompactionTransform is used to transform range key spans as part of the
     181             : // keyspanimpl.MergingIter. As part of this transformation step, we can elide range
     182             : // keys in the last snapshot stripe, as well as coalesce range keys within
     183             : // snapshot stripes.
     184             : func rangeKeyCompactionTransform(
     185             :         eq base.Equal, snapshots []uint64, elideRangeKey func(start, end []byte) bool,
     186           1 : ) keyspan.Transformer {
     187           1 :         return keyspan.TransformerFunc(func(cmp base.Compare, s keyspan.Span, dst *keyspan.Span) error {
     188           1 :                 elideInLastStripe := func(keys []keyspan.Key) []keyspan.Key {
     189           1 :                         // Unsets and deletes in the last snapshot stripe can be elided.
     190           1 :                         k := 0
     191           1 :                         for j := range keys {
     192           1 :                                 if elideRangeKey(s.Start, s.End) &&
     193           1 :                                         (keys[j].Kind() == InternalKeyKindRangeKeyUnset || keys[j].Kind() == InternalKeyKindRangeKeyDelete) {
     194           1 :                                         continue
     195             :                                 }
     196           1 :                                 keys[k] = keys[j]
     197           1 :                                 k++
     198             :                         }
     199           1 :                         keys = keys[:k]
     200           1 :                         return keys
     201             :                 }
     202             :                 // snapshots are in ascending order, while s.keys are in descending seqnum
     203             :                 // order. Partition s.keys by snapshot stripes, and call rangekey.Coalesce
     204             :                 // on each partition.
     205           1 :                 dst.Start = s.Start
     206           1 :                 dst.End = s.End
     207           1 :                 dst.Keys = dst.Keys[:0]
     208           1 :                 i, j := len(snapshots)-1, 0
     209           1 :                 usedLen := 0
     210           1 :                 for i >= 0 {
     211           1 :                         start := j
     212           1 :                         for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) {
     213           1 :                                 // Include j in current partition.
     214           1 :                                 j++
     215           1 :                         }
     216           1 :                         if j > start {
     217           1 :                                 keysDst := dst.Keys[usedLen:cap(dst.Keys)]
     218           1 :                                 rangekey.Coalesce(cmp, eq, s.Keys[start:j], &keysDst)
     219           1 :                                 if j == len(s.Keys) {
     220           1 :                                         // This is the last snapshot stripe. Unsets and deletes can be elided.
     221           1 :                                         keysDst = elideInLastStripe(keysDst)
     222           1 :                                 }
     223           1 :                                 usedLen += len(keysDst)
     224           1 :                                 dst.Keys = append(dst.Keys, keysDst...)
     225             :                         }
     226           1 :                         i--
     227             :                 }
     228           1 :                 if j < len(s.Keys) {
     229           1 :                         keysDst := dst.Keys[usedLen:cap(dst.Keys)]
     230           1 :                         rangekey.Coalesce(cmp, eq, s.Keys[j:], &keysDst)
     231           1 :                         keysDst = elideInLastStripe(keysDst)
     232           1 :                         usedLen += len(keysDst)
     233           1 :                         dst.Keys = append(dst.Keys, keysDst...)
     234           1 :                 }
     235           1 :                 return nil
     236             :         })
     237             : }
     238             : 
     239             : // compaction is a table compaction from one level to the next, starting from a
     240             : // given version.
     241             : type compaction struct {
     242             :         // cancel is a bool that can be used by other goroutines to signal a compaction
     243             :         // to cancel, such as if a conflicting excise operation raced it to manifest
     244             :         // application. Only holders of the manifest lock will write to this atomic.
     245             :         cancel atomic.Bool
     246             : 
     247             :         kind compactionKind
     248             :         // isDownload is true if this compaction was started as part of a Download
     249             :         // operation. In this case kind is compactionKindCopy or
     250             :         // compactionKindRewrite.
     251             :         isDownload bool
     252             : 
     253             :         cmp       Compare
     254             :         equal     Equal
     255             :         comparer  *base.Comparer
     256             :         formatKey base.FormatKey
     257             :         logger    Logger
     258             :         version   *version
     259             :         stats     base.InternalIteratorStats
     260             :         beganAt   time.Time
     261             :         // versionEditApplied is set to true when a compaction has completed and the
     262             :         // resulting version has been installed (if successful), but the compaction
     263             :         // goroutine is still cleaning up (eg, deleting obsolete files).
     264             :         versionEditApplied bool
     265             :         bufferPool         sstable.BufferPool
     266             : 
     267             :         // startLevel is the level that is being compacted. Inputs from startLevel
     268             :         // and outputLevel will be merged to produce a set of outputLevel files.
     269             :         startLevel *compactionLevel
     270             : 
     271             :         // outputLevel is the level that files are being produced in. outputLevel is
     272             :         // equal to startLevel+1 except when:
     273             :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     274             :         //    - in multilevel compaction, the output level is the lowest level involved in
     275             :         //      the compaction
     276             :         // A compaction's outputLevel is nil for delete-only compactions.
     277             :         outputLevel *compactionLevel
     278             : 
     279             :         // extraLevels point to additional levels in between the input and output
     280             :         // levels that get compacted in multilevel compactions
     281             :         extraLevels []*compactionLevel
     282             : 
     283             :         inputs []compactionLevel
     284             : 
     285             :         // maxOutputFileSize is the maximum size of an individual table created
     286             :         // during compaction.
     287             :         maxOutputFileSize uint64
     288             :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     289             :         // single output table with the tables in the grandparent level.
     290             :         maxOverlapBytes uint64
     291             :         // disableSpanElision disables elision of range tombstones and range keys. Used
     292             :         // by tests to allow range tombstones or range keys to be added to tables where
     293             :         // they would otherwise be elided.
     294             :         disableSpanElision bool
     295             : 
     296             :         // flushing contains the flushables (aka memtables) that are being flushed.
     297             :         flushing flushableList
     298             :         // bytesWritten contains the number of bytes that have been written to outputs.
     299             :         bytesWritten int64
     300             : 
     301             :         // The boundaries of the input data.
     302             :         smallest InternalKey
     303             :         largest  InternalKey
     304             : 
     305             :         // rangeDelInterlaving is an interleaving iterator for range deletions, that
     306             :         // interleaves range tombstones among the point keys.
     307             :         rangeDelInterleaving keyspan.InterleavingIter
     308             :         // rangeKeyInterleaving is the interleaving iter for range keys.
     309             :         rangeKeyInterleaving keyspan.InterleavingIter
     310             : 
     311             :         // A list of objects to close when the compaction finishes. Used by input
     312             :         // iteration to keep rangeDelIters open for the lifetime of the compaction,
     313             :         // and only close them when the compaction finishes.
     314             :         closers []io.Closer
     315             : 
     316             :         // grandparents are the tables in level+2 that overlap with the files being
     317             :         // compacted. Used to determine output table boundaries. Do not assume that the actual files
     318             :         // in the grandparent when this compaction finishes will be the same.
     319             :         grandparents manifest.LevelSlice
     320             : 
     321             :         // Boundaries at which flushes to L0 should be split. Determined by
     322             :         // L0Sublevels. If nil, flushes aren't split.
     323             :         l0Limits [][]byte
     324             : 
     325             :         // List of disjoint inuse key ranges the compaction overlaps with in
     326             :         // grandparent and lower levels. See setupInuseKeyRanges() for the
     327             :         // construction. Used by elideTombstone() and elideRangeTombstone() to
     328             :         // determine if keys affected by a tombstone possibly exist at a lower level.
     329             :         inuseKeyRanges []manifest.UserKeyRange
     330             :         // inuseEntireRange is set if the above inuse key ranges wholly contain the
     331             :         // compaction's key range. This allows compactions in higher levels to often
     332             :         // elide key comparisons.
     333             :         inuseEntireRange    bool
     334             :         elideTombstoneIndex int
     335             : 
     336             :         // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
     337             :         // snapshots requiring them to be kept. This determination is made by
     338             :         // looking for an sstable which overlaps the bounds of the compaction at a
     339             :         // lower level in the LSM during runCompaction.
     340             :         allowedZeroSeqNum bool
     341             : 
     342             :         metrics map[int]*LevelMetrics
     343             : 
     344             :         pickerMetrics compactionPickerMetrics
     345             : }
     346             : 
     347           1 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
     348           1 :         info := CompactionInfo{
     349           1 :                 JobID:       int(jobID),
     350           1 :                 Reason:      c.kind.String(),
     351           1 :                 Input:       make([]LevelInfo, 0, len(c.inputs)),
     352           1 :                 Annotations: []string{},
     353           1 :         }
     354           1 :         if c.isDownload {
     355           1 :                 info.Reason = "download," + info.Reason
     356           1 :         }
     357           1 :         for _, cl := range c.inputs {
     358           1 :                 inputInfo := LevelInfo{Level: cl.level, Tables: nil}
     359           1 :                 iter := cl.files.Iter()
     360           1 :                 for m := iter.First(); m != nil; m = iter.Next() {
     361           1 :                         inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
     362           1 :                 }
     363           1 :                 info.Input = append(info.Input, inputInfo)
     364             :         }
     365           1 :         if c.outputLevel != nil {
     366           1 :                 info.Output.Level = c.outputLevel.level
     367           1 : 
     368           1 :                 // If there are no inputs from the output level (eg, a move
     369           1 :                 // compaction), add an empty LevelInfo to info.Input.
     370           1 :                 if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
     371           0 :                         info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
     372           0 :                 }
     373           1 :         } else {
     374           1 :                 // For a delete-only compaction, set the output level to L6. The
     375           1 :                 // output level is not meaningful here, but complicating the
     376           1 :                 // info.Output interface with a pointer doesn't seem worth the
     377           1 :                 // semantic distinction.
     378           1 :                 info.Output.Level = numLevels - 1
     379           1 :         }
     380             : 
     381           1 :         for i, score := range c.pickerMetrics.scores {
     382           1 :                 info.Input[i].Score = score
     383           1 :         }
     384           1 :         info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
     385           1 :         info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
     386           1 :         if len(info.Input) > 2 {
     387           1 :                 info.Annotations = append(info.Annotations, "multilevel")
     388           1 :         }
     389           1 :         return info
     390             : }
     391             : 
     392           1 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
     393           1 :         return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
     394           1 : }
     395             : 
     396             : func newCompaction(
     397             :         pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
     398           1 : ) *compaction {
     399           1 :         c := &compaction{
     400           1 :                 kind:              compactionKindDefault,
     401           1 :                 cmp:               pc.cmp,
     402           1 :                 equal:             opts.Comparer.Equal,
     403           1 :                 comparer:          opts.Comparer,
     404           1 :                 formatKey:         opts.Comparer.FormatKey,
     405           1 :                 inputs:            pc.inputs,
     406           1 :                 smallest:          pc.smallest,
     407           1 :                 largest:           pc.largest,
     408           1 :                 logger:            opts.Logger,
     409           1 :                 version:           pc.version,
     410           1 :                 beganAt:           beganAt,
     411           1 :                 maxOutputFileSize: pc.maxOutputFileSize,
     412           1 :                 maxOverlapBytes:   pc.maxOverlapBytes,
     413           1 :                 pickerMetrics:     pc.pickerMetrics,
     414           1 :         }
     415           1 :         c.startLevel = &c.inputs[0]
     416           1 :         if pc.startLevel.l0SublevelInfo != nil {
     417           1 :                 c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
     418           1 :         }
     419           1 :         c.outputLevel = &c.inputs[1]
     420           1 : 
     421           1 :         if len(pc.extraLevels) > 0 {
     422           1 :                 c.extraLevels = pc.extraLevels
     423           1 :                 c.outputLevel = &c.inputs[len(c.inputs)-1]
     424           1 :         }
     425             :         // Compute the set of outputLevel+1 files that overlap this compaction (these
     426             :         // are the grandparent sstables).
     427           1 :         if c.outputLevel.level+1 < numLevels {
     428           1 :                 c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
     429           1 :         }
     430           1 :         c.setupInuseKeyRanges()
     431           1 :         c.kind = pc.kind
     432           1 : 
     433           1 :         if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
     434           1 :                 c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
     435           1 :                 // This compaction can be converted into a move or copy from one level
     436           1 :                 // to the next. We avoid such a move if there is lots of overlapping
     437           1 :                 // grandparent data. Otherwise, the move could create a parent file
     438           1 :                 // that will require a very expensive merge later on.
     439           1 :                 iter := c.startLevel.files.Iter()
     440           1 :                 meta := iter.First()
     441           1 :                 isRemote := false
     442           1 :                 // We should always be passed a provider, except in some unit tests.
     443           1 :                 if provider != nil {
     444           1 :                         isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
     445           1 :                 }
     446             :                 // Avoid a trivial move or copy if all of these are true, as rewriting a
     447             :                 // new file is better:
     448             :                 //
     449             :                 // 1) The source file is a virtual sstable
     450             :                 // 2) The existing file `meta` is on non-remote storage
     451             :                 // 3) The output level prefers shared storage
     452           1 :                 mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     453           1 :                 if mustCopy {
     454           1 :                         // If the source is virtual, it's best to just rewrite the file as all
     455           1 :                         // conditions in the above comment are met.
     456           1 :                         if !meta.Virtual {
     457           1 :                                 c.kind = compactionKindCopy
     458           1 :                         }
     459           1 :                 } else {
     460           1 :                         c.kind = compactionKindMove
     461           1 :                 }
     462             :         }
     463           1 :         return c
     464             : }
     465             : 
     466             : func newDeleteOnlyCompaction(
     467             :         opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
     468           1 : ) *compaction {
     469           1 :         c := &compaction{
     470           1 :                 kind:      compactionKindDeleteOnly,
     471           1 :                 cmp:       opts.Comparer.Compare,
     472           1 :                 equal:     opts.Comparer.Equal,
     473           1 :                 comparer:  opts.Comparer,
     474           1 :                 formatKey: opts.Comparer.FormatKey,
     475           1 :                 logger:    opts.Logger,
     476           1 :                 version:   cur,
     477           1 :                 beganAt:   beganAt,
     478           1 :                 inputs:    inputs,
     479           1 :         }
     480           1 : 
     481           1 :         // Set c.smallest, c.largest.
     482           1 :         files := make([]manifest.LevelIterator, 0, len(inputs))
     483           1 :         for _, in := range inputs {
     484           1 :                 files = append(files, in.files.Iter())
     485           1 :         }
     486           1 :         c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
     487           1 :         return c
     488             : }
     489             : 
     490           1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
     491           1 :         // Heuristic to place a lower bound on compaction output file size
     492           1 :         // caused by Lbase. Prior to this heuristic we have observed an L0 in
     493           1 :         // production with 310K files of which 290K files were < 10KB in size.
     494           1 :         // Our hypothesis is that it was caused by L1 having 2600 files and
     495           1 :         // ~10GB, such that each flush got split into many tiny files due to
     496           1 :         // overlapping with most of the files in Lbase.
     497           1 :         //
     498           1 :         // The computation below is general in that it accounts
     499           1 :         // for flushing different volumes of data (e.g. we may be flushing
     500           1 :         // many memtables). For illustration, we consider the typical
     501           1 :         // example of flushing a 64MB memtable. So 12.8MB output,
     502           1 :         // based on the compression guess below. If the compressed bytes
     503           1 :         // guess is an over-estimate we will end up with smaller files,
     504           1 :         // and if an under-estimate we will end up with larger files.
     505           1 :         // With a 2MB target file size, 7 files. We are willing to accept
     506           1 :         // 4x the number of files, if it results in better write amplification
     507           1 :         // when later compacting to Lbase, i.e., ~450KB files (target file
     508           1 :         // size / 4).
     509           1 :         //
     510           1 :         // Note that this is a pessimistic heuristic in that
     511           1 :         // fileCountUpperBoundDueToGrandparents could be far from the actual
     512           1 :         // number of files produced due to the grandparent limits. For
     513           1 :         // example, in the extreme, consider a flush that overlaps with 1000
     514           1 :         // files in Lbase f0...f999, and the initially calculated value of
     515           1 :         // maxOverlapBytes will cause splits at f10, f20,..., f990, which
     516           1 :         // means an upper bound file count of 100 files. Say the input bytes
     517           1 :         // in the flush are such that acceptableFileCount=10. We will fatten
     518           1 :         // up maxOverlapBytes by 10x to ensure that the upper bound file count
     519           1 :         // drops to 10. However, it is possible that in practice, even without
     520           1 :         // this change, we would have produced no more than 10 files, and that
     521           1 :         // this change makes the files unnecessarily wide. Say the input bytes
     522           1 :         // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
     523           1 :         // 10% in f80...f89 and 10% in f990...f999. The original value of
     524           1 :         // maxOverlapBytes would have actually produced only 10 sstables. But
     525           1 :         // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
     526           1 :         // spans f0...f89, i.e., a much wider sstable than necessary.
     527           1 :         //
     528           1 :         // We could produce a tighter estimate of
     529           1 :         // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
     530           1 :         // distribution of the flush. The 4x multiplier mentioned earlier is
     531           1 :         // a way to try to compensate for this pessimism.
     532           1 :         //
     533           1 :         // TODO(sumeer): we don't have compression info for the data being
     534           1 :         // flushed, but it is likely that existing files that overlap with
     535           1 :         // this flush in Lbase are representative wrt compression ratio. We
     536           1 :         // could store the uncompressed size in FileMetadata and estimate
     537           1 :         // the compression ratio.
     538           1 :         const approxCompressionRatio = 0.2
     539           1 :         approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
     540           1 :         approxNumFilesBasedOnTargetSize :=
     541           1 :                 int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
     542           1 :         acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
     543           1 :         // The byte calculation is linear in numGrandparentFiles, but we will
     544           1 :         // incur this linear cost in findGrandparentLimit too, so we are also
     545           1 :         // willing to pay it now. We could approximate this cheaply by using
     546           1 :         // the mean file size of Lbase.
     547           1 :         grandparentFileBytes := c.grandparents.SizeSum()
     548           1 :         fileCountUpperBoundDueToGrandparents :=
     549           1 :                 float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
     550           1 :         if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
     551           1 :                 c.maxOverlapBytes = uint64(
     552           1 :                         float64(c.maxOverlapBytes) *
     553           1 :                                 (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
     554           1 :         }
     555             : }
     556             : 
     557             : func newFlush(
     558             :         opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
     559           1 : ) (*compaction, error) {
     560           1 :         c := &compaction{
     561           1 :                 kind:              compactionKindFlush,
     562           1 :                 cmp:               opts.Comparer.Compare,
     563           1 :                 equal:             opts.Comparer.Equal,
     564           1 :                 comparer:          opts.Comparer,
     565           1 :                 formatKey:         opts.Comparer.FormatKey,
     566           1 :                 logger:            opts.Logger,
     567           1 :                 version:           cur,
     568           1 :                 beganAt:           beganAt,
     569           1 :                 inputs:            []compactionLevel{{level: -1}, {level: 0}},
     570           1 :                 maxOutputFileSize: math.MaxUint64,
     571           1 :                 maxOverlapBytes:   math.MaxUint64,
     572           1 :                 flushing:          flushing,
     573           1 :         }
     574           1 :         c.startLevel = &c.inputs[0]
     575           1 :         c.outputLevel = &c.inputs[1]
     576           1 : 
     577           1 :         if len(flushing) > 0 {
     578           1 :                 if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
     579           1 :                         if len(flushing) != 1 {
     580           0 :                                 panic("pebble: ingestedFlushable must be flushed one at a time.")
     581             :                         }
     582           1 :                         c.kind = compactionKindIngestedFlushable
     583           1 :                         return c, nil
     584             :                 }
     585             :         }
     586             : 
     587             :         // Make sure there's no ingestedFlushable after the first flushable in the
     588             :         // list.
     589           1 :         for _, f := range flushing {
     590           1 :                 if _, ok := f.flushable.(*ingestedFlushable); ok {
     591           0 :                         panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
     592             :                 }
     593             :         }
     594             : 
     595           1 :         if cur.L0Sublevels != nil {
     596           1 :                 c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
     597           1 :         }
     598             : 
     599           1 :         smallestSet, largestSet := false, false
     600           1 :         updatePointBounds := func(iter internalIterator) {
     601           1 :                 if kv := iter.First(); kv != nil {
     602           1 :                         if !smallestSet ||
     603           1 :                                 base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
     604           1 :                                 smallestSet = true
     605           1 :                                 c.smallest = kv.K.Clone()
     606           1 :                         }
     607             :                 }
     608           1 :                 if kv := iter.Last(); kv != nil {
     609           1 :                         if !largestSet ||
     610           1 :                                 base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
     611           1 :                                 largestSet = true
     612           1 :                                 c.largest = kv.K.Clone()
     613           1 :                         }
     614             :                 }
     615             :         }
     616             : 
     617           1 :         updateRangeBounds := func(iter keyspan.FragmentIterator) error {
     618           1 :                 // File bounds require s != nil && !s.Empty(). We only need to check for
     619           1 :                 // s != nil here, as the memtable's FragmentIterator would never surface
     620           1 :                 // empty spans.
     621           1 :                 if s, err := iter.First(); err != nil {
     622           0 :                         return err
     623           1 :                 } else if s != nil {
     624           1 :                         if key := s.SmallestKey(); !smallestSet ||
     625           1 :                                 base.InternalCompare(c.cmp, c.smallest, key) > 0 {
     626           1 :                                 smallestSet = true
     627           1 :                                 c.smallest = key.Clone()
     628           1 :                         }
     629             :                 }
     630           1 :                 if s, err := iter.Last(); err != nil {
     631           0 :                         return err
     632           1 :                 } else if s != nil {
     633           1 :                         if key := s.LargestKey(); !largestSet ||
     634           1 :                                 base.InternalCompare(c.cmp, c.largest, key) < 0 {
     635           1 :                                 largestSet = true
     636           1 :                                 c.largest = key.Clone()
     637           1 :                         }
     638             :                 }
     639           1 :                 return nil
     640             :         }
     641             : 
     642           1 :         var flushingBytes uint64
     643           1 :         for i := range flushing {
     644           1 :                 f := flushing[i]
     645           1 :                 updatePointBounds(f.newIter(nil))
     646           1 :                 if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
     647           1 :                         if err := updateRangeBounds(rangeDelIter); err != nil {
     648           0 :                                 return nil, err
     649           0 :                         }
     650             :                 }
     651           1 :                 if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     652           1 :                         if err := updateRangeBounds(rangeKeyIter); err != nil {
     653           0 :                                 return nil, err
     654           0 :                         }
     655             :                 }
     656           1 :                 flushingBytes += f.inuseBytes()
     657             :         }
     658             : 
     659           1 :         if opts.FlushSplitBytes > 0 {
     660           1 :                 c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
     661           1 :                 c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
     662           1 :                 c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
     663           1 :                 adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
     664           1 :         }
     665             : 
     666           1 :         c.setupInuseKeyRanges()
     667           1 :         return c, nil
     668             : }
     669             : 
     670           1 : func (c *compaction) hasExtraLevelData() bool {
     671           1 :         if len(c.extraLevels) == 0 {
     672           1 :                 // not a multi level compaction
     673           1 :                 return false
     674           1 :         } else if c.extraLevels[0].files.Empty() {
     675           1 :                 // a multi level compaction without data in the intermediate input level;
     676           1 :                 // e.g. for a multi level compaction with levels 4,5, and 6, this could
     677           1 :                 // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
     678           1 :                 return false
     679           1 :         }
     680           1 :         return true
     681             : }
     682             : 
     683           1 : func (c *compaction) setupInuseKeyRanges() {
     684           1 :         level := c.outputLevel.level + 1
     685           1 :         if c.outputLevel.level == 0 {
     686           1 :                 level = 0
     687           1 :         }
     688             :         // calculateInuseKeyRanges will return a series of sorted spans. Overlapping
     689             :         // or abutting spans have already been merged.
     690           1 :         c.inuseKeyRanges = c.version.CalculateInuseKeyRanges(
     691           1 :                 level, numLevels-1, c.smallest.UserKey, c.largest.UserKey,
     692           1 :         )
     693           1 :         // Check if there's a single in-use span that encompasses the entire key
     694           1 :         // range of the compaction. This is an optimization to avoid key comparisons
     695           1 :         // against inuseKeyRanges during the compaction when every key within the
     696           1 :         // compaction overlaps with an in-use span.
     697           1 :         if len(c.inuseKeyRanges) > 0 {
     698           1 :                 c.inuseEntireRange = c.cmp(c.inuseKeyRanges[0].Start, c.smallest.UserKey) <= 0 &&
     699           1 :                         c.cmp(c.inuseKeyRanges[0].End, c.largest.UserKey) >= 0
     700           1 :         }
     701             : }
     702             : 
     703             : // findGrandparentLimit takes the start user key for a table and returns the
     704             : // user key to which that table can extend without excessively overlapping
     705             : // the grandparent level. If no limit is needed considering the grandparent
     706             : // files, this function returns nil. This is done in order to prevent a table
     707             : // at level N from overlapping too much data at level N+1. We want to avoid
     708             : // such large overlaps because they translate into large compactions. The
     709             : // current heuristic stops output of a table if the addition of another key
     710             : // would cause the table to overlap more than 10x the target file size at
     711             : // level N. See maxGrandparentOverlapBytes.
     712           1 : func (c *compaction) findGrandparentLimit(start []byte) []byte {
     713           1 :         iter := c.grandparents.Iter()
     714           1 :         var overlappedBytes uint64
     715           1 :         var greater bool
     716           1 :         for f := iter.SeekGE(c.cmp, start); f != nil; f = iter.Next() {
     717           1 :                 overlappedBytes += f.Size
     718           1 :                 // To ensure forward progress we always return a larger user
     719           1 :                 // key than where we started. See comments above clients of
     720           1 :                 // this function for how this is used.
     721           1 :                 greater = greater || c.cmp(f.Smallest.UserKey, start) > 0
     722           1 :                 if !greater {
     723           1 :                         continue
     724             :                 }
     725             : 
     726             :                 // We return the smallest bound of a sstable rather than the
     727             :                 // largest because the smallest is always inclusive, and limits
     728             :                 // are used exlusively when truncating range tombstones. If we
     729             :                 // truncated an output to the largest key while there's a
     730             :                 // pending tombstone, the next output file would also overlap
     731             :                 // the same grandparent f.
     732           1 :                 if overlappedBytes > c.maxOverlapBytes {
     733           1 :                         return f.Smallest.UserKey
     734           1 :                 }
     735             :         }
     736           1 :         return nil
     737             : }
     738             : 
     739             : // findL0Limit takes the start key for a table and returns the user key to which
     740             : // that table can be extended without hitting the next l0Limit. Having flushed
     741             : // sstables "bridging across" an l0Limit could lead to increased L0 -> LBase
     742             : // compaction sizes as well as elevated read amplification.
     743           1 : func (c *compaction) findL0Limit(start []byte) []byte {
     744           1 :         if c.startLevel.level > -1 || c.outputLevel.level != 0 || len(c.l0Limits) == 0 {
     745           1 :                 return nil
     746           1 :         }
     747           1 :         index := sort.Search(len(c.l0Limits), func(i int) bool {
     748           1 :                 return c.cmp(c.l0Limits[i], start) > 0
     749           1 :         })
     750           1 :         if index < len(c.l0Limits) {
     751           1 :                 return c.l0Limits[index]
     752           1 :         }
     753           1 :         return nil
     754             : }
     755             : 
     756             : // errorOnUserKeyOverlap returns an error if the last two written sstables in
     757             : // this compaction have revisions of the same user key present in both sstables,
     758             : // when it shouldn't (eg. when splitting flushes).
     759           1 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
     760           1 :         if n := len(ve.NewFiles); n > 1 {
     761           1 :                 meta := ve.NewFiles[n-1].Meta
     762           1 :                 prevMeta := ve.NewFiles[n-2].Meta
     763           1 :                 if !prevMeta.Largest.IsExclusiveSentinel() &&
     764           1 :                         c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
     765           0 :                         return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
     766           0 :                                 prevMeta.Largest.Pretty(c.formatKey),
     767           0 :                                 prevMeta.FileNum,
     768           0 :                                 meta.FileNum)
     769           0 :                 }
     770             :         }
     771           1 :         return nil
     772             : }
     773             : 
     774             : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
     775             : // snapshots requiring them to be kept. It performs this determination by
     776             : // looking for an sstable which overlaps the bounds of the compaction at a
     777             : // lower level in the LSM.
     778           1 : func (c *compaction) allowZeroSeqNum() bool {
     779           1 :         return c.elideRangeTombstone(c.smallest.UserKey, c.largest.UserKey)
     780           1 : }
     781             : 
     782             : // elideTombstone returns true if it is ok to elide a tombstone for the
     783             : // specified key. A return value of true guarantees that there are no key/value
     784             : // pairs at c.level+2 or higher that possibly contain the specified user
     785             : // key. The keys in multiple invocations to elideTombstone must be supplied in
     786             : // order.
     787           1 : func (c *compaction) elideTombstone(key []byte) bool {
     788           1 :         if c.inuseEntireRange || len(c.flushing) != 0 {
     789           1 :                 return false
     790           1 :         }
     791             : 
     792           1 :         for ; c.elideTombstoneIndex < len(c.inuseKeyRanges); c.elideTombstoneIndex++ {
     793           1 :                 r := &c.inuseKeyRanges[c.elideTombstoneIndex]
     794           1 :                 if c.cmp(key, r.End) <= 0 {
     795           1 :                         if c.cmp(key, r.Start) >= 0 {
     796           1 :                                 return false
     797           1 :                         }
     798           1 :                         break
     799             :                 }
     800             :         }
     801           1 :         return true
     802             : }
     803             : 
     804             : // elideRangeTombstone returns true if it is ok to elide the specified range
     805             : // tombstone. A return value of true guarantees that there are no key/value
     806             : // pairs at c.outputLevel.level+1 or higher that possibly overlap the specified
     807             : // tombstone.
     808           1 : func (c *compaction) elideRangeTombstone(start, end []byte) bool {
     809           1 :         // Disable range tombstone elision if the testing knob for that is enabled,
     810           1 :         // or if we are flushing memtables. The latter requirement is due to
     811           1 :         // inuseKeyRanges not accounting for key ranges in other memtables that are
     812           1 :         // being flushed in the same compaction. It's possible for a range tombstone
     813           1 :         // in one memtable to overlap keys in a preceding memtable in c.flushing.
     814           1 :         //
     815           1 :         // This function is also used in setting allowZeroSeqNum, so disabling
     816           1 :         // elision of range tombstones also disables zeroing of SeqNums.
     817           1 :         //
     818           1 :         // TODO(peter): we disable zeroing of seqnums during flushing to match
     819           1 :         // RocksDB behavior and to avoid generating overlapping sstables during
     820           1 :         // DB.replayWAL. When replaying WAL files at startup, we flush after each
     821           1 :         // WAL is replayed building up a single version edit that is
     822           1 :         // applied. Because we don't apply the version edit after each flush, this
     823           1 :         // code doesn't know that L0 contains files and zeroing of seqnums should
     824           1 :         // be disabled. That is fixable, but it seems safer to just match the
     825           1 :         // RocksDB behavior for now.
     826           1 :         if c.disableSpanElision || len(c.flushing) != 0 {
     827           1 :                 return false
     828           1 :         }
     829             : 
     830           1 :         lower := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
     831           1 :                 return c.cmp(c.inuseKeyRanges[i].End, start) >= 0
     832           1 :         })
     833           1 :         upper := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
     834           1 :                 return c.cmp(c.inuseKeyRanges[i].Start, end) > 0
     835           1 :         })
     836           1 :         return lower >= upper
     837             : }
     838             : 
     839             : // elideRangeKey returns true if it is ok to elide the specified range key. A
     840             : // return value of true guarantees that there are no key/value pairs at
     841             : // c.outputLevel.level+1 or higher that possibly overlap the specified range key.
     842           1 : func (c *compaction) elideRangeKey(start, end []byte) bool {
     843           1 :         // TODO(bilal): Track inuseKeyRanges separately for the range keyspace as
     844           1 :         // opposed to the point keyspace. Once that is done, elideRangeTombstone
     845           1 :         // can just check in the point keyspace, and this function can check for
     846           1 :         // inuseKeyRanges in the range keyspace.
     847           1 :         return c.elideRangeTombstone(start, end)
     848           1 : }
     849             : 
     850             : // newInputIter returns an iterator over all the input tables in a compaction.
     851             : func (c *compaction) newInputIter(
     852             :         newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, snapshots []uint64,
     853           1 : ) (_ internalIterator, retErr error) {
     854           1 :         // Validate the ordering of compaction input files for defense in depth.
     855           1 :         if len(c.flushing) == 0 {
     856           1 :                 if c.startLevel.level >= 0 {
     857           1 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     858           1 :                                 manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
     859           1 :                         if err != nil {
     860           0 :                                 return nil, err
     861           0 :                         }
     862             :                 }
     863           1 :                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     864           1 :                         manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
     865           1 :                 if err != nil {
     866           0 :                         return nil, err
     867           0 :                 }
     868           1 :                 if c.startLevel.level == 0 {
     869           1 :                         if c.startLevel.l0SublevelInfo == nil {
     870           0 :                                 panic("l0SublevelInfo not created for compaction out of L0")
     871             :                         }
     872           1 :                         for _, info := range c.startLevel.l0SublevelInfo {
     873           1 :                                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
     874           1 :                                         info.sublevel, info.Iter())
     875           1 :                                 if err != nil {
     876           0 :                                         return nil, err
     877           0 :                                 }
     878             :                         }
     879             :                 }
     880           1 :                 if len(c.extraLevels) > 0 {
     881           1 :                         if len(c.extraLevels) > 1 {
     882           0 :                                 panic("n>2 multi level compaction not implemented yet")
     883             :                         }
     884           1 :                         interLevel := c.extraLevels[0]
     885           1 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
     886           1 :                                 manifest.Level(interLevel.level), interLevel.files.Iter())
     887           1 :                         if err != nil {
     888           0 :                                 return nil, err
     889           0 :                         }
     890             :                 }
     891             :         }
     892             : 
     893             :         // There are three classes of keys that a compaction needs to process: point
     894             :         // keys, range deletion tombstones and range keys. Collect all iterators for
     895             :         // all these classes of keys from all the levels. We'll aggregate them
     896             :         // together farther below.
     897             :         //
     898             :         // numInputLevels is an approximation of the number of iterator levels. Due
     899             :         // to idiosyncrasies in iterator construction, we may (rarely) exceed this
     900             :         // initial capacity.
     901           1 :         numInputLevels := max(len(c.flushing), len(c.inputs))
     902           1 :         iters := make([]internalIterator, 0, numInputLevels)
     903           1 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     904           1 :         rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
     905           1 : 
     906           1 :         // If construction of the iterator inputs fails, ensure that we close all
     907           1 :         // the consitutent iterators.
     908           1 :         defer func() {
     909           1 :                 if retErr != nil {
     910           0 :                         for _, iter := range iters {
     911           0 :                                 if iter != nil {
     912           0 :                                         iter.Close()
     913           0 :                                 }
     914             :                         }
     915           0 :                         for _, rangeDelIter := range rangeDelIters {
     916           0 :                                 rangeDelIter.Close()
     917           0 :                         }
     918             :                 }
     919             :         }()
     920           1 :         iterOpts := IterOptions{
     921           1 :                 CategoryAndQoS: sstable.CategoryAndQoS{
     922           1 :                         Category: "pebble-compaction",
     923           1 :                         QoSLevel: sstable.NonLatencySensitiveQoSLevel,
     924           1 :                 },
     925           1 :                 logger: c.logger,
     926           1 :         }
     927           1 : 
     928           1 :         // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
     929           1 :         // constituent iterators. This depends on whether this is a flush or a
     930           1 :         // compaction.
     931           1 :         if len(c.flushing) != 0 {
     932           1 :                 // If flushing, we need to build the input iterators over the memtables
     933           1 :                 // stored in c.flushing.
     934           1 :                 for i := range c.flushing {
     935           1 :                         f := c.flushing[i]
     936           1 :                         iters = append(iters, f.newFlushIter(nil))
     937           1 :                         rangeDelIter := f.newRangeDelIter(nil)
     938           1 :                         if rangeDelIter != nil {
     939           1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
     940           1 :                         }
     941           1 :                         if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     942           1 :                                 rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
     943           1 :                         }
     944             :                 }
     945           1 :         } else {
     946           1 :                 addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
     947           1 :                         // Add a *levelIter for point iterators. Because we don't call
     948           1 :                         // initRangeDel, the levelIter will close and forget the range
     949           1 :                         // deletion iterator when it steps on to a new file. Surfacing range
     950           1 :                         // deletions to compactions are handled below.
     951           1 :                         iters = append(iters, newLevelIter(context.Background(),
     952           1 :                                 iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
     953           1 :                                         compaction: true,
     954           1 :                                         bufferPool: &c.bufferPool,
     955           1 :                                 }))
     956           1 :                         // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
     957           1 :                         // deletions into memory upfront. (See #2015, which reverted this.) There
     958           1 :                         // will be no user keys that are split between sstables within a level in
     959           1 :                         // Cockroach 23.1, which unblocks this optimization.
     960           1 : 
     961           1 :                         // Add the range deletion iterator for each file as an independent level
     962           1 :                         // in mergingIter, as opposed to making a levelIter out of those. This
     963           1 :                         // is safer as levelIter expects all keys coming from underlying
     964           1 :                         // iterators to be in order. Due to compaction / tombstone writing
     965           1 :                         // logic in finishOutput(), it is possible for range tombstones to not
     966           1 :                         // be strictly ordered across all files in one level.
     967           1 :                         //
     968           1 :                         // Consider this example from the metamorphic tests (also repeated in
     969           1 :                         // finishOutput()), consisting of three L3 files with their bounds
     970           1 :                         // specified in square brackets next to the file name:
     971           1 :                         //
     972           1 :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
     973           1 :                         // tmgc#391,MERGE [786e627a]
     974           1 :                         // tmgc-udkatvs#331,RANGEDEL
     975           1 :                         //
     976           1 :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
     977           1 :                         // tmgc#384,MERGE [666c7070]
     978           1 :                         // tmgc-tvsalezade#383,RANGEDEL
     979           1 :                         // tmgc-tvsalezade#331,RANGEDEL
     980           1 :                         //
     981           1 :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
     982           1 :                         // tmgc-tvsalezade#383,RANGEDEL
     983           1 :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
     984           1 :                         // tmgc-tvsalezade#356,RANGEDEL
     985           1 :                         //
     986           1 :                         // Here, the range tombstone in 000240.sst falls "after" one in
     987           1 :                         // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
     988           1 :                         // levelIter's purposes. While each file is still consistent before its
     989           1 :                         // bounds, it's safer to have all rangedel iterators be visible to
     990           1 :                         // mergingIter.
     991           1 :                         iter := level.files.Iter()
     992           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
     993           1 :                                 rangeDelIter, closer, err := c.newRangeDelIter(
     994           1 :                                         newIters, iter.Take(), iterOpts, l)
     995           1 :                                 if err != nil {
     996           0 :                                         // The error will already be annotated with the BackingFileNum, so
     997           0 :                                         // we annotate it with the FileNum.
     998           0 :                                         return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
     999           0 :                                 }
    1000           1 :                                 if rangeDelIter == nil {
    1001           1 :                                         continue
    1002             :                                 }
    1003           1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
    1004           1 :                                 c.closers = append(c.closers, closer)
    1005             :                         }
    1006             : 
    1007             :                         // Check if this level has any range keys.
    1008           1 :                         hasRangeKeys := false
    1009           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1010           1 :                                 if f.HasRangeKeys {
    1011           1 :                                         hasRangeKeys = true
    1012           1 :                                         break
    1013             :                                 }
    1014             :                         }
    1015           1 :                         if hasRangeKeys {
    1016           1 :                                 li := &keyspanimpl.LevelIter{}
    1017           1 :                                 newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
    1018           1 :                                         iter, err := newRangeKeyIter(file, iterOptions)
    1019           1 :                                         if err != nil {
    1020           0 :                                                 return nil, err
    1021           1 :                                         } else if iter == nil {
    1022           0 :                                                 return emptyKeyspanIter, nil
    1023           0 :                                         }
    1024             :                                         // Ensure that the range key iter is not closed until the compaction is
    1025             :                                         // finished. This is necessary because range key processing
    1026             :                                         // requires the range keys to be held in memory for up to the
    1027             :                                         // lifetime of the compaction.
    1028           1 :                                         c.closers = append(c.closers, iter)
    1029           1 :                                         iter = noCloseIter{iter}
    1030           1 : 
    1031           1 :                                         // We do not need to truncate range keys to sstable boundaries, or
    1032           1 :                                         // only read within the file's atomic compaction units, unlike with
    1033           1 :                                         // range tombstones. This is because range keys were added after we
    1034           1 :                                         // stopped splitting user keys across sstables, so all the range keys
    1035           1 :                                         // in this sstable must wholly lie within the file's bounds.
    1036           1 :                                         return iter, err
    1037             :                                 }
    1038           1 :                                 li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
    1039           1 :                                 rangeKeyIters = append(rangeKeyIters, li)
    1040             :                         }
    1041           1 :                         return nil
    1042             :                 }
    1043             : 
    1044           1 :                 for i := range c.inputs {
    1045           1 :                         // If the level is annotated with l0SublevelInfo, expand it into one
    1046           1 :                         // level per sublevel.
    1047           1 :                         // TODO(jackson): Perform this expansion even earlier when we pick the
    1048           1 :                         // compaction?
    1049           1 :                         if len(c.inputs[i].l0SublevelInfo) > 0 {
    1050           1 :                                 for _, info := range c.startLevel.l0SublevelInfo {
    1051           1 :                                         sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
    1052           1 :                                         if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
    1053           0 :                                                 return nil, err
    1054           0 :                                         }
    1055             :                                 }
    1056           1 :                                 continue
    1057             :                         }
    1058           1 :                         if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
    1059           0 :                                 return nil, err
    1060           0 :                         }
    1061             :                 }
    1062             :         }
    1063             : 
    1064             :         // If there's only one constituent point iterator, we can avoid the overhead
    1065             :         // of a *mergingIter. This is possible, for example, when performing a flush
    1066             :         // of a single memtable. Otherwise, combine all the iterators into a merging
    1067             :         // iter.
    1068           1 :         iter := iters[0]
    1069           1 :         if len(iters) > 1 {
    1070           1 :                 iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
    1071           1 :         }
    1072             : 
    1073             :         // In normal operation, levelIter iterates over the point operations in a
    1074             :         // level, and initializes a rangeDelIter pointer for the range deletions in
    1075             :         // each table. During compaction, we want to iterate over the merged view of
    1076             :         // point operations and range deletions. In order to do this we create one
    1077             :         // levelIter per level to iterate over the point operations, and collect up
    1078             :         // all the range deletion files.
    1079             :         //
    1080             :         // The range deletion levels are combined with a keyspanimpl.MergingIter. The
    1081             :         // resulting merged rangedel iterator is then included using an
    1082             :         // InterleavingIter.
    1083             :         // TODO(jackson): Consider using a defragmenting iterator to stitch together
    1084             :         // logical range deletions that were fragmented due to previous file
    1085             :         // boundaries.
    1086           1 :         if len(rangeDelIters) > 0 {
    1087           1 :                 mi := &keyspanimpl.MergingIter{}
    1088           1 :                 mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
    1089           1 :                 c.rangeDelInterleaving.Init(c.comparer, iter, mi, keyspan.InterleavingIterOpts{})
    1090           1 :                 iter = &c.rangeDelInterleaving
    1091           1 :         }
    1092             : 
    1093             :         // If there are range key iterators, we need to combine them using
    1094             :         // keyspanimpl.MergingIter, and then interleave them among the points.
    1095           1 :         if len(rangeKeyIters) > 0 {
    1096           1 :                 mi := &keyspanimpl.MergingIter{}
    1097           1 :                 mi.Init(c.cmp, rangeKeyCompactionTransform(c.equal, snapshots, c.elideRangeKey), new(keyspanimpl.MergingBuffers), rangeKeyIters...)
    1098           1 :                 di := &keyspan.DefragmentingIter{}
    1099           1 :                 di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
    1100           1 :                 c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{})
    1101           1 :                 iter = &c.rangeKeyInterleaving
    1102           1 :         }
    1103           1 :         return iter, nil
    1104             : }
    1105             : 
    1106             : func (c *compaction) newRangeDelIter(
    1107             :         newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Level,
    1108           1 : ) (keyspan.FragmentIterator, io.Closer, error) {
    1109           1 :         opts.level = l
    1110           1 :         iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
    1111           1 :                 internalIterOpts{
    1112           1 :                         compaction: true,
    1113           1 :                         bufferPool: &c.bufferPool,
    1114           1 :                 }, iterRangeDeletions)
    1115           1 :         if err != nil {
    1116           0 :                 return nil, nil, err
    1117           1 :         } else if iterSet.rangeDeletion == nil {
    1118           1 :                 // The file doesn't contain any range deletions.
    1119           1 :                 return nil, nil, nil
    1120           1 :         }
    1121             :         // Ensure that rangeDelIter is not closed until the compaction is
    1122             :         // finished. This is necessary because range tombstone processing
    1123             :         // requires the range tombstones to be held in memory for up to the
    1124             :         // lifetime of the compaction.
    1125           1 :         return noCloseIter{iterSet.rangeDeletion}, iterSet.rangeDeletion, nil
    1126             : }
    1127             : 
    1128           0 : func (c *compaction) String() string {
    1129           0 :         if len(c.flushing) != 0 {
    1130           0 :                 return "flush\n"
    1131           0 :         }
    1132             : 
    1133           0 :         var buf bytes.Buffer
    1134           0 :         for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
    1135           0 :                 i := level - c.startLevel.level
    1136           0 :                 fmt.Fprintf(&buf, "%d:", level)
    1137           0 :                 iter := c.inputs[i].files.Iter()
    1138           0 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1139           0 :                         fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
    1140           0 :                 }
    1141           0 :                 fmt.Fprintf(&buf, "\n")
    1142             :         }
    1143           0 :         return buf.String()
    1144             : }
    1145             : 
    1146             : type manualCompaction struct {
    1147             :         // Count of the retries either due to too many concurrent compactions, or a
    1148             :         // concurrent compaction to overlapping levels.
    1149             :         retries     int
    1150             :         level       int
    1151             :         outputLevel int
    1152             :         done        chan error
    1153             :         start       []byte
    1154             :         end         []byte
    1155             :         split       bool
    1156             : }
    1157             : 
    1158             : type readCompaction struct {
    1159             :         level int
    1160             :         // [start, end] key ranges are used for de-duping.
    1161             :         start []byte
    1162             :         end   []byte
    1163             : 
    1164             :         // The file associated with the compaction.
    1165             :         // If the file no longer belongs in the same
    1166             :         // level, then we skip the compaction.
    1167             :         fileNum base.FileNum
    1168             : }
    1169             : 
    1170           1 : func (d *DB) addInProgressCompaction(c *compaction) {
    1171           1 :         d.mu.compact.inProgress[c] = struct{}{}
    1172           1 :         var isBase, isIntraL0 bool
    1173           1 :         for _, cl := range c.inputs {
    1174           1 :                 iter := cl.files.Iter()
    1175           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1176           1 :                         if f.IsCompacting() {
    1177           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1178           0 :                         }
    1179           1 :                         f.SetCompactionState(manifest.CompactionStateCompacting)
    1180           1 :                         if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
    1181           1 :                                 if c.outputLevel.level == 0 {
    1182           1 :                                         f.IsIntraL0Compacting = true
    1183           1 :                                         isIntraL0 = true
    1184           1 :                                 } else {
    1185           1 :                                         isBase = true
    1186           1 :                                 }
    1187             :                         }
    1188             :                 }
    1189             :         }
    1190             : 
    1191           1 :         if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
    1192           1 :                 l0Inputs := []manifest.LevelSlice{c.startLevel.files}
    1193           1 :                 if isIntraL0 {
    1194           1 :                         l0Inputs = append(l0Inputs, c.outputLevel.files)
    1195           1 :                 }
    1196           1 :                 if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
    1197           0 :                         d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
    1198           0 :                 }
    1199             :         }
    1200             : }
    1201             : 
    1202             : // Removes compaction markers from files in a compaction. The rollback parameter
    1203             : // indicates whether the compaction state should be rolled back to its original
    1204             : // state in the case of an unsuccessful compaction.
    1205             : //
    1206             : // DB.mu must be held when calling this method, however this method can drop and
    1207             : // re-acquire that mutex. All writes to the manifest for this compaction should
    1208             : // have completed by this point.
    1209           1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
    1210           1 :         c.versionEditApplied = true
    1211           1 :         for _, cl := range c.inputs {
    1212           1 :                 iter := cl.files.Iter()
    1213           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1214           1 :                         if !f.IsCompacting() {
    1215           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1216           0 :                         }
    1217           1 :                         if !rollback {
    1218           1 :                                 // On success all compactions other than move-compactions transition the
    1219           1 :                                 // file into the Compacted state. Move-compacted files become eligible
    1220           1 :                                 // for compaction again and transition back to NotCompacting.
    1221           1 :                                 if c.kind != compactionKindMove {
    1222           1 :                                         f.SetCompactionState(manifest.CompactionStateCompacted)
    1223           1 :                                 } else {
    1224           1 :                                         f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1225           1 :                                 }
    1226           1 :                         } else {
    1227           1 :                                 // Else, on rollback, all input files unconditionally transition back to
    1228           1 :                                 // NotCompacting.
    1229           1 :                                 f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1230           1 :                         }
    1231           1 :                         f.IsIntraL0Compacting = false
    1232             :                 }
    1233             :         }
    1234           1 :         l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
    1235           1 :         func() {
    1236           1 :                 // InitCompactingFileInfo requires that no other manifest writes be
    1237           1 :                 // happening in parallel with it, i.e. we're not in the midst of installing
    1238           1 :                 // another version. Otherwise, it's possible that we've created another
    1239           1 :                 // L0Sublevels instance, but not added it to the versions list, causing
    1240           1 :                 // all the indices in FileMetadata to be inaccurate. To ensure this,
    1241           1 :                 // grab the manifest lock.
    1242           1 :                 d.mu.versions.logLock()
    1243           1 :                 defer d.mu.versions.logUnlock()
    1244           1 :                 d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
    1245           1 :         }()
    1246             : }
    1247             : 
    1248           1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
    1249           1 :         if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
    1250           1 :                 d.diskAvailBytes.Store(space.AvailBytes)
    1251           1 :                 return space.AvailBytes
    1252           1 :         } else if !errors.Is(err, vfs.ErrUnsupported) {
    1253           0 :                 d.opts.EventListener.BackgroundError(err)
    1254           0 :         }
    1255           1 :         return d.diskAvailBytes.Load()
    1256             : }
    1257             : 
    1258           1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
    1259           1 :         var pacerInfo deletionPacerInfo
    1260           1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
    1261           1 :         // but in practice this was observed to take constant time, regardless of
    1262           1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
    1263           1 :         // take 10 microseconds or less.
    1264           1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
    1265           1 :         d.mu.Lock()
    1266           1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
    1267           1 :         pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
    1268           1 :         d.mu.Unlock()
    1269           1 :         return pacerInfo
    1270           1 : }
    1271             : 
    1272             : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
    1273           1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
    1274           1 :         d.mu.Lock()
    1275           1 :         d.mu.versions.metrics.Table.ObsoleteCount--
    1276           1 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
    1277           1 :         if isLocal {
    1278           1 :                 d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
    1279           1 :         }
    1280           1 :         d.mu.Unlock()
    1281             : }
    1282             : 
    1283             : // maybeScheduleFlush schedules a flush if necessary.
    1284             : //
    1285             : // d.mu must be held when calling this.
    1286           1 : func (d *DB) maybeScheduleFlush() {
    1287           1 :         if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
    1288           1 :                 return
    1289           1 :         }
    1290           1 :         if len(d.mu.mem.queue) <= 1 {
    1291           1 :                 return
    1292           1 :         }
    1293             : 
    1294           1 :         if !d.passedFlushThreshold() {
    1295           1 :                 return
    1296           1 :         }
    1297             : 
    1298           1 :         d.mu.compact.flushing = true
    1299           1 :         go d.flush()
    1300             : }
    1301             : 
    1302           1 : func (d *DB) passedFlushThreshold() bool {
    1303           1 :         var n int
    1304           1 :         var size uint64
    1305           1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1306           1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1307           1 :                         break
    1308             :                 }
    1309           1 :                 if d.mu.mem.queue[n].flushForced {
    1310           1 :                         // A flush was forced. Pretend the memtable size is the configured
    1311           1 :                         // size. See minFlushSize below.
    1312           1 :                         size += d.opts.MemTableSize
    1313           1 :                 } else {
    1314           1 :                         size += d.mu.mem.queue[n].totalBytes()
    1315           1 :                 }
    1316             :         }
    1317           1 :         if n == 0 {
    1318           1 :                 // None of the immutable memtables are ready for flushing.
    1319           1 :                 return false
    1320           1 :         }
    1321             : 
    1322             :         // Only flush once the sum of the queued memtable sizes exceeds half the
    1323             :         // configured memtable size. This prevents flushing of memtables at startup
    1324             :         // while we're undergoing the ramp period on the memtable size. See
    1325             :         // DB.newMemTable().
    1326           1 :         minFlushSize := d.opts.MemTableSize / 2
    1327           1 :         return size >= minFlushSize
    1328             : }
    1329             : 
    1330           1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
    1331           1 :         var mem *flushableEntry
    1332           1 :         for _, m := range d.mu.mem.queue {
    1333           1 :                 if m.flushable == tbl {
    1334           1 :                         mem = m
    1335           1 :                         break
    1336             :                 }
    1337             :         }
    1338           1 :         if mem == nil || mem.flushForced {
    1339           1 :                 return
    1340           1 :         }
    1341           1 :         deadline := d.timeNow().Add(dur)
    1342           1 :         if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
    1343           1 :                 // Already scheduled to flush sooner than within `dur`.
    1344           1 :                 return
    1345           1 :         }
    1346           1 :         mem.delayedFlushForcedAt = deadline
    1347           1 :         go func() {
    1348           1 :                 timer := time.NewTimer(dur)
    1349           1 :                 defer timer.Stop()
    1350           1 : 
    1351           1 :                 select {
    1352           1 :                 case <-d.closedCh:
    1353           1 :                         return
    1354           1 :                 case <-mem.flushed:
    1355           1 :                         return
    1356           1 :                 case <-timer.C:
    1357           1 :                         d.commit.mu.Lock()
    1358           1 :                         defer d.commit.mu.Unlock()
    1359           1 :                         d.mu.Lock()
    1360           1 :                         defer d.mu.Unlock()
    1361           1 : 
    1362           1 :                         // NB: The timer may fire concurrently with a call to Close.  If a
    1363           1 :                         // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
    1364           1 :                         // and it's too late to flush anything. Otherwise, the Close call
    1365           1 :                         // will block on locking d.mu until we've finished scheduling the
    1366           1 :                         // flush and set `d.mu.compact.flushing` to true. Close will wait
    1367           1 :                         // for the current flush to complete.
    1368           1 :                         if d.closed.Load() != nil {
    1369           0 :                                 return
    1370           0 :                         }
    1371             : 
    1372           1 :                         if d.mu.mem.mutable == tbl {
    1373           1 :                                 d.makeRoomForWrite(nil)
    1374           1 :                         } else {
    1375           1 :                                 mem.flushForced = true
    1376           1 :                         }
    1377           1 :                         d.maybeScheduleFlush()
    1378             :                 }
    1379             :         }()
    1380             : }
    1381             : 
    1382           1 : func (d *DB) flush() {
    1383           1 :         pprof.Do(context.Background(), flushLabels, func(context.Context) {
    1384           1 :                 flushingWorkStart := time.Now()
    1385           1 :                 d.mu.Lock()
    1386           1 :                 defer d.mu.Unlock()
    1387           1 :                 idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
    1388           1 :                 var bytesFlushed uint64
    1389           1 :                 var err error
    1390           1 :                 if bytesFlushed, err = d.flush1(); err != nil {
    1391           0 :                         // TODO(peter): count consecutive flush errors and backoff.
    1392           0 :                         d.opts.EventListener.BackgroundError(err)
    1393           0 :                 }
    1394           1 :                 d.mu.compact.flushing = false
    1395           1 :                 d.mu.compact.noOngoingFlushStartTime = time.Now()
    1396           1 :                 workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
    1397           1 :                 d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
    1398           1 :                 d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
    1399           1 :                 d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
    1400           1 :                 // More flush work may have arrived while we were flushing, so schedule
    1401           1 :                 // another flush if needed.
    1402           1 :                 d.maybeScheduleFlush()
    1403           1 :                 // The flush may have produced too many files in a level, so schedule a
    1404           1 :                 // compaction if needed.
    1405           1 :                 d.maybeScheduleCompaction()
    1406           1 :                 d.mu.compact.cond.Broadcast()
    1407             :         })
    1408             : }
    1409             : 
    1410             : // runIngestFlush is used to generate a flush version edit for sstables which
    1411             : // were ingested as flushables. Both DB.mu and the manifest lock must be held
    1412             : // while runIngestFlush is called.
    1413           1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
    1414           1 :         if len(c.flushing) != 1 {
    1415           0 :                 panic("pebble: ingestedFlushable must be flushed one at a time.")
    1416             :         }
    1417             : 
    1418             :         // Construct the VersionEdit, levelMetrics etc.
    1419           1 :         c.metrics = make(map[int]*LevelMetrics, numLevels)
    1420           1 :         // Finding the target level for ingestion must use the latest version
    1421           1 :         // after the logLock has been acquired.
    1422           1 :         c.version = d.mu.versions.currentVersion()
    1423           1 : 
    1424           1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1425           1 :         iterOpts := IterOptions{logger: d.opts.Logger}
    1426           1 :         ve := &versionEdit{}
    1427           1 :         var ingestSplitFiles []ingestSplitFile
    1428           1 :         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1429           1 : 
    1430           1 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    1431           1 :                 levelMetrics := c.metrics[level]
    1432           1 :                 if levelMetrics == nil {
    1433           1 :                         levelMetrics = &LevelMetrics{}
    1434           1 :                         c.metrics[level] = levelMetrics
    1435           1 :                 }
    1436           1 :                 levelMetrics.NumFiles--
    1437           1 :                 levelMetrics.Size -= int64(m.Size)
    1438           1 :                 for i := range added {
    1439           1 :                         levelMetrics.NumFiles++
    1440           1 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    1441           1 :                 }
    1442             :         }
    1443             : 
    1444           1 :         suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
    1445           1 :                 d.FormatMajorVersion() >= FormatVirtualSSTables
    1446           1 : 
    1447           1 :         if suggestSplit || ingestFlushable.exciseSpan.Valid() {
    1448           1 :                 // We could add deleted files to ve.
    1449           1 :                 ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
    1450           1 :         }
    1451             : 
    1452           1 :         replacedFiles := make(map[base.FileNum][]newFileEntry)
    1453           1 :         for _, file := range ingestFlushable.files {
    1454           1 :                 var fileToSplit *fileMetadata
    1455           1 :                 var level int
    1456           1 : 
    1457           1 :                 // This file fits perfectly within the excise span, so we can slot it at L6.
    1458           1 :                 if ingestFlushable.exciseSpan.Valid() &&
    1459           1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
    1460           1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
    1461           1 :                         level = 6
    1462           1 :                 } else {
    1463           1 :                         var err error
    1464           1 :                         level, fileToSplit, err = ingestTargetLevel(
    1465           1 :                                 d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
    1466           1 :                                 c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
    1467           1 :                                 suggestSplit,
    1468           1 :                         )
    1469           1 :                         if err != nil {
    1470           0 :                                 return nil, err
    1471           0 :                         }
    1472             :                 }
    1473             : 
    1474             :                 // Add the current flushableIngest file to the version.
    1475           1 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
    1476           1 :                 if fileToSplit != nil {
    1477           1 :                         ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
    1478           1 :                                 ingestFile: file.FileMetadata,
    1479           1 :                                 splitFile:  fileToSplit,
    1480           1 :                                 level:      level,
    1481           1 :                         })
    1482           1 :                 }
    1483           1 :                 levelMetrics := c.metrics[level]
    1484           1 :                 if levelMetrics == nil {
    1485           1 :                         levelMetrics = &LevelMetrics{}
    1486           1 :                         c.metrics[level] = levelMetrics
    1487           1 :                 }
    1488           1 :                 levelMetrics.BytesIngested += file.Size
    1489           1 :                 levelMetrics.TablesIngested++
    1490             :         }
    1491           1 :         if ingestFlushable.exciseSpan.Valid() {
    1492           1 :                 // Iterate through all levels and find files that intersect with exciseSpan.
    1493           1 :                 for l := range c.version.Levels {
    1494           1 :                         overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
    1495           1 :                         iter := overlaps.Iter()
    1496           1 : 
    1497           1 :                         for m := iter.First(); m != nil; m = iter.Next() {
    1498           1 :                                 newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
    1499           1 :                                 if err != nil {
    1500           0 :                                         return nil, err
    1501           0 :                                 }
    1502             : 
    1503           1 :                                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    1504           1 :                                         Level:   l,
    1505           1 :                                         FileNum: m.FileNum,
    1506           1 :                                 }]; !ok {
    1507           1 :                                         // We did not excise this file.
    1508           1 :                                         continue
    1509             :                                 }
    1510           1 :                                 replacedFiles[m.FileNum] = newFiles
    1511           1 :                                 updateLevelMetricsOnExcise(m, l, newFiles)
    1512             :                         }
    1513             :                 }
    1514             :         }
    1515             : 
    1516           1 :         if len(ingestSplitFiles) > 0 {
    1517           1 :                 if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
    1518           0 :                         return nil, err
    1519           0 :                 }
    1520             :         }
    1521             : 
    1522           1 :         return ve, nil
    1523             : }
    1524             : 
    1525             : // flush runs a compaction that copies the immutable memtables from memory to
    1526             : // disk.
    1527             : //
    1528             : // d.mu must be held when calling this, but the mutex may be dropped and
    1529             : // re-acquired during the course of this method.
    1530           1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
    1531           1 :         // NB: The flushable queue can contain flushables of type ingestedFlushable.
    1532           1 :         // The sstables in ingestedFlushable.files must be placed into the appropriate
    1533           1 :         // level in the lsm. Let's say the flushable queue contains a prefix of
    1534           1 :         // regular immutable memtables, then an ingestedFlushable, and then the
    1535           1 :         // mutable memtable. When the flush of the ingestedFlushable is performed,
    1536           1 :         // it needs an updated view of the lsm. That is, the prefix of immutable
    1537           1 :         // memtables must have already been flushed. Similarly, if there are two
    1538           1 :         // contiguous ingestedFlushables in the queue, then the first flushable must
    1539           1 :         // be flushed, so that the second flushable can see an updated view of the
    1540           1 :         // lsm.
    1541           1 :         //
    1542           1 :         // Given the above, we restrict flushes to either some prefix of regular
    1543           1 :         // memtables, or a single flushable of type ingestedFlushable. The DB.flush
    1544           1 :         // function will call DB.maybeScheduleFlush again, so a new flush to finish
    1545           1 :         // the remaining flush work should be scheduled right away.
    1546           1 :         //
    1547           1 :         // NB: Large batches placed in the flushable queue share the WAL with the
    1548           1 :         // previous memtable in the queue. We must ensure the property that both the
    1549           1 :         // large batch and the memtable with which it shares a WAL are flushed
    1550           1 :         // together. The property ensures that the minimum unflushed log number
    1551           1 :         // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
    1552           1 :         // returns true, and since the large batch will always be placed right after
    1553           1 :         // the memtable with which it shares a WAL, the property is naturally
    1554           1 :         // ensured. The large batch will always be placed after the memtable with
    1555           1 :         // which it shares a WAL because we ensure it in DB.commitWrite by holding
    1556           1 :         // the commitPipeline.mu and then holding DB.mu. As an extra defensive
    1557           1 :         // measure, if we try to flush the memtable without also flushing the
    1558           1 :         // flushable batch in the same flush, since the memtable and flushableBatch
    1559           1 :         // have the same logNum, the logNum invariant check below will trigger.
    1560           1 :         var n, inputs int
    1561           1 :         var inputBytes uint64
    1562           1 :         var ingest bool
    1563           1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1564           1 :                 if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
    1565           1 :                         if n == 0 {
    1566           1 :                                 // The first flushable is of type ingestedFlushable. Since these
    1567           1 :                                 // must be flushed individually, we perform a flush for just
    1568           1 :                                 // this.
    1569           1 :                                 if !f.readyForFlush() {
    1570           0 :                                         // This check is almost unnecessary, but we guard against it
    1571           0 :                                         // just in case this invariant changes in the future.
    1572           0 :                                         panic("pebble: ingestedFlushable should always be ready to flush.")
    1573             :                                 }
    1574             :                                 // By setting n = 1, we ensure that the first flushable(n == 0)
    1575             :                                 // is scheduled for a flush. The number of tables added is equal to the
    1576             :                                 // number of files in the ingest operation.
    1577           1 :                                 n = 1
    1578           1 :                                 inputs = len(f.files)
    1579           1 :                                 ingest = true
    1580           1 :                                 break
    1581           1 :                         } else {
    1582           1 :                                 // There was some prefix of flushables which weren't of type
    1583           1 :                                 // ingestedFlushable. So, perform a flush for those.
    1584           1 :                                 break
    1585             :                         }
    1586             :                 }
    1587           1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1588           0 :                         break
    1589             :                 }
    1590           1 :                 inputBytes += d.mu.mem.queue[n].inuseBytes()
    1591             :         }
    1592           1 :         if n == 0 {
    1593           0 :                 // None of the immutable memtables are ready for flushing.
    1594           0 :                 return 0, nil
    1595           0 :         }
    1596           1 :         if !ingest {
    1597           1 :                 // Flushes of memtables add the prefix of n memtables from the flushable
    1598           1 :                 // queue.
    1599           1 :                 inputs = n
    1600           1 :         }
    1601             : 
    1602             :         // Require that every memtable being flushed has a log number less than the
    1603             :         // new minimum unflushed log number.
    1604           1 :         minUnflushedLogNum := d.mu.mem.queue[n].logNum
    1605           1 :         if !d.opts.DisableWAL {
    1606           1 :                 for i := 0; i < n; i++ {
    1607           1 :                         if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
    1608           0 :                                 panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
    1609           0 :                                         n,
    1610           0 :                                         i, d.mu.mem.queue[i].flushable, logNum,
    1611           0 :                                         n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
    1612             :                         }
    1613             :                 }
    1614             :         }
    1615             : 
    1616           1 :         c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
    1617           1 :                 d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
    1618           1 :         if err != nil {
    1619           0 :                 return 0, err
    1620           0 :         }
    1621           1 :         d.addInProgressCompaction(c)
    1622           1 : 
    1623           1 :         jobID := d.newJobIDLocked()
    1624           1 :         d.opts.EventListener.FlushBegin(FlushInfo{
    1625           1 :                 JobID:      int(jobID),
    1626           1 :                 Input:      inputs,
    1627           1 :                 InputBytes: inputBytes,
    1628           1 :                 Ingest:     ingest,
    1629           1 :         })
    1630           1 :         startTime := d.timeNow()
    1631           1 : 
    1632           1 :         var ve *manifest.VersionEdit
    1633           1 :         var pendingOutputs []compactionOutput
    1634           1 :         var stats compactStats
    1635           1 :         // To determine the target level of the files in the ingestedFlushable, we
    1636           1 :         // need to acquire the logLock, and not release it for that duration. Since,
    1637           1 :         // we need to acquire the logLock below to perform the logAndApply step
    1638           1 :         // anyway, we create the VersionEdit for ingestedFlushable outside of
    1639           1 :         // runCompaction. For all other flush cases, we construct the VersionEdit
    1640           1 :         // inside runCompaction.
    1641           1 :         if c.kind != compactionKindIngestedFlushable {
    1642           1 :                 ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
    1643           1 :         }
    1644             : 
    1645             :         // Acquire logLock. This will be released either on an error, by way of
    1646             :         // logUnlock, or through a call to logAndApply if there is no error.
    1647           1 :         d.mu.versions.logLock()
    1648           1 : 
    1649           1 :         if c.kind == compactionKindIngestedFlushable {
    1650           1 :                 ve, err = d.runIngestFlush(c)
    1651           1 :         }
    1652             : 
    1653           1 :         info := FlushInfo{
    1654           1 :                 JobID:      int(jobID),
    1655           1 :                 Input:      inputs,
    1656           1 :                 InputBytes: inputBytes,
    1657           1 :                 Duration:   d.timeNow().Sub(startTime),
    1658           1 :                 Done:       true,
    1659           1 :                 Ingest:     ingest,
    1660           1 :                 Err:        err,
    1661           1 :         }
    1662           1 :         if err == nil {
    1663           1 :                 for i := range ve.NewFiles {
    1664           1 :                         e := &ve.NewFiles[i]
    1665           1 :                         info.Output = append(info.Output, e.Meta.TableInfo())
    1666           1 :                         // Ingested tables are not necessarily flushed to L0. Record the level of
    1667           1 :                         // each ingested file explicitly.
    1668           1 :                         if ingest {
    1669           1 :                                 info.IngestLevels = append(info.IngestLevels, e.Level)
    1670           1 :                         }
    1671             :                 }
    1672           1 :                 if len(ve.NewFiles) == 0 {
    1673           1 :                         info.Err = errEmptyTable
    1674           1 :                 }
    1675             : 
    1676             :                 // The flush succeeded or it produced an empty sstable. In either case we
    1677             :                 // want to bump the minimum unflushed log number to the log number of the
    1678             :                 // oldest unflushed memtable.
    1679           1 :                 ve.MinUnflushedLogNum = minUnflushedLogNum
    1680           1 :                 if c.kind != compactionKindIngestedFlushable {
    1681           1 :                         metrics := c.metrics[0]
    1682           1 :                         if d.opts.DisableWAL {
    1683           1 :                                 // If the WAL is disabled, every flushable has a zero [logSize],
    1684           1 :                                 // resulting in zero bytes in. Instead, use the number of bytes we
    1685           1 :                                 // flushed as the BytesIn. This ensures we get a reasonable w-amp
    1686           1 :                                 // calculation even when the WAL is disabled.
    1687           1 :                                 metrics.BytesIn = metrics.BytesFlushed
    1688           1 :                         } else {
    1689           1 :                                 metrics := c.metrics[0]
    1690           1 :                                 for i := 0; i < n; i++ {
    1691           1 :                                         metrics.BytesIn += d.mu.mem.queue[i].logSize
    1692           1 :                                 }
    1693             :                         }
    1694           1 :                 } else {
    1695           1 :                         // c.kind == compactionKindIngestedFlushable && we could have deleted files due
    1696           1 :                         // to ingest-time splits or excises.
    1697           1 :                         ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
    1698           1 :                         for c2 := range d.mu.compact.inProgress {
    1699           1 :                                 // Check if this compaction overlaps with the excise span. Note that just
    1700           1 :                                 // checking if the inputs individually overlap with the excise span
    1701           1 :                                 // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    1702           1 :                                 // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    1703           1 :                                 // doing a [c,d) excise at the same time as this compaction, we will have
    1704           1 :                                 // to error out the whole compaction as we can't guarantee it hasn't/won't
    1705           1 :                                 // write a file overlapping with the excise span.
    1706           1 :                                 if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
    1707           1 :                                         c2.cancel.Store(true)
    1708           1 :                                         continue
    1709             :                                 }
    1710             :                         }
    1711             : 
    1712           1 :                         if len(ve.DeletedFiles) > 0 {
    1713           1 :                                 // Iterate through all other compactions, and check if their inputs have
    1714           1 :                                 // been replaced due to an ingest-time split or excise. In that case,
    1715           1 :                                 // cancel the compaction.
    1716           1 :                                 for c2 := range d.mu.compact.inProgress {
    1717           1 :                                         for i := range c2.inputs {
    1718           1 :                                                 iter := c2.inputs[i].files.Iter()
    1719           1 :                                                 for f := iter.First(); f != nil; f = iter.Next() {
    1720           1 :                                                         if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
    1721           1 :                                                                 c2.cancel.Store(true)
    1722           1 :                                                                 break
    1723             :                                                         }
    1724             :                                                 }
    1725             :                                         }
    1726             :                                 }
    1727             :                         }
    1728             :                 }
    1729           1 :                 err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
    1730           1 :                         func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
    1731           1 :                 if err != nil {
    1732           0 :                         info.Err = err
    1733           0 :                         // TODO(peter): untested.
    1734           0 :                         for _, f := range pendingOutputs {
    1735           0 :                                 // Note that the FileBacking for the file metadata might not have
    1736           0 :                                 // been set yet. So, we directly use the FileNum. Since these
    1737           0 :                                 // files were generated as compaction outputs, these must be
    1738           0 :                                 // physical files on disk. This property might not hold once
    1739           0 :                                 // https://github.com/cockroachdb/pebble/issues/389 is
    1740           0 :                                 // implemented if #389 creates virtual sstables as output files.
    1741           0 :                                 d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
    1742           0 :                                         fileInfo: fileInfo{
    1743           0 :                                                 FileNum:  base.PhysicalTableDiskFileNum(f.meta.FileNum),
    1744           0 :                                                 FileSize: f.meta.Size,
    1745           0 :                                         },
    1746           0 :                                         isLocal: f.isLocal,
    1747           0 :                                 })
    1748           0 :                         }
    1749           0 :                         d.mu.versions.updateObsoleteTableMetricsLocked()
    1750             :                 }
    1751           0 :         } else {
    1752           0 :                 // We won't be performing the logAndApply step because of the error,
    1753           0 :                 // so logUnlock.
    1754           0 :                 d.mu.versions.logUnlock()
    1755           0 :         }
    1756             : 
    1757             :         // If err != nil, then the flush will be retried, and we will recalculate
    1758             :         // these metrics.
    1759           1 :         if err == nil {
    1760           1 :                 d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
    1761           1 :                 d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
    1762           1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
    1763           1 :                 d.maybeUpdateDeleteCompactionHints(c)
    1764           1 :         }
    1765             : 
    1766           1 :         d.clearCompactingState(c, err != nil)
    1767           1 :         delete(d.mu.compact.inProgress, c)
    1768           1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    1769           1 : 
    1770           1 :         var flushed flushableList
    1771           1 :         if err == nil {
    1772           1 :                 flushed = d.mu.mem.queue[:n]
    1773           1 :                 d.mu.mem.queue = d.mu.mem.queue[n:]
    1774           1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    1775           1 :                 d.updateTableStatsLocked(ve.NewFiles)
    1776           1 :                 if ingest {
    1777           1 :                         d.mu.versions.metrics.Flush.AsIngestCount++
    1778           1 :                         for _, l := range c.metrics {
    1779           1 :                                 d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
    1780           1 :                                 d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
    1781           1 :                         }
    1782             :                 }
    1783           1 :                 d.maybeTransitionSnapshotsToFileOnlyLocked()
    1784             : 
    1785             :         }
    1786             :         // Signal FlushEnd after installing the new readState. This helps for unit
    1787             :         // tests that use the callback to trigger a read using an iterator with
    1788             :         // IterOptions.OnlyReadGuaranteedDurable.
    1789           1 :         info.TotalDuration = d.timeNow().Sub(startTime)
    1790           1 :         d.opts.EventListener.FlushEnd(info)
    1791           1 : 
    1792           1 :         // The order of these operations matters here for ease of testing.
    1793           1 :         // Removing the reader reference first allows tests to be guaranteed that
    1794           1 :         // the memtable reservation has been released by the time a synchronous
    1795           1 :         // flush returns. readerUnrefLocked may also produce obsolete files so the
    1796           1 :         // call to deleteObsoleteFiles must happen after it.
    1797           1 :         for i := range flushed {
    1798           1 :                 flushed[i].readerUnrefLocked(true)
    1799           1 :         }
    1800             : 
    1801           1 :         d.deleteObsoleteFiles(jobID)
    1802           1 : 
    1803           1 :         // Mark all the memtables we flushed as flushed.
    1804           1 :         for i := range flushed {
    1805           1 :                 close(flushed[i].flushed)
    1806           1 :         }
    1807             : 
    1808           1 :         return inputBytes, err
    1809             : }
    1810             : 
    1811             : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
    1812             : // file-only" snapshots to be file-only if all their visible state has been
    1813             : // flushed to sstables.
    1814             : //
    1815             : // REQUIRES: d.mu.
    1816           1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
    1817           1 :         earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
    1818           1 :         currentVersion := d.mu.versions.currentVersion()
    1819           1 :         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
    1820           1 :                 if s.efos == nil {
    1821           1 :                         s = s.next
    1822           1 :                         continue
    1823             :                 }
    1824           1 :                 overlapsFlushable := false
    1825           1 :                 if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
    1826           1 :                         // There are some unflushed keys that are still visible to the EFOS.
    1827           1 :                         // Check if any memtables older than the EFOS contain keys within a
    1828           1 :                         // protected range of the EFOS. If no, we can transition.
    1829           1 :                         protectedRanges := make([]bounded, len(s.efos.protectedRanges))
    1830           1 :                         for i := range s.efos.protectedRanges {
    1831           1 :                                 protectedRanges[i] = s.efos.protectedRanges[i]
    1832           1 :                         }
    1833           1 :                         for i := range d.mu.mem.queue {
    1834           1 :                                 if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
    1835           1 :                                         // All keys in this memtable are newer than the EFOS. Skip this
    1836           1 :                                         // memtable.
    1837           1 :                                         continue
    1838             :                                 }
    1839             :                                 // NB: computePossibleOverlaps could have false positives, such as if
    1840             :                                 // the flushable is a flushable ingest and not a memtable. In that
    1841             :                                 // case we don't open the sstables to check; we just pessimistically
    1842             :                                 // assume an overlap.
    1843           1 :                                 d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
    1844           1 :                                         overlapsFlushable = true
    1845           1 :                                         return stopIteration
    1846           1 :                                 }, protectedRanges...)
    1847           1 :                                 if overlapsFlushable {
    1848           1 :                                         break
    1849             :                                 }
    1850             :                         }
    1851             :                 }
    1852           1 :                 if overlapsFlushable {
    1853           1 :                         s = s.next
    1854           1 :                         continue
    1855             :                 }
    1856           1 :                 currentVersion.Ref()
    1857           1 : 
    1858           1 :                 // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
    1859           1 :                 // case s.next would be nil. Save it before calling it.
    1860           1 :                 next := s.next
    1861           1 :                 _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
    1862           1 :                 s = next
    1863             :         }
    1864             : }
    1865             : 
    1866             : // maybeScheduleCompactionAsync should be used when
    1867             : // we want to possibly schedule a compaction, but don't
    1868             : // want to eat the cost of running maybeScheduleCompaction.
    1869             : // This method should be launched in a separate goroutine.
    1870             : // d.mu must not be held when this is called.
    1871           0 : func (d *DB) maybeScheduleCompactionAsync() {
    1872           0 :         defer d.compactionSchedulers.Done()
    1873           0 : 
    1874           0 :         d.mu.Lock()
    1875           0 :         d.maybeScheduleCompaction()
    1876           0 :         d.mu.Unlock()
    1877           0 : }
    1878             : 
    1879             : // maybeScheduleCompaction schedules a compaction if necessary.
    1880             : //
    1881             : // d.mu must be held when calling this.
    1882           1 : func (d *DB) maybeScheduleCompaction() {
    1883           1 :         d.maybeScheduleCompactionPicker(pickAuto)
    1884           1 : }
    1885             : 
    1886           1 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1887           1 :         return picker.pickAuto(env)
    1888           1 : }
    1889             : 
    1890           1 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
    1891           1 :         return picker.pickElisionOnlyCompaction(env)
    1892           1 : }
    1893             : 
    1894             : // tryScheduleDownloadCompaction tries to start a download compaction.
    1895             : //
    1896             : // Returns true if we started a download compaction (or completed it
    1897             : // immediately because it is a no-op or we hit an error).
    1898             : //
    1899             : // Requires d.mu to be held. Updates d.mu.compact.downloads.
    1900           1 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
    1901           1 :         vers := d.mu.versions.currentVersion()
    1902           1 :         for i := 0; i < len(d.mu.compact.downloads); {
    1903           1 :                 download := d.mu.compact.downloads[i]
    1904           1 :                 switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
    1905           1 :                 case launchedCompaction:
    1906           1 :                         return true
    1907           1 :                 case didNotLaunchCompaction:
    1908           1 :                         // See if we can launch a compaction for another download task.
    1909           1 :                         i++
    1910           1 :                 case downloadTaskCompleted:
    1911           1 :                         // Task is completed and must be removed.
    1912           1 :                         d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
    1913             :                 }
    1914             :         }
    1915           1 :         return false
    1916             : }
    1917             : 
    1918             : // maybeScheduleCompactionPicker schedules a compaction if necessary,
    1919             : // calling `pickFunc` to pick automatic compactions.
    1920             : //
    1921             : // Requires d.mu to be held.
    1922             : func (d *DB) maybeScheduleCompactionPicker(
    1923             :         pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    1924           1 : ) {
    1925           1 :         if d.closed.Load() != nil || d.opts.ReadOnly {
    1926           1 :                 return
    1927           1 :         }
    1928           1 :         maxCompactions := d.opts.MaxConcurrentCompactions()
    1929           1 :         maxDownloads := d.opts.MaxConcurrentDownloads()
    1930           1 : 
    1931           1 :         if d.mu.compact.compactingCount >= maxCompactions &&
    1932           1 :                 (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
    1933           1 :                 if len(d.mu.compact.manual) > 0 {
    1934           1 :                         // Inability to run head blocks later manual compactions.
    1935           1 :                         d.mu.compact.manual[0].retries++
    1936           1 :                 }
    1937           1 :                 return
    1938             :         }
    1939             : 
    1940             :         // Compaction picking needs a coherent view of a Version. In particular, we
    1941             :         // need to exclude concurrent ingestions from making a decision on which level
    1942             :         // to ingest into that conflicts with our compaction
    1943             :         // decision. versionSet.logLock provides the necessary mutual exclusion.
    1944           1 :         d.mu.versions.logLock()
    1945           1 :         defer d.mu.versions.logUnlock()
    1946           1 : 
    1947           1 :         // Check for the closed flag again, in case the DB was closed while we were
    1948           1 :         // waiting for logLock().
    1949           1 :         if d.closed.Load() != nil {
    1950           1 :                 return
    1951           1 :         }
    1952             : 
    1953           1 :         env := compactionEnv{
    1954           1 :                 diskAvailBytes:          d.diskAvailBytes.Load(),
    1955           1 :                 earliestSnapshotSeqNum:  d.mu.snapshots.earliest(),
    1956           1 :                 earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
    1957           1 :         }
    1958           1 : 
    1959           1 :         if d.mu.compact.compactingCount < maxCompactions {
    1960           1 :                 // Check for delete-only compactions first, because they're expected to be
    1961           1 :                 // cheap and reduce future compaction work.
    1962           1 :                 if !d.opts.private.disableDeleteOnlyCompactions &&
    1963           1 :                         !d.opts.DisableAutomaticCompactions &&
    1964           1 :                         len(d.mu.compact.deletionHints) > 0 {
    1965           1 :                         d.tryScheduleDeleteOnlyCompaction()
    1966           1 :                 }
    1967             : 
    1968           1 :                 for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
    1969           1 :                         if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
    1970           1 :                                 // Inability to run head blocks later manual compactions.
    1971           1 :                                 manual.retries++
    1972           1 :                                 break
    1973             :                         }
    1974           1 :                         d.mu.compact.manual = d.mu.compact.manual[1:]
    1975             :                 }
    1976             : 
    1977           1 :                 for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
    1978           1 :                         d.tryScheduleAutoCompaction(env, pickFunc) {
    1979           1 :                 }
    1980             :         }
    1981             : 
    1982           1 :         for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
    1983           1 :                 d.tryScheduleDownloadCompaction(env, maxDownloads) {
    1984           1 :         }
    1985             : }
    1986             : 
    1987             : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
    1988             : // for all files that can be deleted as suggested by deletionHints.
    1989             : //
    1990             : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
    1991           1 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
    1992           1 :         v := d.mu.versions.currentVersion()
    1993           1 :         snapshots := d.mu.snapshots.toSlice()
    1994           1 :         inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
    1995           1 :         d.mu.compact.deletionHints = unresolvedHints
    1996           1 : 
    1997           1 :         if len(inputs) > 0 {
    1998           1 :                 c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
    1999           1 :                 d.mu.compact.compactingCount++
    2000           1 :                 d.addInProgressCompaction(c)
    2001           1 :                 go d.compact(c, nil)
    2002           1 :         }
    2003             : }
    2004             : 
    2005             : // tryScheduleManualCompaction tries to kick off the given manual compaction.
    2006             : //
    2007             : // Returns false if we are not able to run this compaction at this time.
    2008             : //
    2009             : // Requires d.mu to be held.
    2010           1 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
    2011           1 :         v := d.mu.versions.currentVersion()
    2012           1 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2013           1 :         pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
    2014           1 :         if pc == nil {
    2015           1 :                 if !retryLater {
    2016           1 :                         // Manual compaction is a no-op. Signal completion and exit.
    2017           1 :                         manual.done <- nil
    2018           1 :                         return true
    2019           1 :                 }
    2020             :                 // We are not able to run this manual compaction at this time.
    2021           1 :                 return false
    2022             :         }
    2023             : 
    2024           1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    2025           1 :         d.mu.compact.compactingCount++
    2026           1 :         d.addInProgressCompaction(c)
    2027           1 :         go d.compact(c, manual.done)
    2028           1 :         return true
    2029             : }
    2030             : 
    2031             : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
    2032             : //
    2033             : // Returns false if no automatic compactions are necessary or able to run at
    2034             : // this time.
    2035             : //
    2036             : // Requires d.mu to be held.
    2037             : func (d *DB) tryScheduleAutoCompaction(
    2038             :         env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    2039           1 : ) bool {
    2040           1 :         env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2041           1 :         env.readCompactionEnv = readCompactionEnv{
    2042           1 :                 readCompactions:          &d.mu.compact.readCompactions,
    2043           1 :                 flushing:                 d.mu.compact.flushing || d.passedFlushThreshold(),
    2044           1 :                 rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
    2045           1 :         }
    2046           1 :         pc := pickFunc(d.mu.versions.picker, env)
    2047           1 :         if pc == nil {
    2048           1 :                 return false
    2049           1 :         }
    2050           1 :         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    2051           1 :         d.mu.compact.compactingCount++
    2052           1 :         d.addInProgressCompaction(c)
    2053           1 :         go d.compact(c, nil)
    2054           1 :         return true
    2055             : }
    2056             : 
    2057             : // deleteCompactionHintType indicates whether the deleteCompactionHint was
    2058             : // generated from a span containing a range del (point key only), a range key
    2059             : // delete (range key only), or both a point and range key.
    2060             : type deleteCompactionHintType uint8
    2061             : 
    2062             : const (
    2063             :         // NOTE: While these are primarily used as enumeration types, they are also
    2064             :         // used for some bitwise operations. Care should be taken when updating.
    2065             :         deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
    2066             :         deleteCompactionHintTypePointKeyOnly
    2067             :         deleteCompactionHintTypeRangeKeyOnly
    2068             :         deleteCompactionHintTypePointAndRangeKey
    2069             : )
    2070             : 
    2071             : // String implements fmt.Stringer.
    2072           0 : func (h deleteCompactionHintType) String() string {
    2073           0 :         switch h {
    2074           0 :         case deleteCompactionHintTypeUnknown:
    2075           0 :                 return "unknown"
    2076           0 :         case deleteCompactionHintTypePointKeyOnly:
    2077           0 :                 return "point-key-only"
    2078           0 :         case deleteCompactionHintTypeRangeKeyOnly:
    2079           0 :                 return "range-key-only"
    2080           0 :         case deleteCompactionHintTypePointAndRangeKey:
    2081           0 :                 return "point-and-range-key"
    2082           0 :         default:
    2083           0 :                 panic(fmt.Sprintf("unknown hint type: %d", h))
    2084             :         }
    2085             : }
    2086             : 
    2087             : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
    2088             : // keyspan.Keys.
    2089           1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
    2090           1 :         var hintType deleteCompactionHintType
    2091           1 :         for _, k := range keys {
    2092           1 :                 switch k.Kind() {
    2093           1 :                 case base.InternalKeyKindRangeDelete:
    2094           1 :                         hintType |= deleteCompactionHintTypePointKeyOnly
    2095           1 :                 case base.InternalKeyKindRangeKeyDelete:
    2096           1 :                         hintType |= deleteCompactionHintTypeRangeKeyOnly
    2097           0 :                 default:
    2098           0 :                         panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
    2099             :                 }
    2100             :         }
    2101           1 :         return hintType
    2102             : }
    2103             : 
    2104             : // A deleteCompactionHint records a user key and sequence number span that has been
    2105             : // deleted by a range tombstone. A hint is recorded if at least one sstable
    2106             : // falls completely within both the user key and sequence number spans.
    2107             : // Once the tombstones and the observed completely-contained sstables fall
    2108             : // into the same snapshot stripe, a delete-only compaction may delete any
    2109             : // sstables within the range.
    2110             : type deleteCompactionHint struct {
    2111             :         // The type of key span that generated this hint (point key, range key, or
    2112             :         // both).
    2113             :         hintType deleteCompactionHintType
    2114             :         // start and end are user keys specifying a key range [start, end) of
    2115             :         // deleted keys.
    2116             :         start []byte
    2117             :         end   []byte
    2118             :         // The level of the file containing the range tombstone(s) when the hint
    2119             :         // was created. Only lower levels need to be searched for files that may
    2120             :         // be deleted.
    2121             :         tombstoneLevel int
    2122             :         // The file containing the range tombstone(s) that created the hint.
    2123             :         tombstoneFile *fileMetadata
    2124             :         // The smallest and largest sequence numbers of the abutting tombstones
    2125             :         // merged to form this hint. All of a tables' keys must be less than the
    2126             :         // tombstone smallest sequence number to be deleted. All of a tables'
    2127             :         // sequence numbers must fall into the same snapshot stripe as the
    2128             :         // tombstone largest sequence number to be deleted.
    2129             :         tombstoneLargestSeqNum  uint64
    2130             :         tombstoneSmallestSeqNum uint64
    2131             :         // The smallest sequence number of a sstable that was found to be covered
    2132             :         // by this hint. The hint cannot be resolved until this sequence number is
    2133             :         // in the same snapshot stripe as the largest tombstone sequence number.
    2134             :         // This is set when a hint is created, so the LSM may look different and
    2135             :         // notably no longer contain the sstable that contained the key at this
    2136             :         // sequence number.
    2137             :         fileSmallestSeqNum uint64
    2138             : }
    2139             : 
    2140           0 : func (h deleteCompactionHint) String() string {
    2141           0 :         return fmt.Sprintf(
    2142           0 :                 "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
    2143           0 :                 h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
    2144           0 :                 h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
    2145           0 :                 h.hintType,
    2146           0 :         )
    2147           0 : }
    2148             : 
    2149           1 : func (h *deleteCompactionHint) canDelete(cmp Compare, m *fileMetadata, snapshots []uint64) bool {
    2150           1 :         // The file can only be deleted if all of its keys are older than the
    2151           1 :         // earliest tombstone aggregated into the hint.
    2152           1 :         if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
    2153           1 :                 return false
    2154           1 :         }
    2155             : 
    2156             :         // The file's oldest key must  be in the same snapshot stripe as the
    2157             :         // newest tombstone. NB: We already checked the hint's sequence numbers,
    2158             :         // but this file's oldest sequence number might be lower than the hint's
    2159             :         // smallest sequence number despite the file falling within the key range
    2160             :         // if this file was constructed after the hint by a compaction.
    2161           1 :         ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
    2162           1 :         fi, _ := snapshotIndex(m.SmallestSeqNum, snapshots)
    2163           1 :         if ti != fi {
    2164           0 :                 return false
    2165           0 :         }
    2166             : 
    2167           1 :         switch h.hintType {
    2168           1 :         case deleteCompactionHintTypePointKeyOnly:
    2169           1 :                 // A hint generated by a range del span cannot delete tables that contain
    2170           1 :                 // range keys.
    2171           1 :                 if m.HasRangeKeys {
    2172           1 :                         return false
    2173           1 :                 }
    2174           1 :         case deleteCompactionHintTypeRangeKeyOnly:
    2175           1 :                 // A hint generated by a range key del span cannot delete tables that
    2176           1 :                 // contain point keys.
    2177           1 :                 if m.HasPointKeys {
    2178           1 :                         return false
    2179           1 :                 }
    2180           1 :         case deleteCompactionHintTypePointAndRangeKey:
    2181             :                 // A hint from a span that contains both range dels *and* range keys can
    2182             :                 // only be deleted if both bounds fall within the hint. The next check takes
    2183             :                 // care of this.
    2184           0 :         default:
    2185           0 :                 panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
    2186             :         }
    2187             : 
    2188             :         // The file's keys must be completely contained within the hint range.
    2189           1 :         return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
    2190             : }
    2191             : 
    2192           1 : func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
    2193           1 :         // Compactions that zero sequence numbers can interfere with compaction
    2194           1 :         // deletion hints. Deletion hints apply to tables containing keys older
    2195           1 :         // than a threshold. If a key more recent than the threshold is zeroed in
    2196           1 :         // a compaction, a delete-only compaction may mistake it as meeting the
    2197           1 :         // threshold and drop a table containing live data.
    2198           1 :         //
    2199           1 :         // To avoid this scenario, compactions that zero sequence numbers remove
    2200           1 :         // any conflicting deletion hints. A deletion hint is conflicting if both
    2201           1 :         // of the following conditions apply:
    2202           1 :         // * its key space overlaps with the compaction
    2203           1 :         // * at least one of its inputs contains a key as recent as one of the
    2204           1 :         //   hint's tombstones.
    2205           1 :         //
    2206           1 :         if !c.allowedZeroSeqNum {
    2207           1 :                 return
    2208           1 :         }
    2209             : 
    2210           1 :         updatedHints := d.mu.compact.deletionHints[:0]
    2211           1 :         for _, h := range d.mu.compact.deletionHints {
    2212           1 :                 // If the compaction's key space is disjoint from the hint's key
    2213           1 :                 // space, the zeroing of sequence numbers won't affect the hint. Keep
    2214           1 :                 // the hint.
    2215           1 :                 keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
    2216           1 :                 if keysDisjoint {
    2217           1 :                         updatedHints = append(updatedHints, h)
    2218           1 :                         continue
    2219             :                 }
    2220             : 
    2221             :                 // All of the compaction's inputs must be older than the hint's
    2222             :                 // tombstones.
    2223           1 :                 inputsOlder := true
    2224           1 :                 for _, in := range c.inputs {
    2225           1 :                         iter := in.files.Iter()
    2226           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    2227           1 :                                 inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
    2228           1 :                         }
    2229             :                 }
    2230           1 :                 if inputsOlder {
    2231           1 :                         updatedHints = append(updatedHints, h)
    2232           1 :                         continue
    2233             :                 }
    2234             : 
    2235             :                 // Drop h, because the compaction c may have zeroed sequence numbers
    2236             :                 // of keys more recent than some of h's tombstones.
    2237             :         }
    2238           1 :         d.mu.compact.deletionHints = updatedHints
    2239             : }
    2240             : 
    2241             : func checkDeleteCompactionHints(
    2242             :         cmp Compare, v *version, hints []deleteCompactionHint, snapshots []uint64,
    2243           1 : ) ([]compactionLevel, []deleteCompactionHint) {
    2244           1 :         var files map[*fileMetadata]bool
    2245           1 :         var byLevel [numLevels][]*fileMetadata
    2246           1 : 
    2247           1 :         unresolvedHints := hints[:0]
    2248           1 :         for _, h := range hints {
    2249           1 :                 // Check each compaction hint to see if it's resolvable. Resolvable
    2250           1 :                 // hints are removed and trigger a delete-only compaction if any files
    2251           1 :                 // in the current LSM still meet their criteria. Unresolvable hints
    2252           1 :                 // are saved and don't trigger a delete-only compaction.
    2253           1 :                 //
    2254           1 :                 // When a compaction hint is created, the sequence numbers of the
    2255           1 :                 // range tombstones and the covered file with the oldest key are
    2256           1 :                 // recorded. The largest tombstone sequence number and the smallest
    2257           1 :                 // file sequence number must be in the same snapshot stripe for the
    2258           1 :                 // hint to be resolved. The below graphic models a compaction hint
    2259           1 :                 // covering the keyspace [b, r). The hint completely contains two
    2260           1 :                 // files, 000002 and 000003. The file 000003 contains the lowest
    2261           1 :                 // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
    2262           1 :                 // the highest tombstone sequence number incorporated into the hint.
    2263           1 :                 // The hint may be resolved only once the snapshots at #100, #180 and
    2264           1 :                 // #210 are all closed. File 000001 is not included within the hint
    2265           1 :                 // because it extends beyond the range tombstones in user key space.
    2266           1 :                 //
    2267           1 :                 // 250
    2268           1 :                 //
    2269           1 :                 //       |-b...230:h-|
    2270           1 :                 // _____________________________________________________ snapshot #210
    2271           1 :                 // 200               |--h.RANGEDEL.200:r--|
    2272           1 :                 //
    2273           1 :                 // _____________________________________________________ snapshot #180
    2274           1 :                 //
    2275           1 :                 // 150                     +--------+
    2276           1 :                 //           +---------+   | 000003 |
    2277           1 :                 //           | 000002  |   |        |
    2278           1 :                 //           +_________+   |        |
    2279           1 :                 // 100_____________________|________|___________________ snapshot #100
    2280           1 :                 //                         +--------+
    2281           1 :                 // _____________________________________________________ snapshot #70
    2282           1 :                 //                             +---------------+
    2283           1 :                 //  50                         | 000001        |
    2284           1 :                 //                             |               |
    2285           1 :                 //                             +---------------+
    2286           1 :                 // ______________________________________________________________
    2287           1 :                 //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2288           1 : 
    2289           1 :                 ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
    2290           1 :                 fi, _ := snapshotIndex(h.fileSmallestSeqNum, snapshots)
    2291           1 :                 if ti != fi {
    2292           1 :                         // Cannot resolve yet.
    2293           1 :                         unresolvedHints = append(unresolvedHints, h)
    2294           1 :                         continue
    2295             :                 }
    2296             : 
    2297             :                 // The hint h will be resolved and dropped, regardless of whether
    2298             :                 // there are any tables that can be deleted.
    2299           1 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2300           1 :                         overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
    2301           1 :                         iter := overlaps.Iter()
    2302           1 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2303           1 :                                 if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
    2304           1 :                                         continue
    2305             :                                 }
    2306           1 :                                 if files == nil {
    2307           1 :                                         // Construct files lazily, assuming most calls will not
    2308           1 :                                         // produce delete-only compactions.
    2309           1 :                                         files = make(map[*fileMetadata]bool)
    2310           1 :                                 }
    2311           1 :                                 files[m] = true
    2312           1 :                                 byLevel[l] = append(byLevel[l], m)
    2313             :                         }
    2314             :                 }
    2315             :         }
    2316             : 
    2317           1 :         var compactLevels []compactionLevel
    2318           1 :         for l, files := range byLevel {
    2319           1 :                 if len(files) == 0 {
    2320           1 :                         continue
    2321             :                 }
    2322           1 :                 compactLevels = append(compactLevels, compactionLevel{
    2323           1 :                         level: l,
    2324           1 :                         files: manifest.NewLevelSliceKeySorted(cmp, files),
    2325           1 :                 })
    2326             :         }
    2327           1 :         return compactLevels, unresolvedHints
    2328             : }
    2329             : 
    2330             : // compact runs one compaction and maybe schedules another call to compact.
    2331           1 : func (d *DB) compact(c *compaction, errChannel chan error) {
    2332           1 :         pprof.Do(context.Background(), compactLabels, func(context.Context) {
    2333           1 :                 d.mu.Lock()
    2334           1 :                 defer d.mu.Unlock()
    2335           1 :                 if err := d.compact1(c, errChannel); err != nil {
    2336           1 :                         // TODO(peter): count consecutive compaction errors and backoff.
    2337           1 :                         d.opts.EventListener.BackgroundError(err)
    2338           1 :                 }
    2339           1 :                 if c.isDownload {
    2340           1 :                         d.mu.compact.downloadingCount--
    2341           1 :                 } else {
    2342           1 :                         d.mu.compact.compactingCount--
    2343           1 :                 }
    2344           1 :                 delete(d.mu.compact.inProgress, c)
    2345           1 :                 // Add this compaction's duration to the cumulative duration. NB: This
    2346           1 :                 // must be atomic with the above removal of c from
    2347           1 :                 // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
    2348           1 :                 // miss or double count a completing compaction's duration.
    2349           1 :                 d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
    2350           1 : 
    2351           1 :                 // The previous compaction may have produced too many files in a
    2352           1 :                 // level, so reschedule another compaction if needed.
    2353           1 :                 d.maybeScheduleCompaction()
    2354           1 :                 d.mu.compact.cond.Broadcast()
    2355             :         })
    2356             : }
    2357             : 
    2358             : // compact1 runs one compaction.
    2359             : //
    2360             : // d.mu must be held when calling this, but the mutex may be dropped and
    2361             : // re-acquired during the course of this method.
    2362           1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
    2363           1 :         if errChannel != nil {
    2364           1 :                 defer func() {
    2365           1 :                         errChannel <- err
    2366           1 :                 }()
    2367             :         }
    2368             : 
    2369           1 :         jobID := d.newJobIDLocked()
    2370           1 :         info := c.makeInfo(jobID)
    2371           1 :         d.opts.EventListener.CompactionBegin(info)
    2372           1 :         startTime := d.timeNow()
    2373           1 : 
    2374           1 :         ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
    2375           1 : 
    2376           1 :         info.Duration = d.timeNow().Sub(startTime)
    2377           1 :         if err == nil {
    2378           1 :                 err = func() error {
    2379           1 :                         var err error
    2380           1 :                         d.mu.versions.logLock()
    2381           1 :                         // Check if this compaction had a conflicting operation (eg. a d.excise())
    2382           1 :                         // that necessitates it restarting from scratch. Note that since we hold
    2383           1 :                         // the manifest lock, we don't expect this bool to change its value
    2384           1 :                         // as only the holder of the manifest lock will ever write to it.
    2385           1 :                         if c.cancel.Load() {
    2386           1 :                                 err = firstError(err, ErrCancelledCompaction)
    2387           1 :                         }
    2388           1 :                         if err != nil {
    2389           1 :                                 // logAndApply calls logUnlock. If we didn't call it, we need to call
    2390           1 :                                 // logUnlock ourselves.
    2391           1 :                                 d.mu.versions.logUnlock()
    2392           1 :                                 return err
    2393           1 :                         }
    2394           1 :                         return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
    2395           1 :                                 return d.getInProgressCompactionInfoLocked(c)
    2396           1 :                         })
    2397             :                 }()
    2398           1 :                 if err != nil {
    2399           1 :                         // TODO(peter): untested.
    2400           1 :                         for _, f := range pendingOutputs {
    2401           1 :                                 // Note that the FileBacking for the file metadata might not have
    2402           1 :                                 // been set yet. So, we directly use the FileNum. Since these
    2403           1 :                                 // files were generated as compaction outputs, these must be
    2404           1 :                                 // physical files on disk. This property might not hold once
    2405           1 :                                 // https://github.com/cockroachdb/pebble/issues/389 is
    2406           1 :                                 // implemented if #389 creates virtual sstables as output files.
    2407           1 :                                 d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
    2408           1 :                                         fileInfo: fileInfo{
    2409           1 :                                                 FileNum:  base.PhysicalTableDiskFileNum(f.meta.FileNum),
    2410           1 :                                                 FileSize: f.meta.Size,
    2411           1 :                                         },
    2412           1 :                                         isLocal: f.isLocal,
    2413           1 :                                 })
    2414           1 :                         }
    2415           1 :                         d.mu.versions.updateObsoleteTableMetricsLocked()
    2416             :                 }
    2417             :         }
    2418             : 
    2419           1 :         info.Done = true
    2420           1 :         info.Err = err
    2421           1 :         if err == nil {
    2422           1 :                 for i := range ve.NewFiles {
    2423           1 :                         e := &ve.NewFiles[i]
    2424           1 :                         info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
    2425           1 :                 }
    2426           1 :                 d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
    2427           1 :                 d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
    2428           1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
    2429           1 :                 d.maybeUpdateDeleteCompactionHints(c)
    2430             :         }
    2431             : 
    2432             :         // NB: clearing compacting state must occur before updating the read state;
    2433             :         // L0Sublevels initialization depends on it.
    2434           1 :         d.clearCompactingState(c, err != nil)
    2435           1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    2436           1 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
    2437           1 : 
    2438           1 :         info.TotalDuration = d.timeNow().Sub(c.beganAt)
    2439           1 :         d.opts.EventListener.CompactionEnd(info)
    2440           1 : 
    2441           1 :         // Update the read state before deleting obsolete files because the
    2442           1 :         // read-state update will cause the previous version to be unref'd and if
    2443           1 :         // there are no references obsolete tables will be added to the obsolete
    2444           1 :         // table list.
    2445           1 :         if err == nil {
    2446           1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2447           1 :                 d.updateTableStatsLocked(ve.NewFiles)
    2448           1 :         }
    2449           1 :         d.deleteObsoleteFiles(jobID)
    2450           1 : 
    2451           1 :         return err
    2452             : }
    2453             : 
    2454             : type compactStats struct {
    2455             :         cumulativePinnedKeys uint64
    2456             :         cumulativePinnedSize uint64
    2457             :         countMissizedDels    uint64
    2458             : }
    2459             : 
    2460             : // runCopyCompaction runs a copy compaction where a new FileNum is created that
    2461             : // is a byte-for-byte copy of the input file or span thereof in some cases. This
    2462             : // is used in lieu of a move compaction when a file is being moved across the
    2463             : // local/remote storage boundary. It could also be used in lieu of a rewrite
    2464             : // compaction as part of a Download() call, which allows copying only a span of
    2465             : // the external file, provided the file does not contain range keys or value
    2466             : // blocks (see sstable.CopySpan).
    2467             : //
    2468             : // d.mu must be held when calling this method. The mutex will be released when
    2469             : // doing IO.
    2470             : func (d *DB) runCopyCompaction(
    2471             :         jobID JobID,
    2472             :         c *compaction,
    2473             :         inputMeta *fileMetadata,
    2474             :         objMeta objstorage.ObjectMetadata,
    2475             :         versionEdit *versionEdit,
    2476           1 : ) (ve *versionEdit, pendingOutputs []compactionOutput, retErr error) {
    2477           1 :         ctx := context.TODO()
    2478           1 : 
    2479           1 :         ve = versionEdit
    2480           1 :         if !objMeta.IsExternal() {
    2481           1 :                 if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
    2482           0 :                         panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
    2483             :                 }
    2484             :                 // Note that based on logic in the compaction picker, we're guaranteed
    2485             :                 // inputMeta.Virtual is false.
    2486           1 :                 if inputMeta.Virtual {
    2487           0 :                         panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
    2488             :                 }
    2489             :         }
    2490             : 
    2491             :         // We are in the relatively more complex case where we need to copy this
    2492             :         // file to remote storage. Drop the db mutex while we do the copy
    2493             :         //
    2494             :         // To ease up cleanup of the local file and tracking of refs, we create
    2495             :         // a new FileNum. This has the potential of making the block cache less
    2496             :         // effective, however.
    2497           1 :         newMeta := &fileMetadata{
    2498           1 :                 Size:            inputMeta.Size,
    2499           1 :                 CreationTime:    inputMeta.CreationTime,
    2500           1 :                 SmallestSeqNum:  inputMeta.SmallestSeqNum,
    2501           1 :                 LargestSeqNum:   inputMeta.LargestSeqNum,
    2502           1 :                 Stats:           inputMeta.Stats,
    2503           1 :                 Virtual:         inputMeta.Virtual,
    2504           1 :                 SyntheticPrefix: inputMeta.SyntheticPrefix,
    2505           1 :                 SyntheticSuffix: inputMeta.SyntheticSuffix,
    2506           1 :         }
    2507           1 :         if inputMeta.HasPointKeys {
    2508           1 :                 newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
    2509           1 :         }
    2510           1 :         if inputMeta.HasRangeKeys {
    2511           1 :                 newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
    2512           1 :         }
    2513           1 :         newMeta.FileNum = d.mu.versions.getNextFileNum()
    2514           1 :         if objMeta.IsExternal() {
    2515           1 :                 // external -> local/shared copy. File must be virtual.
    2516           1 :                 // We will update this size later after we produce the new backing file.
    2517           1 :                 newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
    2518           1 :         } else {
    2519           1 :                 // local -> shared copy. New file is guaranteed to not be virtual.
    2520           1 :                 newMeta.InitPhysicalBacking()
    2521           1 :         }
    2522             : 
    2523           1 :         c.metrics = map[int]*LevelMetrics{
    2524           1 :                 c.outputLevel.level: {
    2525           1 :                         BytesIn:         inputMeta.Size,
    2526           1 :                         BytesCompacted:  inputMeta.Size,
    2527           1 :                         TablesCompacted: 1,
    2528           1 :                 },
    2529           1 :         }
    2530           1 : 
    2531           1 :         // Before dropping the db mutex, grab a ref to the current version. This
    2532           1 :         // prevents any concurrent excises from deleting files that this compaction
    2533           1 :         // needs to read/maintain a reference to.
    2534           1 :         vers := d.mu.versions.currentVersion()
    2535           1 :         vers.Ref()
    2536           1 :         defer vers.UnrefLocked()
    2537           1 : 
    2538           1 :         // NB: The order here is reversed, lock after unlock. This is similar to
    2539           1 :         // runCompaction.
    2540           1 :         d.mu.Unlock()
    2541           1 :         defer d.mu.Lock()
    2542           1 : 
    2543           1 :         // If the src obj is external, we're doing an external to local/shared copy.
    2544           1 :         if objMeta.IsExternal() {
    2545           1 :                 src, err := d.objProvider.OpenForReading(
    2546           1 :                         ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
    2547           1 :                 )
    2548           1 :                 if err != nil {
    2549           0 :                         return ve, pendingOutputs, err
    2550           0 :                 }
    2551           1 :                 defer func() {
    2552           1 :                         if src != nil {
    2553           0 :                                 src.Close()
    2554           0 :                         }
    2555             :                 }()
    2556             : 
    2557           1 :                 w, outObjMeta, err := d.objProvider.Create(
    2558           1 :                         ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum),
    2559           1 :                         objstorage.CreateOptions{
    2560           1 :                                 PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    2561           1 :                         },
    2562           1 :                 )
    2563           1 :                 if err != nil {
    2564           0 :                         return ve, pendingOutputs, err
    2565           0 :                 }
    2566           1 :                 pendingOutputs = append(pendingOutputs, compactionOutput{
    2567           1 :                         meta:    newMeta,
    2568           1 :                         isLocal: !outObjMeta.IsRemote(),
    2569           1 :                 })
    2570           1 : 
    2571           1 :                 start, end := newMeta.Smallest, newMeta.Largest
    2572           1 :                 if newMeta.SyntheticPrefix.IsSet() {
    2573           1 :                         start.UserKey = newMeta.SyntheticPrefix.Invert(start.UserKey)
    2574           1 :                         end.UserKey = newMeta.SyntheticPrefix.Invert(end.UserKey)
    2575           1 :                 }
    2576           1 :                 if newMeta.SyntheticSuffix.IsSet() {
    2577           1 :                         // Extend the bounds as necessary so that the keys don't include suffixes.
    2578           1 :                         start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
    2579           1 :                         if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
    2580           0 :                                 end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
    2581           0 :                         }
    2582             :                 }
    2583             : 
    2584           1 :                 wrote, err := sstable.CopySpan(ctx,
    2585           1 :                         src, d.opts.MakeReaderOptions(),
    2586           1 :                         w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
    2587           1 :                         start, end,
    2588           1 :                 )
    2589           1 :                 src = nil // We passed src to CopySpan; it's responsible for closing it.
    2590           1 :                 if err != nil {
    2591           0 :                         return ve, pendingOutputs, err
    2592           0 :                 }
    2593           1 :                 newMeta.FileBacking.Size = wrote
    2594           1 :                 newMeta.Size = wrote
    2595           1 :         } else {
    2596           1 :                 pendingOutputs = append(pendingOutputs, compactionOutput{
    2597           1 :                         meta:    newMeta.PhysicalMeta().FileMetadata,
    2598           1 :                         isLocal: true,
    2599           1 :                 })
    2600           1 :                 _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
    2601           1 :                         d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
    2602           1 :                         objstorage.CreateOptions{PreferSharedStorage: true})
    2603           1 : 
    2604           1 :                 if err != nil {
    2605           0 :                         return ve, pendingOutputs, err
    2606           0 :                 }
    2607             :         }
    2608           1 :         ve.NewFiles[0].Meta = newMeta
    2609           1 :         if newMeta.Virtual {
    2610           1 :                 ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
    2611           1 :         }
    2612             : 
    2613           1 :         if err := d.objProvider.Sync(); err != nil {
    2614           0 :                 return nil, pendingOutputs, err
    2615           0 :         }
    2616           1 :         return ve, pendingOutputs, nil
    2617             : }
    2618             : 
    2619             : type compactionOutput struct {
    2620             :         meta    *fileMetadata
    2621             :         isLocal bool
    2622             : }
    2623             : 
    2624             : // runCompactions runs a compaction that produces new on-disk tables from
    2625             : // memtables or old on-disk tables.
    2626             : //
    2627             : // d.mu must be held when calling this, but the mutex may be dropped and
    2628             : // re-acquired during the course of this method.
    2629             : func (d *DB) runCompaction(
    2630             :         jobID JobID, c *compaction,
    2631           1 : ) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
    2632           1 :         // As a sanity check, confirm that the smallest / largest keys for new and
    2633           1 :         // deleted files in the new versionEdit pass a validation function before
    2634           1 :         // returning the edit.
    2635           1 :         defer func() {
    2636           1 :                 // If we're handling a panic, don't expect the version edit to validate.
    2637           1 :                 if r := recover(); r != nil {
    2638           0 :                         panic(r)
    2639           1 :                 } else if ve != nil {
    2640           1 :                         err := validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey)
    2641           1 :                         if err != nil {
    2642           0 :                                 d.opts.Logger.Fatalf("pebble: version edit validation failed: %s", err)
    2643           0 :                         }
    2644             :                 }
    2645             :         }()
    2646             : 
    2647             :         // Check for a delete-only compaction. This can occur when wide range
    2648             :         // tombstones completely contain sstables.
    2649           1 :         if c.kind == compactionKindDeleteOnly {
    2650           1 :                 c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
    2651           1 :                 ve := &versionEdit{
    2652           1 :                         DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2653           1 :                 }
    2654           1 :                 for _, cl := range c.inputs {
    2655           1 :                         levelMetrics := &LevelMetrics{}
    2656           1 :                         iter := cl.files.Iter()
    2657           1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    2658           1 :                                 ve.DeletedFiles[deletedFileEntry{
    2659           1 :                                         Level:   cl.level,
    2660           1 :                                         FileNum: f.FileNum,
    2661           1 :                                 }] = f
    2662           1 :                         }
    2663           1 :                         c.metrics[cl.level] = levelMetrics
    2664             :                 }
    2665           1 :                 return ve, nil, stats, nil
    2666             :         }
    2667             : 
    2668           1 :         if c.kind == compactionKindIngestedFlushable {
    2669           0 :                 panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
    2670             :         }
    2671             : 
    2672             :         // Check for a move or copy of one table from one level to the next. We avoid
    2673             :         // such a move if there is lots of overlapping grandparent data. Otherwise,
    2674             :         // the move could create a parent file that will require a very expensive
    2675             :         // merge later on.
    2676           1 :         if c.kind == compactionKindMove || c.kind == compactionKindCopy {
    2677           1 :                 iter := c.startLevel.files.Iter()
    2678           1 :                 meta := iter.First()
    2679           1 :                 if invariants.Enabled {
    2680           1 :                         if iter.Next() != nil {
    2681           0 :                                 panic("got more than one file for a move or copy compaction")
    2682             :                         }
    2683             :                 }
    2684           1 :                 if c.cancel.Load() {
    2685           1 :                         return ve, nil, stats, ErrCancelledCompaction
    2686           1 :                 }
    2687           1 :                 objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
    2688           1 :                 if err != nil {
    2689           0 :                         return ve, pendingOutputs, stats, err
    2690           0 :                 }
    2691           1 :                 c.metrics = map[int]*LevelMetrics{
    2692           1 :                         c.outputLevel.level: {
    2693           1 :                                 BytesMoved:  meta.Size,
    2694           1 :                                 TablesMoved: 1,
    2695           1 :                         },
    2696           1 :                 }
    2697           1 :                 ve := &versionEdit{
    2698           1 :                         DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2699           1 :                                 {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
    2700           1 :                         },
    2701           1 :                         NewFiles: []newFileEntry{
    2702           1 :                                 {Level: c.outputLevel.level, Meta: meta},
    2703           1 :                         },
    2704           1 :                 }
    2705           1 :                 if c.kind == compactionKindCopy {
    2706           1 :                         ve, pendingOutputs, retErr = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
    2707           1 :                         if retErr != nil {
    2708           0 :                                 return ve, pendingOutputs, stats, retErr
    2709           0 :                         }
    2710             :                 }
    2711           1 :                 return ve, nil, stats, nil
    2712             :         }
    2713             : 
    2714           1 :         defer func() {
    2715           1 :                 if retErr != nil {
    2716           1 :                         pendingOutputs = nil
    2717           1 :                 }
    2718             :         }()
    2719             : 
    2720           1 :         snapshots := d.mu.snapshots.toSlice()
    2721           1 :         formatVers := d.FormatMajorVersion()
    2722           1 : 
    2723           1 :         if c.flushing == nil {
    2724           1 :                 // Before dropping the db mutex, grab a ref to the current version. This
    2725           1 :                 // prevents any concurrent excises from deleting files that this compaction
    2726           1 :                 // needs to read/maintain a reference to.
    2727           1 :                 //
    2728           1 :                 // Note that unlike user iterators, compactionIter does not maintain a ref
    2729           1 :                 // of the version or read state.
    2730           1 :                 vers := d.mu.versions.currentVersion()
    2731           1 :                 vers.Ref()
    2732           1 :                 defer vers.UnrefLocked()
    2733           1 :         }
    2734             : 
    2735           1 :         if c.cancel.Load() {
    2736           1 :                 return ve, nil, stats, ErrCancelledCompaction
    2737           1 :         }
    2738             : 
    2739             :         // Release the d.mu lock while doing I/O.
    2740             :         // Note the unusual order: Unlock and then Lock.
    2741           1 :         d.mu.Unlock()
    2742           1 :         defer d.mu.Lock()
    2743           1 : 
    2744           1 :         // Compactions use a pool of buffers to read blocks, avoiding polluting the
    2745           1 :         // block cache with blocks that will not be read again. We initialize the
    2746           1 :         // buffer pool with a size 12. This initial size does not need to be
    2747           1 :         // accurate, because the pool will grow to accommodate the maximum number of
    2748           1 :         // blocks allocated at a given time over the course of the compaction. But
    2749           1 :         // choosing a size larger than that working set avoids any additional
    2750           1 :         // allocations to grow the size of the pool over the course of iteration.
    2751           1 :         //
    2752           1 :         // Justification for initial size 12: In a two-level compaction, at any
    2753           1 :         // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
    2754           1 :         // Additionally, when decoding a compressed block, we'll temporarily
    2755           1 :         // allocate 1 additional block to hold the compressed buffer. In the worst
    2756           1 :         // case that all input sstables have two-level index blocks (+2), value
    2757           1 :         // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
    2758           1 :         // additionally require 2n+4 blocks where n is the number of input sstables.
    2759           1 :         // Range deletion and range key blocks are relatively rare, and the cost of
    2760           1 :         // an additional allocation or two over the course of the compaction is
    2761           1 :         // considered to be okay. A larger initial size would cause the pool to hold
    2762           1 :         // on to more memory, even when it's not in-use because the pool will
    2763           1 :         // recycle buffers up to the current capacity of the pool. The memory use of
    2764           1 :         // a 12-buffer pool is expected to be within reason, even if all the buffers
    2765           1 :         // grow to the typical size of an index block (256 KiB) which would
    2766           1 :         // translate to 3 MiB per compaction.
    2767           1 :         c.bufferPool.Init(12)
    2768           1 :         defer c.bufferPool.Release()
    2769           1 : 
    2770           1 :         iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
    2771           1 :         if err != nil {
    2772           0 :                 return nil, pendingOutputs, stats, err
    2773           0 :         }
    2774           1 :         c.allowedZeroSeqNum = c.allowZeroSeqNum()
    2775           1 :         iiter = invalidating.MaybeWrapIfInvariants(iiter)
    2776           1 :         iter := newCompactionIter(c.cmp, c.equal, d.merge, iiter, snapshots,
    2777           1 :                 c.allowedZeroSeqNum, c.elideTombstone,
    2778           1 :                 c.elideRangeTombstone, d.opts.Experimental.IneffectualSingleDeleteCallback,
    2779           1 :                 d.opts.Experimental.SingleDeleteInvariantViolationCallback,
    2780           1 :                 d.FormatMajorVersion())
    2781           1 : 
    2782           1 :         var (
    2783           1 :                 createdFiles    []base.DiskFileNum
    2784           1 :                 tw              *sstable.Writer
    2785           1 :                 pinnedKeySize   uint64
    2786           1 :                 pinnedValueSize uint64
    2787           1 :                 pinnedCount     uint64
    2788           1 :         )
    2789           1 :         defer func() {
    2790           1 :                 if iter != nil {
    2791           1 :                         retErr = firstError(retErr, iter.Close())
    2792           1 :                 }
    2793           1 :                 if tw != nil {
    2794           0 :                         retErr = firstError(retErr, tw.Close())
    2795           0 :                 }
    2796           1 :                 if retErr != nil {
    2797           1 :                         for _, fileNum := range createdFiles {
    2798           1 :                                 _ = d.objProvider.Remove(fileTypeTable, fileNum)
    2799           1 :                         }
    2800             :                 }
    2801           1 :                 for _, closer := range c.closers {
    2802           1 :                         retErr = firstError(retErr, closer.Close())
    2803           1 :                 }
    2804             :         }()
    2805             : 
    2806           1 :         ve = &versionEdit{
    2807           1 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2808           1 :         }
    2809           1 : 
    2810           1 :         startLevelBytes := c.startLevel.files.SizeSum()
    2811           1 :         outputMetrics := &LevelMetrics{
    2812           1 :                 BytesIn:   startLevelBytes,
    2813           1 :                 BytesRead: c.outputLevel.files.SizeSum(),
    2814           1 :         }
    2815           1 :         if len(c.extraLevels) > 0 {
    2816           1 :                 outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
    2817           1 :         }
    2818           1 :         outputMetrics.BytesRead += outputMetrics.BytesIn
    2819           1 : 
    2820           1 :         c.metrics = map[int]*LevelMetrics{
    2821           1 :                 c.outputLevel.level: outputMetrics,
    2822           1 :         }
    2823           1 :         if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
    2824           1 :                 c.metrics[c.startLevel.level] = &LevelMetrics{}
    2825           1 :         }
    2826           1 :         if len(c.extraLevels) > 0 {
    2827           1 :                 c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
    2828           1 :                 outputMetrics.MultiLevel.BytesInTop = startLevelBytes
    2829           1 :                 outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
    2830           1 :                 outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
    2831           1 :         }
    2832             : 
    2833             :         // The table is typically written at the maximum allowable format implied by
    2834             :         // the current format major version of the DB.
    2835           1 :         tableFormat := formatVers.MaxTableFormat()
    2836           1 : 
    2837           1 :         // In format major versions with maximum table formats of Pebblev3, value
    2838           1 :         // blocks were conditional on an experimental setting. In format major
    2839           1 :         // versions with maximum table formats of Pebblev4 and higher, value blocks
    2840           1 :         // are always enabled.
    2841           1 :         if tableFormat == sstable.TableFormatPebblev3 &&
    2842           1 :                 (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
    2843           1 :                 tableFormat = sstable.TableFormatPebblev2
    2844           1 :         }
    2845             : 
    2846           1 :         writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
    2847           1 : 
    2848           1 :         // prevPointKey is a sstable.WriterOption that provides access to
    2849           1 :         // the last point key written to a writer's sstable. When a new
    2850           1 :         // output begins in newOutput, prevPointKey is updated to point to
    2851           1 :         // the new output's sstable.Writer. This allows the compaction loop
    2852           1 :         // to access the last written point key without requiring the
    2853           1 :         // compaction loop to make a copy of each key ahead of time. Users
    2854           1 :         // must be careful, because the byte slice returned by UnsafeKey
    2855           1 :         // points directly into the Writer's block buffer.
    2856           1 :         var prevPointKey sstable.PreviousPointKeyOpt
    2857           1 :         var cpuWorkHandle CPUWorkHandle
    2858           1 :         defer func() {
    2859           1 :                 if cpuWorkHandle != nil {
    2860           0 :                         d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    2861           0 :                 }
    2862             :         }()
    2863             : 
    2864           1 :         newOutput := func() error {
    2865           1 :                 // Check if we've been cancelled by a concurrent operation.
    2866           1 :                 if c.cancel.Load() {
    2867           1 :                         return ErrCancelledCompaction
    2868           1 :                 }
    2869           1 :                 d.mu.Lock()
    2870           1 :                 fileNum := d.mu.versions.getNextFileNum()
    2871           1 :                 d.mu.Unlock()
    2872           1 : 
    2873           1 :                 ctx := context.TODO()
    2874           1 :                 if objiotracing.Enabled {
    2875           0 :                         ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
    2876           0 :                         switch c.kind {
    2877           0 :                         case compactionKindFlush:
    2878           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
    2879           0 :                         case compactionKindIngestedFlushable:
    2880           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForIngestion)
    2881           0 :                         default:
    2882           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
    2883             :                         }
    2884             :                 }
    2885           1 :                 var writeCategory vfs.DiskWriteCategory
    2886           1 :                 switch c.kind {
    2887           1 :                 case compactionKindFlush:
    2888           1 :                         if d.opts.EnableSQLRowSpillMetrics {
    2889           0 :                                 // In the scenario that the Pebble engine is used for SQL row spills the data written to
    2890           0 :                                 // the memtable will correspond to spills to disk and should be categorized as such.
    2891           0 :                                 writeCategory = "sql-row-spill"
    2892           1 :                         } else {
    2893           1 :                                 writeCategory = "pebble-memtable-flush"
    2894           1 :                         }
    2895           0 :                 case compactionKindIngestedFlushable:
    2896           0 :                         writeCategory = "pebble-ingestion"
    2897           1 :                 default:
    2898           1 :                         writeCategory = "pebble-compaction"
    2899             :                 }
    2900             :                 // Prefer shared storage if present.
    2901           1 :                 createOpts := objstorage.CreateOptions{
    2902           1 :                         PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    2903           1 :                         WriteCategory:       writeCategory,
    2904           1 :                 }
    2905           1 :                 diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
    2906           1 :                 writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
    2907           1 :                 if err != nil {
    2908           0 :                         return err
    2909           0 :                 }
    2910           1 :                 fileMeta := &fileMetadata{}
    2911           1 :                 fileMeta.FileNum = fileNum
    2912           1 :                 pendingOutputs = append(pendingOutputs, compactionOutput{
    2913           1 :                         meta:    fileMeta.PhysicalMeta().FileMetadata,
    2914           1 :                         isLocal: !objMeta.IsRemote(),
    2915           1 :                 })
    2916           1 : 
    2917           1 :                 reason := "flushing"
    2918           1 :                 if c.flushing == nil {
    2919           1 :                         reason = "compacting"
    2920           1 :                 }
    2921           1 :                 d.opts.EventListener.TableCreated(TableCreateInfo{
    2922           1 :                         JobID:   int(jobID),
    2923           1 :                         Reason:  reason,
    2924           1 :                         Path:    d.objProvider.Path(objMeta),
    2925           1 :                         FileNum: diskFileNum,
    2926           1 :                 })
    2927           1 :                 if c.kind != compactionKindFlush {
    2928           1 :                         writable = &compactionWritable{
    2929           1 :                                 Writable: writable,
    2930           1 :                                 versions: d.mu.versions,
    2931           1 :                                 written:  &c.bytesWritten,
    2932           1 :                         }
    2933           1 :                 }
    2934           1 :                 createdFiles = append(createdFiles, diskFileNum)
    2935           1 :                 cacheOpts := private.SSTableCacheOpts(d.cacheID, diskFileNum).(sstable.WriterOption)
    2936           1 : 
    2937           1 :                 const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
    2938           1 :                 cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
    2939           1 :                         MaxFileWriteAdditionalCPUTime,
    2940           1 :                 )
    2941           1 :                 writerOpts.Parallelism =
    2942           1 :                         d.opts.Experimental.MaxWriterConcurrency > 0 &&
    2943           1 :                                 (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
    2944           1 : 
    2945           1 :                 tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)
    2946           1 : 
    2947           1 :                 fileMeta.CreationTime = time.Now().Unix()
    2948           1 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{
    2949           1 :                         Level: c.outputLevel.level,
    2950           1 :                         Meta:  fileMeta,
    2951           1 :                 })
    2952           1 :                 return nil
    2953             :         }
    2954             : 
    2955             :         // splitL0Outputs is true during flushes and intra-L0 compactions with flush
    2956             :         // splits enabled.
    2957           1 :         splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0
    2958           1 : 
    2959           1 :         // finishOutput is called with the a user key up to which all tombstones
    2960           1 :         // should be flushed. Typically, this is the first key of the next
    2961           1 :         // sstable or an empty key if this output is the final sstable.
    2962           1 :         finishOutput := func(splitKey []byte) error {
    2963           1 :                 // If we haven't output any point records to the sstable (tw == nil) then the
    2964           1 :                 // sstable will only contain range tombstones and/or range keys. The smallest
    2965           1 :                 // key in the sstable will be the start key of the first range tombstone or
    2966           1 :                 // range key added. We need to ensure that this start key is distinct from
    2967           1 :                 // the splitKey passed to finishOutput (if set), otherwise we would generate
    2968           1 :                 // an sstable where the largest key is smaller than the smallest key due to
    2969           1 :                 // how the largest key boundary is set below. NB: It is permissible for the
    2970           1 :                 // range tombstone / range key start key to be nil.
    2971           1 :                 //
    2972           1 :                 // TODO: It is unfortunate that we have to do this check here rather than
    2973           1 :                 // when we decide to finish the sstable in the runCompaction loop. A better
    2974           1 :                 // structure currently eludes us.
    2975           1 :                 if tw == nil {
    2976           1 :                         startKey := iter.FirstTombstoneStart()
    2977           1 :                         if startKey == nil {
    2978           1 :                                 startKey = iter.FirstRangeKeyStart()
    2979           1 :                         }
    2980           1 :                         if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
    2981           0 :                                 return nil
    2982           0 :                         }
    2983             :                 }
    2984             : 
    2985           1 :                 for _, v := range iter.TombstonesUpTo(splitKey) {
    2986           1 :                         if tw == nil {
    2987           1 :                                 if err := newOutput(); err != nil {
    2988           1 :                                         return err
    2989           1 :                                 }
    2990             :                         }
    2991             :                         // The tombstone being added could be completely outside the
    2992             :                         // eventual bounds of the sstable. Consider this example (bounds
    2993             :                         // in square brackets next to table filename):
    2994             :                         //
    2995             :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
    2996             :                         // tmgc#391,MERGE [786e627a]
    2997             :                         // tmgc-udkatvs#331,RANGEDEL
    2998             :                         //
    2999             :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
    3000             :                         // tmgc#384,MERGE [666c7070]
    3001             :                         // tmgc-tvsalezade#383,RANGEDEL
    3002             :                         // tmgc-tvsalezade#331,RANGEDEL
    3003             :                         //
    3004             :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
    3005             :                         // tmgc-tvsalezade#383,RANGEDEL
    3006             :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
    3007             :                         // tmgc-tvsalezade#356,RANGEDEL
    3008             :                         //
    3009             :                         // Note that both of the top two SSTables have range tombstones
    3010             :                         // that start after the file's end keys. Since the file bound
    3011             :                         // computation happens well after all range tombstones have been
    3012             :                         // added to the writer, eliding out-of-file range tombstones based
    3013             :                         // on sequence number at this stage is difficult, and necessitates
    3014             :                         // read-time logic to ignore range tombstones outside file bounds.
    3015           1 :                         if err := rangedel.Encode(&v, tw.Add); err != nil {
    3016           0 :                                 return err
    3017           0 :                         }
    3018             :                 }
    3019           1 :                 for _, v := range iter.RangeKeysUpTo(splitKey) {
    3020           1 :                         // Same logic as for range tombstones, except added using tw.AddRangeKey.
    3021           1 :                         if tw == nil {
    3022           1 :                                 if err := newOutput(); err != nil {
    3023           1 :                                         return err
    3024           1 :                                 }
    3025             :                         }
    3026           1 :                         if err := rangekey.Encode(&v, tw.AddRangeKey); err != nil {
    3027           0 :                                 return err
    3028           0 :                         }
    3029             :                 }
    3030             : 
    3031           1 :                 if tw == nil {
    3032           1 :                         return nil
    3033           1 :                 }
    3034           1 :                 {
    3035           1 :                         // Set internal sstable properties.
    3036           1 :                         p := getInternalWriterProperties(tw)
    3037           1 :                         // Set the external sst version to 0. This is what RocksDB expects for
    3038           1 :                         // db-internal sstables; otherwise, it could apply a global sequence number.
    3039           1 :                         p.ExternalFormatVersion = 0
    3040           1 :                         // Set the snapshot pinned totals.
    3041           1 :                         p.SnapshotPinnedKeys = pinnedCount
    3042           1 :                         p.SnapshotPinnedKeySize = pinnedKeySize
    3043           1 :                         p.SnapshotPinnedValueSize = pinnedValueSize
    3044           1 :                         stats.cumulativePinnedKeys += pinnedCount
    3045           1 :                         stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
    3046           1 :                         pinnedCount = 0
    3047           1 :                         pinnedKeySize = 0
    3048           1 :                         pinnedValueSize = 0
    3049           1 :                 }
    3050           1 :                 if err := tw.Close(); err != nil {
    3051           0 :                         tw = nil
    3052           0 :                         return err
    3053           0 :                 }
    3054           1 :                 d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    3055           1 :                 cpuWorkHandle = nil
    3056           1 :                 writerMeta, err := tw.Metadata()
    3057           1 :                 if err != nil {
    3058           0 :                         tw = nil
    3059           0 :                         return err
    3060           0 :                 }
    3061           1 :                 tw = nil
    3062           1 :                 meta := ve.NewFiles[len(ve.NewFiles)-1].Meta
    3063           1 :                 meta.Size = writerMeta.Size
    3064           1 :                 meta.SmallestSeqNum = writerMeta.SmallestSeqNum
    3065           1 :                 meta.LargestSeqNum = writerMeta.LargestSeqNum
    3066           1 :                 meta.InitPhysicalBacking()
    3067           1 : 
    3068           1 :                 // If the file didn't contain any range deletions, we can fill its
    3069           1 :                 // table stats now, avoiding unnecessarily loading the table later.
    3070           1 :                 maybeSetStatsFromProperties(
    3071           1 :                         meta.PhysicalMeta(), &writerMeta.Properties,
    3072           1 :                 )
    3073           1 : 
    3074           1 :                 if c.flushing == nil {
    3075           1 :                         outputMetrics.TablesCompacted++
    3076           1 :                         outputMetrics.BytesCompacted += meta.Size
    3077           1 :                 } else {
    3078           1 :                         outputMetrics.TablesFlushed++
    3079           1 :                         outputMetrics.BytesFlushed += meta.Size
    3080           1 :                 }
    3081           1 :                 outputMetrics.Size += int64(meta.Size)
    3082           1 :                 outputMetrics.NumFiles++
    3083           1 :                 outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize
    3084           1 :                 outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize
    3085           1 : 
    3086           1 :                 if n := len(ve.NewFiles); n > 1 {
    3087           1 :                         // This is not the first output file. Ensure the sstable boundaries
    3088           1 :                         // are nonoverlapping.
    3089           1 :                         prevMeta := ve.NewFiles[n-2].Meta
    3090           1 :                         if writerMeta.SmallestRangeDel.UserKey != nil {
    3091           1 :                                 c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey)
    3092           1 :                                 if c < 0 {
    3093           0 :                                         return errors.Errorf(
    3094           0 :                                                 "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s",
    3095           0 :                                                 writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3096           0 :                                                 prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
    3097           1 :                                 } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() {
    3098           0 :                                         // The user key portion of the range boundary start key is
    3099           0 :                                         // equal to the previous table's largest key user key, and
    3100           0 :                                         // the previous table's largest key is not exclusive. This
    3101           0 :                                         // violates the invariant that tables are key-space
    3102           0 :                                         // partitioned.
    3103           0 :                                         return errors.Errorf(
    3104           0 :                                                 "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s",
    3105           0 :                                                 prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey),
    3106           0 :                                                 writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3107           0 :                                         )
    3108           0 :                                 }
    3109             :                         }
    3110             :                 }
    3111             : 
    3112             :                 // Verify that all range deletions outputted to the sstable are
    3113             :                 // truncated to split key.
    3114           1 :                 if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil &&
    3115           1 :                         d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 {
    3116           0 :                         return errors.Errorf(
    3117           0 :                                 "pebble: invariant violation: rangedel largest key %q extends beyond split key %q",
    3118           0 :                                 writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3119           0 :                                 d.opts.Comparer.FormatKey(splitKey),
    3120           0 :                         )
    3121           0 :                 }
    3122             : 
    3123           1 :                 if writerMeta.HasPointKeys {
    3124           1 :                         meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint)
    3125           1 :                 }
    3126           1 :                 if writerMeta.HasRangeDelKeys {
    3127           1 :                         meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel)
    3128           1 :                 }
    3129           1 :                 if writerMeta.HasRangeKeys {
    3130           1 :                         meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey)
    3131           1 :                 }
    3132             : 
    3133             :                 // Verify that the sstable bounds fall within the compaction input
    3134             :                 // bounds. This is a sanity check that we don't have a logic error
    3135             :                 // elsewhere that causes the sstable bounds to accidentally expand past the
    3136             :                 // compaction input bounds as doing so could lead to various badness such
    3137             :                 // as keys being deleted by a range tombstone incorrectly.
    3138           1 :                 if c.smallest.UserKey != nil {
    3139           1 :                         switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); {
    3140           1 :                         case v >= 0:
    3141             :                                 // Nothing to do.
    3142           0 :                         case v < 0:
    3143           0 :                                 return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s",
    3144           0 :                                         meta.Smallest.Pretty(d.opts.Comparer.FormatKey),
    3145           0 :                                         c.smallest.Pretty(d.opts.Comparer.FormatKey))
    3146             :                         }
    3147             :                 }
    3148           1 :                 if c.largest.UserKey != nil {
    3149           1 :                         switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); {
    3150           1 :                         case v <= 0:
    3151             :                                 // Nothing to do.
    3152           0 :                         case v > 0:
    3153           0 :                                 return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s",
    3154           0 :                                         meta.Largest.Pretty(d.opts.Comparer.FormatKey),
    3155           0 :                                         c.largest.Pretty(d.opts.Comparer.FormatKey))
    3156             :                         }
    3157             :                 }
    3158             :                 // Verify that we never split different revisions of the same user key
    3159             :                 // across two different sstables.
    3160           1 :                 if err := c.errorOnUserKeyOverlap(ve); err != nil {
    3161           0 :                         return err
    3162           0 :                 }
    3163           1 :                 if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    3164           0 :                         return err
    3165           0 :                 }
    3166           1 :                 return nil
    3167             :         }
    3168             : 
    3169             :         // Build a compactionOutputSplitter that contains all logic to determine
    3170             :         // whether the compaction loop should stop writing to one output sstable and
    3171             :         // switch to a new one. Some splitters can wrap other splitters, and the
    3172             :         // splitterGroup can be composed of multiple splitters. In this case, we
    3173             :         // start off with splitters for file sizes, grandparent limits, and (for L0
    3174             :         // splits) L0 limits, before wrapping them in an splitterGroup.
    3175           1 :         unsafePrevUserKey := func() []byte {
    3176           1 :                 // Return the largest point key written to tw or the start of
    3177           1 :                 // the current range deletion in the fragmenter, whichever is
    3178           1 :                 // greater.
    3179           1 :                 prevPoint := prevPointKey.UnsafeKey()
    3180           1 :                 if c.cmp(prevPoint.UserKey, iter.FirstTombstoneStart()) > 0 {
    3181           1 :                         return prevPoint.UserKey
    3182           1 :                 }
    3183           1 :                 return iter.FirstTombstoneStart()
    3184             :         }
    3185           1 :         outputSplitters := []compact.OutputSplitter{
    3186           1 :                 // We do not split the same user key across different sstables within
    3187           1 :                 // one flush or compaction. The FileSizeSplitter may request a split in
    3188           1 :                 // the middle of a user key, so PreventSplitUserKeys ensures we are at a
    3189           1 :                 // user key change boundary when doing a split.
    3190           1 :                 compact.PreventSplitUserKeys(
    3191           1 :                         c.cmp,
    3192           1 :                         compact.FileSizeSplitter(&iter.frontiers, c.maxOutputFileSize, c.grandparents.Iter()),
    3193           1 :                         unsafePrevUserKey,
    3194           1 :                 ),
    3195           1 :                 compact.LimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
    3196           1 :         }
    3197           1 :         if splitL0Outputs {
    3198           1 :                 outputSplitters = append(outputSplitters, compact.LimitFuncSplitter(&iter.frontiers, c.findL0Limit))
    3199           1 :         }
    3200           1 :         splitter := compact.CombineSplitters(c.cmp, outputSplitters...)
    3201           1 : 
    3202           1 :         // Each outer loop iteration produces one output file. An iteration that
    3203           1 :         // produces a file containing point keys (and optionally range tombstones)
    3204           1 :         // guarantees that the input iterator advanced. An iteration that produces
    3205           1 :         // a file containing only range tombstones guarantees the limit passed to
    3206           1 :         // `finishOutput()` advanced to a strictly greater user key corresponding
    3207           1 :         // to a grandparent file largest key, or nil. Taken together, these
    3208           1 :         // progress guarantees ensure that eventually the input iterator will be
    3209           1 :         // exhausted and the range tombstone fragments will all be flushed.
    3210           1 :         for key, val := iter.First(); key != nil || iter.FirstTombstoneStart() != nil || iter.FirstRangeKeyStart() != nil; {
    3211           1 :                 var firstKey []byte
    3212           1 :                 if key != nil {
    3213           1 :                         firstKey = key.UserKey
    3214           1 :                 } else if startKey := iter.FirstTombstoneStart(); startKey != nil {
    3215           1 :                         // Pass the start key of the first pending tombstone to find the
    3216           1 :                         // next limit. All pending tombstones have the same start key. We
    3217           1 :                         // use this as opposed to the end key of the last written sstable to
    3218           1 :                         // effectively handle cases like these:
    3219           1 :                         //
    3220           1 :                         // a.SET.3
    3221           1 :                         // (lf.limit at b)
    3222           1 :                         // d.RANGEDEL.4:f
    3223           1 :                         //
    3224           1 :                         // In this case, the partition after b has only range deletions, so
    3225           1 :                         // if we were to find the limit after the last written key at the
    3226           1 :                         // split point (key a), we'd get the limit b again, and
    3227           1 :                         // finishOutput() would not advance any further because the next
    3228           1 :                         // range tombstone to write does not start until after the L0 split
    3229           1 :                         // point.
    3230           1 :                         firstKey = startKey
    3231           1 :                 }
    3232           1 :                 splitterSuggestion := splitter.OnNewOutput(firstKey)
    3233           1 : 
    3234           1 :                 // Each inner loop iteration processes one key from the input iterator.
    3235           1 :                 for ; key != nil; key, val = iter.Next() {
    3236           1 :                         if split := splitter.ShouldSplitBefore(key, tw); split == compact.SplitNow {
    3237           1 :                                 break
    3238             :                         }
    3239             : 
    3240           1 :                         switch key.Kind() {
    3241           1 :                         case InternalKeyKindRangeDelete:
    3242           1 :                                 // Range tombstones are handled specially. They are not written until
    3243           1 :                                 // later during `finishOutput()`. We add them to the compaction iterator
    3244           1 :                                 // so covered keys in the same snapshot stripe can be elided.
    3245           1 :                                 //
    3246           1 :                                 // Since the keys' Suffix and Value fields are not deep cloned, the
    3247           1 :                                 // underlying blockIter must be kept open for the lifetime of the
    3248           1 :                                 // compaction.
    3249           1 :                                 iter.AddTombstoneSpan(c.rangeDelInterleaving.Span())
    3250           1 :                                 continue
    3251           1 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
    3252           1 :                                 // Range keys are handled in the same way as range tombstones.
    3253           1 :                                 // Since the keys' Suffix and Value fields are not deep cloned, the
    3254           1 :                                 // underlying blockIter must be kept open for the lifetime of the
    3255           1 :                                 // compaction.
    3256           1 :                                 iter.AddRangeKeySpan(c.rangeKeyInterleaving.Span())
    3257           1 :                                 continue
    3258             :                         }
    3259           1 :                         if tw == nil {
    3260           1 :                                 if err := newOutput(); err != nil {
    3261           1 :                                         return nil, pendingOutputs, stats, err
    3262           1 :                                 }
    3263             :                         }
    3264           1 :                         if err := tw.AddWithForceObsolete(*key, val, iter.forceObsoleteDueToRangeDel); err != nil {
    3265           0 :                                 return nil, pendingOutputs, stats, err
    3266           0 :                         }
    3267           1 :                         if iter.snapshotPinned {
    3268           1 :                                 // The kv pair we just added to the sstable was only surfaced by
    3269           1 :                                 // the compaction iterator because an open snapshot prevented
    3270           1 :                                 // its elision. Increment the stats.
    3271           1 :                                 pinnedCount++
    3272           1 :                                 pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
    3273           1 :                                 pinnedValueSize += uint64(len(val))
    3274           1 :                         }
    3275             :                 }
    3276             : 
    3277             :                 // A splitter requested a split, and we're ready to finish the output.
    3278             :                 // We need to choose the key at which to split any pending range
    3279             :                 // tombstones. There are two options:
    3280             :                 // 1. splitterSuggestion — The key suggested by the splitter. This key
    3281             :                 //    is guaranteed to be greater than the last key written to the
    3282             :                 //    current output.
    3283             :                 // 2. key.UserKey — the first key of the next sstable output. This user
    3284             :                 //     key is also guaranteed to be greater than the last user key
    3285             :                 //     written to the current output (see userKeyChangeSplitter).
    3286             :                 //
    3287             :                 // Use whichever is smaller. Using the smaller of the two limits
    3288             :                 // overlap with grandparents. Consider the case where the
    3289             :                 // grandparent limit is calculated to be 'b', key is 'x', and
    3290             :                 // there exist many sstables between 'b' and 'x'. If the range
    3291             :                 // deletion fragmenter has a pending tombstone [a,x), splitting
    3292             :                 // at 'x' would cause the output table to overlap many
    3293             :                 // grandparents well beyond the calculated grandparent limit
    3294             :                 // 'b'. Splitting at the smaller `splitterSuggestion` avoids
    3295             :                 // this unbounded overlap with grandparent tables.
    3296           1 :                 splitKey := splitterSuggestion
    3297           1 :                 if key != nil && (splitKey == nil || c.cmp(splitKey, key.UserKey) > 0) {
    3298           1 :                         splitKey = key.UserKey
    3299           1 :                 }
    3300           1 :                 if err := finishOutput(splitKey); err != nil {
    3301           1 :                         return nil, pendingOutputs, stats, err
    3302           1 :                 }
    3303             :         }
    3304             : 
    3305           1 :         for _, cl := range c.inputs {
    3306           1 :                 iter := cl.files.Iter()
    3307           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    3308           1 :                         ve.DeletedFiles[deletedFileEntry{
    3309           1 :                                 Level:   cl.level,
    3310           1 :                                 FileNum: f.FileNum,
    3311           1 :                         }] = f
    3312           1 :                 }
    3313             :         }
    3314             : 
    3315             :         // The compaction iterator keeps track of a count of the number of DELSIZED
    3316             :         // keys that encoded an incorrect size. Propagate it up as a part of
    3317             :         // compactStats.
    3318           1 :         stats.countMissizedDels = iter.stats.countMissizedDels
    3319           1 : 
    3320           1 :         if err := d.objProvider.Sync(); err != nil {
    3321           0 :                 return nil, pendingOutputs, stats, err
    3322           0 :         }
    3323             : 
    3324             :         // Refresh the disk available statistic whenever a compaction/flush
    3325             :         // completes, before re-acquiring the mutex.
    3326           1 :         _ = d.calculateDiskAvailableBytes()
    3327           1 : 
    3328           1 :         return ve, pendingOutputs, stats, nil
    3329             : }
    3330             : 
    3331             : // validateVersionEdit validates that start and end keys across new and deleted
    3332             : // files in a versionEdit pass the given validation function.
    3333             : func validateVersionEdit(
    3334             :         ve *versionEdit, validateFn func([]byte) error, format base.FormatKey,
    3335           1 : ) error {
    3336           1 :         validateMetaFn := func(f *manifest.FileMetadata) error {
    3337           1 :                 for _, key := range []InternalKey{f.Smallest, f.Largest} {
    3338           1 :                         if err := validateFn(key.UserKey); err != nil {
    3339           0 :                                 return errors.Wrapf(err, "key=%q; file=%s", format(key.UserKey), f)
    3340           0 :                         }
    3341             :                 }
    3342           1 :                 return nil
    3343             :         }
    3344             : 
    3345             :         // Validate both new and deleted files.
    3346           1 :         for _, f := range ve.NewFiles {
    3347           1 :                 if err := validateMetaFn(f.Meta); err != nil {
    3348           0 :                         return err
    3349           0 :                 }
    3350             :         }
    3351           1 :         for _, m := range ve.DeletedFiles {
    3352           1 :                 if err := validateMetaFn(m); err != nil {
    3353           0 :                         return err
    3354           0 :                 }
    3355             :         }
    3356             : 
    3357           1 :         return nil
    3358             : }
    3359             : 
    3360             : // scanObsoleteFiles scans the filesystem for files that are no longer needed
    3361             : // and adds those to the internal lists of obsolete files. Note that the files
    3362             : // are not actually deleted by this method. A subsequent call to
    3363             : // deleteObsoleteFiles must be performed. Must be not be called concurrently
    3364             : // with compactions and flushes. db.mu must be held when calling this function.
    3365           1 : func (d *DB) scanObsoleteFiles(list []string) {
    3366           1 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
    3367           1 :         // flushes from interfering. The original value is restored on completion.
    3368           1 :         disabledPrev := d.opts.DisableAutomaticCompactions
    3369           1 :         defer func() {
    3370           1 :                 d.opts.DisableAutomaticCompactions = disabledPrev
    3371           1 :         }()
    3372           1 :         d.opts.DisableAutomaticCompactions = true
    3373           1 : 
    3374           1 :         // Wait for any ongoing compaction to complete before continuing.
    3375           1 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
    3376           0 :                 d.mu.compact.cond.Wait()
    3377           0 :         }
    3378             : 
    3379           1 :         liveFileNums := make(map[base.DiskFileNum]struct{})
    3380           1 :         d.mu.versions.addLiveFileNums(liveFileNums)
    3381           1 :         // Protect against files which are only referred to by the ingestedFlushable
    3382           1 :         // from being deleted. These are added to the flushable queue on WAL replay
    3383           1 :         // during read only mode and aren't part of the Version. Note that if
    3384           1 :         // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
    3385           1 :         // already been flushed.
    3386           1 :         for _, fEntry := range d.mu.mem.queue {
    3387           1 :                 if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
    3388           0 :                         for _, file := range f.files {
    3389           0 :                                 liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
    3390           0 :                         }
    3391             :                 }
    3392             :         }
    3393             : 
    3394           1 :         manifestFileNum := d.mu.versions.manifestFileNum
    3395           1 : 
    3396           1 :         var obsoleteTables []tableInfo
    3397           1 :         var obsoleteManifests []fileInfo
    3398           1 :         var obsoleteOptions []fileInfo
    3399           1 : 
    3400           1 :         for _, filename := range list {
    3401           1 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
    3402           1 :                 if !ok {
    3403           1 :                         continue
    3404             :                 }
    3405           1 :                 switch fileType {
    3406           1 :                 case fileTypeManifest:
    3407           1 :                         if diskFileNum >= manifestFileNum {
    3408           0 :                                 continue
    3409             :                         }
    3410           1 :                         fi := fileInfo{FileNum: diskFileNum}
    3411           1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
    3412           0 :                                 fi.FileSize = uint64(stat.Size())
    3413           0 :                         }
    3414           1 :                         obsoleteManifests = append(obsoleteManifests, fi)
    3415           1 :                 case fileTypeOptions:
    3416           1 :                         if diskFileNum >= d.optionsFileNum {
    3417           0 :                                 continue
    3418             :                         }
    3419           1 :                         fi := fileInfo{FileNum: diskFileNum}
    3420           1 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
    3421           0 :                                 fi.FileSize = uint64(stat.Size())
    3422           0 :                         }
    3423           1 :                         obsoleteOptions = append(obsoleteOptions, fi)
    3424           1 :                 case fileTypeTable:
    3425             :                         // Objects are handled through the objstorage provider below.
    3426           1 :                 default:
    3427             :                         // Don't delete files we don't know about.
    3428             :                 }
    3429             :         }
    3430             : 
    3431           1 :         objects := d.objProvider.List()
    3432           1 :         for _, obj := range objects {
    3433           1 :                 switch obj.FileType {
    3434           1 :                 case fileTypeTable:
    3435           1 :                         if _, ok := liveFileNums[obj.DiskFileNum]; ok {
    3436           1 :                                 continue
    3437             :                         }
    3438           1 :                         fileInfo := fileInfo{
    3439           1 :                                 FileNum: obj.DiskFileNum,
    3440           1 :                         }
    3441           1 :                         if size, err := d.objProvider.Size(obj); err == nil {
    3442           1 :                                 fileInfo.FileSize = uint64(size)
    3443           1 :                         }
    3444           1 :                         obsoleteTables = append(obsoleteTables, tableInfo{
    3445           1 :                                 fileInfo: fileInfo,
    3446           1 :                                 isLocal:  !obj.IsRemote(),
    3447           1 :                         })
    3448             : 
    3449           0 :                 default:
    3450             :                         // Ignore object types we don't know about.
    3451             :                 }
    3452             :         }
    3453             : 
    3454           1 :         d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
    3455           1 :         d.mu.versions.updateObsoleteTableMetricsLocked()
    3456           1 :         d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
    3457           1 :         d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
    3458             : }
    3459             : 
    3460             : // disableFileDeletions disables file deletions and then waits for any
    3461             : // in-progress deletion to finish. The caller is required to call
    3462             : // enableFileDeletions in order to enable file deletions again. It is ok for
    3463             : // multiple callers to disable file deletions simultaneously, though they must
    3464             : // all invoke enableFileDeletions in order for file deletions to be re-enabled
    3465             : // (there is an internal reference count on file deletion disablement).
    3466             : //
    3467             : // d.mu must be held when calling this method.
    3468           1 : func (d *DB) disableFileDeletions() {
    3469           1 :         d.mu.disableFileDeletions++
    3470           1 :         d.mu.Unlock()
    3471           1 :         defer d.mu.Lock()
    3472           1 :         d.cleanupManager.Wait()
    3473           1 : }
    3474             : 
    3475             : // enableFileDeletions enables previously disabled file deletions. A cleanup job
    3476             : // is queued if necessary.
    3477             : //
    3478             : // d.mu must be held when calling this method.
    3479           1 : func (d *DB) enableFileDeletions() {
    3480           1 :         if d.mu.disableFileDeletions <= 0 {
    3481           0 :                 panic("pebble: file deletion disablement invariant violated")
    3482             :         }
    3483           1 :         d.mu.disableFileDeletions--
    3484           1 :         if d.mu.disableFileDeletions > 0 {
    3485           0 :                 return
    3486           0 :         }
    3487           1 :         d.deleteObsoleteFiles(d.newJobIDLocked())
    3488             : }
    3489             : 
    3490             : type fileInfo = base.FileInfo
    3491             : 
    3492             : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
    3493             : //
    3494             : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
    3495             : //
    3496             : // Does nothing if file deletions are disabled (see disableFileDeletions). A
    3497             : // cleanup job will be scheduled when file deletions are re-enabled.
    3498           1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
    3499           1 :         if d.mu.disableFileDeletions > 0 {
    3500           1 :                 return
    3501           1 :         }
    3502           1 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
    3503           1 : 
    3504           1 :         // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
    3505           1 :         // log that has not had its contents flushed to an sstable.
    3506           1 :         obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
    3507           1 :         if err != nil {
    3508           0 :                 panic(err)
    3509             :         }
    3510             : 
    3511           1 :         obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
    3512           1 :         d.mu.versions.obsoleteTables = nil
    3513           1 : 
    3514           1 :         for _, tbl := range obsoleteTables {
    3515           1 :                 delete(d.mu.versions.zombieTables, tbl.FileNum)
    3516           1 :         }
    3517             : 
    3518             :         // Sort the manifests cause we want to delete some contiguous prefix
    3519             :         // of the older manifests.
    3520           1 :         slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
    3521           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
    3522           1 :         })
    3523             : 
    3524           1 :         var obsoleteManifests []fileInfo
    3525           1 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
    3526           1 :         if manifestsToDelete > 0 {
    3527           1 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
    3528           1 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
    3529           1 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
    3530           0 :                         d.mu.versions.obsoleteManifests = nil
    3531           0 :                 }
    3532             :         }
    3533             : 
    3534           1 :         obsoleteOptions := d.mu.versions.obsoleteOptions
    3535           1 :         d.mu.versions.obsoleteOptions = nil
    3536           1 : 
    3537           1 :         // Release d.mu while preparing the cleanup job and possibly waiting.
    3538           1 :         // Note the unusual order: Unlock and then Lock.
    3539           1 :         d.mu.Unlock()
    3540           1 :         defer d.mu.Lock()
    3541           1 : 
    3542           1 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
    3543           1 :         for _, f := range obsoleteLogs {
    3544           1 :                 filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
    3545           1 :         }
    3546             :         // We sort to make the order of deletions deterministic, which is nice for
    3547             :         // tests.
    3548           1 :         slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
    3549           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
    3550           1 :         })
    3551           1 :         for _, f := range obsoleteTables {
    3552           1 :                 d.tableCache.evict(f.FileNum)
    3553           1 :                 filesToDelete = append(filesToDelete, obsoleteFile{
    3554           1 :                         fileType: fileTypeTable,
    3555           1 :                         nonLogFile: deletableFile{
    3556           1 :                                 dir:      d.dirname,
    3557           1 :                                 fileNum:  f.FileNum,
    3558           1 :                                 fileSize: f.FileSize,
    3559           1 :                                 isLocal:  f.isLocal,
    3560           1 :                         },
    3561           1 :                 })
    3562           1 :         }
    3563           1 :         files := [2]struct {
    3564           1 :                 fileType fileType
    3565           1 :                 obsolete []fileInfo
    3566           1 :         }{
    3567           1 :                 {fileTypeManifest, obsoleteManifests},
    3568           1 :                 {fileTypeOptions, obsoleteOptions},
    3569           1 :         }
    3570           1 :         for _, f := range files {
    3571           1 :                 // We sort to make the order of deletions deterministic, which is nice for
    3572           1 :                 // tests.
    3573           1 :                 slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
    3574           1 :                         return cmp.Compare(a.FileNum, b.FileNum)
    3575           1 :                 })
    3576           1 :                 for _, fi := range f.obsolete {
    3577           1 :                         dir := d.dirname
    3578           1 :                         filesToDelete = append(filesToDelete, obsoleteFile{
    3579           1 :                                 fileType: f.fileType,
    3580           1 :                                 nonLogFile: deletableFile{
    3581           1 :                                         dir:      dir,
    3582           1 :                                         fileNum:  fi.FileNum,
    3583           1 :                                         fileSize: fi.FileSize,
    3584           1 :                                         isLocal:  true,
    3585           1 :                                 },
    3586           1 :                         })
    3587           1 :                 }
    3588             :         }
    3589           1 :         if len(filesToDelete) > 0 {
    3590           1 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
    3591           1 :         }
    3592           1 :         if d.opts.private.testingAlwaysWaitForCleanup {
    3593           0 :                 d.cleanupManager.Wait()
    3594           0 :         }
    3595             : }
    3596             : 
    3597           1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
    3598           1 :         d.mu.Lock()
    3599           1 :         defer d.mu.Unlock()
    3600           1 :         d.maybeScheduleObsoleteTableDeletionLocked()
    3601           1 : }
    3602             : 
    3603           1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
    3604           1 :         if len(d.mu.versions.obsoleteTables) > 0 {
    3605           1 :                 d.deleteObsoleteFiles(d.newJobIDLocked())
    3606           1 :         }
    3607             : }
    3608             : 
    3609           1 : func merge(a, b []fileInfo) []fileInfo {
    3610           1 :         if len(b) == 0 {
    3611           1 :                 return a
    3612           1 :         }
    3613             : 
    3614           1 :         a = append(a, b...)
    3615           1 :         slices.SortFunc(a, func(a, b fileInfo) int {
    3616           1 :                 return cmp.Compare(a.FileNum, b.FileNum)
    3617           1 :         })
    3618           1 :         return slices.CompactFunc(a, func(a, b fileInfo) bool {
    3619           1 :                 return a.FileNum == b.FileNum
    3620           1 :         })
    3621             : }
    3622             : 
    3623           1 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
    3624           1 :         if len(b) == 0 {
    3625           1 :                 return a
    3626           1 :         }
    3627             : 
    3628           1 :         a = append(a, b...)
    3629           1 :         slices.SortFunc(a, func(a, b tableInfo) int {
    3630           0 :                 return cmp.Compare(a.FileNum, b.FileNum)
    3631           0 :         })
    3632           1 :         return slices.CompactFunc(a, func(a, b tableInfo) bool {
    3633           0 :                 return a.FileNum == b.FileNum
    3634           0 :         })
    3635             : }

Generated by: LCOV version 1.14