LCOV - code coverage report
Current view: top level - pebble - compaction.go (source / functions) Hit Total Coverage
Test: 2023-11-29 08:16Z ce7560a8 - tests + meta.lcov Lines: 2534 2717 93.3 %
Date: 2023-11-29 08:17:02 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "fmt"
      12             :         "io"
      13             :         "math"
      14             :         "runtime/pprof"
      15             :         "slices"
      16             :         "sort"
      17             :         "sync/atomic"
      18             :         "time"
      19             : 
      20             :         "github.com/cockroachdb/errors"
      21             :         "github.com/cockroachdb/pebble/internal/base"
      22             :         "github.com/cockroachdb/pebble/internal/invalidating"
      23             :         "github.com/cockroachdb/pebble/internal/invariants"
      24             :         "github.com/cockroachdb/pebble/internal/keyspan"
      25             :         "github.com/cockroachdb/pebble/internal/manifest"
      26             :         "github.com/cockroachdb/pebble/internal/private"
      27             :         "github.com/cockroachdb/pebble/internal/rangedel"
      28             :         "github.com/cockroachdb/pebble/internal/rangekey"
      29             :         "github.com/cockroachdb/pebble/objstorage"
      30             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      31             :         "github.com/cockroachdb/pebble/objstorage/remote"
      32             :         "github.com/cockroachdb/pebble/sstable"
      33             :         "github.com/cockroachdb/pebble/vfs"
      34             : )
      35             : 
      36             : var errEmptyTable = errors.New("pebble: empty table")
      37             : 
      38             : // ErrCancelledCompaction is returned if a compaction is cancelled by a
      39             : // concurrent excise or ingest-split operation.
      40             : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
      41             : 
      42             : var compactLabels = pprof.Labels("pebble", "compact")
      43             : var flushLabels = pprof.Labels("pebble", "flush")
      44             : var gcLabels = pprof.Labels("pebble", "gc")
      45             : 
      46             : // getInternalWriterProperties accesses a private variable (in the
      47             : // internal/private package) initialized by the sstable Writer. This indirection
      48             : // is necessary to ensure non-Pebble users constructing sstables for ingestion
      49             : // are unable to set internal-only properties.
      50             : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
      51             : 
      52             : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
      53             : // compacted files. We avoid expanding the lower level file set of a compaction
      54             : // if it would make the total compaction cover more than this many bytes.
      55           2 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
      56           2 :         v := uint64(25 * opts.Level(level).TargetFileSize)
      57           2 : 
      58           2 :         // Never expand a compaction beyond half the available capacity, divided
      59           2 :         // by the maximum number of concurrent compactions. Each of the concurrent
      60           2 :         // compactions may expand up to this limit, so this attempts to limit
      61           2 :         // compactions to half of available disk space. Note that this will not
      62           2 :         // prevent compaction picking from pursuing compactions that are larger
      63           2 :         // than this threshold before expansion.
      64           2 :         diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
      65           2 :         if v > diskMax {
      66           1 :                 v = diskMax
      67           1 :         }
      68           2 :         return v
      69             : }
      70             : 
      71             : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
      72             : // before we stop building a single file in a level-1 to level compaction.
      73           2 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
      74           2 :         return uint64(10 * opts.Level(level).TargetFileSize)
      75           2 : }
      76             : 
      77             : // maxReadCompactionBytes is used to prevent read compactions which
      78             : // are too wide.
      79           2 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
      80           2 :         return uint64(10 * opts.Level(level).TargetFileSize)
      81           2 : }
      82             : 
      83             : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
      84             : // calls to Close. It is used during compaction to ensure that rangeDelIters
      85             : // are not closed prematurely.
      86             : type noCloseIter struct {
      87             :         keyspan.FragmentIterator
      88             : }
      89             : 
      90           2 : func (i noCloseIter) Close() error {
      91           2 :         return nil
      92           2 : }
      93             : 
      94             : type compactionLevel struct {
      95             :         level int
      96             :         files manifest.LevelSlice
      97             :         // l0SublevelInfo contains information about L0 sublevels being compacted.
      98             :         // It's only set for the start level of a compaction starting out of L0 and
      99             :         // is nil for all other compactions.
     100             :         l0SublevelInfo []sublevelInfo
     101             : }
     102             : 
     103           2 : func (cl compactionLevel) Clone() compactionLevel {
     104           2 :         newCL := compactionLevel{
     105           2 :                 level: cl.level,
     106           2 :                 files: cl.files.Reslice(func(start, end *manifest.LevelIterator) {}),
     107             :         }
     108           2 :         return newCL
     109             : }
     110           1 : func (cl compactionLevel) String() string {
     111           1 :         return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
     112           1 : }
     113             : 
     114             : // Return output from compactionOutputSplitters. See comment on
     115             : // compactionOutputSplitter.shouldSplitBefore() on how this value is used.
     116             : type maybeSplit int
     117             : 
     118             : const (
     119             :         noSplit maybeSplit = iota
     120             :         splitNow
     121             : )
     122             : 
     123             : // String implements the Stringer interface.
     124           1 : func (c maybeSplit) String() string {
     125           1 :         if c == noSplit {
     126           1 :                 return "no-split"
     127           1 :         }
     128           1 :         return "split-now"
     129             : }
     130             : 
     131             : // compactionOutputSplitter is an interface for encapsulating logic around
     132             : // switching the output of a compaction to a new output file. Additional
     133             : // constraints around switching compaction outputs that are specific to that
     134             : // compaction type (eg. flush splits) are implemented in
     135             : // compactionOutputSplitters that compose other child compactionOutputSplitters.
     136             : type compactionOutputSplitter interface {
     137             :         // shouldSplitBefore returns whether we should split outputs before the
     138             :         // specified "current key". The return value is splitNow or noSplit.
     139             :         // splitNow means a split is advised before the specified key, and noSplit
     140             :         // means no split is advised. If shouldSplitBefore(a) advises a split then
     141             :         // shouldSplitBefore(b) should also advise a split given b >= a, until
     142             :         // onNewOutput is called.
     143             :         shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
     144             :         // onNewOutput updates internal splitter state when the compaction switches
     145             :         // to a new sstable, and returns the next limit for the new output which
     146             :         // would get used to truncate range tombstones if the compaction iterator
     147             :         // runs out of keys. The limit returned MUST be > key according to the
     148             :         // compaction's comparator. The specified key is the first key in the new
     149             :         // output, or nil if this sstable will only contain range tombstones already
     150             :         // in the fragmenter.
     151             :         onNewOutput(key []byte) []byte
     152             : }
     153             : 
     154             : // fileSizeSplitter is a compactionOutputSplitter that enforces target file
     155             : // sizes. This splitter splits to a new output file when the estimated file size
     156             : // is 0.5x-2x the target file size. If there are overlapping grandparent files,
     157             : // this splitter will attempt to split at a grandparent boundary. For example,
     158             : // consider the example where a compaction wrote 'd' to the current output file,
     159             : // and the next key has a user key 'g':
     160             : //
     161             : //                                    previous key   next key
     162             : //                                               |           |
     163             : //                                               |           |
     164             : //                               +---------------|----+   +--|----------+
     165             : //                grandparents:  |       000006  |    |   |  | 000007   |
     166             : //                               +---------------|----+   +--|----------+
     167             : //                               a    b          d    e   f  g       i
     168             : //
     169             : // Splitting the output file F before 'g' will ensure that the current output
     170             : // file F does not overlap the grandparent file 000007. Aligning sstable
     171             : // boundaries like this can significantly reduce write amplification, since a
     172             : // subsequent compaction of F into the grandparent level will avoid needlessly
     173             : // rewriting any keys within 000007 that do not overlap F's bounds. Consider the
     174             : // following compaction:
     175             : //
     176             : //                             +----------------------+
     177             : //                input            |                      |
     178             : //                level            +----------------------+
     179             : //                                            \/
     180             : //                         +---------------+       +---------------+
     181             : //                output   |XXXXXXX|       |       |      |XXXXXXXX|
     182             : //                level    +---------------+       +---------------+
     183             : //
     184             : // The input-level file overlaps two files in the output level, but only
     185             : // partially. The beginning of the first output-level file and the end of the
     186             : // second output-level file will be rewritten verbatim. This write I/O is
     187             : // "wasted" in the sense that no merging is being performed.
     188             : //
     189             : // To prevent the above waste, this splitter attempts to split output files
     190             : // before the start key of grandparent files. It still strives to write output
     191             : // files of approximately the target file size, by constraining this splitting
     192             : // at grandparent points to apply only if the current output's file size is
     193             : // about the right order of magnitude.
     194             : //
     195             : // Note that, unlike most other splitters, this splitter does not guarantee that
     196             : // it will advise splits only at user key change boundaries.
     197             : type fileSizeSplitter struct {
     198             :         frontier              frontier
     199             :         targetFileSize        uint64
     200             :         atGrandparentBoundary bool
     201             :         boundariesObserved    uint64
     202             :         nextGrandparent       *fileMetadata
     203             :         grandparents          manifest.LevelIterator
     204             : }
     205             : 
     206             : func newFileSizeSplitter(
     207             :         f *frontiers, targetFileSize uint64, grandparents manifest.LevelIterator,
     208           2 : ) *fileSizeSplitter {
     209           2 :         s := &fileSizeSplitter{targetFileSize: targetFileSize}
     210           2 :         s.nextGrandparent = grandparents.First()
     211           2 :         s.grandparents = grandparents
     212           2 :         if s.nextGrandparent != nil {
     213           2 :                 s.frontier.Init(f, s.nextGrandparent.Smallest.UserKey, s.reached)
     214           2 :         }
     215           2 :         return s
     216             : }
     217             : 
     218           2 : func (f *fileSizeSplitter) reached(nextKey []byte) []byte {
     219           2 :         f.atGrandparentBoundary = true
     220           2 :         f.boundariesObserved++
     221           2 :         // NB: f.grandparents is a bounded iterator, constrained to the compaction
     222           2 :         // key range.
     223           2 :         f.nextGrandparent = f.grandparents.Next()
     224           2 :         if f.nextGrandparent == nil {
     225           2 :                 return nil
     226           2 :         }
     227             :         // TODO(jackson): Should we also split before or immediately after
     228             :         // grandparents' largest keys? Splitting before the start boundary prevents
     229             :         // overlap with the grandparent. Also splitting after the end boundary may
     230             :         // increase the probability of move compactions.
     231           2 :         return f.nextGrandparent.Smallest.UserKey
     232             : }
     233             : 
     234           2 : func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
     235           2 :         atGrandparentBoundary := f.atGrandparentBoundary
     236           2 : 
     237           2 :         // Clear f.atGrandparentBoundary unconditionally.
     238           2 :         //
     239           2 :         // This is a bit subtle. Even if do decide to split, it's possible that a
     240           2 :         // higher-level splitter will ignore our request (eg, because we're between
     241           2 :         // two internal keys with the same user key). In this case, the next call to
     242           2 :         // shouldSplitBefore will find atGrandparentBoundary=false. This is
     243           2 :         // desirable, because in this case we would've already written the earlier
     244           2 :         // key with the same user key to the output file. The current output file is
     245           2 :         // already doomed to overlap the grandparent whose bound triggered
     246           2 :         // atGrandparentBoundary=true. We should continue on, waiting for the next
     247           2 :         // grandparent boundary.
     248           2 :         f.atGrandparentBoundary = false
     249           2 : 
     250           2 :         // If the key is a range tombstone, the EstimatedSize may not grow right
     251           2 :         // away when a range tombstone is added to the fragmenter: It's dependent on
     252           2 :         // whether or not the this new range deletion will start a new fragment.
     253           2 :         // Range deletions are rare, so we choose to simply not split yet.
     254           2 :         // TODO(jackson): Reconsider this, and consider range keys too as a part of
     255           2 :         // #2321.
     256           2 :         if key.Kind() == InternalKeyKindRangeDelete || tw == nil {
     257           2 :                 return noSplit
     258           2 :         }
     259             : 
     260           2 :         estSize := tw.EstimatedSize()
     261           2 :         switch {
     262           2 :         case estSize < f.targetFileSize/2:
     263           2 :                 // The estimated file size is less than half the target file size. Don't
     264           2 :                 // split it, even if currently aligned with a grandparent file because
     265           2 :                 // it's too small.
     266           2 :                 return noSplit
     267           2 :         case estSize >= 2*f.targetFileSize:
     268           2 :                 // The estimated file size is double the target file size. Split it even
     269           2 :                 // if we were not aligned with a grandparent file boundary to avoid
     270           2 :                 // excessively exceeding the target file size.
     271           2 :                 return splitNow
     272           2 :         case !atGrandparentBoundary:
     273           2 :                 // Don't split if we're not at a grandparent, except if we've exhausted
     274           2 :                 // all the grandparents overlapping this compaction's key range. Then we
     275           2 :                 // may want to split purely based on file size.
     276           2 :                 if f.nextGrandparent == nil {
     277           2 :                         // There are no more grandparents. Optimize for the target file size
     278           2 :                         // and split as soon as we hit the target file size.
     279           2 :                         if estSize >= f.targetFileSize {
     280           2 :                                 return splitNow
     281           2 :                         }
     282             :                 }
     283           2 :                 return noSplit
     284           2 :         default:
     285           2 :                 // INVARIANT: atGrandparentBoundary
     286           2 :                 // INVARIANT: targetSize/2 < estSize < 2*targetSize
     287           2 :                 //
     288           2 :                 // The estimated file size is close enough to the target file size that
     289           2 :                 // we should consider splitting.
     290           2 :                 //
     291           2 :                 // Determine whether to split now based on how many grandparent
     292           2 :                 // boundaries we have already observed while building this output file.
     293           2 :                 // The intuition here is that if the grandparent level is dense in this
     294           2 :                 // part of the keyspace, we're likely to continue to have more
     295           2 :                 // opportunities to split this file aligned with a grandparent. If this
     296           2 :                 // is the first grandparent boundary observed, we split immediately
     297           2 :                 // (we're already at ≥50% the target file size). Otherwise, each
     298           2 :                 // overlapping grandparent we've observed increases the minimum file
     299           2 :                 // size by 5% of the target file size, up to at most 90% of the target
     300           2 :                 // file size.
     301           2 :                 //
     302           2 :                 // TODO(jackson): The particular thresholds are somewhat unprincipled.
     303           2 :                 // This is the same heuristic as RocksDB implements. Is there are more
     304           2 :                 // principled formulation that can, further reduce w-amp, produce files
     305           2 :                 // closer to the target file size, or is more understandable?
     306           2 : 
     307           2 :                 // NB: Subtract 1 from `boundariesObserved` to account for the current
     308           2 :                 // boundary we're considering splitting at. `reached` will have
     309           2 :                 // incremented it at the same time it set `atGrandparentBoundary`.
     310           2 :                 minimumPctOfTargetSize := 50 + 5*min(f.boundariesObserved-1, 8)
     311           2 :                 if estSize < (minimumPctOfTargetSize*f.targetFileSize)/100 {
     312           2 :                         return noSplit
     313           2 :                 }
     314           2 :                 return splitNow
     315             :         }
     316             : }
     317             : 
     318           2 : func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
     319           2 :         f.boundariesObserved = 0
     320           2 :         return nil
     321           2 : }
     322             : 
     323           2 : func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
     324           2 :         s := &limitFuncSplitter{limitFunc: limitFunc}
     325           2 :         s.frontier.Init(f, nil, s.reached)
     326           2 :         return s
     327           2 : }
     328             : 
     329             : type limitFuncSplitter struct {
     330             :         frontier  frontier
     331             :         limitFunc func(userKey []byte) []byte
     332             :         split     maybeSplit
     333             : }
     334             : 
     335           2 : func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
     336           2 :         return lf.split
     337           2 : }
     338             : 
     339           2 : func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
     340           2 :         lf.split = splitNow
     341           2 :         return nil
     342           2 : }
     343             : 
     344           2 : func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
     345           2 :         lf.split = noSplit
     346           2 :         if key != nil {
     347           2 :                 // TODO(jackson): For some users, like L0 flush splits, there's no need
     348           2 :                 // to binary search over all the flush splits every time. The next split
     349           2 :                 // point must be ahead of the previous flush split point.
     350           2 :                 limit := lf.limitFunc(key)
     351           2 :                 lf.frontier.Update(limit)
     352           2 :                 return limit
     353           2 :         }
     354           2 :         lf.frontier.Update(nil)
     355           2 :         return nil
     356             : }
     357             : 
     358             : // splitterGroup is a compactionOutputSplitter that splits whenever one of its
     359             : // child splitters advises a compaction split.
     360             : type splitterGroup struct {
     361             :         cmp       Compare
     362             :         splitters []compactionOutputSplitter
     363             : }
     364             : 
     365             : func (a *splitterGroup) shouldSplitBefore(
     366             :         key *InternalKey, tw *sstable.Writer,
     367           2 : ) (suggestion maybeSplit) {
     368           2 :         for _, splitter := range a.splitters {
     369           2 :                 if splitter.shouldSplitBefore(key, tw) == splitNow {
     370           2 :                         return splitNow
     371           2 :                 }
     372             :         }
     373           2 :         return noSplit
     374             : }
     375             : 
     376           2 : func (a *splitterGroup) onNewOutput(key []byte) []byte {
     377           2 :         var earliestLimit []byte
     378           2 :         for _, splitter := range a.splitters {
     379           2 :                 limit := splitter.onNewOutput(key)
     380           2 :                 if limit == nil {
     381           2 :                         continue
     382             :                 }
     383           2 :                 if earliestLimit == nil || a.cmp(limit, earliestLimit) < 0 {
     384           2 :                         earliestLimit = limit
     385           2 :                 }
     386             :         }
     387           2 :         return earliestLimit
     388             : }
     389             : 
     390             : // userKeyChangeSplitter is a compactionOutputSplitter that takes in a child
     391             : // splitter, and splits when 1) that child splitter has advised a split, and 2)
     392             : // the compaction output is at the boundary between two user keys (also
     393             : // the boundary between atomic compaction units). Use this splitter to wrap
     394             : // any splitters that don't guarantee user key splits (i.e. splitters that make
     395             : // their determination in ways other than comparing the current key against a
     396             : // limit key.) If a wrapped splitter advises a split, it must continue
     397             : // to advise a split until a new output.
     398             : type userKeyChangeSplitter struct {
     399             :         cmp               Compare
     400             :         splitter          compactionOutputSplitter
     401             :         unsafePrevUserKey func() []byte
     402             : }
     403             : 
     404           2 : func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
     405           2 :         // NB: The userKeyChangeSplitter only needs to suffer a key comparison if
     406           2 :         // the wrapped splitter requests a split.
     407           2 :         //
     408           2 :         // We could implement this splitter using frontiers: When the inner splitter
     409           2 :         // requests a split before key `k`, we'd update a frontier to be
     410           2 :         // ImmediateSuccessor(k). Then on the next key greater than >k, the
     411           2 :         // frontier's `reached` func would be called and we'd return splitNow.
     412           2 :         // This doesn't really save work since duplicate user keys are rare, and it
     413           2 :         // requires us to materialize the ImmediateSuccessor key. It also prevents
     414           2 :         // us from splitting on the same key that the inner splitter requested a
     415           2 :         // split for—instead we need to wait until the next key. The current
     416           2 :         // implementation uses `unsafePrevUserKey` to gain access to the previous
     417           2 :         // key which allows it to immediately respect the inner splitter if
     418           2 :         // possible.
     419           2 :         if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
     420           2 :                 return split
     421           2 :         }
     422           2 :         if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 {
     423           2 :                 return splitNow
     424           2 :         }
     425           2 :         return noSplit
     426             : }
     427             : 
     428           2 : func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
     429           2 :         return u.splitter.onNewOutput(key)
     430           2 : }
     431             : 
     432             : // compactionWritable is a objstorage.Writable wrapper that, on every write,
     433             : // updates a metric in `versions` on bytes written by in-progress compactions so
     434             : // far. It also increments a per-compaction `written` int.
     435             : type compactionWritable struct {
     436             :         objstorage.Writable
     437             : 
     438             :         versions *versionSet
     439             :         written  *int64
     440             : }
     441             : 
     442             : // Write is part of the objstorage.Writable interface.
     443           2 : func (c *compactionWritable) Write(p []byte) error {
     444           2 :         if err := c.Writable.Write(p); err != nil {
     445           0 :                 return err
     446           0 :         }
     447             : 
     448           2 :         *c.written += int64(len(p))
     449           2 :         c.versions.incrementCompactionBytes(int64(len(p)))
     450           2 :         return nil
     451             : }
     452             : 
     453             : type compactionKind int
     454             : 
     455             : const (
     456             :         compactionKindDefault compactionKind = iota
     457             :         compactionKindFlush
     458             :         // compactionKindMove denotes a move compaction where the input file is
     459             :         // retained and linked in a new level without being obsoleted.
     460             :         compactionKindMove
     461             :         // compactionKindCopy denotes a copy compaction where the input file is
     462             :         // copied byte-by-byte into a new file with a new FileNum in the output level.
     463             :         compactionKindCopy
     464             :         compactionKindDeleteOnly
     465             :         compactionKindElisionOnly
     466             :         compactionKindRead
     467             :         compactionKindRewrite
     468             :         compactionKindIngestedFlushable
     469             : )
     470             : 
     471           2 : func (k compactionKind) String() string {
     472           2 :         switch k {
     473           2 :         case compactionKindDefault:
     474           2 :                 return "default"
     475           0 :         case compactionKindFlush:
     476           0 :                 return "flush"
     477           2 :         case compactionKindMove:
     478           2 :                 return "move"
     479           2 :         case compactionKindDeleteOnly:
     480           2 :                 return "delete-only"
     481           2 :         case compactionKindElisionOnly:
     482           2 :                 return "elision-only"
     483           1 :         case compactionKindRead:
     484           1 :                 return "read"
     485           1 :         case compactionKindRewrite:
     486           1 :                 return "rewrite"
     487           0 :         case compactionKindIngestedFlushable:
     488           0 :                 return "ingested-flushable"
     489           2 :         case compactionKindCopy:
     490           2 :                 return "copy"
     491             :         }
     492           0 :         return "?"
     493             : }
     494             : 
     495             : // rangeKeyCompactionTransform is used to transform range key spans as part of the
     496             : // keyspan.MergingIter. As part of this transformation step, we can elide range
     497             : // keys in the last snapshot stripe, as well as coalesce range keys within
     498             : // snapshot stripes.
     499             : func rangeKeyCompactionTransform(
     500             :         eq base.Equal, snapshots []uint64, elideRangeKey func(start, end []byte) bool,
     501           2 : ) keyspan.Transformer {
     502           2 :         return keyspan.TransformerFunc(func(cmp base.Compare, s keyspan.Span, dst *keyspan.Span) error {
     503           2 :                 elideInLastStripe := func(keys []keyspan.Key) []keyspan.Key {
     504           2 :                         // Unsets and deletes in the last snapshot stripe can be elided.
     505           2 :                         k := 0
     506           2 :                         for j := range keys {
     507           2 :                                 if elideRangeKey(s.Start, s.End) &&
     508           2 :                                         (keys[j].Kind() == InternalKeyKindRangeKeyUnset || keys[j].Kind() == InternalKeyKindRangeKeyDelete) {
     509           2 :                                         continue
     510             :                                 }
     511           2 :                                 keys[k] = keys[j]
     512           2 :                                 k++
     513             :                         }
     514           2 :                         keys = keys[:k]
     515           2 :                         return keys
     516             :                 }
     517             :                 // snapshots are in ascending order, while s.keys are in descending seqnum
     518             :                 // order. Partition s.keys by snapshot stripes, and call rangekey.Coalesce
     519             :                 // on each partition.
     520           2 :                 dst.Start = s.Start
     521           2 :                 dst.End = s.End
     522           2 :                 dst.Keys = dst.Keys[:0]
     523           2 :                 i, j := len(snapshots)-1, 0
     524           2 :                 usedLen := 0
     525           2 :                 for i >= 0 {
     526           2 :                         start := j
     527           2 :                         for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) {
     528           2 :                                 // Include j in current partition.
     529           2 :                                 j++
     530           2 :                         }
     531           2 :                         if j > start {
     532           2 :                                 keysDst := dst.Keys[usedLen:cap(dst.Keys)]
     533           2 :                                 if err := rangekey.Coalesce(cmp, eq, s.Keys[start:j], &keysDst); err != nil {
     534           0 :                                         return err
     535           0 :                                 }
     536           2 :                                 if j == len(s.Keys) {
     537           2 :                                         // This is the last snapshot stripe. Unsets and deletes can be elided.
     538           2 :                                         keysDst = elideInLastStripe(keysDst)
     539           2 :                                 }
     540           2 :                                 usedLen += len(keysDst)
     541           2 :                                 dst.Keys = append(dst.Keys, keysDst...)
     542             :                         }
     543           2 :                         i--
     544             :                 }
     545           2 :                 if j < len(s.Keys) {
     546           2 :                         keysDst := dst.Keys[usedLen:cap(dst.Keys)]
     547           2 :                         if err := rangekey.Coalesce(cmp, eq, s.Keys[j:], &keysDst); err != nil {
     548           0 :                                 return err
     549           0 :                         }
     550           2 :                         keysDst = elideInLastStripe(keysDst)
     551           2 :                         usedLen += len(keysDst)
     552           2 :                         dst.Keys = append(dst.Keys, keysDst...)
     553             :                 }
     554           2 :                 return nil
     555             :         })
     556             : }
     557             : 
     558             : // compaction is a table compaction from one level to the next, starting from a
     559             : // given version.
     560             : type compaction struct {
     561             :         // cancel is a bool that can be used by other goroutines to signal a compaction
     562             :         // to cancel, such as if a conflicting excise operation raced it to manifest
     563             :         // application. Only holders of the manifest lock will write to this atomic.
     564             :         cancel atomic.Bool
     565             : 
     566             :         kind      compactionKind
     567             :         cmp       Compare
     568             :         equal     Equal
     569             :         comparer  *base.Comparer
     570             :         formatKey base.FormatKey
     571             :         logger    Logger
     572             :         version   *version
     573             :         stats     base.InternalIteratorStats
     574             :         beganAt   time.Time
     575             :         // versionEditApplied is set to true when a compaction has completed and the
     576             :         // resulting version has been installed (if successful), but the compaction
     577             :         // goroutine is still cleaning up (eg, deleting obsolete files).
     578             :         versionEditApplied bool
     579             :         bufferPool         sstable.BufferPool
     580             : 
     581             :         // startLevel is the level that is being compacted. Inputs from startLevel
     582             :         // and outputLevel will be merged to produce a set of outputLevel files.
     583             :         startLevel *compactionLevel
     584             : 
     585             :         // outputLevel is the level that files are being produced in. outputLevel is
     586             :         // equal to startLevel+1 except when:
     587             :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     588             :         //    - in multilevel compaction, the output level is the lowest level involved in
     589             :         //      the compaction
     590             :         // A compaction's outputLevel is nil for delete-only compactions.
     591             :         outputLevel *compactionLevel
     592             : 
     593             :         // extraLevels point to additional levels in between the input and output
     594             :         // levels that get compacted in multilevel compactions
     595             :         extraLevels []*compactionLevel
     596             : 
     597             :         inputs []compactionLevel
     598             : 
     599             :         // maxOutputFileSize is the maximum size of an individual table created
     600             :         // during compaction.
     601             :         maxOutputFileSize uint64
     602             :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     603             :         // single output table with the tables in the grandparent level.
     604             :         maxOverlapBytes uint64
     605             :         // disableSpanElision disables elision of range tombstones and range keys. Used
     606             :         // by tests to allow range tombstones or range keys to be added to tables where
     607             :         // they would otherwise be elided.
     608             :         disableSpanElision bool
     609             : 
     610             :         // flushing contains the flushables (aka memtables) that are being flushed.
     611             :         flushing flushableList
     612             :         // bytesIterated contains the number of bytes that have been flushed/compacted.
     613             :         bytesIterated uint64
     614             :         // bytesWritten contains the number of bytes that have been written to outputs.
     615             :         bytesWritten int64
     616             : 
     617             :         // The boundaries of the input data.
     618             :         smallest InternalKey
     619             :         largest  InternalKey
     620             : 
     621             :         // The range deletion tombstone fragmenter. Adds range tombstones as they are
     622             :         // returned from `compactionIter` and fragments them for output to files.
     623             :         // Referenced by `compactionIter` which uses it to check whether keys are deleted.
     624             :         rangeDelFrag keyspan.Fragmenter
     625             :         // The range key fragmenter. Similar to rangeDelFrag in that it gets range
     626             :         // keys from the compaction iter and fragments them for output to files.
     627             :         rangeKeyFrag keyspan.Fragmenter
     628             :         // The range deletion tombstone iterator, that merges and fragments
     629             :         // tombstones across levels. This iterator is included within the compaction
     630             :         // input iterator as a single level.
     631             :         // TODO(jackson): Remove this when the refactor of FragmentIterator,
     632             :         // InterleavingIterator, etc is complete.
     633             :         rangeDelIter keyspan.InternalIteratorShim
     634             :         // rangeKeyInterleaving is the interleaving iter for range keys.
     635             :         rangeKeyInterleaving keyspan.InterleavingIter
     636             : 
     637             :         // A list of objects to close when the compaction finishes. Used by input
     638             :         // iteration to keep rangeDelIters open for the lifetime of the compaction,
     639             :         // and only close them when the compaction finishes.
     640             :         closers []io.Closer
     641             : 
     642             :         // grandparents are the tables in level+2 that overlap with the files being
     643             :         // compacted. Used to determine output table boundaries. Do not assume that the actual files
     644             :         // in the grandparent when this compaction finishes will be the same.
     645             :         grandparents manifest.LevelSlice
     646             : 
     647             :         // Boundaries at which flushes to L0 should be split. Determined by
     648             :         // L0Sublevels. If nil, flushes aren't split.
     649             :         l0Limits [][]byte
     650             : 
     651             :         // List of disjoint inuse key ranges the compaction overlaps with in
     652             :         // grandparent and lower levels. See setupInuseKeyRanges() for the
     653             :         // construction. Used by elideTombstone() and elideRangeTombstone() to
     654             :         // determine if keys affected by a tombstone possibly exist at a lower level.
     655             :         inuseKeyRanges []manifest.UserKeyRange
     656             :         // inuseEntireRange is set if the above inuse key ranges wholly contain the
     657             :         // compaction's key range. This allows compactions in higher levels to often
     658             :         // elide key comparisons.
     659             :         inuseEntireRange    bool
     660             :         elideTombstoneIndex int
     661             : 
     662             :         // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
     663             :         // snapshots requiring them to be kept. This determination is made by
     664             :         // looking for an sstable which overlaps the bounds of the compaction at a
     665             :         // lower level in the LSM during runCompaction.
     666             :         allowedZeroSeqNum bool
     667             : 
     668             :         metrics map[int]*LevelMetrics
     669             : 
     670             :         pickerMetrics compactionPickerMetrics
     671             : }
     672             : 
     673           2 : func (c *compaction) makeInfo(jobID int) CompactionInfo {
     674           2 :         info := CompactionInfo{
     675           2 :                 JobID:       jobID,
     676           2 :                 Reason:      c.kind.String(),
     677           2 :                 Input:       make([]LevelInfo, 0, len(c.inputs)),
     678           2 :                 Annotations: []string{},
     679           2 :         }
     680           2 :         for _, cl := range c.inputs {
     681           2 :                 inputInfo := LevelInfo{Level: cl.level, Tables: nil}
     682           2 :                 iter := cl.files.Iter()
     683           2 :                 for m := iter.First(); m != nil; m = iter.Next() {
     684           2 :                         inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
     685           2 :                 }
     686           2 :                 info.Input = append(info.Input, inputInfo)
     687             :         }
     688           2 :         if c.outputLevel != nil {
     689           2 :                 info.Output.Level = c.outputLevel.level
     690           2 : 
     691           2 :                 // If there are no inputs from the output level (eg, a move
     692           2 :                 // compaction), add an empty LevelInfo to info.Input.
     693           2 :                 if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
     694           0 :                         info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
     695           0 :                 }
     696           2 :         } else {
     697           2 :                 // For a delete-only compaction, set the output level to L6. The
     698           2 :                 // output level is not meaningful here, but complicating the
     699           2 :                 // info.Output interface with a pointer doesn't seem worth the
     700           2 :                 // semantic distinction.
     701           2 :                 info.Output.Level = numLevels - 1
     702           2 :         }
     703             : 
     704           2 :         for i, score := range c.pickerMetrics.scores {
     705           2 :                 info.Input[i].Score = score
     706           2 :         }
     707           2 :         info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
     708           2 :         info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
     709           2 :         if len(info.Input) > 2 {
     710           2 :                 info.Annotations = append(info.Annotations, "multilevel")
     711           2 :         }
     712           2 :         return info
     713             : }
     714             : 
     715             : func newCompaction(
     716             :         pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
     717           2 : ) *compaction {
     718           2 :         c := &compaction{
     719           2 :                 kind:              compactionKindDefault,
     720           2 :                 cmp:               pc.cmp,
     721           2 :                 equal:             opts.equal(),
     722           2 :                 comparer:          opts.Comparer,
     723           2 :                 formatKey:         opts.Comparer.FormatKey,
     724           2 :                 inputs:            pc.inputs,
     725           2 :                 smallest:          pc.smallest,
     726           2 :                 largest:           pc.largest,
     727           2 :                 logger:            opts.Logger,
     728           2 :                 version:           pc.version,
     729           2 :                 beganAt:           beganAt,
     730           2 :                 maxOutputFileSize: pc.maxOutputFileSize,
     731           2 :                 maxOverlapBytes:   pc.maxOverlapBytes,
     732           2 :                 pickerMetrics:     pc.pickerMetrics,
     733           2 :         }
     734           2 :         c.startLevel = &c.inputs[0]
     735           2 :         if pc.startLevel.l0SublevelInfo != nil {
     736           2 :                 c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
     737           2 :         }
     738           2 :         c.outputLevel = &c.inputs[1]
     739           2 : 
     740           2 :         if len(pc.extraLevels) > 0 {
     741           2 :                 c.extraLevels = pc.extraLevels
     742           2 :                 c.outputLevel = &c.inputs[len(c.inputs)-1]
     743           2 :         }
     744             :         // Compute the set of outputLevel+1 files that overlap this compaction (these
     745             :         // are the grandparent sstables).
     746           2 :         if c.outputLevel.level+1 < numLevels {
     747           2 :                 c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.cmp,
     748           2 :                         c.smallest.UserKey, c.largest.UserKey, c.largest.IsExclusiveSentinel())
     749           2 :         }
     750           2 :         c.setupInuseKeyRanges()
     751           2 :         c.kind = pc.kind
     752           2 : 
     753           2 :         if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
     754           2 :                 c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
     755           2 :                 // This compaction can be converted into a move or copy from one level
     756           2 :                 // to the next. We avoid such a move if there is lots of overlapping
     757           2 :                 // grandparent data. Otherwise, the move could create a parent file
     758           2 :                 // that will require a very expensive merge later on.
     759           2 :                 iter := c.startLevel.files.Iter()
     760           2 :                 meta := iter.First()
     761           2 :                 isRemote := false
     762           2 :                 // We should always be passed a provider, except in some unit tests.
     763           2 :                 if provider != nil {
     764           2 :                         objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
     765           2 :                         if err != nil {
     766           0 :                                 panic(errors.Wrapf(err, "cannot lookup table %s in provider", meta.FileBacking.DiskFileNum))
     767             :                         }
     768           2 :                         isRemote = objMeta.IsRemote()
     769             :                 }
     770             :                 // Avoid a trivial move or copy if all of these are true, as rewriting a
     771             :                 // new file is better:
     772             :                 //
     773             :                 // 1) The source file is a virtual sstable
     774             :                 // 2) The existing file `meta` is on non-remote storage
     775             :                 // 3) The output level prefers shared storage
     776           2 :                 mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     777           2 :                 if mustCopy {
     778           2 :                         // If the source is virtual, it's best to just rewrite the file as all
     779           2 :                         // conditions in the above comment are met.
     780           2 :                         if !meta.Virtual {
     781           2 :                                 c.kind = compactionKindCopy
     782           2 :                         }
     783           2 :                 } else {
     784           2 :                         c.kind = compactionKindMove
     785           2 :                 }
     786             :         }
     787           2 :         return c
     788             : }
     789             : 
     790             : func newDeleteOnlyCompaction(
     791             :         opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
     792           2 : ) *compaction {
     793           2 :         c := &compaction{
     794           2 :                 kind:      compactionKindDeleteOnly,
     795           2 :                 cmp:       opts.Comparer.Compare,
     796           2 :                 equal:     opts.equal(),
     797           2 :                 comparer:  opts.Comparer,
     798           2 :                 formatKey: opts.Comparer.FormatKey,
     799           2 :                 logger:    opts.Logger,
     800           2 :                 version:   cur,
     801           2 :                 beganAt:   beganAt,
     802           2 :                 inputs:    inputs,
     803           2 :         }
     804           2 : 
     805           2 :         // Set c.smallest, c.largest.
     806           2 :         files := make([]manifest.LevelIterator, 0, len(inputs))
     807           2 :         for _, in := range inputs {
     808           2 :                 files = append(files, in.files.Iter())
     809           2 :         }
     810           2 :         c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
     811           2 :         return c
     812             : }
     813             : 
     814           2 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
     815           2 :         // Heuristic to place a lower bound on compaction output file size
     816           2 :         // caused by Lbase. Prior to this heuristic we have observed an L0 in
     817           2 :         // production with 310K files of which 290K files were < 10KB in size.
     818           2 :         // Our hypothesis is that it was caused by L1 having 2600 files and
     819           2 :         // ~10GB, such that each flush got split into many tiny files due to
     820           2 :         // overlapping with most of the files in Lbase.
     821           2 :         //
     822           2 :         // The computation below is general in that it accounts
     823           2 :         // for flushing different volumes of data (e.g. we may be flushing
     824           2 :         // many memtables). For illustration, we consider the typical
     825           2 :         // example of flushing a 64MB memtable. So 12.8MB output,
     826           2 :         // based on the compression guess below. If the compressed bytes
     827           2 :         // guess is an over-estimate we will end up with smaller files,
     828           2 :         // and if an under-estimate we will end up with larger files.
     829           2 :         // With a 2MB target file size, 7 files. We are willing to accept
     830           2 :         // 4x the number of files, if it results in better write amplification
     831           2 :         // when later compacting to Lbase, i.e., ~450KB files (target file
     832           2 :         // size / 4).
     833           2 :         //
     834           2 :         // Note that this is a pessimistic heuristic in that
     835           2 :         // fileCountUpperBoundDueToGrandparents could be far from the actual
     836           2 :         // number of files produced due to the grandparent limits. For
     837           2 :         // example, in the extreme, consider a flush that overlaps with 1000
     838           2 :         // files in Lbase f0...f999, and the initially calculated value of
     839           2 :         // maxOverlapBytes will cause splits at f10, f20,..., f990, which
     840           2 :         // means an upper bound file count of 100 files. Say the input bytes
     841           2 :         // in the flush are such that acceptableFileCount=10. We will fatten
     842           2 :         // up maxOverlapBytes by 10x to ensure that the upper bound file count
     843           2 :         // drops to 10. However, it is possible that in practice, even without
     844           2 :         // this change, we would have produced no more than 10 files, and that
     845           2 :         // this change makes the files unnecessarily wide. Say the input bytes
     846           2 :         // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
     847           2 :         // 10% in f80...f89 and 10% in f990...f999. The original value of
     848           2 :         // maxOverlapBytes would have actually produced only 10 sstables. But
     849           2 :         // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
     850           2 :         // spans f0...f89, i.e., a much wider sstable than necessary.
     851           2 :         //
     852           2 :         // We could produce a tighter estimate of
     853           2 :         // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
     854           2 :         // distribution of the flush. The 4x multiplier mentioned earlier is
     855           2 :         // a way to try to compensate for this pessimism.
     856           2 :         //
     857           2 :         // TODO(sumeer): we don't have compression info for the data being
     858           2 :         // flushed, but it is likely that existing files that overlap with
     859           2 :         // this flush in Lbase are representative wrt compression ratio. We
     860           2 :         // could store the uncompressed size in FileMetadata and estimate
     861           2 :         // the compression ratio.
     862           2 :         const approxCompressionRatio = 0.2
     863           2 :         approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
     864           2 :         approxNumFilesBasedOnTargetSize :=
     865           2 :                 int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
     866           2 :         acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
     867           2 :         // The byte calculation is linear in numGrandparentFiles, but we will
     868           2 :         // incur this linear cost in findGrandparentLimit too, so we are also
     869           2 :         // willing to pay it now. We could approximate this cheaply by using
     870           2 :         // the mean file size of Lbase.
     871           2 :         grandparentFileBytes := c.grandparents.SizeSum()
     872           2 :         fileCountUpperBoundDueToGrandparents :=
     873           2 :                 float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
     874           2 :         if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
     875           2 :                 c.maxOverlapBytes = uint64(
     876           2 :                         float64(c.maxOverlapBytes) *
     877           2 :                                 (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
     878           2 :         }
     879             : }
     880             : 
     881             : func newFlush(
     882             :         opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
     883           2 : ) *compaction {
     884           2 :         c := &compaction{
     885           2 :                 kind:              compactionKindFlush,
     886           2 :                 cmp:               opts.Comparer.Compare,
     887           2 :                 equal:             opts.equal(),
     888           2 :                 comparer:          opts.Comparer,
     889           2 :                 formatKey:         opts.Comparer.FormatKey,
     890           2 :                 logger:            opts.Logger,
     891           2 :                 version:           cur,
     892           2 :                 beganAt:           beganAt,
     893           2 :                 inputs:            []compactionLevel{{level: -1}, {level: 0}},
     894           2 :                 maxOutputFileSize: math.MaxUint64,
     895           2 :                 maxOverlapBytes:   math.MaxUint64,
     896           2 :                 flushing:          flushing,
     897           2 :         }
     898           2 :         c.startLevel = &c.inputs[0]
     899           2 :         c.outputLevel = &c.inputs[1]
     900           2 : 
     901           2 :         if len(flushing) > 0 {
     902           2 :                 if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
     903           2 :                         if len(flushing) != 1 {
     904           0 :                                 panic("pebble: ingestedFlushable must be flushed one at a time.")
     905             :                         }
     906           2 :                         c.kind = compactionKindIngestedFlushable
     907           2 :                         return c
     908             :                 }
     909             :         }
     910             : 
     911             :         // Make sure there's no ingestedFlushable after the first flushable in the
     912             :         // list.
     913           2 :         for _, f := range flushing {
     914           2 :                 if _, ok := f.flushable.(*ingestedFlushable); ok {
     915           0 :                         panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
     916             :                 }
     917             :         }
     918             : 
     919           2 :         if cur.L0Sublevels != nil {
     920           2 :                 c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
     921           2 :         }
     922             : 
     923           2 :         smallestSet, largestSet := false, false
     924           2 :         updatePointBounds := func(iter internalIterator) {
     925           2 :                 if key, _ := iter.First(); key != nil {
     926           2 :                         if !smallestSet ||
     927           2 :                                 base.InternalCompare(c.cmp, c.smallest, *key) > 0 {
     928           2 :                                 smallestSet = true
     929           2 :                                 c.smallest = key.Clone()
     930           2 :                         }
     931             :                 }
     932           2 :                 if key, _ := iter.Last(); key != nil {
     933           2 :                         if !largestSet ||
     934           2 :                                 base.InternalCompare(c.cmp, c.largest, *key) < 0 {
     935           2 :                                 largestSet = true
     936           2 :                                 c.largest = key.Clone()
     937           2 :                         }
     938             :                 }
     939             :         }
     940             : 
     941           2 :         updateRangeBounds := func(iter keyspan.FragmentIterator) {
     942           2 :                 // File bounds require s != nil && !s.Empty(). We only need to check for
     943           2 :                 // s != nil here, as the memtable's FragmentIterator would never surface
     944           2 :                 // empty spans.
     945           2 :                 if s := iter.First(); s != nil {
     946           2 :                         if key := s.SmallestKey(); !smallestSet ||
     947           2 :                                 base.InternalCompare(c.cmp, c.smallest, key) > 0 {
     948           2 :                                 smallestSet = true
     949           2 :                                 c.smallest = key.Clone()
     950           2 :                         }
     951             :                 }
     952           2 :                 if s := iter.Last(); s != nil {
     953           2 :                         if key := s.LargestKey(); !largestSet ||
     954           2 :                                 base.InternalCompare(c.cmp, c.largest, key) < 0 {
     955           2 :                                 largestSet = true
     956           2 :                                 c.largest = key.Clone()
     957           2 :                         }
     958             :                 }
     959             :         }
     960             : 
     961           2 :         var flushingBytes uint64
     962           2 :         for i := range flushing {
     963           2 :                 f := flushing[i]
     964           2 :                 updatePointBounds(f.newIter(nil))
     965           2 :                 if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
     966           2 :                         updateRangeBounds(rangeDelIter)
     967           2 :                 }
     968           2 :                 if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     969           2 :                         updateRangeBounds(rangeKeyIter)
     970           2 :                 }
     971           2 :                 flushingBytes += f.inuseBytes()
     972             :         }
     973             : 
     974           2 :         if opts.FlushSplitBytes > 0 {
     975           2 :                 c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
     976           2 :                 c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
     977           2 :                 c.grandparents = c.version.Overlaps(baseLevel, c.cmp, c.smallest.UserKey,
     978           2 :                         c.largest.UserKey, c.largest.IsExclusiveSentinel())
     979           2 :                 adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
     980           2 :         }
     981             : 
     982           2 :         c.setupInuseKeyRanges()
     983           2 :         return c
     984             : }
     985             : 
     986           2 : func (c *compaction) hasExtraLevelData() bool {
     987           2 :         if len(c.extraLevels) == 0 {
     988           2 :                 // not a multi level compaction
     989           2 :                 return false
     990           2 :         } else if c.extraLevels[0].files.Empty() {
     991           2 :                 // a multi level compaction without data in the intermediate input level;
     992           2 :                 // e.g. for a multi level compaction with levels 4,5, and 6, this could
     993           2 :                 // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
     994           2 :                 return false
     995           2 :         }
     996           2 :         return true
     997             : }
     998             : 
     999           2 : func (c *compaction) setupInuseKeyRanges() {
    1000           2 :         level := c.outputLevel.level + 1
    1001           2 :         if c.outputLevel.level == 0 {
    1002           2 :                 level = 0
    1003           2 :         }
    1004             :         // calculateInuseKeyRanges will return a series of sorted spans. Overlapping
    1005             :         // or abutting spans have already been merged.
    1006           2 :         c.inuseKeyRanges = calculateInuseKeyRanges(
    1007           2 :                 c.version, c.cmp, level, numLevels-1, c.smallest.UserKey, c.largest.UserKey,
    1008           2 :         )
    1009           2 :         // Check if there's a single in-use span that encompasses the entire key
    1010           2 :         // range of the compaction. This is an optimization to avoid key comparisons
    1011           2 :         // against inuseKeyRanges during the compaction when every key within the
    1012           2 :         // compaction overlaps with an in-use span.
    1013           2 :         if len(c.inuseKeyRanges) > 0 {
    1014           2 :                 c.inuseEntireRange = c.cmp(c.inuseKeyRanges[0].Start, c.smallest.UserKey) <= 0 &&
    1015           2 :                         c.cmp(c.inuseKeyRanges[0].End, c.largest.UserKey) >= 0
    1016           2 :         }
    1017             : }
    1018             : 
    1019             : func calculateInuseKeyRanges(
    1020             :         v *version, cmp base.Compare, level, maxLevel int, smallest, largest []byte,
    1021           2 : ) []manifest.UserKeyRange {
    1022           2 :         // Use two slices, alternating which one is input and which one is output
    1023           2 :         // as we descend the LSM.
    1024           2 :         var input, output []manifest.UserKeyRange
    1025           2 : 
    1026           2 :         // L0 requires special treatment, since sstables within L0 may overlap.
    1027           2 :         // We use the L0 Sublevels structure to efficiently calculate the merged
    1028           2 :         // in-use key ranges.
    1029           2 :         if level == 0 {
    1030           2 :                 output = v.L0Sublevels.InUseKeyRanges(smallest, largest)
    1031           2 :                 level++
    1032           2 :         }
    1033             : 
    1034           2 :         for ; level <= maxLevel; level++ {
    1035           2 :                 // NB: We always treat `largest` as inclusive for simplicity, because
    1036           2 :                 // there's little consequence to calculating slightly broader in-use key
    1037           2 :                 // ranges.
    1038           2 :                 overlaps := v.Overlaps(level, cmp, smallest, largest, false /* exclusiveEnd */)
    1039           2 :                 iter := overlaps.Iter()
    1040           2 : 
    1041           2 :                 // We may already have in-use key ranges from higher levels. Iterate
    1042           2 :                 // through both our accumulated in-use key ranges and this level's
    1043           2 :                 // files, merging the two.
    1044           2 :                 //
    1045           2 :                 // Tables higher within the LSM have broader key spaces. We use this
    1046           2 :                 // when possible to seek past a level's files that are contained by
    1047           2 :                 // our current accumulated in-use key ranges. This helps avoid
    1048           2 :                 // per-sstable work during flushes or compactions in high levels which
    1049           2 :                 // overlap the majority of the LSM's sstables.
    1050           2 :                 input, output = output, input
    1051           2 :                 output = output[:0]
    1052           2 : 
    1053           2 :                 var currFile *fileMetadata
    1054           2 :                 var currAccum *manifest.UserKeyRange
    1055           2 :                 if len(input) > 0 {
    1056           2 :                         currAccum, input = &input[0], input[1:]
    1057           2 :                 }
    1058             : 
    1059             :                 // If we have an accumulated key range and its start is ≤ smallest,
    1060             :                 // we can seek to the accumulated range's end. Otherwise, we need to
    1061             :                 // start at the first overlapping file within the level.
    1062           2 :                 if currAccum != nil && cmp(currAccum.Start, smallest) <= 0 {
    1063           2 :                         currFile = seekGT(&iter, cmp, currAccum.End)
    1064           2 :                 } else {
    1065           2 :                         currFile = iter.First()
    1066           2 :                 }
    1067             : 
    1068           2 :                 for currFile != nil || currAccum != nil {
    1069           2 :                         // If we've exhausted either the files in the level or the
    1070           2 :                         // accumulated key ranges, we just need to append the one we have.
    1071           2 :                         // If we have both a currFile and a currAccum, they either overlap
    1072           2 :                         // or they're disjoint. If they're disjoint, we append whichever
    1073           2 :                         // one sorts first and move on to the next file or range. If they
    1074           2 :                         // overlap, we merge them into currAccum and proceed to the next
    1075           2 :                         // file.
    1076           2 :                         switch {
    1077           2 :                         case currAccum == nil || (currFile != nil && cmp(currFile.Largest.UserKey, currAccum.Start) < 0):
    1078           2 :                                 // This file is strictly before the current accumulated range,
    1079           2 :                                 // or there are no more accumulated ranges.
    1080           2 :                                 output = append(output, manifest.UserKeyRange{
    1081           2 :                                         Start: currFile.Smallest.UserKey,
    1082           2 :                                         End:   currFile.Largest.UserKey,
    1083           2 :                                 })
    1084           2 :                                 currFile = iter.Next()
    1085           2 :                         case currFile == nil || (currAccum != nil && cmp(currAccum.End, currFile.Smallest.UserKey) < 0):
    1086           2 :                                 // The current accumulated key range is strictly before the
    1087           2 :                                 // current file, or there are no more files.
    1088           2 :                                 output = append(output, *currAccum)
    1089           2 :                                 currAccum = nil
    1090           2 :                                 if len(input) > 0 {
    1091           2 :                                         currAccum, input = &input[0], input[1:]
    1092           2 :                                 }
    1093           2 :                         default:
    1094           2 :                                 // The current accumulated range and the current file overlap.
    1095           2 :                                 // Adjust the accumulated range to be the union.
    1096           2 :                                 if cmp(currFile.Smallest.UserKey, currAccum.Start) < 0 {
    1097           2 :                                         currAccum.Start = currFile.Smallest.UserKey
    1098           2 :                                 }
    1099           2 :                                 if cmp(currFile.Largest.UserKey, currAccum.End) > 0 {
    1100           2 :                                         currAccum.End = currFile.Largest.UserKey
    1101           2 :                                 }
    1102             : 
    1103             :                                 // Extending `currAccum`'s end boundary may have caused it to
    1104             :                                 // overlap with `input` key ranges that we haven't processed
    1105             :                                 // yet. Merge any such key ranges.
    1106           2 :                                 for len(input) > 0 && cmp(input[0].Start, currAccum.End) <= 0 {
    1107           2 :                                         if cmp(input[0].End, currAccum.End) > 0 {
    1108           2 :                                                 currAccum.End = input[0].End
    1109           2 :                                         }
    1110           2 :                                         input = input[1:]
    1111             :                                 }
    1112             :                                 // Seek the level iterator past our current accumulated end.
    1113           2 :                                 currFile = seekGT(&iter, cmp, currAccum.End)
    1114             :                         }
    1115             :                 }
    1116             :         }
    1117           2 :         return output
    1118             : }
    1119             : 
    1120           2 : func seekGT(iter *manifest.LevelIterator, cmp base.Compare, key []byte) *manifest.FileMetadata {
    1121           2 :         f := iter.SeekGE(cmp, key)
    1122           2 :         for f != nil && cmp(f.Largest.UserKey, key) == 0 {
    1123           2 :                 f = iter.Next()
    1124           2 :         }
    1125           2 :         return f
    1126             : }
    1127             : 
    1128             : // findGrandparentLimit takes the start user key for a table and returns the
    1129             : // user key to which that table can extend without excessively overlapping
    1130             : // the grandparent level. If no limit is needed considering the grandparent
    1131             : // files, this function returns nil. This is done in order to prevent a table
    1132             : // at level N from overlapping too much data at level N+1. We want to avoid
    1133             : // such large overlaps because they translate into large compactions. The
    1134             : // current heuristic stops output of a table if the addition of another key
    1135             : // would cause the table to overlap more than 10x the target file size at
    1136             : // level N. See maxGrandparentOverlapBytes.
    1137           2 : func (c *compaction) findGrandparentLimit(start []byte) []byte {
    1138           2 :         iter := c.grandparents.Iter()
    1139           2 :         var overlappedBytes uint64
    1140           2 :         var greater bool
    1141           2 :         for f := iter.SeekGE(c.cmp, start); f != nil; f = iter.Next() {
    1142           2 :                 overlappedBytes += f.Size
    1143           2 :                 // To ensure forward progress we always return a larger user
    1144           2 :                 // key than where we started. See comments above clients of
    1145           2 :                 // this function for how this is used.
    1146           2 :                 greater = greater || c.cmp(f.Smallest.UserKey, start) > 0
    1147           2 :                 if !greater {
    1148           2 :                         continue
    1149             :                 }
    1150             : 
    1151             :                 // We return the smallest bound of a sstable rather than the
    1152             :                 // largest because the smallest is always inclusive, and limits
    1153             :                 // are used exlusively when truncating range tombstones. If we
    1154             :                 // truncated an output to the largest key while there's a
    1155             :                 // pending tombstone, the next output file would also overlap
    1156             :                 // the same grandparent f.
    1157           2 :                 if overlappedBytes > c.maxOverlapBytes {
    1158           2 :                         return f.Smallest.UserKey
    1159           2 :                 }
    1160             :         }
    1161           2 :         return nil
    1162             : }
    1163             : 
    1164             : // findL0Limit takes the start key for a table and returns the user key to which
    1165             : // that table can be extended without hitting the next l0Limit. Having flushed
    1166             : // sstables "bridging across" an l0Limit could lead to increased L0 -> LBase
    1167             : // compaction sizes as well as elevated read amplification.
    1168           2 : func (c *compaction) findL0Limit(start []byte) []byte {
    1169           2 :         if c.startLevel.level > -1 || c.outputLevel.level != 0 || len(c.l0Limits) == 0 {
    1170           2 :                 return nil
    1171           2 :         }
    1172           2 :         index := sort.Search(len(c.l0Limits), func(i int) bool {
    1173           2 :                 return c.cmp(c.l0Limits[i], start) > 0
    1174           2 :         })
    1175           2 :         if index < len(c.l0Limits) {
    1176           2 :                 return c.l0Limits[index]
    1177           2 :         }
    1178           2 :         return nil
    1179             : }
    1180             : 
    1181             : // errorOnUserKeyOverlap returns an error if the last two written sstables in
    1182             : // this compaction have revisions of the same user key present in both sstables,
    1183             : // when it shouldn't (eg. when splitting flushes).
    1184           2 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
    1185           2 :         if n := len(ve.NewFiles); n > 1 {
    1186           2 :                 meta := ve.NewFiles[n-1].Meta
    1187           2 :                 prevMeta := ve.NewFiles[n-2].Meta
    1188           2 :                 if !prevMeta.Largest.IsExclusiveSentinel() &&
    1189           2 :                         c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
    1190           1 :                         return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
    1191           1 :                                 prevMeta.Largest.Pretty(c.formatKey),
    1192           1 :                                 prevMeta.FileNum,
    1193           1 :                                 meta.FileNum)
    1194           1 :                 }
    1195             :         }
    1196           2 :         return nil
    1197             : }
    1198             : 
    1199             : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
    1200             : // snapshots requiring them to be kept. It performs this determination by
    1201             : // looking for an sstable which overlaps the bounds of the compaction at a
    1202             : // lower level in the LSM.
    1203           2 : func (c *compaction) allowZeroSeqNum() bool {
    1204           2 :         return c.elideRangeTombstone(c.smallest.UserKey, c.largest.UserKey)
    1205           2 : }
    1206             : 
    1207             : // elideTombstone returns true if it is ok to elide a tombstone for the
    1208             : // specified key. A return value of true guarantees that there are no key/value
    1209             : // pairs at c.level+2 or higher that possibly contain the specified user
    1210             : // key. The keys in multiple invocations to elideTombstone must be supplied in
    1211             : // order.
    1212           2 : func (c *compaction) elideTombstone(key []byte) bool {
    1213           2 :         if c.inuseEntireRange || len(c.flushing) != 0 {
    1214           2 :                 return false
    1215           2 :         }
    1216             : 
    1217           2 :         for ; c.elideTombstoneIndex < len(c.inuseKeyRanges); c.elideTombstoneIndex++ {
    1218           2 :                 r := &c.inuseKeyRanges[c.elideTombstoneIndex]
    1219           2 :                 if c.cmp(key, r.End) <= 0 {
    1220           2 :                         if c.cmp(key, r.Start) >= 0 {
    1221           2 :                                 return false
    1222           2 :                         }
    1223           2 :                         break
    1224             :                 }
    1225             :         }
    1226           2 :         return true
    1227             : }
    1228             : 
    1229             : // elideRangeTombstone returns true if it is ok to elide the specified range
    1230             : // tombstone. A return value of true guarantees that there are no key/value
    1231             : // pairs at c.outputLevel.level+1 or higher that possibly overlap the specified
    1232             : // tombstone.
    1233           2 : func (c *compaction) elideRangeTombstone(start, end []byte) bool {
    1234           2 :         // Disable range tombstone elision if the testing knob for that is enabled,
    1235           2 :         // or if we are flushing memtables. The latter requirement is due to
    1236           2 :         // inuseKeyRanges not accounting for key ranges in other memtables that are
    1237           2 :         // being flushed in the same compaction. It's possible for a range tombstone
    1238           2 :         // in one memtable to overlap keys in a preceding memtable in c.flushing.
    1239           2 :         //
    1240           2 :         // This function is also used in setting allowZeroSeqNum, so disabling
    1241           2 :         // elision of range tombstones also disables zeroing of SeqNums.
    1242           2 :         //
    1243           2 :         // TODO(peter): we disable zeroing of seqnums during flushing to match
    1244           2 :         // RocksDB behavior and to avoid generating overlapping sstables during
    1245           2 :         // DB.replayWAL. When replaying WAL files at startup, we flush after each
    1246           2 :         // WAL is replayed building up a single version edit that is
    1247           2 :         // applied. Because we don't apply the version edit after each flush, this
    1248           2 :         // code doesn't know that L0 contains files and zeroing of seqnums should
    1249           2 :         // be disabled. That is fixable, but it seems safer to just match the
    1250           2 :         // RocksDB behavior for now.
    1251           2 :         if c.disableSpanElision || len(c.flushing) != 0 {
    1252           2 :                 return false
    1253           2 :         }
    1254             : 
    1255           2 :         lower := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
    1256           2 :                 return c.cmp(c.inuseKeyRanges[i].End, start) >= 0
    1257           2 :         })
    1258           2 :         upper := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
    1259           2 :                 return c.cmp(c.inuseKeyRanges[i].Start, end) > 0
    1260           2 :         })
    1261           2 :         return lower >= upper
    1262             : }
    1263             : 
    1264             : // elideRangeKey returns true if it is ok to elide the specified range key. A
    1265             : // return value of true guarantees that there are no key/value pairs at
    1266             : // c.outputLevel.level+1 or higher that possibly overlap the specified range key.
    1267           2 : func (c *compaction) elideRangeKey(start, end []byte) bool {
    1268           2 :         // TODO(bilal): Track inuseKeyRanges separately for the range keyspace as
    1269           2 :         // opposed to the point keyspace. Once that is done, elideRangeTombstone
    1270           2 :         // can just check in the point keyspace, and this function can check for
    1271           2 :         // inuseKeyRanges in the range keyspace.
    1272           2 :         return c.elideRangeTombstone(start, end)
    1273           2 : }
    1274             : 
    1275             : // newInputIter returns an iterator over all the input tables in a compaction.
    1276             : func (c *compaction) newInputIter(
    1277             :         newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, snapshots []uint64,
    1278           2 : ) (_ internalIterator, retErr error) {
    1279           2 :         // Validate the ordering of compaction input files for defense in depth.
    1280           2 :         // TODO(jackson): Some of the CheckOrdering calls may be adapted to pass
    1281           2 :         // ProhibitSplitUserKeys if we thread the active format major version in. Or
    1282           2 :         // if we remove support for earlier FMVs, we can remove the parameter
    1283           2 :         // altogether.
    1284           2 :         if len(c.flushing) == 0 {
    1285           2 :                 if c.startLevel.level >= 0 {
    1286           2 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
    1287           2 :                                 manifest.Level(c.startLevel.level), c.startLevel.files.Iter(),
    1288           2 :                                 manifest.AllowSplitUserKeys)
    1289           2 :                         if err != nil {
    1290           1 :                                 return nil, err
    1291           1 :                         }
    1292             :                 }
    1293           2 :                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
    1294           2 :                         manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter(),
    1295           2 :                         manifest.AllowSplitUserKeys)
    1296           2 :                 if err != nil {
    1297           1 :                         return nil, err
    1298           1 :                 }
    1299           2 :                 if c.startLevel.level == 0 {
    1300           2 :                         if c.startLevel.l0SublevelInfo == nil {
    1301           0 :                                 panic("l0SublevelInfo not created for compaction out of L0")
    1302             :                         }
    1303           2 :                         for _, info := range c.startLevel.l0SublevelInfo {
    1304           2 :                                 err := manifest.CheckOrdering(c.cmp, c.formatKey,
    1305           2 :                                         info.sublevel, info.Iter(),
    1306           2 :                                         // NB: L0 sublevels have never allowed split user keys.
    1307           2 :                                         manifest.ProhibitSplitUserKeys)
    1308           2 :                                 if err != nil {
    1309           1 :                                         return nil, err
    1310           1 :                                 }
    1311             :                         }
    1312             :                 }
    1313           2 :                 if len(c.extraLevels) > 0 {
    1314           2 :                         if len(c.extraLevels) > 1 {
    1315           0 :                                 panic("n>2 multi level compaction not implemented yet")
    1316             :                         }
    1317           2 :                         interLevel := c.extraLevels[0]
    1318           2 :                         err := manifest.CheckOrdering(c.cmp, c.formatKey,
    1319           2 :                                 manifest.Level(interLevel.level), interLevel.files.Iter(),
    1320           2 :                                 manifest.AllowSplitUserKeys)
    1321           2 :                         if err != nil {
    1322           0 :                                 return nil, err
    1323           0 :                         }
    1324             :                 }
    1325             :         }
    1326             : 
    1327             :         // There are three classes of keys that a compaction needs to process: point
    1328             :         // keys, range deletion tombstones and range keys. Collect all iterators for
    1329             :         // all these classes of keys from all the levels. We'll aggregate them
    1330             :         // together farther below.
    1331             :         //
    1332             :         // numInputLevels is an approximation of the number of iterator levels. Due
    1333             :         // to idiosyncrasies in iterator construction, we may (rarely) exceed this
    1334             :         // initial capacity.
    1335           2 :         numInputLevels := max(len(c.flushing), len(c.inputs))
    1336           2 :         iters := make([]internalIterator, 0, numInputLevels)
    1337           2 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
    1338           2 :         rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
    1339           2 : 
    1340           2 :         // If construction of the iterator inputs fails, ensure that we close all
    1341           2 :         // the consitutent iterators.
    1342           2 :         defer func() {
    1343           2 :                 if retErr != nil {
    1344           0 :                         for _, iter := range iters {
    1345           0 :                                 if iter != nil {
    1346           0 :                                         iter.Close()
    1347           0 :                                 }
    1348             :                         }
    1349           0 :                         for _, rangeDelIter := range rangeDelIters {
    1350           0 :                                 rangeDelIter.Close()
    1351           0 :                         }
    1352             :                 }
    1353             :         }()
    1354           2 :         iterOpts := IterOptions{
    1355           2 :                 CategoryAndQoS: sstable.CategoryAndQoS{
    1356           2 :                         Category: "pebble-compaction",
    1357           2 :                         QoSLevel: sstable.NonLatencySensitiveQoSLevel,
    1358           2 :                 },
    1359           2 :                 logger: c.logger,
    1360           2 :         }
    1361           2 : 
    1362           2 :         // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
    1363           2 :         // constituent iterators. This depends on whether this is a flush or a
    1364           2 :         // compaction.
    1365           2 :         if len(c.flushing) != 0 {
    1366           2 :                 // If flushing, we need to build the input iterators over the memtables
    1367           2 :                 // stored in c.flushing.
    1368           2 :                 for i := range c.flushing {
    1369           2 :                         f := c.flushing[i]
    1370           2 :                         iters = append(iters, f.newFlushIter(nil, &c.bytesIterated))
    1371           2 :                         rangeDelIter := f.newRangeDelIter(nil)
    1372           2 :                         if rangeDelIter != nil {
    1373           2 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
    1374           2 :                         }
    1375           2 :                         if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
    1376           2 :                                 rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
    1377           2 :                         }
    1378             :                 }
    1379           2 :         } else {
    1380           2 :                 addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
    1381           2 :                         // Add a *levelIter for point iterators. Because we don't call
    1382           2 :                         // initRangeDel, the levelIter will close and forget the range
    1383           2 :                         // deletion iterator when it steps on to a new file. Surfacing range
    1384           2 :                         // deletions to compactions are handled below.
    1385           2 :                         iters = append(iters, newLevelIter(context.Background(),
    1386           2 :                                 iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
    1387           2 :                                         bytesIterated: &c.bytesIterated,
    1388           2 :                                         bufferPool:    &c.bufferPool,
    1389           2 :                                 }))
    1390           2 :                         // TODO(jackson): Use keyspan.LevelIter to avoid loading all the range
    1391           2 :                         // deletions into memory upfront. (See #2015, which reverted this.)
    1392           2 :                         // There will be no user keys that are split between sstables
    1393           2 :                         // within a level in Cockroach 23.1, which unblocks this optimization.
    1394           2 : 
    1395           2 :                         // Add the range deletion iterator for each file as an independent level
    1396           2 :                         // in mergingIter, as opposed to making a levelIter out of those. This
    1397           2 :                         // is safer as levelIter expects all keys coming from underlying
    1398           2 :                         // iterators to be in order. Due to compaction / tombstone writing
    1399           2 :                         // logic in finishOutput(), it is possible for range tombstones to not
    1400           2 :                         // be strictly ordered across all files in one level.
    1401           2 :                         //
    1402           2 :                         // Consider this example from the metamorphic tests (also repeated in
    1403           2 :                         // finishOutput()), consisting of three L3 files with their bounds
    1404           2 :                         // specified in square brackets next to the file name:
    1405           2 :                         //
    1406           2 :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
    1407           2 :                         // tmgc#391,MERGE [786e627a]
    1408           2 :                         // tmgc-udkatvs#331,RANGEDEL
    1409           2 :                         //
    1410           2 :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
    1411           2 :                         // tmgc#384,MERGE [666c7070]
    1412           2 :                         // tmgc-tvsalezade#383,RANGEDEL
    1413           2 :                         // tmgc-tvsalezade#331,RANGEDEL
    1414           2 :                         //
    1415           2 :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
    1416           2 :                         // tmgc-tvsalezade#383,RANGEDEL
    1417           2 :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
    1418           2 :                         // tmgc-tvsalezade#356,RANGEDEL
    1419           2 :                         //
    1420           2 :                         // Here, the range tombstone in 000240.sst falls "after" one in
    1421           2 :                         // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
    1422           2 :                         // levelIter's purposes. While each file is still consistent before its
    1423           2 :                         // bounds, it's safer to have all rangedel iterators be visible to
    1424           2 :                         // mergingIter.
    1425           2 :                         iter := level.files.Iter()
    1426           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1427           2 :                                 rangeDelIter, closer, err := c.newRangeDelIter(
    1428           2 :                                         newIters, iter.Take(), iterOpts, l, &c.bytesIterated)
    1429           2 :                                 if err != nil {
    1430           0 :                                         // The error will already be annotated with the BackingFileNum, so
    1431           0 :                                         // we annotate it with the FileNum.
    1432           0 :                                         return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
    1433           0 :                                 }
    1434           2 :                                 if rangeDelIter == nil {
    1435           2 :                                         continue
    1436             :                                 }
    1437           2 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
    1438           2 :                                 c.closers = append(c.closers, closer)
    1439             :                         }
    1440             : 
    1441             :                         // Check if this level has any range keys.
    1442           2 :                         hasRangeKeys := false
    1443           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1444           2 :                                 if f.HasRangeKeys {
    1445           2 :                                         hasRangeKeys = true
    1446           2 :                                         break
    1447             :                                 }
    1448             :                         }
    1449           2 :                         if hasRangeKeys {
    1450           2 :                                 li := &keyspan.LevelIter{}
    1451           2 :                                 newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
    1452           2 :                                         iter, err := newRangeKeyIter(file, iterOptions)
    1453           2 :                                         if err != nil {
    1454           0 :                                                 return nil, err
    1455           2 :                                         } else if iter == nil {
    1456           0 :                                                 return emptyKeyspanIter, nil
    1457           0 :                                         }
    1458             :                                         // Ensure that the range key iter is not closed until the compaction is
    1459             :                                         // finished. This is necessary because range key processing
    1460             :                                         // requires the range keys to be held in memory for up to the
    1461             :                                         // lifetime of the compaction.
    1462           2 :                                         c.closers = append(c.closers, iter)
    1463           2 :                                         iter = noCloseIter{iter}
    1464           2 : 
    1465           2 :                                         // We do not need to truncate range keys to sstable boundaries, or
    1466           2 :                                         // only read within the file's atomic compaction units, unlike with
    1467           2 :                                         // range tombstones. This is because range keys were added after we
    1468           2 :                                         // stopped splitting user keys across sstables, so all the range keys
    1469           2 :                                         // in this sstable must wholly lie within the file's bounds.
    1470           2 :                                         return iter, err
    1471             :                                 }
    1472           2 :                                 li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
    1473           2 :                                 rangeKeyIters = append(rangeKeyIters, li)
    1474             :                         }
    1475           2 :                         return nil
    1476             :                 }
    1477             : 
    1478           2 :                 for i := range c.inputs {
    1479           2 :                         // If the level is annotated with l0SublevelInfo, expand it into one
    1480           2 :                         // level per sublevel.
    1481           2 :                         // TODO(jackson): Perform this expansion even earlier when we pick the
    1482           2 :                         // compaction?
    1483           2 :                         if len(c.inputs[i].l0SublevelInfo) > 0 {
    1484           2 :                                 for _, info := range c.startLevel.l0SublevelInfo {
    1485           2 :                                         sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
    1486           2 :                                         if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
    1487           0 :                                                 return nil, err
    1488           0 :                                         }
    1489             :                                 }
    1490           2 :                                 continue
    1491             :                         }
    1492           2 :                         if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
    1493           0 :                                 return nil, err
    1494           0 :                         }
    1495             :                 }
    1496             :         }
    1497             : 
    1498             :         // In normal operation, levelIter iterates over the point operations in a
    1499             :         // level, and initializes a rangeDelIter pointer for the range deletions in
    1500             :         // each table. During compaction, we want to iterate over the merged view of
    1501             :         // point operations and range deletions. In order to do this we create one
    1502             :         // levelIter per level to iterate over the point operations, and collect up
    1503             :         // all the range deletion files.
    1504             :         //
    1505             :         // The range deletion levels are first combined with a keyspan.MergingIter
    1506             :         // (currently wrapped by a keyspan.InternalIteratorShim to satisfy the
    1507             :         // internal iterator interface). The resulting merged rangedel iterator is
    1508             :         // then included with the point levels in a single mergingIter.
    1509             :         //
    1510             :         // Combine all the rangedel iterators using a keyspan.MergingIterator and a
    1511             :         // InternalIteratorShim so that the range deletions may be interleaved in
    1512             :         // the compaction input.
    1513             :         // TODO(jackson): Replace the InternalIteratorShim with an interleaving
    1514             :         // iterator.
    1515           2 :         if len(rangeDelIters) > 0 {
    1516           2 :                 c.rangeDelIter.Init(c.cmp, rangeDelIters...)
    1517           2 :                 iters = append(iters, &c.rangeDelIter)
    1518           2 :         }
    1519             : 
    1520             :         // If there's only one constituent point iterator, we can avoid the overhead
    1521             :         // of a *mergingIter. This is possible, for example, when performing a flush
    1522             :         // of a single memtable. Otherwise, combine all the iterators into a merging
    1523             :         // iter.
    1524           2 :         iter := iters[0]
    1525           2 :         if len(iters) > 0 {
    1526           2 :                 iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
    1527           2 :         }
    1528             :         // If there are range key iterators, we need to combine them using
    1529             :         // keyspan.MergingIter, and then interleave them among the points.
    1530           2 :         if len(rangeKeyIters) > 0 {
    1531           2 :                 mi := &keyspan.MergingIter{}
    1532           2 :                 mi.Init(c.cmp, rangeKeyCompactionTransform(c.equal, snapshots, c.elideRangeKey), new(keyspan.MergingBuffers), rangeKeyIters...)
    1533           2 :                 di := &keyspan.DefragmentingIter{}
    1534           2 :                 di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
    1535           2 :                 c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{})
    1536           2 :                 iter = &c.rangeKeyInterleaving
    1537           2 :         }
    1538           2 :         return iter, nil
    1539             : }
    1540             : 
    1541             : func (c *compaction) newRangeDelIter(
    1542             :         newIters tableNewIters,
    1543             :         f manifest.LevelFile,
    1544             :         opts IterOptions,
    1545             :         l manifest.Level,
    1546             :         bytesIterated *uint64,
    1547           2 : ) (keyspan.FragmentIterator, io.Closer, error) {
    1548           2 :         opts.level = l
    1549           2 :         iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
    1550           2 :                 &opts, internalIterOpts{
    1551           2 :                         bytesIterated: &c.bytesIterated,
    1552           2 :                         bufferPool:    &c.bufferPool,
    1553           2 :                 })
    1554           2 :         if err != nil {
    1555           0 :                 return nil, nil, err
    1556           0 :         }
    1557             :         // TODO(peter): It is mildly wasteful to open the point iterator only to
    1558             :         // immediately close it. One way to solve this would be to add new
    1559             :         // methods to tableCache for creating point and range-deletion iterators
    1560             :         // independently. We'd only want to use those methods here,
    1561             :         // though. Doesn't seem worth the hassle in the near term.
    1562           2 :         if err = iter.Close(); err != nil {
    1563           0 :                 if rangeDelIter != nil {
    1564           0 :                         err = errors.CombineErrors(err, rangeDelIter.Close())
    1565           0 :                 }
    1566           0 :                 return nil, nil, err
    1567             :         }
    1568           2 :         if rangeDelIter == nil {
    1569           2 :                 // The file doesn't contain any range deletions.
    1570           2 :                 return nil, nil, nil
    1571           2 :         }
    1572             : 
    1573             :         // Ensure that rangeDelIter is not closed until the compaction is
    1574             :         // finished. This is necessary because range tombstone processing
    1575             :         // requires the range tombstones to be held in memory for up to the
    1576             :         // lifetime of the compaction.
    1577           2 :         closer := rangeDelIter
    1578           2 :         rangeDelIter = noCloseIter{rangeDelIter}
    1579           2 : 
    1580           2 :         // Truncate the range tombstones returned by the iterator to the
    1581           2 :         // upper bound of the atomic compaction unit of the file. We want to
    1582           2 :         // truncate the range tombstone to the bounds of the file, but files
    1583           2 :         // with split user keys pose an obstacle: The file's largest bound
    1584           2 :         // is inclusive whereas the range tombstone's end is exclusive.
    1585           2 :         //
    1586           2 :         // Consider the example:
    1587           2 :         //
    1588           2 :         //   000001:[b-f#200]         range del [c,k)
    1589           2 :         //   000002:[f#190-g#inf]     range del [c,k)
    1590           2 :         //   000003:[g#500-i#3]
    1591           2 :         //
    1592           2 :         // Files 000001 and 000002 contain the untruncated range tombstones
    1593           2 :         // [c,k). While the keyspace covered by 000003 was at one point
    1594           2 :         // deleted by the tombstone [c,k), the tombstone may have already
    1595           2 :         // been compacted away and the file does not contain an untruncated
    1596           2 :         // range tombstone. We want to bound 000001's tombstone to the file
    1597           2 :         // bounds, but it's not possible to encode a range tombstone with an
    1598           2 :         // end boundary within a user key (eg, between sequence numbers
    1599           2 :         // f#200 and f#190). Instead, we expand 000001 to its atomic
    1600           2 :         // compaction unit (000001 and 000002) and truncate the tombstone to
    1601           2 :         // g#inf.
    1602           2 :         //
    1603           2 :         // NB: We must not use the atomic compaction unit of the entire
    1604           2 :         // compaction, because the [c,k) tombstone contained in the file
    1605           2 :         // 000001 ≥ g. If 000001, 000002 and 000003 are all included in the
    1606           2 :         // same compaction, the compaction's atomic compaction unit includes
    1607           2 :         // 000003. However 000003's keys must not be covered by 000001's
    1608           2 :         // untruncated range tombstone.
    1609           2 :         //
    1610           2 :         // Note that we need do this truncation at read time in order to
    1611           2 :         // handle sstables generated by RocksDB and earlier versions of
    1612           2 :         // Pebble which do not truncate range tombstones to atomic
    1613           2 :         // compaction unit boundaries at write time.
    1614           2 :         //
    1615           2 :         // The current Pebble compaction logic DOES truncate tombstones to
    1616           2 :         // atomic unit boundaries at compaction time too.
    1617           2 :         atomicUnit, _ := expandToAtomicUnit(c.cmp, f.Slice(), true /* disableIsCompacting */)
    1618           2 :         lowerBound, upperBound := manifest.KeyRange(c.cmp, atomicUnit.Iter())
    1619           2 :         // Range deletion tombstones are often written to sstables
    1620           2 :         // untruncated on the end key side. However, they are still only
    1621           2 :         // valid within a given file's bounds. The logic for writing range
    1622           2 :         // tombstones to an output file sometimes has an incomplete view
    1623           2 :         // of range tombstones outside the file's internal key bounds. Skip
    1624           2 :         // any range tombstones completely outside file bounds.
    1625           2 :         rangeDelIter = keyspan.Truncate(
    1626           2 :                 c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey,
    1627           2 :                 &f.Smallest, &f.Largest, false, /* panicOnUpperTruncate */
    1628           2 :         )
    1629           2 :         return rangeDelIter, closer, nil
    1630             : }
    1631             : 
    1632           1 : func (c *compaction) String() string {
    1633           1 :         if len(c.flushing) != 0 {
    1634           0 :                 return "flush\n"
    1635           0 :         }
    1636             : 
    1637           1 :         var buf bytes.Buffer
    1638           1 :         for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
    1639           1 :                 i := level - c.startLevel.level
    1640           1 :                 fmt.Fprintf(&buf, "%d:", level)
    1641           1 :                 iter := c.inputs[i].files.Iter()
    1642           1 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1643           1 :                         fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
    1644           1 :                 }
    1645           1 :                 fmt.Fprintf(&buf, "\n")
    1646             :         }
    1647           1 :         return buf.String()
    1648             : }
    1649             : 
    1650             : type manualCompaction struct {
    1651             :         // Count of the retries either due to too many concurrent compactions, or a
    1652             :         // concurrent compaction to overlapping levels.
    1653             :         retries     int
    1654             :         level       int
    1655             :         outputLevel int
    1656             :         done        chan error
    1657             :         start       []byte
    1658             :         end         []byte
    1659             :         split       bool
    1660             : }
    1661             : 
    1662             : type readCompaction struct {
    1663             :         level int
    1664             :         // [start, end] key ranges are used for de-duping.
    1665             :         start []byte
    1666             :         end   []byte
    1667             : 
    1668             :         // The file associated with the compaction.
    1669             :         // If the file no longer belongs in the same
    1670             :         // level, then we skip the compaction.
    1671             :         fileNum base.FileNum
    1672             : }
    1673             : 
    1674           2 : func (d *DB) addInProgressCompaction(c *compaction) {
    1675           2 :         d.mu.compact.inProgress[c] = struct{}{}
    1676           2 :         var isBase, isIntraL0 bool
    1677           2 :         for _, cl := range c.inputs {
    1678           2 :                 iter := cl.files.Iter()
    1679           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1680           2 :                         if f.IsCompacting() {
    1681           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1682           0 :                         }
    1683           2 :                         f.SetCompactionState(manifest.CompactionStateCompacting)
    1684           2 :                         if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
    1685           2 :                                 if c.outputLevel.level == 0 {
    1686           2 :                                         f.IsIntraL0Compacting = true
    1687           2 :                                         isIntraL0 = true
    1688           2 :                                 } else {
    1689           2 :                                         isBase = true
    1690           2 :                                 }
    1691             :                         }
    1692             :                 }
    1693             :         }
    1694             : 
    1695           2 :         if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
    1696           2 :                 l0Inputs := []manifest.LevelSlice{c.startLevel.files}
    1697           2 :                 if isIntraL0 {
    1698           2 :                         l0Inputs = append(l0Inputs, c.outputLevel.files)
    1699           2 :                 }
    1700           2 :                 if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
    1701           0 :                         d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
    1702           0 :                 }
    1703             :         }
    1704             : }
    1705             : 
    1706             : // Removes compaction markers from files in a compaction. The rollback parameter
    1707             : // indicates whether the compaction state should be rolled back to its original
    1708             : // state in the case of an unsuccessful compaction.
    1709             : //
    1710             : // DB.mu must be held when calling this method, however this method can drop and
    1711             : // re-acquire that mutex. All writes to the manifest for this compaction should
    1712             : // have completed by this point.
    1713           2 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
    1714           2 :         c.versionEditApplied = true
    1715           2 :         for _, cl := range c.inputs {
    1716           2 :                 iter := cl.files.Iter()
    1717           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    1718           2 :                         if !f.IsCompacting() {
    1719           0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
    1720           0 :                         }
    1721           2 :                         if !rollback {
    1722           2 :                                 // On success all compactions other than move-compactions transition the
    1723           2 :                                 // file into the Compacted state. Move-compacted files become eligible
    1724           2 :                                 // for compaction again and transition back to NotCompacting.
    1725           2 :                                 if c.kind != compactionKindMove {
    1726           2 :                                         f.SetCompactionState(manifest.CompactionStateCompacted)
    1727           2 :                                 } else {
    1728           2 :                                         f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1729           2 :                                 }
    1730           2 :                         } else {
    1731           2 :                                 // Else, on rollback, all input files unconditionally transition back to
    1732           2 :                                 // NotCompacting.
    1733           2 :                                 f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1734           2 :                         }
    1735           2 :                         f.IsIntraL0Compacting = false
    1736             :                 }
    1737             :         }
    1738           2 :         l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
    1739           2 :         func() {
    1740           2 :                 // InitCompactingFileInfo requires that no other manifest writes be
    1741           2 :                 // happening in parallel with it, i.e. we're not in the midst of installing
    1742           2 :                 // another version. Otherwise, it's possible that we've created another
    1743           2 :                 // L0Sublevels instance, but not added it to the versions list, causing
    1744           2 :                 // all the indices in FileMetadata to be inaccurate. To ensure this,
    1745           2 :                 // grab the manifest lock.
    1746           2 :                 d.mu.versions.logLock()
    1747           2 :                 defer d.mu.versions.logUnlock()
    1748           2 :                 d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
    1749           2 :         }()
    1750             : }
    1751             : 
    1752           2 : func (d *DB) calculateDiskAvailableBytes() uint64 {
    1753           2 :         if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
    1754           2 :                 d.diskAvailBytes.Store(space.AvailBytes)
    1755           2 :                 return space.AvailBytes
    1756           2 :         } else if !errors.Is(err, vfs.ErrUnsupported) {
    1757           1 :                 d.opts.EventListener.BackgroundError(err)
    1758           1 :         }
    1759           2 :         return d.diskAvailBytes.Load()
    1760             : }
    1761             : 
    1762           1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
    1763           1 :         var pacerInfo deletionPacerInfo
    1764           1 :         // Call GetDiskUsage after every file deletion. This may seem inefficient,
    1765           1 :         // but in practice this was observed to take constant time, regardless of
    1766           1 :         // volume size used, at least on linux with ext4 and zfs. All invocations
    1767           1 :         // take 10 microseconds or less.
    1768           1 :         pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
    1769           1 :         d.mu.Lock()
    1770           1 :         pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
    1771           1 :         pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
    1772           1 :         d.mu.Unlock()
    1773           1 :         return pacerInfo
    1774           1 : }
    1775             : 
    1776             : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
    1777           2 : func (d *DB) onObsoleteTableDelete(fileSize uint64) {
    1778           2 :         d.mu.Lock()
    1779           2 :         d.mu.versions.metrics.Table.ObsoleteCount--
    1780           2 :         d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
    1781           2 :         d.mu.Unlock()
    1782           2 : }
    1783             : 
    1784             : // maybeScheduleFlush schedules a flush if necessary.
    1785             : //
    1786             : // d.mu must be held when calling this.
    1787           2 : func (d *DB) maybeScheduleFlush() {
    1788           2 :         if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
    1789           2 :                 return
    1790           2 :         }
    1791           2 :         if len(d.mu.mem.queue) <= 1 {
    1792           2 :                 return
    1793           2 :         }
    1794             : 
    1795           2 :         if !d.passedFlushThreshold() {
    1796           2 :                 return
    1797           2 :         }
    1798             : 
    1799           2 :         d.mu.compact.flushing = true
    1800           2 :         go d.flush()
    1801             : }
    1802             : 
    1803           2 : func (d *DB) passedFlushThreshold() bool {
    1804           2 :         var n int
    1805           2 :         var size uint64
    1806           2 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1807           2 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1808           2 :                         break
    1809             :                 }
    1810           2 :                 if d.mu.mem.queue[n].flushForced {
    1811           2 :                         // A flush was forced. Pretend the memtable size is the configured
    1812           2 :                         // size. See minFlushSize below.
    1813           2 :                         size += d.opts.MemTableSize
    1814           2 :                 } else {
    1815           2 :                         size += d.mu.mem.queue[n].totalBytes()
    1816           2 :                 }
    1817             :         }
    1818           2 :         if n == 0 {
    1819           2 :                 // None of the immutable memtables are ready for flushing.
    1820           2 :                 return false
    1821           2 :         }
    1822             : 
    1823             :         // Only flush once the sum of the queued memtable sizes exceeds half the
    1824             :         // configured memtable size. This prevents flushing of memtables at startup
    1825             :         // while we're undergoing the ramp period on the memtable size. See
    1826             :         // DB.newMemTable().
    1827           2 :         minFlushSize := d.opts.MemTableSize / 2
    1828           2 :         return size >= minFlushSize
    1829             : }
    1830             : 
    1831           2 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
    1832           2 :         var mem *flushableEntry
    1833           2 :         for _, m := range d.mu.mem.queue {
    1834           2 :                 if m.flushable == tbl {
    1835           2 :                         mem = m
    1836           2 :                         break
    1837             :                 }
    1838             :         }
    1839           2 :         if mem == nil || mem.flushForced {
    1840           2 :                 return
    1841           2 :         }
    1842           2 :         deadline := d.timeNow().Add(dur)
    1843           2 :         if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
    1844           2 :                 // Already scheduled to flush sooner than within `dur`.
    1845           2 :                 return
    1846           2 :         }
    1847           2 :         mem.delayedFlushForcedAt = deadline
    1848           2 :         go func() {
    1849           2 :                 timer := time.NewTimer(dur)
    1850           2 :                 defer timer.Stop()
    1851           2 : 
    1852           2 :                 select {
    1853           2 :                 case <-d.closedCh:
    1854           2 :                         return
    1855           2 :                 case <-mem.flushed:
    1856           2 :                         return
    1857           2 :                 case <-timer.C:
    1858           2 :                         d.commit.mu.Lock()
    1859           2 :                         defer d.commit.mu.Unlock()
    1860           2 :                         d.mu.Lock()
    1861           2 :                         defer d.mu.Unlock()
    1862           2 : 
    1863           2 :                         // NB: The timer may fire concurrently with a call to Close.  If a
    1864           2 :                         // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
    1865           2 :                         // and it's too late to flush anything. Otherwise, the Close call
    1866           2 :                         // will block on locking d.mu until we've finished scheduling the
    1867           2 :                         // flush and set `d.mu.compact.flushing` to true. Close will wait
    1868           2 :                         // for the current flush to complete.
    1869           2 :                         if d.closed.Load() != nil {
    1870           2 :                                 return
    1871           2 :                         }
    1872             : 
    1873           2 :                         if d.mu.mem.mutable == tbl {
    1874           2 :                                 d.makeRoomForWrite(nil)
    1875           2 :                         } else {
    1876           2 :                                 mem.flushForced = true
    1877           2 :                         }
    1878           2 :                         d.maybeScheduleFlush()
    1879             :                 }
    1880             :         }()
    1881             : }
    1882             : 
    1883           2 : func (d *DB) flush() {
    1884           2 :         pprof.Do(context.Background(), flushLabels, func(context.Context) {
    1885           2 :                 flushingWorkStart := time.Now()
    1886           2 :                 d.mu.Lock()
    1887           2 :                 defer d.mu.Unlock()
    1888           2 :                 idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
    1889           2 :                 var bytesFlushed uint64
    1890           2 :                 var err error
    1891           2 :                 if bytesFlushed, err = d.flush1(); err != nil {
    1892           1 :                         // TODO(peter): count consecutive flush errors and backoff.
    1893           1 :                         d.opts.EventListener.BackgroundError(err)
    1894           1 :                 }
    1895           2 :                 d.mu.compact.flushing = false
    1896           2 :                 d.mu.compact.noOngoingFlushStartTime = time.Now()
    1897           2 :                 workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
    1898           2 :                 d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
    1899           2 :                 d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
    1900           2 :                 d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
    1901           2 :                 // More flush work may have arrived while we were flushing, so schedule
    1902           2 :                 // another flush if needed.
    1903           2 :                 d.maybeScheduleFlush()
    1904           2 :                 // The flush may have produced too many files in a level, so schedule a
    1905           2 :                 // compaction if needed.
    1906           2 :                 d.maybeScheduleCompaction()
    1907           2 :                 d.mu.compact.cond.Broadcast()
    1908             :         })
    1909             : }
    1910             : 
    1911             : // runIngestFlush is used to generate a flush version edit for sstables which
    1912             : // were ingested as flushables. Both DB.mu and the manifest lock must be held
    1913             : // while runIngestFlush is called.
    1914           2 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
    1915           2 :         if len(c.flushing) != 1 {
    1916           0 :                 panic("pebble: ingestedFlushable must be flushed one at a time.")
    1917             :         }
    1918             : 
    1919             :         // Construct the VersionEdit, levelMetrics etc.
    1920           2 :         c.metrics = make(map[int]*LevelMetrics, numLevels)
    1921           2 :         // Finding the target level for ingestion must use the latest version
    1922           2 :         // after the logLock has been acquired.
    1923           2 :         c.version = d.mu.versions.currentVersion()
    1924           2 : 
    1925           2 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1926           2 :         iterOpts := IterOptions{logger: d.opts.Logger}
    1927           2 :         ve := &versionEdit{}
    1928           2 :         var level int
    1929           2 :         var err error
    1930           2 :         var fileToSplit *fileMetadata
    1931           2 :         var ingestSplitFiles []ingestSplitFile
    1932           2 :         for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
    1933           2 :                 suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
    1934           2 :                         d.FormatMajorVersion() >= FormatVirtualSSTables
    1935           2 :                 level, fileToSplit, err = ingestTargetLevel(
    1936           2 :                         d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
    1937           2 :                         c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
    1938           2 :                         suggestSplit,
    1939           2 :                 )
    1940           2 :                 if err != nil {
    1941           0 :                         return nil, err
    1942           0 :                 }
    1943           2 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
    1944           2 :                 if fileToSplit != nil {
    1945           1 :                         ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
    1946           1 :                                 ingestFile: file.FileMetadata,
    1947           1 :                                 splitFile:  fileToSplit,
    1948           1 :                                 level:      level,
    1949           1 :                         })
    1950           1 :                 }
    1951           2 :                 levelMetrics := c.metrics[level]
    1952           2 :                 if levelMetrics == nil {
    1953           2 :                         levelMetrics = &LevelMetrics{}
    1954           2 :                         c.metrics[level] = levelMetrics
    1955           2 :                 }
    1956           2 :                 levelMetrics.BytesIngested += file.Size
    1957           2 :                 levelMetrics.TablesIngested++
    1958             :         }
    1959             : 
    1960           2 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    1961           1 :                 levelMetrics := c.metrics[level]
    1962           1 :                 if levelMetrics == nil {
    1963           0 :                         levelMetrics = &LevelMetrics{}
    1964           0 :                         c.metrics[level] = levelMetrics
    1965           0 :                 }
    1966           1 :                 levelMetrics.NumFiles--
    1967           1 :                 levelMetrics.Size -= int64(m.Size)
    1968           1 :                 for i := range added {
    1969           1 :                         levelMetrics.NumFiles++
    1970           1 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    1971           1 :                 }
    1972             :         }
    1973             : 
    1974           2 :         if len(ingestSplitFiles) > 0 {
    1975           1 :                 ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
    1976           1 :                 replacedFiles := make(map[base.FileNum][]newFileEntry)
    1977           1 :                 if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
    1978           0 :                         return nil, err
    1979           0 :                 }
    1980             :         }
    1981             : 
    1982           2 :         return ve, nil
    1983             : }
    1984             : 
    1985             : // flush runs a compaction that copies the immutable memtables from memory to
    1986             : // disk.
    1987             : //
    1988             : // d.mu must be held when calling this, but the mutex may be dropped and
    1989             : // re-acquired during the course of this method.
    1990           2 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
    1991           2 :         // NB: The flushable queue can contain flushables of type ingestedFlushable.
    1992           2 :         // The sstables in ingestedFlushable.files must be placed into the appropriate
    1993           2 :         // level in the lsm. Let's say the flushable queue contains a prefix of
    1994           2 :         // regular immutable memtables, then an ingestedFlushable, and then the
    1995           2 :         // mutable memtable. When the flush of the ingestedFlushable is performed,
    1996           2 :         // it needs an updated view of the lsm. That is, the prefix of immutable
    1997           2 :         // memtables must have already been flushed. Similarly, if there are two
    1998           2 :         // contiguous ingestedFlushables in the queue, then the first flushable must
    1999           2 :         // be flushed, so that the second flushable can see an updated view of the
    2000           2 :         // lsm.
    2001           2 :         //
    2002           2 :         // Given the above, we restrict flushes to either some prefix of regular
    2003           2 :         // memtables, or a single flushable of type ingestedFlushable. The DB.flush
    2004           2 :         // function will call DB.maybeScheduleFlush again, so a new flush to finish
    2005           2 :         // the remaining flush work should be scheduled right away.
    2006           2 :         //
    2007           2 :         // NB: Large batches placed in the flushable queue share the WAL with the
    2008           2 :         // previous memtable in the queue. We must ensure the property that both the
    2009           2 :         // large batch and the memtable with which it shares a WAL are flushed
    2010           2 :         // together. The property ensures that the minimum unflushed log number
    2011           2 :         // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
    2012           2 :         // returns true, and since the large batch will always be placed right after
    2013           2 :         // the memtable with which it shares a WAL, the property is naturally
    2014           2 :         // ensured. The large batch will always be placed after the memtable with
    2015           2 :         // which it shares a WAL because we ensure it in DB.commitWrite by holding
    2016           2 :         // the commitPipeline.mu and then holding DB.mu. As an extra defensive
    2017           2 :         // measure, if we try to flush the memtable without also flushing the
    2018           2 :         // flushable batch in the same flush, since the memtable and flushableBatch
    2019           2 :         // have the same logNum, the logNum invariant check below will trigger.
    2020           2 :         var n, inputs int
    2021           2 :         var inputBytes uint64
    2022           2 :         var ingest bool
    2023           2 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    2024           2 :                 if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
    2025           2 :                         if n == 0 {
    2026           2 :                                 // The first flushable is of type ingestedFlushable. Since these
    2027           2 :                                 // must be flushed individually, we perform a flush for just
    2028           2 :                                 // this.
    2029           2 :                                 if !f.readyForFlush() {
    2030           0 :                                         // This check is almost unnecessary, but we guard against it
    2031           0 :                                         // just in case this invariant changes in the future.
    2032           0 :                                         panic("pebble: ingestedFlushable should always be ready to flush.")
    2033             :                                 }
    2034             :                                 // By setting n = 1, we ensure that the first flushable(n == 0)
    2035             :                                 // is scheduled for a flush. The number of tables added is equal to the
    2036             :                                 // number of files in the ingest operation.
    2037           2 :                                 n = 1
    2038           2 :                                 inputs = len(f.files)
    2039           2 :                                 ingest = true
    2040           2 :                                 break
    2041           2 :                         } else {
    2042           2 :                                 // There was some prefix of flushables which weren't of type
    2043           2 :                                 // ingestedFlushable. So, perform a flush for those.
    2044           2 :                                 break
    2045             :                         }
    2046             :                 }
    2047           2 :                 if !d.mu.mem.queue[n].readyForFlush() {
    2048           2 :                         break
    2049             :                 }
    2050           2 :                 inputBytes += d.mu.mem.queue[n].inuseBytes()
    2051             :         }
    2052           2 :         if n == 0 {
    2053           0 :                 // None of the immutable memtables are ready for flushing.
    2054           0 :                 return 0, nil
    2055           0 :         }
    2056           2 :         if !ingest {
    2057           2 :                 // Flushes of memtables add the prefix of n memtables from the flushable
    2058           2 :                 // queue.
    2059           2 :                 inputs = n
    2060           2 :         }
    2061             : 
    2062             :         // Require that every memtable being flushed has a log number less than the
    2063             :         // new minimum unflushed log number.
    2064           2 :         minUnflushedLogNum := d.mu.mem.queue[n].logNum
    2065           2 :         if !d.opts.DisableWAL {
    2066           2 :                 for i := 0; i < n; i++ {
    2067           2 :                         if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
    2068           0 :                                 panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
    2069           0 :                                         n,
    2070           0 :                                         i, d.mu.mem.queue[i].flushable, logNum,
    2071           0 :                                         n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
    2072             :                         }
    2073             :                 }
    2074             :         }
    2075             : 
    2076           2 :         c := newFlush(d.opts, d.mu.versions.currentVersion(),
    2077           2 :                 d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
    2078           2 :         d.addInProgressCompaction(c)
    2079           2 : 
    2080           2 :         jobID := d.mu.nextJobID
    2081           2 :         d.mu.nextJobID++
    2082           2 :         d.opts.EventListener.FlushBegin(FlushInfo{
    2083           2 :                 JobID:      jobID,
    2084           2 :                 Input:      inputs,
    2085           2 :                 InputBytes: inputBytes,
    2086           2 :                 Ingest:     ingest,
    2087           2 :         })
    2088           2 :         startTime := d.timeNow()
    2089           2 : 
    2090           2 :         var ve *manifest.VersionEdit
    2091           2 :         var pendingOutputs []physicalMeta
    2092           2 :         var stats compactStats
    2093           2 :         // To determine the target level of the files in the ingestedFlushable, we
    2094           2 :         // need to acquire the logLock, and not release it for that duration. Since,
    2095           2 :         // we need to acquire the logLock below to perform the logAndApply step
    2096           2 :         // anyway, we create the VersionEdit for ingestedFlushable outside of
    2097           2 :         // runCompaction. For all other flush cases, we construct the VersionEdit
    2098           2 :         // inside runCompaction.
    2099           2 :         if c.kind != compactionKindIngestedFlushable {
    2100           2 :                 ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
    2101           2 :         }
    2102             : 
    2103             :         // Acquire logLock. This will be released either on an error, by way of
    2104             :         // logUnlock, or through a call to logAndApply if there is no error.
    2105           2 :         d.mu.versions.logLock()
    2106           2 : 
    2107           2 :         if c.kind == compactionKindIngestedFlushable {
    2108           2 :                 ve, err = d.runIngestFlush(c)
    2109           2 :         }
    2110             : 
    2111           2 :         info := FlushInfo{
    2112           2 :                 JobID:      jobID,
    2113           2 :                 Input:      inputs,
    2114           2 :                 InputBytes: inputBytes,
    2115           2 :                 Duration:   d.timeNow().Sub(startTime),
    2116           2 :                 Done:       true,
    2117           2 :                 Ingest:     ingest,
    2118           2 :                 Err:        err,
    2119           2 :         }
    2120           2 :         if err == nil {
    2121           2 :                 for i := range ve.NewFiles {
    2122           2 :                         e := &ve.NewFiles[i]
    2123           2 :                         info.Output = append(info.Output, e.Meta.TableInfo())
    2124           2 :                         // Ingested tables are not necessarily flushed to L0. Record the level of
    2125           2 :                         // each ingested file explicitly.
    2126           2 :                         if ingest {
    2127           2 :                                 info.IngestLevels = append(info.IngestLevels, e.Level)
    2128           2 :                         }
    2129             :                 }
    2130           2 :                 if len(ve.NewFiles) == 0 {
    2131           2 :                         info.Err = errEmptyTable
    2132           2 :                 }
    2133             : 
    2134             :                 // The flush succeeded or it produced an empty sstable. In either case we
    2135             :                 // want to bump the minimum unflushed log number to the log number of the
    2136             :                 // oldest unflushed memtable.
    2137           2 :                 ve.MinUnflushedLogNum = minUnflushedLogNum
    2138           2 :                 if c.kind != compactionKindIngestedFlushable {
    2139           2 :                         metrics := c.metrics[0]
    2140           2 :                         if d.opts.DisableWAL {
    2141           2 :                                 // If the WAL is disabled, every flushable has a zero [logSize],
    2142           2 :                                 // resulting in zero bytes in. Instead, use the number of bytes we
    2143           2 :                                 // flushed as the BytesIn. This ensures we get a reasonable w-amp
    2144           2 :                                 // calculation even when the WAL is disabled.
    2145           2 :                                 metrics.BytesIn = metrics.BytesFlushed
    2146           2 :                         } else {
    2147           2 :                                 metrics := c.metrics[0]
    2148           2 :                                 for i := 0; i < n; i++ {
    2149           2 :                                         metrics.BytesIn += d.mu.mem.queue[i].logSize
    2150           2 :                                 }
    2151             :                         }
    2152           2 :                 } else if len(ve.DeletedFiles) > 0 {
    2153           1 :                         // c.kind == compactionKindIngestedFlushable && we have deleted files due
    2154           1 :                         // to ingest-time splits.
    2155           1 :                         //
    2156           1 :                         // Iterate through all other compactions, and check if their inputs have
    2157           1 :                         // been replaced due to an ingest-time split. In that case, cancel the
    2158           1 :                         // compaction.
    2159           1 :                         for c2 := range d.mu.compact.inProgress {
    2160           1 :                                 for i := range c2.inputs {
    2161           1 :                                         iter := c2.inputs[i].files.Iter()
    2162           1 :                                         for f := iter.First(); f != nil; f = iter.Next() {
    2163           0 :                                                 if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
    2164           0 :                                                         c2.cancel.Store(true)
    2165           0 :                                                         break
    2166             :                                                 }
    2167             :                                         }
    2168             :                                 }
    2169             :                         }
    2170             :                 }
    2171           2 :                 err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
    2172           2 :                         func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
    2173           2 :                 if err != nil {
    2174           1 :                         info.Err = err
    2175           1 :                         // TODO(peter): untested.
    2176           1 :                         for _, f := range pendingOutputs {
    2177           1 :                                 // Note that the FileBacking for the file metadata might not have
    2178           1 :                                 // been set yet. So, we directly use the FileNum. Since these
    2179           1 :                                 // files were generated as compaction outputs, these must be
    2180           1 :                                 // physical files on disk. This property might not hold once
    2181           1 :                                 // https://github.com/cockroachdb/pebble/issues/389 is
    2182           1 :                                 // implemented if #389 creates virtual sstables as output files.
    2183           1 :                                 d.mu.versions.obsoleteTables = append(
    2184           1 :                                         d.mu.versions.obsoleteTables,
    2185           1 :                                         fileInfo{f.FileNum.DiskFileNum(), f.Size},
    2186           1 :                                 )
    2187           1 :                         }
    2188           1 :                         d.mu.versions.updateObsoleteTableMetricsLocked()
    2189             :                 }
    2190           1 :         } else {
    2191           1 :                 // We won't be performing the logAndApply step because of the error,
    2192           1 :                 // so logUnlock.
    2193           1 :                 d.mu.versions.logUnlock()
    2194           1 :         }
    2195             : 
    2196           2 :         bytesFlushed = c.bytesIterated
    2197           2 : 
    2198           2 :         // If err != nil, then the flush will be retried, and we will recalculate
    2199           2 :         // these metrics.
    2200           2 :         if err == nil {
    2201           2 :                 d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
    2202           2 :                 d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
    2203           2 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
    2204           2 :                 d.maybeUpdateDeleteCompactionHints(c)
    2205           2 :         }
    2206             : 
    2207           2 :         d.clearCompactingState(c, err != nil)
    2208           2 :         delete(d.mu.compact.inProgress, c)
    2209           2 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    2210           2 : 
    2211           2 :         var flushed flushableList
    2212           2 :         if err == nil {
    2213           2 :                 flushed = d.mu.mem.queue[:n]
    2214           2 :                 d.mu.mem.queue = d.mu.mem.queue[n:]
    2215           2 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2216           2 :                 d.updateTableStatsLocked(ve.NewFiles)
    2217           2 :                 if ingest {
    2218           2 :                         d.mu.versions.metrics.Flush.AsIngestCount++
    2219           2 :                         for _, l := range c.metrics {
    2220           2 :                                 d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
    2221           2 :                                 d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
    2222           2 :                         }
    2223             :                 }
    2224             : 
    2225             :                 // Update if any eventually file-only snapshots have now transitioned to
    2226             :                 // being file-only.
    2227           2 :                 earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
    2228           2 :                 currentVersion := d.mu.versions.currentVersion()
    2229           2 :                 for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
    2230           2 :                         if s.efos == nil {
    2231           2 :                                 s = s.next
    2232           2 :                                 continue
    2233             :                         }
    2234           2 :                         if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
    2235           1 :                                 s = s.next
    2236           1 :                                 continue
    2237             :                         }
    2238           2 :                         if s.efos.excised.Load() {
    2239           1 :                                 // If a concurrent excise has happened that overlaps with one of the key
    2240           1 :                                 // ranges this snapshot is interested in, this EFOS cannot transition to
    2241           1 :                                 // a file-only snapshot as keys in that range could now be deleted. Move
    2242           1 :                                 // onto the next snapshot.
    2243           1 :                                 s = s.next
    2244           1 :                                 continue
    2245             :                         }
    2246           2 :                         currentVersion.Ref()
    2247           2 : 
    2248           2 :                         // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
    2249           2 :                         // case s.next would be nil. Save it before calling it.
    2250           2 :                         next := s.next
    2251           2 :                         _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
    2252           2 :                         s = next
    2253             :                 }
    2254             :         }
    2255             :         // Signal FlushEnd after installing the new readState. This helps for unit
    2256             :         // tests that use the callback to trigger a read using an iterator with
    2257             :         // IterOptions.OnlyReadGuaranteedDurable.
    2258           2 :         info.TotalDuration = d.timeNow().Sub(startTime)
    2259           2 :         d.opts.EventListener.FlushEnd(info)
    2260           2 : 
    2261           2 :         // The order of these operations matters here for ease of testing.
    2262           2 :         // Removing the reader reference first allows tests to be guaranteed that
    2263           2 :         // the memtable reservation has been released by the time a synchronous
    2264           2 :         // flush returns. readerUnrefLocked may also produce obsolete files so the
    2265           2 :         // call to deleteObsoleteFiles must happen after it.
    2266           2 :         for i := range flushed {
    2267           2 :                 flushed[i].readerUnrefLocked(true)
    2268           2 :         }
    2269             : 
    2270           2 :         d.deleteObsoleteFiles(jobID)
    2271           2 : 
    2272           2 :         // Mark all the memtables we flushed as flushed.
    2273           2 :         for i := range flushed {
    2274           2 :                 close(flushed[i].flushed)
    2275           2 :         }
    2276             : 
    2277           2 :         return bytesFlushed, err
    2278             : }
    2279             : 
    2280             : // maybeScheduleCompactionAsync should be used when
    2281             : // we want to possibly schedule a compaction, but don't
    2282             : // want to eat the cost of running maybeScheduleCompaction.
    2283             : // This method should be launched in a separate goroutine.
    2284             : // d.mu must not be held when this is called.
    2285           0 : func (d *DB) maybeScheduleCompactionAsync() {
    2286           0 :         defer d.compactionSchedulers.Done()
    2287           0 : 
    2288           0 :         d.mu.Lock()
    2289           0 :         d.maybeScheduleCompaction()
    2290           0 :         d.mu.Unlock()
    2291           0 : }
    2292             : 
    2293             : // maybeScheduleCompaction schedules a compaction if necessary.
    2294             : //
    2295             : // d.mu must be held when calling this.
    2296           2 : func (d *DB) maybeScheduleCompaction() {
    2297           2 :         d.maybeScheduleCompactionPicker(pickAuto)
    2298           2 : }
    2299             : 
    2300           2 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
    2301           2 :         return picker.pickAuto(env)
    2302           2 : }
    2303             : 
    2304           2 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
    2305           2 :         return picker.pickElisionOnlyCompaction(env)
    2306           2 : }
    2307             : 
    2308             : // maybeScheduleCompactionPicker schedules a compaction if necessary,
    2309             : // calling `pickFunc` to pick automatic compactions.
    2310             : //
    2311             : // d.mu must be held when calling this.
    2312             : func (d *DB) maybeScheduleCompactionPicker(
    2313             :         pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
    2314           2 : ) {
    2315           2 :         if d.closed.Load() != nil || d.opts.ReadOnly {
    2316           2 :                 return
    2317           2 :         }
    2318           2 :         maxConcurrentCompactions := d.opts.MaxConcurrentCompactions()
    2319           2 :         if d.mu.compact.compactingCount >= maxConcurrentCompactions {
    2320           2 :                 if len(d.mu.compact.manual) > 0 {
    2321           2 :                         // Inability to run head blocks later manual compactions.
    2322           2 :                         d.mu.compact.manual[0].retries++
    2323           2 :                 }
    2324           2 :                 return
    2325             :         }
    2326             : 
    2327             :         // Compaction picking needs a coherent view of a Version. In particular, we
    2328             :         // need to exlude concurrent ingestions from making a decision on which level
    2329             :         // to ingest into that conflicts with our compaction
    2330             :         // decision. versionSet.logLock provides the necessary mutual exclusion.
    2331           2 :         d.mu.versions.logLock()
    2332           2 :         defer d.mu.versions.logUnlock()
    2333           2 : 
    2334           2 :         // Check for the closed flag again, in case the DB was closed while we were
    2335           2 :         // waiting for logLock().
    2336           2 :         if d.closed.Load() != nil {
    2337           2 :                 return
    2338           2 :         }
    2339             : 
    2340           2 :         env := compactionEnv{
    2341           2 :                 diskAvailBytes:          d.diskAvailBytes.Load(),
    2342           2 :                 earliestSnapshotSeqNum:  d.mu.snapshots.earliest(),
    2343           2 :                 earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
    2344           2 :         }
    2345           2 : 
    2346           2 :         // Check for delete-only compactions first, because they're expected to be
    2347           2 :         // cheap and reduce future compaction work.
    2348           2 :         if !d.opts.private.disableDeleteOnlyCompactions &&
    2349           2 :                 len(d.mu.compact.deletionHints) > 0 &&
    2350           2 :                 !d.opts.DisableAutomaticCompactions {
    2351           2 :                 v := d.mu.versions.currentVersion()
    2352           2 :                 snapshots := d.mu.snapshots.toSlice()
    2353           2 :                 inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
    2354           2 :                 d.mu.compact.deletionHints = unresolvedHints
    2355           2 : 
    2356           2 :                 if len(inputs) > 0 {
    2357           2 :                         c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
    2358           2 :                         d.mu.compact.compactingCount++
    2359           2 :                         d.addInProgressCompaction(c)
    2360           2 :                         go d.compact(c, nil)
    2361           2 :                 }
    2362             :         }
    2363             : 
    2364           2 :         for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxConcurrentCompactions {
    2365           2 :                 v := d.mu.versions.currentVersion()
    2366           2 :                 manual := d.mu.compact.manual[0]
    2367           2 :                 env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2368           2 :                 pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
    2369           2 :                 if pc != nil {
    2370           2 :                         c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    2371           2 :                         d.mu.compact.manual = d.mu.compact.manual[1:]
    2372           2 :                         d.mu.compact.compactingCount++
    2373           2 :                         d.addInProgressCompaction(c)
    2374           2 :                         go d.compact(c, manual.done)
    2375           2 :                 } else if !retryLater {
    2376           2 :                         // Noop
    2377           2 :                         d.mu.compact.manual = d.mu.compact.manual[1:]
    2378           2 :                         manual.done <- nil
    2379           2 :                 } else {
    2380           2 :                         // Inability to run head blocks later manual compactions.
    2381           2 :                         manual.retries++
    2382           2 :                         break
    2383             :                 }
    2384             :         }
    2385             : 
    2386           2 :         for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxConcurrentCompactions {
    2387           2 :                 env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2388           2 :                 env.readCompactionEnv = readCompactionEnv{
    2389           2 :                         readCompactions:          &d.mu.compact.readCompactions,
    2390           2 :                         flushing:                 d.mu.compact.flushing || d.passedFlushThreshold(),
    2391           2 :                         rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
    2392           2 :                 }
    2393           2 :                 pc := pickFunc(d.mu.versions.picker, env)
    2394           2 :                 if pc == nil {
    2395           2 :                         break
    2396             :                 }
    2397           2 :                 c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
    2398           2 :                 d.mu.compact.compactingCount++
    2399           2 :                 d.addInProgressCompaction(c)
    2400           2 :                 go d.compact(c, nil)
    2401             :         }
    2402             : }
    2403             : 
    2404             : // deleteCompactionHintType indicates whether the deleteCompactionHint was
    2405             : // generated from a span containing a range del (point key only), a range key
    2406             : // delete (range key only), or both a point and range key.
    2407             : type deleteCompactionHintType uint8
    2408             : 
    2409             : const (
    2410             :         // NOTE: While these are primarily used as enumeration types, they are also
    2411             :         // used for some bitwise operations. Care should be taken when updating.
    2412             :         deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
    2413             :         deleteCompactionHintTypePointKeyOnly
    2414             :         deleteCompactionHintTypeRangeKeyOnly
    2415             :         deleteCompactionHintTypePointAndRangeKey
    2416             : )
    2417             : 
    2418             : // String implements fmt.Stringer.
    2419           1 : func (h deleteCompactionHintType) String() string {
    2420           1 :         switch h {
    2421           0 :         case deleteCompactionHintTypeUnknown:
    2422           0 :                 return "unknown"
    2423           1 :         case deleteCompactionHintTypePointKeyOnly:
    2424           1 :                 return "point-key-only"
    2425           1 :         case deleteCompactionHintTypeRangeKeyOnly:
    2426           1 :                 return "range-key-only"
    2427           1 :         case deleteCompactionHintTypePointAndRangeKey:
    2428           1 :                 return "point-and-range-key"
    2429           0 :         default:
    2430           0 :                 panic(fmt.Sprintf("unknown hint type: %d", h))
    2431             :         }
    2432             : }
    2433             : 
    2434             : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
    2435             : // keyspan.Keys.
    2436           2 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
    2437           2 :         var hintType deleteCompactionHintType
    2438           2 :         for _, k := range keys {
    2439           2 :                 switch k.Kind() {
    2440           2 :                 case base.InternalKeyKindRangeDelete:
    2441           2 :                         hintType |= deleteCompactionHintTypePointKeyOnly
    2442           2 :                 case base.InternalKeyKindRangeKeyDelete:
    2443           2 :                         hintType |= deleteCompactionHintTypeRangeKeyOnly
    2444           0 :                 default:
    2445           0 :                         panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
    2446             :                 }
    2447             :         }
    2448           2 :         return hintType
    2449             : }
    2450             : 
    2451             : // A deleteCompactionHint records a user key and sequence number span that has been
    2452             : // deleted by a range tombstone. A hint is recorded if at least one sstable
    2453             : // falls completely within both the user key and sequence number spans.
    2454             : // Once the tombstones and the observed completely-contained sstables fall
    2455             : // into the same snapshot stripe, a delete-only compaction may delete any
    2456             : // sstables within the range.
    2457             : type deleteCompactionHint struct {
    2458             :         // The type of key span that generated this hint (point key, range key, or
    2459             :         // both).
    2460             :         hintType deleteCompactionHintType
    2461             :         // start and end are user keys specifying a key range [start, end) of
    2462             :         // deleted keys.
    2463             :         start []byte
    2464             :         end   []byte
    2465             :         // The level of the file containing the range tombstone(s) when the hint
    2466             :         // was created. Only lower levels need to be searched for files that may
    2467             :         // be deleted.
    2468             :         tombstoneLevel int
    2469             :         // The file containing the range tombstone(s) that created the hint.
    2470             :         tombstoneFile *fileMetadata
    2471             :         // The smallest and largest sequence numbers of the abutting tombstones
    2472             :         // merged to form this hint. All of a tables' keys must be less than the
    2473             :         // tombstone smallest sequence number to be deleted. All of a tables'
    2474             :         // sequence numbers must fall into the same snapshot stripe as the
    2475             :         // tombstone largest sequence number to be deleted.
    2476             :         tombstoneLargestSeqNum  uint64
    2477             :         tombstoneSmallestSeqNum uint64
    2478             :         // The smallest sequence number of a sstable that was found to be covered
    2479             :         // by this hint. The hint cannot be resolved until this sequence number is
    2480             :         // in the same snapshot stripe as the largest tombstone sequence number.
    2481             :         // This is set when a hint is created, so the LSM may look different and
    2482             :         // notably no longer contain the sstable that contained the key at this
    2483             :         // sequence number.
    2484             :         fileSmallestSeqNum uint64
    2485             : }
    2486             : 
    2487           1 : func (h deleteCompactionHint) String() string {
    2488           1 :         return fmt.Sprintf(
    2489           1 :                 "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
    2490           1 :                 h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
    2491           1 :                 h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
    2492           1 :                 h.hintType,
    2493           1 :         )
    2494           1 : }
    2495             : 
    2496           2 : func (h *deleteCompactionHint) canDelete(cmp Compare, m *fileMetadata, snapshots []uint64) bool {
    2497           2 :         // The file can only be deleted if all of its keys are older than the
    2498           2 :         // earliest tombstone aggregated into the hint.
    2499           2 :         if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
    2500           2 :                 return false
    2501           2 :         }
    2502             : 
    2503             :         // The file's oldest key must  be in the same snapshot stripe as the
    2504             :         // newest tombstone. NB: We already checked the hint's sequence numbers,
    2505             :         // but this file's oldest sequence number might be lower than the hint's
    2506             :         // smallest sequence number despite the file falling within the key range
    2507             :         // if this file was constructed after the hint by a compaction.
    2508           2 :         ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
    2509           2 :         fi, _ := snapshotIndex(m.SmallestSeqNum, snapshots)
    2510           2 :         if ti != fi {
    2511           0 :                 return false
    2512           0 :         }
    2513             : 
    2514           2 :         switch h.hintType {
    2515           2 :         case deleteCompactionHintTypePointKeyOnly:
    2516           2 :                 // A hint generated by a range del span cannot delete tables that contain
    2517           2 :                 // range keys.
    2518           2 :                 if m.HasRangeKeys {
    2519           1 :                         return false
    2520           1 :                 }
    2521           2 :         case deleteCompactionHintTypeRangeKeyOnly:
    2522           2 :                 // A hint generated by a range key del span cannot delete tables that
    2523           2 :                 // contain point keys.
    2524           2 :                 if m.HasPointKeys {
    2525           2 :                         return false
    2526           2 :                 }
    2527           2 :         case deleteCompactionHintTypePointAndRangeKey:
    2528             :                 // A hint from a span that contains both range dels *and* range keys can
    2529             :                 // only be deleted if both bounds fall within the hint. The next check takes
    2530             :                 // care of this.
    2531           0 :         default:
    2532           0 :                 panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
    2533             :         }
    2534             : 
    2535             :         // The file's keys must be completely contained within the hint range.
    2536           2 :         return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
    2537             : }
    2538             : 
    2539           2 : func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
    2540           2 :         // Compactions that zero sequence numbers can interfere with compaction
    2541           2 :         // deletion hints. Deletion hints apply to tables containing keys older
    2542           2 :         // than a threshold. If a key more recent than the threshold is zeroed in
    2543           2 :         // a compaction, a delete-only compaction may mistake it as meeting the
    2544           2 :         // threshold and drop a table containing live data.
    2545           2 :         //
    2546           2 :         // To avoid this scenario, compactions that zero sequence numbers remove
    2547           2 :         // any conflicting deletion hints. A deletion hint is conflicting if both
    2548           2 :         // of the following conditions apply:
    2549           2 :         // * its key space overlaps with the compaction
    2550           2 :         // * at least one of its inputs contains a key as recent as one of the
    2551           2 :         //   hint's tombstones.
    2552           2 :         //
    2553           2 :         if !c.allowedZeroSeqNum {
    2554           2 :                 return
    2555           2 :         }
    2556             : 
    2557           2 :         updatedHints := d.mu.compact.deletionHints[:0]
    2558           2 :         for _, h := range d.mu.compact.deletionHints {
    2559           2 :                 // If the compaction's key space is disjoint from the hint's key
    2560           2 :                 // space, the zeroing of sequence numbers won't affect the hint. Keep
    2561           2 :                 // the hint.
    2562           2 :                 keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
    2563           2 :                 if keysDisjoint {
    2564           2 :                         updatedHints = append(updatedHints, h)
    2565           2 :                         continue
    2566             :                 }
    2567             : 
    2568             :                 // All of the compaction's inputs must be older than the hint's
    2569             :                 // tombstones.
    2570           2 :                 inputsOlder := true
    2571           2 :                 for _, in := range c.inputs {
    2572           2 :                         iter := in.files.Iter()
    2573           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
    2574           2 :                                 inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
    2575           2 :                         }
    2576             :                 }
    2577           2 :                 if inputsOlder {
    2578           1 :                         updatedHints = append(updatedHints, h)
    2579           1 :                         continue
    2580             :                 }
    2581             : 
    2582             :                 // Drop h, because the compaction c may have zeroed sequence numbers
    2583             :                 // of keys more recent than some of h's tombstones.
    2584             :         }
    2585           2 :         d.mu.compact.deletionHints = updatedHints
    2586             : }
    2587             : 
    2588             : func checkDeleteCompactionHints(
    2589             :         cmp Compare, v *version, hints []deleteCompactionHint, snapshots []uint64,
    2590           2 : ) ([]compactionLevel, []deleteCompactionHint) {
    2591           2 :         var files map[*fileMetadata]bool
    2592           2 :         var byLevel [numLevels][]*fileMetadata
    2593           2 : 
    2594           2 :         unresolvedHints := hints[:0]
    2595           2 :         for _, h := range hints {
    2596           2 :                 // Check each compaction hint to see if it's resolvable. Resolvable
    2597           2 :                 // hints are removed and trigger a delete-only compaction if any files
    2598           2 :                 // in the current LSM still meet their criteria. Unresolvable hints
    2599           2 :                 // are saved and don't trigger a delete-only compaction.
    2600           2 :                 //
    2601           2 :                 // When a compaction hint is created, the sequence numbers of the
    2602           2 :                 // range tombstones and the covered file with the oldest key are
    2603           2 :                 // recorded. The largest tombstone sequence number and the smallest
    2604           2 :                 // file sequence number must be in the same snapshot stripe for the
    2605           2 :                 // hint to be resolved. The below graphic models a compaction hint
    2606           2 :                 // covering the keyspace [b, r). The hint completely contains two
    2607           2 :                 // files, 000002 and 000003. The file 000003 contains the lowest
    2608           2 :                 // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
    2609           2 :                 // the highest tombstone sequence number incorporated into the hint.
    2610           2 :                 // The hint may be resolved only once the snapshots at #100, #180 and
    2611           2 :                 // #210 are all closed. File 000001 is not included within the hint
    2612           2 :                 // because it extends beyond the range tombstones in user key space.
    2613           2 :                 //
    2614           2 :                 // 250
    2615           2 :                 //
    2616           2 :                 //       |-b...230:h-|
    2617           2 :                 // _____________________________________________________ snapshot #210
    2618           2 :                 // 200               |--h.RANGEDEL.200:r--|
    2619           2 :                 //
    2620           2 :                 // _____________________________________________________ snapshot #180
    2621           2 :                 //
    2622           2 :                 // 150                     +--------+
    2623           2 :                 //           +---------+   | 000003 |
    2624           2 :                 //           | 000002  |   |        |
    2625           2 :                 //           +_________+   |        |
    2626           2 :                 // 100_____________________|________|___________________ snapshot #100
    2627           2 :                 //                         +--------+
    2628           2 :                 // _____________________________________________________ snapshot #70
    2629           2 :                 //                             +---------------+
    2630           2 :                 //  50                         | 000001        |
    2631           2 :                 //                             |               |
    2632           2 :                 //                             +---------------+
    2633           2 :                 // ______________________________________________________________
    2634           2 :                 //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2635           2 : 
    2636           2 :                 ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
    2637           2 :                 fi, _ := snapshotIndex(h.fileSmallestSeqNum, snapshots)
    2638           2 :                 if ti != fi {
    2639           2 :                         // Cannot resolve yet.
    2640           2 :                         unresolvedHints = append(unresolvedHints, h)
    2641           2 :                         continue
    2642             :                 }
    2643             : 
    2644             :                 // The hint h will be resolved and dropped, regardless of whether
    2645             :                 // there are any tables that can be deleted.
    2646           2 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2647           2 :                         overlaps := v.Overlaps(l, cmp, h.start, h.end, true /* exclusiveEnd */)
    2648           2 :                         iter := overlaps.Iter()
    2649           2 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2650           2 :                                 if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
    2651           2 :                                         continue
    2652             :                                 }
    2653           2 :                                 if files == nil {
    2654           2 :                                         // Construct files lazily, assuming most calls will not
    2655           2 :                                         // produce delete-only compactions.
    2656           2 :                                         files = make(map[*fileMetadata]bool)
    2657           2 :                                 }
    2658           2 :                                 files[m] = true
    2659           2 :                                 byLevel[l] = append(byLevel[l], m)
    2660             :                         }
    2661             :                 }
    2662             :         }
    2663             : 
    2664           2 :         var compactLevels []compactionLevel
    2665           2 :         for l, files := range byLevel {
    2666           2 :                 if len(files) == 0 {
    2667           2 :                         continue
    2668             :                 }
    2669           2 :                 compactLevels = append(compactLevels, compactionLevel{
    2670           2 :                         level: l,
    2671           2 :                         files: manifest.NewLevelSliceKeySorted(cmp, files),
    2672           2 :                 })
    2673             :         }
    2674           2 :         return compactLevels, unresolvedHints
    2675             : }
    2676             : 
    2677             : // compact runs one compaction and maybe schedules another call to compact.
    2678           2 : func (d *DB) compact(c *compaction, errChannel chan error) {
    2679           2 :         pprof.Do(context.Background(), compactLabels, func(context.Context) {
    2680           2 :                 d.mu.Lock()
    2681           2 :                 defer d.mu.Unlock()
    2682           2 :                 if err := d.compact1(c, errChannel); err != nil {
    2683           2 :                         // TODO(peter): count consecutive compaction errors and backoff.
    2684           2 :                         d.opts.EventListener.BackgroundError(err)
    2685           2 :                 }
    2686           2 :                 d.mu.compact.compactingCount--
    2687           2 :                 delete(d.mu.compact.inProgress, c)
    2688           2 :                 // Add this compaction's duration to the cumulative duration. NB: This
    2689           2 :                 // must be atomic with the above removal of c from
    2690           2 :                 // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
    2691           2 :                 // miss or double count a completing compaction's duration.
    2692           2 :                 d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
    2693           2 : 
    2694           2 :                 // The previous compaction may have produced too many files in a
    2695           2 :                 // level, so reschedule another compaction if needed.
    2696           2 :                 d.maybeScheduleCompaction()
    2697           2 :                 d.mu.compact.cond.Broadcast()
    2698             :         })
    2699             : }
    2700             : 
    2701             : // compact1 runs one compaction.
    2702             : //
    2703             : // d.mu must be held when calling this, but the mutex may be dropped and
    2704             : // re-acquired during the course of this method.
    2705           2 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
    2706           2 :         if errChannel != nil {
    2707           2 :                 defer func() {
    2708           2 :                         errChannel <- err
    2709           2 :                 }()
    2710             :         }
    2711             : 
    2712           2 :         jobID := d.mu.nextJobID
    2713           2 :         d.mu.nextJobID++
    2714           2 :         info := c.makeInfo(jobID)
    2715           2 :         d.opts.EventListener.CompactionBegin(info)
    2716           2 :         startTime := d.timeNow()
    2717           2 : 
    2718           2 :         ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
    2719           2 : 
    2720           2 :         info.Duration = d.timeNow().Sub(startTime)
    2721           2 :         if err == nil {
    2722           2 :                 err = func() error {
    2723           2 :                         var err error
    2724           2 :                         d.mu.versions.logLock()
    2725           2 :                         // Check if this compaction had a conflicting operation (eg. a d.excise())
    2726           2 :                         // that necessitates it restarting from scratch. Note that since we hold
    2727           2 :                         // the manifest lock, we don't expect this bool to change its value
    2728           2 :                         // as only the holder of the manifest lock will ever write to it.
    2729           2 :                         if c.cancel.Load() {
    2730           2 :                                 err = firstError(err, ErrCancelledCompaction)
    2731           2 :                         }
    2732           2 :                         if err != nil {
    2733           2 :                                 // logAndApply calls logUnlock. If we didn't call it, we need to call
    2734           2 :                                 // logUnlock ourselves.
    2735           2 :                                 d.mu.versions.logUnlock()
    2736           2 :                                 return err
    2737           2 :                         }
    2738           2 :                         return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
    2739           2 :                                 return d.getInProgressCompactionInfoLocked(c)
    2740           2 :                         })
    2741             :                 }()
    2742           2 :                 if err != nil {
    2743           2 :                         // TODO(peter): untested.
    2744           2 :                         for _, f := range pendingOutputs {
    2745           2 :                                 // Note that the FileBacking for the file metadata might not have
    2746           2 :                                 // been set yet. So, we directly use the FileNum. Since these
    2747           2 :                                 // files were generated as compaction outputs, these must be
    2748           2 :                                 // physical files on disk. This property might not hold once
    2749           2 :                                 // https://github.com/cockroachdb/pebble/issues/389 is
    2750           2 :                                 // implemented if #389 creates virtual sstables as output files.
    2751           2 :                                 d.mu.versions.obsoleteTables = append(
    2752           2 :                                         d.mu.versions.obsoleteTables,
    2753           2 :                                         fileInfo{f.FileNum.DiskFileNum(), f.Size},
    2754           2 :                                 )
    2755           2 :                         }
    2756           2 :                         d.mu.versions.updateObsoleteTableMetricsLocked()
    2757             :                 }
    2758             :         }
    2759             : 
    2760           2 :         info.Done = true
    2761           2 :         info.Err = err
    2762           2 :         if err == nil {
    2763           2 :                 for i := range ve.NewFiles {
    2764           2 :                         e := &ve.NewFiles[i]
    2765           2 :                         info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
    2766           2 :                 }
    2767           2 :                 d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
    2768           2 :                 d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
    2769           2 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
    2770           2 :                 d.maybeUpdateDeleteCompactionHints(c)
    2771             :         }
    2772             : 
    2773             :         // NB: clearing compacting state must occur before updating the read state;
    2774             :         // L0Sublevels initialization depends on it.
    2775           2 :         d.clearCompactingState(c, err != nil)
    2776           2 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
    2777           2 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
    2778           2 : 
    2779           2 :         info.TotalDuration = d.timeNow().Sub(c.beganAt)
    2780           2 :         d.opts.EventListener.CompactionEnd(info)
    2781           2 : 
    2782           2 :         // Update the read state before deleting obsolete files because the
    2783           2 :         // read-state update will cause the previous version to be unref'd and if
    2784           2 :         // there are no references obsolete tables will be added to the obsolete
    2785           2 :         // table list.
    2786           2 :         if err == nil {
    2787           2 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2788           2 :                 d.updateTableStatsLocked(ve.NewFiles)
    2789           2 :         }
    2790           2 :         d.deleteObsoleteFiles(jobID)
    2791           2 : 
    2792           2 :         return err
    2793             : }
    2794             : 
    2795             : type compactStats struct {
    2796             :         cumulativePinnedKeys uint64
    2797             :         cumulativePinnedSize uint64
    2798             :         countMissizedDels    uint64
    2799             : }
    2800             : 
    2801             : // runCopyCompaction runs a copy compaction where a new FileNum is created that
    2802             : // is a byte-for-byte copy of the input file. This is used in lieu of a move
    2803             : // compaction when a file is being moved across the local/remote storage
    2804             : // boundary.
    2805             : //
    2806             : // d.mu must be held when calling this method.
    2807             : func (d *DB) runCopyCompaction(
    2808             :         jobID int,
    2809             :         c *compaction,
    2810             :         meta *fileMetadata,
    2811             :         objMeta objstorage.ObjectMetadata,
    2812             :         versionEdit *versionEdit,
    2813           2 : ) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) {
    2814           2 :         ve = versionEdit
    2815           2 :         if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
    2816           0 :                 panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
    2817             :         }
    2818             :         // Note that based on logic in the compaction picker, we're guaranteed
    2819             :         // meta.Virtual is false.
    2820           2 :         if meta.Virtual {
    2821           0 :                 panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
    2822             :         }
    2823             :         // We are in the relatively more complex case where we need to copy this
    2824             :         // file to remote/shared storage. Drop the db mutex while we do the
    2825             :         // copy.
    2826             :         //
    2827             :         // To ease up cleanup of the local file and tracking of refs, we create
    2828             :         // a new FileNum. This has the potential of making the block cache less
    2829             :         // effective, however.
    2830           2 :         metaCopy := new(fileMetadata)
    2831           2 :         *metaCopy = fileMetadata{
    2832           2 :                 Size:           meta.Size,
    2833           2 :                 CreationTime:   meta.CreationTime,
    2834           2 :                 SmallestSeqNum: meta.SmallestSeqNum,
    2835           2 :                 LargestSeqNum:  meta.LargestSeqNum,
    2836           2 :                 Stats:          meta.Stats,
    2837           2 :                 Virtual:        meta.Virtual,
    2838           2 :         }
    2839           2 :         if meta.HasPointKeys {
    2840           2 :                 metaCopy.ExtendPointKeyBounds(c.cmp, meta.SmallestPointKey, meta.LargestPointKey)
    2841           2 :         }
    2842           2 :         if meta.HasRangeKeys {
    2843           2 :                 metaCopy.ExtendRangeKeyBounds(c.cmp, meta.SmallestRangeKey, meta.LargestRangeKey)
    2844           2 :         }
    2845           2 :         metaCopy.FileNum = d.mu.versions.getNextFileNum()
    2846           2 :         metaCopy.InitPhysicalBacking()
    2847           2 :         c.metrics = map[int]*LevelMetrics{
    2848           2 :                 c.outputLevel.level: {
    2849           2 :                         BytesIn:         meta.Size,
    2850           2 :                         BytesCompacted:  meta.Size,
    2851           2 :                         TablesCompacted: 1,
    2852           2 :                 },
    2853           2 :         }
    2854           2 :         pendingOutputs = append(pendingOutputs, metaCopy.PhysicalMeta())
    2855           2 :         // Before dropping the db mutex, grab a ref to the current version. This
    2856           2 :         // prevents any concurrent excises from deleting files that this compaction
    2857           2 :         // needs to read/maintain a reference to.
    2858           2 :         vers := d.mu.versions.currentVersion()
    2859           2 :         vers.Ref()
    2860           2 :         defer vers.UnrefLocked()
    2861           2 : 
    2862           2 :         d.mu.Unlock()
    2863           2 :         defer d.mu.Lock()
    2864           2 :         _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
    2865           2 :                 d.objProvider.Path(objMeta), fileTypeTable, metaCopy.FileBacking.DiskFileNum,
    2866           2 :                 objstorage.CreateOptions{PreferSharedStorage: true})
    2867           2 :         if err != nil {
    2868           0 :                 return ve, pendingOutputs, err
    2869           0 :         }
    2870           2 :         ve.NewFiles[0].Meta = metaCopy
    2871           2 : 
    2872           2 :         if err := d.objProvider.Sync(); err != nil {
    2873           0 :                 return nil, pendingOutputs, err
    2874           0 :         }
    2875           2 :         return ve, pendingOutputs, nil
    2876             : }
    2877             : 
    2878             : // runCompactions runs a compaction that produces new on-disk tables from
    2879             : // memtables or old on-disk tables.
    2880             : //
    2881             : // d.mu must be held when calling this, but the mutex may be dropped and
    2882             : // re-acquired during the course of this method.
    2883             : func (d *DB) runCompaction(
    2884             :         jobID int, c *compaction,
    2885           2 : ) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
    2886           2 :         // As a sanity check, confirm that the smallest / largest keys for new and
    2887           2 :         // deleted files in the new versionEdit pass a validation function before
    2888           2 :         // returning the edit.
    2889           2 :         defer func() {
    2890           2 :                 // If we're handling a panic, don't expect the version edit to validate.
    2891           2 :                 if r := recover(); r != nil {
    2892           0 :                         panic(r)
    2893           2 :                 } else if ve != nil {
    2894           2 :                         err := validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey)
    2895           2 :                         if err != nil {
    2896           0 :                                 d.opts.Logger.Fatalf("pebble: version edit validation failed: %s", err)
    2897           0 :                         }
    2898             :                 }
    2899             :         }()
    2900             : 
    2901             :         // Check for a delete-only compaction. This can occur when wide range
    2902             :         // tombstones completely contain sstables.
    2903           2 :         if c.kind == compactionKindDeleteOnly {
    2904           2 :                 c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
    2905           2 :                 ve := &versionEdit{
    2906           2 :                         DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    2907           2 :                 }
    2908           2 :                 for _, cl := range c.inputs {
    2909           2 :                         levelMetrics := &LevelMetrics{}
    2910           2 :                         iter := cl.files.Iter()
    2911           2 :                         for f := iter.First(); f != nil; f = iter.Next() {
    2912           2 :                                 ve.DeletedFiles[deletedFileEntry{
    2913           2 :                                         Level:   cl.level,
    2914           2 :                                         FileNum: f.FileNum,
    2915           2 :                                 }] = f
    2916           2 :                         }
    2917           2 :                         c.metrics[cl.level] = levelMetrics
    2918             :                 }
    2919           2 :                 return ve, nil, stats, nil
    2920             :         }
    2921             : 
    2922           2 :         if c.kind == compactionKindIngestedFlushable {
    2923           0 :                 panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
    2924             :         }
    2925             : 
    2926             :         // Check for a move or copy of one table from one level to the next. We avoid
    2927             :         // such a move if there is lots of overlapping grandparent data. Otherwise,
    2928             :         // the move could create a parent file that will require a very expensive
    2929             :         // merge later on.
    2930           2 :         if c.kind == compactionKindMove || c.kind == compactionKindCopy {
    2931           2 :                 iter := c.startLevel.files.Iter()
    2932           2 :                 meta := iter.First()
    2933           2 :                 if invariants.Enabled {
    2934           2 :                         if iter.Next() != nil {
    2935           0 :                                 panic("got more than one file for a move or copy compaction")
    2936             :                         }
    2937             :                 }
    2938           2 :                 if c.cancel.Load() {
    2939           0 :                         return ve, nil, stats, ErrCancelledCompaction
    2940           0 :                 }
    2941           2 :                 objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
    2942           2 :                 if err != nil {
    2943           0 :                         return ve, pendingOutputs, stats, err
    2944           0 :                 }
    2945           2 :                 c.metrics = map[int]*LevelMetrics{
    2946           2 :                         c.outputLevel.level: {
    2947           2 :                                 BytesMoved:  meta.Size,
    2948           2 :                                 TablesMoved: 1,
    2949           2 :                         },
    2950           2 :                 }
    2951           2 :                 ve := &versionEdit{
    2952           2 :                         DeletedFiles: map[deletedFileEntry]*fileMetadata{
    2953           2 :                                 {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
    2954           2 :                         },
    2955           2 :                         NewFiles: []newFileEntry{
    2956           2 :                                 {Level: c.outputLevel.level, Meta: meta},
    2957           2 :                         },
    2958           2 :                 }
    2959           2 :                 if c.kind == compactionKindCopy {
    2960           2 :                         ve, pendingOutputs, retErr = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
    2961           2 :                         if retErr != nil {
    2962           0 :                                 return ve, pendingOutputs, stats, retErr
    2963           0 :                         }
    2964             :                 }
    2965           2 :                 return ve, nil, stats, nil
    2966             :         }
    2967             : 
    2968           2 :         defer func() {
    2969           2 :                 if retErr != nil {
    2970           2 :                         pendingOutputs = nil
    2971           2 :                 }
    2972             :         }()
    2973             : 
    2974           2 :         snapshots := d.mu.snapshots.toSlice()
    2975           2 :         formatVers := d.FormatMajorVersion()
    2976           2 : 
    2977           2 :         if c.flushing == nil {
    2978           2 :                 // Before dropping the db mutex, grab a ref to the current version. This
    2979           2 :                 // prevents any concurrent excises from deleting files that this compaction
    2980           2 :                 // needs to read/maintain a reference to.
    2981           2 :                 //
    2982           2 :                 // Note that unlike user iterators, compactionIter does not maintain a ref
    2983           2 :                 // of the version or read state.
    2984           2 :                 vers := d.mu.versions.currentVersion()
    2985           2 :                 vers.Ref()
    2986           2 :                 defer vers.UnrefLocked()
    2987           2 :         }
    2988             : 
    2989           2 :         if c.cancel.Load() {
    2990           0 :                 return ve, nil, stats, ErrCancelledCompaction
    2991           0 :         }
    2992             : 
    2993             :         // Release the d.mu lock while doing I/O.
    2994             :         // Note the unusual order: Unlock and then Lock.
    2995           2 :         d.mu.Unlock()
    2996           2 :         defer d.mu.Lock()
    2997           2 : 
    2998           2 :         // Compactions use a pool of buffers to read blocks, avoiding polluting the
    2999           2 :         // block cache with blocks that will not be read again. We initialize the
    3000           2 :         // buffer pool with a size 12. This initial size does not need to be
    3001           2 :         // accurate, because the pool will grow to accommodate the maximum number of
    3002           2 :         // blocks allocated at a given time over the course of the compaction. But
    3003           2 :         // choosing a size larger than that working set avoids any additional
    3004           2 :         // allocations to grow the size of the pool over the course of iteration.
    3005           2 :         //
    3006           2 :         // Justification for initial size 12: In a two-level compaction, at any
    3007           2 :         // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
    3008           2 :         // Additionally, when decoding a compressed block, we'll temporarily
    3009           2 :         // allocate 1 additional block to hold the compressed buffer. In the worst
    3010           2 :         // case that all input sstables have two-level index blocks (+2), value
    3011           2 :         // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
    3012           2 :         // additionally require 2n+4 blocks where n is the number of input sstables.
    3013           2 :         // Range deletion and range key blocks are relatively rare, and the cost of
    3014           2 :         // an additional allocation or two over the course of the compaction is
    3015           2 :         // considered to be okay. A larger initial size would cause the pool to hold
    3016           2 :         // on to more memory, even when it's not in-use because the pool will
    3017           2 :         // recycle buffers up to the current capacity of the pool. The memory use of
    3018           2 :         // a 12-buffer pool is expected to be within reason, even if all the buffers
    3019           2 :         // grow to the typical size of an index block (256 KiB) which would
    3020           2 :         // translate to 3 MiB per compaction.
    3021           2 :         c.bufferPool.Init(12)
    3022           2 :         defer c.bufferPool.Release()
    3023           2 : 
    3024           2 :         iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
    3025           2 :         if err != nil {
    3026           0 :                 return nil, pendingOutputs, stats, err
    3027           0 :         }
    3028           2 :         c.allowedZeroSeqNum = c.allowZeroSeqNum()
    3029           2 :         iiter = invalidating.MaybeWrapIfInvariants(iiter)
    3030           2 :         iter := newCompactionIter(c.cmp, c.equal, c.formatKey, d.merge, iiter, snapshots,
    3031           2 :                 &c.rangeDelFrag, &c.rangeKeyFrag, c.allowedZeroSeqNum, c.elideTombstone,
    3032           2 :                 c.elideRangeTombstone, d.FormatMajorVersion())
    3033           2 : 
    3034           2 :         var (
    3035           2 :                 createdFiles    []base.DiskFileNum
    3036           2 :                 tw              *sstable.Writer
    3037           2 :                 pinnedKeySize   uint64
    3038           2 :                 pinnedValueSize uint64
    3039           2 :                 pinnedCount     uint64
    3040           2 :         )
    3041           2 :         defer func() {
    3042           2 :                 if iter != nil {
    3043           2 :                         retErr = firstError(retErr, iter.Close())
    3044           2 :                 }
    3045           2 :                 if tw != nil {
    3046           0 :                         retErr = firstError(retErr, tw.Close())
    3047           0 :                 }
    3048           2 :                 if retErr != nil {
    3049           2 :                         for _, fileNum := range createdFiles {
    3050           1 :                                 _ = d.objProvider.Remove(fileTypeTable, fileNum)
    3051           1 :                         }
    3052             :                 }
    3053           2 :                 for _, closer := range c.closers {
    3054           2 :                         retErr = firstError(retErr, closer.Close())
    3055           2 :                 }
    3056             :         }()
    3057             : 
    3058           2 :         ve = &versionEdit{
    3059           2 :                 DeletedFiles: map[deletedFileEntry]*fileMetadata{},
    3060           2 :         }
    3061           2 : 
    3062           2 :         startLevelBytes := c.startLevel.files.SizeSum()
    3063           2 :         outputMetrics := &LevelMetrics{
    3064           2 :                 BytesIn:   startLevelBytes,
    3065           2 :                 BytesRead: c.outputLevel.files.SizeSum(),
    3066           2 :         }
    3067           2 :         if len(c.extraLevels) > 0 {
    3068           2 :                 outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
    3069           2 :         }
    3070           2 :         outputMetrics.BytesRead += outputMetrics.BytesIn
    3071           2 : 
    3072           2 :         c.metrics = map[int]*LevelMetrics{
    3073           2 :                 c.outputLevel.level: outputMetrics,
    3074           2 :         }
    3075           2 :         if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
    3076           2 :                 c.metrics[c.startLevel.level] = &LevelMetrics{}
    3077           2 :         }
    3078           2 :         if len(c.extraLevels) > 0 {
    3079           2 :                 c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
    3080           2 :                 outputMetrics.MultiLevel.BytesInTop = startLevelBytes
    3081           2 :                 outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
    3082           2 :                 outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
    3083           2 :         }
    3084             : 
    3085             :         // The table is typically written at the maximum allowable format implied by
    3086             :         // the current format major version of the DB.
    3087           2 :         tableFormat := formatVers.MaxTableFormat()
    3088           2 : 
    3089           2 :         // In format major versions with maximum table formats of Pebblev3, value
    3090           2 :         // blocks were conditional on an experimental setting. In format major
    3091           2 :         // versions with maximum table formats of Pebblev4 and higher, value blocks
    3092           2 :         // are always enabled.
    3093           2 :         if tableFormat == sstable.TableFormatPebblev3 &&
    3094           2 :                 (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
    3095           2 :                 tableFormat = sstable.TableFormatPebblev2
    3096           2 :         }
    3097             : 
    3098           2 :         writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
    3099           2 :         if formatVers < FormatBlockPropertyCollector {
    3100           1 :                 // Cannot yet write block properties.
    3101           1 :                 writerOpts.BlockPropertyCollectors = nil
    3102           1 :         }
    3103             : 
    3104             :         // prevPointKey is a sstable.WriterOption that provides access to
    3105             :         // the last point key written to a writer's sstable. When a new
    3106             :         // output begins in newOutput, prevPointKey is updated to point to
    3107             :         // the new output's sstable.Writer. This allows the compaction loop
    3108             :         // to access the last written point key without requiring the
    3109             :         // compaction loop to make a copy of each key ahead of time. Users
    3110             :         // must be careful, because the byte slice returned by UnsafeKey
    3111             :         // points directly into the Writer's block buffer.
    3112           2 :         var prevPointKey sstable.PreviousPointKeyOpt
    3113           2 :         var cpuWorkHandle CPUWorkHandle
    3114           2 :         defer func() {
    3115           2 :                 if cpuWorkHandle != nil {
    3116           1 :                         d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    3117           1 :                 }
    3118             :         }()
    3119             : 
    3120           2 :         newOutput := func() error {
    3121           2 :                 // Check if we've been cancelled by a concurrent operation.
    3122           2 :                 if c.cancel.Load() {
    3123           1 :                         return ErrCancelledCompaction
    3124           1 :                 }
    3125           2 :                 fileMeta := &fileMetadata{}
    3126           2 :                 d.mu.Lock()
    3127           2 :                 fileNum := d.mu.versions.getNextFileNum()
    3128           2 :                 fileMeta.FileNum = fileNum
    3129           2 :                 pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta())
    3130           2 :                 d.mu.Unlock()
    3131           2 : 
    3132           2 :                 ctx := context.TODO()
    3133           2 :                 if objiotracing.Enabled {
    3134           0 :                         ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
    3135           0 :                         switch c.kind {
    3136           0 :                         case compactionKindFlush:
    3137           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
    3138           0 :                         case compactionKindIngestedFlushable:
    3139           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForIngestion)
    3140           0 :                         default:
    3141           0 :                                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
    3142             :                         }
    3143             :                 }
    3144             :                 // Prefer shared storage if present.
    3145           2 :                 createOpts := objstorage.CreateOptions{
    3146           2 :                         PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
    3147           2 :                 }
    3148           2 :                 writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, fileNum.DiskFileNum(), createOpts)
    3149           2 :                 if err != nil {
    3150           1 :                         return err
    3151           1 :                 }
    3152             : 
    3153           2 :                 reason := "flushing"
    3154           2 :                 if c.flushing == nil {
    3155           2 :                         reason = "compacting"
    3156           2 :                 }
    3157           2 :                 d.opts.EventListener.TableCreated(TableCreateInfo{
    3158           2 :                         JobID:   jobID,
    3159           2 :                         Reason:  reason,
    3160           2 :                         Path:    d.objProvider.Path(objMeta),
    3161           2 :                         FileNum: fileNum,
    3162           2 :                 })
    3163           2 :                 if c.kind != compactionKindFlush {
    3164           2 :                         writable = &compactionWritable{
    3165           2 :                                 Writable: writable,
    3166           2 :                                 versions: d.mu.versions,
    3167           2 :                                 written:  &c.bytesWritten,
    3168           2 :                         }
    3169           2 :                 }
    3170           2 :                 createdFiles = append(createdFiles, fileNum.DiskFileNum())
    3171           2 :                 cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum.DiskFileNum()).(sstable.WriterOption)
    3172           2 : 
    3173           2 :                 const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
    3174           2 :                 cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
    3175           2 :                         MaxFileWriteAdditionalCPUTime,
    3176           2 :                 )
    3177           2 :                 writerOpts.Parallelism =
    3178           2 :                         d.opts.Experimental.MaxWriterConcurrency > 0 &&
    3179           2 :                                 (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
    3180           2 : 
    3181           2 :                 tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)
    3182           2 : 
    3183           2 :                 fileMeta.CreationTime = time.Now().Unix()
    3184           2 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{
    3185           2 :                         Level: c.outputLevel.level,
    3186           2 :                         Meta:  fileMeta,
    3187           2 :                 })
    3188           2 :                 return nil
    3189             :         }
    3190             : 
    3191             :         // splitL0Outputs is true during flushes and intra-L0 compactions with flush
    3192             :         // splits enabled.
    3193           2 :         splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0
    3194           2 : 
    3195           2 :         // finishOutput is called with the a user key up to which all tombstones
    3196           2 :         // should be flushed. Typically, this is the first key of the next
    3197           2 :         // sstable or an empty key if this output is the final sstable.
    3198           2 :         finishOutput := func(splitKey []byte) error {
    3199           2 :                 // If we haven't output any point records to the sstable (tw == nil) then the
    3200           2 :                 // sstable will only contain range tombstones and/or range keys. The smallest
    3201           2 :                 // key in the sstable will be the start key of the first range tombstone or
    3202           2 :                 // range key added. We need to ensure that this start key is distinct from
    3203           2 :                 // the splitKey passed to finishOutput (if set), otherwise we would generate
    3204           2 :                 // an sstable where the largest key is smaller than the smallest key due to
    3205           2 :                 // how the largest key boundary is set below. NB: It is permissible for the
    3206           2 :                 // range tombstone / range key start key to be the empty string.
    3207           2 :                 //
    3208           2 :                 // TODO: It is unfortunate that we have to do this check here rather than
    3209           2 :                 // when we decide to finish the sstable in the runCompaction loop. A better
    3210           2 :                 // structure currently eludes us.
    3211           2 :                 if tw == nil {
    3212           2 :                         startKey := c.rangeDelFrag.Start()
    3213           2 :                         if len(iter.tombstones) > 0 {
    3214           2 :                                 startKey = iter.tombstones[0].Start
    3215           2 :                         }
    3216           2 :                         if startKey == nil {
    3217           2 :                                 startKey = c.rangeKeyFrag.Start()
    3218           2 :                                 if len(iter.rangeKeys) > 0 {
    3219           2 :                                         startKey = iter.rangeKeys[0].Start
    3220           2 :                                 }
    3221             :                         }
    3222           2 :                         if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
    3223           0 :                                 return nil
    3224           0 :                         }
    3225             :                 }
    3226             : 
    3227             :                 // NB: clone the key because the data can be held on to by the call to
    3228             :                 // compactionIter.Tombstones via keyspan.Fragmenter.FlushTo, and by the
    3229             :                 // WriterMetadata.LargestRangeDel.UserKey.
    3230           2 :                 splitKey = append([]byte(nil), splitKey...)
    3231           2 :                 for _, v := range iter.Tombstones(splitKey) {
    3232           2 :                         if tw == nil {
    3233           2 :                                 if err := newOutput(); err != nil {
    3234           0 :                                         return err
    3235           0 :                                 }
    3236             :                         }
    3237             :                         // The tombstone being added could be completely outside the
    3238             :                         // eventual bounds of the sstable. Consider this example (bounds
    3239             :                         // in square brackets next to table filename):
    3240             :                         //
    3241             :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
    3242             :                         // tmgc#391,MERGE [786e627a]
    3243             :                         // tmgc-udkatvs#331,RANGEDEL
    3244             :                         //
    3245             :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
    3246             :                         // tmgc#384,MERGE [666c7070]
    3247             :                         // tmgc-tvsalezade#383,RANGEDEL
    3248             :                         // tmgc-tvsalezade#331,RANGEDEL
    3249             :                         //
    3250             :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
    3251             :                         // tmgc-tvsalezade#383,RANGEDEL
    3252             :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
    3253             :                         // tmgc-tvsalezade#356,RANGEDEL
    3254             :                         //
    3255             :                         // Note that both of the top two SSTables have range tombstones
    3256             :                         // that start after the file's end keys. Since the file bound
    3257             :                         // computation happens well after all range tombstones have been
    3258             :                         // added to the writer, eliding out-of-file range tombstones based
    3259             :                         // on sequence number at this stage is difficult, and necessitates
    3260             :                         // read-time logic to ignore range tombstones outside file bounds.
    3261           2 :                         if err := rangedel.Encode(&v, tw.Add); err != nil {
    3262           0 :                                 return err
    3263           0 :                         }
    3264             :                 }
    3265           2 :                 for _, v := range iter.RangeKeys(splitKey) {
    3266           2 :                         // Same logic as for range tombstones, except added using tw.AddRangeKey.
    3267           2 :                         if tw == nil {
    3268           2 :                                 if err := newOutput(); err != nil {
    3269           0 :                                         return err
    3270           0 :                                 }
    3271             :                         }
    3272           2 :                         if err := rangekey.Encode(&v, tw.AddRangeKey); err != nil {
    3273           0 :                                 return err
    3274           0 :                         }
    3275             :                 }
    3276             : 
    3277           2 :                 if tw == nil {
    3278           2 :                         return nil
    3279           2 :                 }
    3280           2 :                 {
    3281           2 :                         // Set internal sstable properties.
    3282           2 :                         p := getInternalWriterProperties(tw)
    3283           2 :                         // Set the external sst version to 0. This is what RocksDB expects for
    3284           2 :                         // db-internal sstables; otherwise, it could apply a global sequence number.
    3285           2 :                         p.ExternalFormatVersion = 0
    3286           2 :                         // Set the snapshot pinned totals.
    3287           2 :                         p.SnapshotPinnedKeys = pinnedCount
    3288           2 :                         p.SnapshotPinnedKeySize = pinnedKeySize
    3289           2 :                         p.SnapshotPinnedValueSize = pinnedValueSize
    3290           2 :                         stats.cumulativePinnedKeys += pinnedCount
    3291           2 :                         stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
    3292           2 :                         pinnedCount = 0
    3293           2 :                         pinnedKeySize = 0
    3294           2 :                         pinnedValueSize = 0
    3295           2 :                 }
    3296           2 :                 if err := tw.Close(); err != nil {
    3297           1 :                         tw = nil
    3298           1 :                         return err
    3299           1 :                 }
    3300           2 :                 d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
    3301           2 :                 cpuWorkHandle = nil
    3302           2 :                 writerMeta, err := tw.Metadata()
    3303           2 :                 if err != nil {
    3304           0 :                         tw = nil
    3305           0 :                         return err
    3306           0 :                 }
    3307           2 :                 tw = nil
    3308           2 :                 meta := ve.NewFiles[len(ve.NewFiles)-1].Meta
    3309           2 :                 meta.Size = writerMeta.Size
    3310           2 :                 meta.SmallestSeqNum = writerMeta.SmallestSeqNum
    3311           2 :                 meta.LargestSeqNum = writerMeta.LargestSeqNum
    3312           2 :                 meta.InitPhysicalBacking()
    3313           2 : 
    3314           2 :                 // If the file didn't contain any range deletions, we can fill its
    3315           2 :                 // table stats now, avoiding unnecessarily loading the table later.
    3316           2 :                 maybeSetStatsFromProperties(
    3317           2 :                         meta.PhysicalMeta(), &writerMeta.Properties,
    3318           2 :                 )
    3319           2 : 
    3320           2 :                 if c.flushing == nil {
    3321           2 :                         outputMetrics.TablesCompacted++
    3322           2 :                         outputMetrics.BytesCompacted += meta.Size
    3323           2 :                 } else {
    3324           2 :                         outputMetrics.TablesFlushed++
    3325           2 :                         outputMetrics.BytesFlushed += meta.Size
    3326           2 :                 }
    3327           2 :                 outputMetrics.Size += int64(meta.Size)
    3328           2 :                 outputMetrics.NumFiles++
    3329           2 :                 outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize
    3330           2 :                 outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize
    3331           2 : 
    3332           2 :                 if n := len(ve.NewFiles); n > 1 {
    3333           2 :                         // This is not the first output file. Ensure the sstable boundaries
    3334           2 :                         // are nonoverlapping.
    3335           2 :                         prevMeta := ve.NewFiles[n-2].Meta
    3336           2 :                         if writerMeta.SmallestRangeDel.UserKey != nil {
    3337           2 :                                 c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey)
    3338           2 :                                 if c < 0 {
    3339           0 :                                         return errors.Errorf(
    3340           0 :                                                 "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s",
    3341           0 :                                                 writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3342           0 :                                                 prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
    3343           2 :                                 } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() {
    3344           0 :                                         // The user key portion of the range boundary start key is
    3345           0 :                                         // equal to the previous table's largest key user key, and
    3346           0 :                                         // the previous table's largest key is not exclusive. This
    3347           0 :                                         // violates the invariant that tables are key-space
    3348           0 :                                         // partitioned.
    3349           0 :                                         return errors.Errorf(
    3350           0 :                                                 "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s",
    3351           0 :                                                 prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey),
    3352           0 :                                                 writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3353           0 :                                         )
    3354           0 :                                 }
    3355             :                         }
    3356             :                 }
    3357             : 
    3358             :                 // Verify that all range deletions outputted to the sstable are
    3359             :                 // truncated to split key.
    3360           2 :                 if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil &&
    3361           2 :                         d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 {
    3362           0 :                         return errors.Errorf(
    3363           0 :                                 "pebble: invariant violation: rangedel largest key %q extends beyond split key %q",
    3364           0 :                                 writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey),
    3365           0 :                                 d.opts.Comparer.FormatKey(splitKey),
    3366           0 :                         )
    3367           0 :                 }
    3368             : 
    3369           2 :                 if writerMeta.HasPointKeys {
    3370           2 :                         meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint)
    3371           2 :                 }
    3372           2 :                 if writerMeta.HasRangeDelKeys {
    3373           2 :                         meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel)
    3374           2 :                 }
    3375           2 :                 if writerMeta.HasRangeKeys {
    3376           2 :                         meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey)
    3377           2 :                 }
    3378             : 
    3379             :                 // Verify that the sstable bounds fall within the compaction input
    3380             :                 // bounds. This is a sanity check that we don't have a logic error
    3381             :                 // elsewhere that causes the sstable bounds to accidentally expand past the
    3382             :                 // compaction input bounds as doing so could lead to various badness such
    3383             :                 // as keys being deleted by a range tombstone incorrectly.
    3384           2 :                 if c.smallest.UserKey != nil {
    3385           2 :                         switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); {
    3386           2 :                         case v >= 0:
    3387             :                                 // Nothing to do.
    3388           0 :                         case v < 0:
    3389           0 :                                 return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s",
    3390           0 :                                         meta.Smallest.Pretty(d.opts.Comparer.FormatKey),
    3391           0 :                                         c.smallest.Pretty(d.opts.Comparer.FormatKey))
    3392             :                         }
    3393             :                 }
    3394           2 :                 if c.largest.UserKey != nil {
    3395           2 :                         switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); {
    3396           2 :                         case v <= 0:
    3397             :                                 // Nothing to do.
    3398           0 :                         case v > 0:
    3399           0 :                                 return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s",
    3400           0 :                                         meta.Largest.Pretty(d.opts.Comparer.FormatKey),
    3401           0 :                                         c.largest.Pretty(d.opts.Comparer.FormatKey))
    3402             :                         }
    3403             :                 }
    3404             :                 // Verify that we never split different revisions of the same user key
    3405             :                 // across two different sstables.
    3406           2 :                 if err := c.errorOnUserKeyOverlap(ve); err != nil {
    3407           0 :                         return err
    3408           0 :                 }
    3409           2 :                 if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    3410           0 :                         return err
    3411           0 :                 }
    3412           2 :                 return nil
    3413             :         }
    3414             : 
    3415             :         // Build a compactionOutputSplitter that contains all logic to determine
    3416             :         // whether the compaction loop should stop writing to one output sstable and
    3417             :         // switch to a new one. Some splitters can wrap other splitters, and the
    3418             :         // splitterGroup can be composed of multiple splitters. In this case, we
    3419             :         // start off with splitters for file sizes, grandparent limits, and (for L0
    3420             :         // splits) L0 limits, before wrapping them in an splitterGroup.
    3421           2 :         sizeSplitter := newFileSizeSplitter(&iter.frontiers, c.maxOutputFileSize, c.grandparents.Iter())
    3422           2 :         unsafePrevUserKey := func() []byte {
    3423           2 :                 // Return the largest point key written to tw or the start of
    3424           2 :                 // the current range deletion in the fragmenter, whichever is
    3425           2 :                 // greater.
    3426           2 :                 prevPoint := prevPointKey.UnsafeKey()
    3427           2 :                 if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
    3428           2 :                         return prevPoint.UserKey
    3429           2 :                 }
    3430           2 :                 return c.rangeDelFrag.Start()
    3431             :         }
    3432           2 :         outputSplitters := []compactionOutputSplitter{
    3433           2 :                 // We do not split the same user key across different sstables within
    3434           2 :                 // one flush or compaction. The fileSizeSplitter may request a split in
    3435           2 :                 // the middle of a user key, so the userKeyChangeSplitter ensures we are
    3436           2 :                 // at a user key change boundary when doing a split.
    3437           2 :                 &userKeyChangeSplitter{
    3438           2 :                         cmp:               c.cmp,
    3439           2 :                         splitter:          sizeSplitter,
    3440           2 :                         unsafePrevUserKey: unsafePrevUserKey,
    3441           2 :                 },
    3442           2 :                 newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
    3443           2 :         }
    3444           2 :         if splitL0Outputs {
    3445           2 :                 outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
    3446           2 :         }
    3447           2 :         splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}
    3448           2 : 
    3449           2 :         // Each outer loop iteration produces one output file. An iteration that
    3450           2 :         // produces a file containing point keys (and optionally range tombstones)
    3451           2 :         // guarantees that the input iterator advanced. An iteration that produces
    3452           2 :         // a file containing only range tombstones guarantees the limit passed to
    3453           2 :         // `finishOutput()` advanced to a strictly greater user key corresponding
    3454           2 :         // to a grandparent file largest key, or nil. Taken together, these
    3455           2 :         // progress guarantees ensure that eventually the input iterator will be
    3456           2 :         // exhausted and the range tombstone fragments will all be flushed.
    3457           2 :         for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
    3458           2 :                 var firstKey []byte
    3459           2 :                 if key != nil {
    3460           2 :                         firstKey = key.UserKey
    3461           2 :                 } else if startKey := c.rangeDelFrag.Start(); startKey != nil {
    3462           2 :                         // Pass the start key of the first pending tombstone to find the
    3463           2 :                         // next limit. All pending tombstones have the same start key. We
    3464           2 :                         // use this as opposed to the end key of the last written sstable to
    3465           2 :                         // effectively handle cases like these:
    3466           2 :                         //
    3467           2 :                         // a.SET.3
    3468           2 :                         // (lf.limit at b)
    3469           2 :                         // d.RANGEDEL.4:f
    3470           2 :                         //
    3471           2 :                         // In this case, the partition after b has only range deletions, so
    3472           2 :                         // if we were to find the limit after the last written key at the
    3473           2 :                         // split point (key a), we'd get the limit b again, and
    3474           2 :                         // finishOutput() would not advance any further because the next
    3475           2 :                         // range tombstone to write does not start until after the L0 split
    3476           2 :                         // point.
    3477           2 :                         firstKey = startKey
    3478           2 :                 }
    3479           2 :                 splitterSuggestion := splitter.onNewOutput(firstKey)
    3480           2 : 
    3481           2 :                 // Each inner loop iteration processes one key from the input iterator.
    3482           2 :                 for ; key != nil; key, val = iter.Next() {
    3483           2 :                         if split := splitter.shouldSplitBefore(key, tw); split == splitNow {
    3484           2 :                                 break
    3485             :                         }
    3486             : 
    3487           2 :                         switch key.Kind() {
    3488           2 :                         case InternalKeyKindRangeDelete:
    3489           2 :                                 // Range tombstones are handled specially. They are fragmented,
    3490           2 :                                 // and they're not written until later during `finishOutput()`.
    3491           2 :                                 // We add them to the `Fragmenter` now to make them visible to
    3492           2 :                                 // `compactionIter` so covered keys in the same snapshot stripe
    3493           2 :                                 // can be elided.
    3494           2 : 
    3495           2 :                                 // The interleaved range deletion might only be one of many with
    3496           2 :                                 // these bounds. Some fragmenting is performed ahead of time by
    3497           2 :                                 // keyspan.MergingIter.
    3498           2 :                                 if s := c.rangeDelIter.Span(); !s.Empty() {
    3499           2 :                                         // The memory management here is subtle. Range deletions
    3500           2 :                                         // blocks do NOT use prefix compression, which ensures that
    3501           2 :                                         // range deletion spans' memory is available as long we keep
    3502           2 :                                         // the iterator open. However, the keyspan.MergingIter that
    3503           2 :                                         // merges spans across levels only guarantees the lifetime
    3504           2 :                                         // of the [start, end) bounds until the next positioning
    3505           2 :                                         // method is called.
    3506           2 :                                         //
    3507           2 :                                         // Additionally, the Span.Keys slice is owned by the the
    3508           2 :                                         // range deletion iterator stack, and it may be overwritten
    3509           2 :                                         // when we advance.
    3510           2 :                                         //
    3511           2 :                                         // Clone the Keys slice and the start and end keys.
    3512           2 :                                         //
    3513           2 :                                         // TODO(jackson): Avoid the clone by removing c.rangeDelFrag
    3514           2 :                                         // and performing explicit truncation of the pending
    3515           2 :                                         // rangedel span as necessary.
    3516           2 :                                         clone := keyspan.Span{
    3517           2 :                                                 Start: iter.cloneKey(s.Start),
    3518           2 :                                                 End:   iter.cloneKey(s.End),
    3519           2 :                                                 Keys:  make([]keyspan.Key, len(s.Keys)),
    3520           2 :                                         }
    3521           2 :                                         copy(clone.Keys, s.Keys)
    3522           2 :                                         c.rangeDelFrag.Add(clone)
    3523           2 :                                 }
    3524           2 :                                 continue
    3525           2 :                         case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
    3526           2 :                                 // Range keys are handled in the same way as range tombstones, except
    3527           2 :                                 // with a dedicated fragmenter.
    3528           2 :                                 if s := c.rangeKeyInterleaving.Span(); !s.Empty() {
    3529           2 :                                         clone := keyspan.Span{
    3530           2 :                                                 Start: iter.cloneKey(s.Start),
    3531           2 :                                                 End:   iter.cloneKey(s.End),
    3532           2 :                                                 Keys:  make([]keyspan.Key, len(s.Keys)),
    3533           2 :                                         }
    3534           2 :                                         // Since the keys' Suffix and Value fields are not deep cloned, the
    3535           2 :                                         // underlying blockIter must be kept open for the lifetime of the
    3536           2 :                                         // compaction.
    3537           2 :                                         copy(clone.Keys, s.Keys)
    3538           2 :                                         c.rangeKeyFrag.Add(clone)
    3539           2 :                                 }
    3540           2 :                                 continue
    3541             :                         }
    3542           2 :                         if tw == nil {
    3543           2 :                                 if err := newOutput(); err != nil {
    3544           2 :                                         return nil, pendingOutputs, stats, err
    3545           2 :                                 }
    3546             :                         }
    3547           2 :                         if err := tw.AddWithForceObsolete(*key, val, iter.forceObsoleteDueToRangeDel); err != nil {
    3548           0 :                                 return nil, pendingOutputs, stats, err
    3549           0 :                         }
    3550           2 :                         if iter.snapshotPinned {
    3551           2 :                                 // The kv pair we just added to the sstable was only surfaced by
    3552           2 :                                 // the compaction iterator because an open snapshot prevented
    3553           2 :                                 // its elision. Increment the stats.
    3554           2 :                                 pinnedCount++
    3555           2 :                                 pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
    3556           2 :                                 pinnedValueSize += uint64(len(val))
    3557           2 :                         }
    3558             :                 }
    3559             : 
    3560             :                 // A splitter requested a split, and we're ready to finish the output.
    3561             :                 // We need to choose the key at which to split any pending range
    3562             :                 // tombstones. There are two options:
    3563             :                 // 1. splitterSuggestion — The key suggested by the splitter. This key
    3564             :                 //    is guaranteed to be greater than the last key written to the
    3565             :                 //    current output.
    3566             :                 // 2. key.UserKey — the first key of the next sstable output. This user
    3567             :                 //     key is also guaranteed to be greater than the last user key
    3568             :                 //     written to the current output (see userKeyChangeSplitter).
    3569             :                 //
    3570             :                 // Use whichever is smaller. Using the smaller of the two limits
    3571             :                 // overlap with grandparents. Consider the case where the
    3572             :                 // grandparent limit is calculated to be 'b', key is 'x', and
    3573             :                 // there exist many sstables between 'b' and 'x'. If the range
    3574             :                 // deletion fragmenter has a pending tombstone [a,x), splitting
    3575             :                 // at 'x' would cause the output table to overlap many
    3576             :                 // grandparents well beyond the calculated grandparent limit
    3577             :                 // 'b'. Splitting at the smaller `splitterSuggestion` avoids
    3578             :                 // this unbounded overlap with grandparent tables.
    3579           2 :                 splitKey := splitterSuggestion
    3580           2 :                 if key != nil && (splitKey == nil || c.cmp(splitKey, key.UserKey) > 0) {
    3581           2 :                         splitKey = key.UserKey
    3582           2 :                 }
    3583           2 :                 if err := finishOutput(splitKey); err != nil {
    3584           1 :                         return nil, pendingOutputs, stats, err
    3585           1 :                 }
    3586             :         }
    3587             : 
    3588           2 :         for _, cl := range c.inputs {
    3589           2 :                 iter := cl.files.Iter()
    3590           2 :                 for f := iter.First(); f != nil; f = iter.Next() {
    3591           2 :                         ve.DeletedFiles[deletedFileEntry{
    3592           2 :                                 Level:   cl.level,
    3593           2 :                                 FileNum: f.FileNum,
    3594           2 :                         }] = f
    3595           2 :                 }
    3596             :         }
    3597             : 
    3598             :         // The compaction iterator keeps track of a count of the number of DELSIZED
    3599             :         // keys that encoded an incorrect size. Propagate it up as a part of
    3600             :         // compactStats.
    3601           2 :         stats.countMissizedDels = iter.stats.countMissizedDels
    3602           2 : 
    3603           2 :         if err := d.objProvider.Sync(); err != nil {
    3604           0 :                 return nil, pendingOutputs, stats, err
    3605           0 :         }
    3606             : 
    3607             :         // Refresh the disk available statistic whenever a compaction/flush
    3608             :         // completes, before re-acquiring the mutex.
    3609           2 :         _ = d.calculateDiskAvailableBytes()
    3610           2 : 
    3611           2 :         return ve, pendingOutputs, stats, nil
    3612             : }
    3613             : 
    3614             : // validateVersionEdit validates that start and end keys across new and deleted
    3615             : // files in a versionEdit pass the given validation function.
    3616             : func validateVersionEdit(
    3617             :         ve *versionEdit, validateFn func([]byte) error, format base.FormatKey,
    3618           2 : ) error {
    3619           2 :         validateMetaFn := func(f *manifest.FileMetadata) error {
    3620           2 :                 for _, key := range []InternalKey{f.Smallest, f.Largest} {
    3621           2 :                         if err := validateFn(key.UserKey); err != nil {
    3622           1 :                                 return errors.Wrapf(err, "key=%q; file=%s", format(key.UserKey), f)
    3623           1 :                         }
    3624             :                 }
    3625           2 :                 return nil
    3626             :         }
    3627             : 
    3628             :         // Validate both new and deleted files.
    3629           2 :         for _, f := range ve.NewFiles {
    3630           2 :                 if err := validateMetaFn(f.Meta); err != nil {
    3631           1 :                         return err
    3632           1 :                 }
    3633             :         }
    3634           2 :         for _, m := range ve.DeletedFiles {
    3635           2 :                 if err := validateMetaFn(m); err != nil {
    3636           1 :                         return err
    3637           1 :                 }
    3638             :         }
    3639             : 
    3640           2 :         return nil
    3641             : }
    3642             : 
    3643             : // scanObsoleteFiles scans the filesystem for files that are no longer needed
    3644             : // and adds those to the internal lists of obsolete files. Note that the files
    3645             : // are not actually deleted by this method. A subsequent call to
    3646             : // deleteObsoleteFiles must be performed. Must be not be called concurrently
    3647             : // with compactions and flushes. db.mu must be held when calling this function.
    3648           2 : func (d *DB) scanObsoleteFiles(list []string) {
    3649           2 :         // Disable automatic compactions temporarily to avoid concurrent compactions /
    3650           2 :         // flushes from interfering. The original value is restored on completion.
    3651           2 :         disabledPrev := d.opts.DisableAutomaticCompactions
    3652           2 :         defer func() {
    3653           2 :                 d.opts.DisableAutomaticCompactions = disabledPrev
    3654           2 :         }()
    3655           2 :         d.opts.DisableAutomaticCompactions = true
    3656           2 : 
    3657           2 :         // Wait for any ongoing compaction to complete before continuing.
    3658           2 :         for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
    3659           0 :                 d.mu.compact.cond.Wait()
    3660           0 :         }
    3661             : 
    3662           2 :         liveFileNums := make(map[base.DiskFileNum]struct{})
    3663           2 :         d.mu.versions.addLiveFileNums(liveFileNums)
    3664           2 :         // Protect against files which are only referred to by the ingestedFlushable
    3665           2 :         // from being deleted. These are added to the flushable queue on WAL replay
    3666           2 :         // during read only mode and aren't part of the Version. Note that if
    3667           2 :         // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
    3668           2 :         // already been flushed.
    3669           2 :         for _, fEntry := range d.mu.mem.queue {
    3670           2 :                 if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
    3671           0 :                         for _, file := range f.files {
    3672           0 :                                 liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
    3673           0 :                         }
    3674             :                 }
    3675             :         }
    3676             : 
    3677           2 :         minUnflushedLogNum := d.mu.versions.minUnflushedLogNum
    3678           2 :         manifestFileNum := d.mu.versions.manifestFileNum
    3679           2 : 
    3680           2 :         var obsoleteLogs []fileInfo
    3681           2 :         var obsoleteTables []fileInfo
    3682           2 :         var obsoleteManifests []fileInfo
    3683           2 :         var obsoleteOptions []fileInfo
    3684           2 : 
    3685           2 :         for _, filename := range list {
    3686           2 :                 fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
    3687           2 :                 if !ok {
    3688           2 :                         continue
    3689             :                 }
    3690           2 :                 switch fileType {
    3691           2 :                 case fileTypeLog:
    3692           2 :                         if diskFileNum >= minUnflushedLogNum {
    3693           0 :                                 continue
    3694             :                         }
    3695           2 :                         fi := fileInfo{fileNum: diskFileNum}
    3696           2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
    3697           1 :                                 fi.fileSize = uint64(stat.Size())
    3698           1 :                         }
    3699           2 :                         obsoleteLogs = append(obsoleteLogs, fi)
    3700           2 :                 case fileTypeManifest:
    3701           2 :                         if diskFileNum >= manifestFileNum {
    3702           2 :                                 continue
    3703             :                         }
    3704           2 :                         fi := fileInfo{fileNum: diskFileNum}
    3705           2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
    3706           1 :                                 fi.fileSize = uint64(stat.Size())
    3707           1 :                         }
    3708           2 :                         obsoleteManifests = append(obsoleteManifests, fi)
    3709           2 :                 case fileTypeOptions:
    3710           2 :                         if diskFileNum.FileNum() >= d.optionsFileNum.FileNum() {
    3711           0 :                                 continue
    3712             :                         }
    3713           2 :                         fi := fileInfo{fileNum: diskFileNum}
    3714           2 :                         if stat, err := d.opts.FS.Stat(filename); err == nil {
    3715           1 :                                 fi.fileSize = uint64(stat.Size())
    3716           1 :                         }
    3717           2 :                         obsoleteOptions = append(obsoleteOptions, fi)
    3718           2 :                 case fileTypeTable:
    3719             :                         // Objects are handled through the objstorage provider below.
    3720           2 :                 default:
    3721             :                         // Don't delete files we don't know about.
    3722             :                 }
    3723             :         }
    3724             : 
    3725           2 :         objects := d.objProvider.List()
    3726           2 :         for _, obj := range objects {
    3727           2 :                 switch obj.FileType {
    3728           2 :                 case fileTypeTable:
    3729           2 :                         if _, ok := liveFileNums[obj.DiskFileNum]; ok {
    3730           2 :                                 continue
    3731             :                         }
    3732           2 :                         fileInfo := fileInfo{
    3733           2 :                                 fileNum: obj.DiskFileNum,
    3734           2 :                         }
    3735           2 :                         if size, err := d.objProvider.Size(obj); err == nil {
    3736           1 :                                 fileInfo.fileSize = uint64(size)
    3737           1 :                         }
    3738           2 :                         obsoleteTables = append(obsoleteTables, fileInfo)
    3739             : 
    3740           0 :                 default:
    3741             :                         // Ignore object types we don't know about.
    3742             :                 }
    3743             :         }
    3744             : 
    3745           2 :         d.mu.log.queue = merge(d.mu.log.queue, obsoleteLogs)
    3746           2 :         d.mu.versions.metrics.WAL.Files = int64(len(d.mu.log.queue))
    3747           2 :         d.mu.versions.obsoleteTables = merge(d.mu.versions.obsoleteTables, obsoleteTables)
    3748           2 :         d.mu.versions.updateObsoleteTableMetricsLocked()
    3749           2 :         d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
    3750           2 :         d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
    3751             : }
    3752             : 
    3753             : // disableFileDeletions disables file deletions and then waits for any
    3754             : // in-progress deletion to finish. The caller is required to call
    3755             : // enableFileDeletions in order to enable file deletions again. It is ok for
    3756             : // multiple callers to disable file deletions simultaneously, though they must
    3757             : // all invoke enableFileDeletions in order for file deletions to be re-enabled
    3758             : // (there is an internal reference count on file deletion disablement).
    3759             : //
    3760             : // d.mu must be held when calling this method.
    3761           2 : func (d *DB) disableFileDeletions() {
    3762           2 :         d.mu.disableFileDeletions++
    3763           2 :         d.mu.Unlock()
    3764           2 :         defer d.mu.Lock()
    3765           2 :         d.cleanupManager.Wait()
    3766           2 : }
    3767             : 
    3768             : // enableFileDeletions enables previously disabled file deletions. A cleanup job
    3769             : // is queued if necessary.
    3770             : //
    3771             : // d.mu must be held when calling this method.
    3772           2 : func (d *DB) enableFileDeletions() {
    3773           2 :         if d.mu.disableFileDeletions <= 0 {
    3774           1 :                 panic("pebble: file deletion disablement invariant violated")
    3775             :         }
    3776           2 :         d.mu.disableFileDeletions--
    3777           2 :         if d.mu.disableFileDeletions > 0 {
    3778           0 :                 return
    3779           0 :         }
    3780           2 :         jobID := d.mu.nextJobID
    3781           2 :         d.mu.nextJobID++
    3782           2 :         d.deleteObsoleteFiles(jobID)
    3783             : }
    3784             : 
    3785             : type fileInfo struct {
    3786             :         fileNum  base.DiskFileNum
    3787             :         fileSize uint64
    3788             : }
    3789             : 
    3790             : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
    3791             : //
    3792             : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
    3793             : //
    3794             : // Does nothing if file deletions are disabled (see disableFileDeletions). A
    3795             : // cleanup job will be scheduled when file deletions are re-enabled.
    3796           2 : func (d *DB) deleteObsoleteFiles(jobID int) {
    3797           2 :         if d.mu.disableFileDeletions > 0 {
    3798           2 :                 return
    3799           2 :         }
    3800             : 
    3801           2 :         var obsoleteLogs []fileInfo
    3802           2 :         for i := range d.mu.log.queue {
    3803           2 :                 // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
    3804           2 :                 // log that has not had its contents flushed to an sstable. We can recycle
    3805           2 :                 // the prefix of d.mu.log.queue with log numbers less than
    3806           2 :                 // minUnflushedLogNum.
    3807           2 :                 if d.mu.log.queue[i].fileNum >= d.mu.versions.minUnflushedLogNum {
    3808           2 :                         obsoleteLogs = d.mu.log.queue[:i]
    3809           2 :                         d.mu.log.queue = d.mu.log.queue[i:]
    3810           2 :                         d.mu.versions.metrics.WAL.Files -= int64(len(obsoleteLogs))
    3811           2 :                         break
    3812             :                 }
    3813             :         }
    3814             : 
    3815           2 :         obsoleteTables := append([]fileInfo(nil), d.mu.versions.obsoleteTables...)
    3816           2 :         d.mu.versions.obsoleteTables = nil
    3817           2 : 
    3818           2 :         for _, tbl := range obsoleteTables {
    3819           2 :                 delete(d.mu.versions.zombieTables, tbl.fileNum)
    3820           2 :         }
    3821             : 
    3822             :         // Sort the manifests cause we want to delete some contiguous prefix
    3823             :         // of the older manifests.
    3824           2 :         slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
    3825           2 :                 return cmp.Compare(a.fileNum, b.fileNum)
    3826           2 :         })
    3827             : 
    3828           2 :         var obsoleteManifests []fileInfo
    3829           2 :         manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
    3830           2 :         if manifestsToDelete > 0 {
    3831           2 :                 obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
    3832           2 :                 d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
    3833           2 :                 if len(d.mu.versions.obsoleteManifests) == 0 {
    3834           0 :                         d.mu.versions.obsoleteManifests = nil
    3835           0 :                 }
    3836             :         }
    3837             : 
    3838           2 :         obsoleteOptions := d.mu.versions.obsoleteOptions
    3839           2 :         d.mu.versions.obsoleteOptions = nil
    3840           2 : 
    3841           2 :         // Release d.mu while preparing the cleanup job and possibly waiting.
    3842           2 :         // Note the unusual order: Unlock and then Lock.
    3843           2 :         d.mu.Unlock()
    3844           2 :         defer d.mu.Lock()
    3845           2 : 
    3846           2 :         files := [4]struct {
    3847           2 :                 fileType fileType
    3848           2 :                 obsolete []fileInfo
    3849           2 :         }{
    3850           2 :                 {fileTypeLog, obsoleteLogs},
    3851           2 :                 {fileTypeTable, obsoleteTables},
    3852           2 :                 {fileTypeManifest, obsoleteManifests},
    3853           2 :                 {fileTypeOptions, obsoleteOptions},
    3854           2 :         }
    3855           2 :         _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
    3856           2 :         filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
    3857           2 :         for _, f := range files {
    3858           2 :                 // We sort to make the order of deletions deterministic, which is nice for
    3859           2 :                 // tests.
    3860           2 :                 slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
    3861           2 :                         return cmp.Compare(a.fileNum, b.fileNum)
    3862           2 :                 })
    3863           2 :                 for _, fi := range f.obsolete {
    3864           2 :                         dir := d.dirname
    3865           2 :                         switch f.fileType {
    3866           2 :                         case fileTypeLog:
    3867           2 :                                 if !noRecycle && d.logRecycler.add(fi) {
    3868           1 :                                         continue
    3869             :                                 }
    3870           2 :                                 dir = d.walDirname
    3871           2 :                         case fileTypeTable:
    3872           2 :                                 d.tableCache.evict(fi.fileNum)
    3873             :                         }
    3874             : 
    3875           2 :                         filesToDelete = append(filesToDelete, obsoleteFile{
    3876           2 :                                 dir:      dir,
    3877           2 :                                 fileNum:  fi.fileNum,
    3878           2 :                                 fileType: f.fileType,
    3879           2 :                                 fileSize: fi.fileSize,
    3880           2 :                         })
    3881             :                 }
    3882             :         }
    3883           2 :         if len(filesToDelete) > 0 {
    3884           2 :                 d.cleanupManager.EnqueueJob(jobID, filesToDelete)
    3885           2 :         }
    3886           2 :         if d.opts.private.testingAlwaysWaitForCleanup {
    3887           1 :                 d.cleanupManager.Wait()
    3888           1 :         }
    3889             : }
    3890             : 
    3891           2 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
    3892           2 :         d.mu.Lock()
    3893           2 :         defer d.mu.Unlock()
    3894           2 :         d.maybeScheduleObsoleteTableDeletionLocked()
    3895           2 : }
    3896             : 
    3897           2 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
    3898           2 :         if len(d.mu.versions.obsoleteTables) > 0 {
    3899           2 :                 jobID := d.mu.nextJobID
    3900           2 :                 d.mu.nextJobID++
    3901           2 :                 d.deleteObsoleteFiles(jobID)
    3902           2 :         }
    3903             : }
    3904             : 
    3905           2 : func merge(a, b []fileInfo) []fileInfo {
    3906           2 :         if len(b) == 0 {
    3907           2 :                 return a
    3908           2 :         }
    3909             : 
    3910           2 :         a = append(a, b...)
    3911           2 :         slices.SortFunc(a, func(a, b fileInfo) int {
    3912           2 :                 return cmp.Compare(a.fileNum, b.fileNum)
    3913           2 :         })
    3914           2 :         return slices.CompactFunc(a, func(a, b fileInfo) bool {
    3915           2 :                 return a.fileNum == b.fileNum
    3916           2 :         })
    3917             : }

Generated by: LCOV version 1.14