LCOV - code coverage report
Current view: top level - pebble - compaction.go (source / functions) Coverage Total Hit
Test: 2025-09-20 08:18Z 489b133b - tests only.lcov Lines: 92.2 % 2554 2354
Test Date: 2025-09-20 08:19:46 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "bytes"
       9              :         stdcmp "cmp"
      10              :         "context"
      11              :         "fmt"
      12              :         "iter"
      13              :         "maps"
      14              :         "math"
      15              :         "runtime/pprof"
      16              :         "slices"
      17              :         "sort"
      18              :         "sync/atomic"
      19              :         "time"
      20              :         "unsafe"
      21              : 
      22              :         "github.com/cockroachdb/crlib/crtime"
      23              :         "github.com/cockroachdb/errors"
      24              :         "github.com/cockroachdb/pebble/internal/base"
      25              :         "github.com/cockroachdb/pebble/internal/compact"
      26              :         "github.com/cockroachdb/pebble/internal/keyspan"
      27              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      28              :         "github.com/cockroachdb/pebble/internal/manifest"
      29              :         "github.com/cockroachdb/pebble/internal/problemspans"
      30              :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      31              :         "github.com/cockroachdb/pebble/objstorage"
      32              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      33              :         "github.com/cockroachdb/pebble/objstorage/remote"
      34              :         "github.com/cockroachdb/pebble/sstable"
      35              :         "github.com/cockroachdb/pebble/sstable/blob"
      36              :         "github.com/cockroachdb/pebble/sstable/block"
      37              :         "github.com/cockroachdb/pebble/sstable/block/blockkind"
      38              :         "github.com/cockroachdb/pebble/vfs"
      39              :         "github.com/cockroachdb/redact"
      40              : )
      41              : 
      42              : var errEmptyTable = errors.New("pebble: empty table")
      43              : 
      44              : // ErrCancelledCompaction is returned if a compaction is cancelled by a
      45              : // concurrent excise or ingest-split operation.
      46              : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
      47              : 
      48              : var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
      49              : var gcLabels = pprof.Labels("pebble", "gc")
      50              : 
      51              : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
      52              : // compacted files. We avoid expanding the lower level file set of a compaction
      53              : // if it would make the total compaction cover more than this many bytes.
      54              : func expandedCompactionByteSizeLimit(
      55              :         opts *Options, targetFileSize int64, availBytes uint64,
      56            1 : ) uint64 {
      57            1 :         v := uint64(25 * targetFileSize)
      58            1 : 
      59            1 :         // Never expand a compaction beyond half the available capacity, divided
      60            1 :         // by the maximum number of concurrent compactions. Each of the concurrent
      61            1 :         // compactions may expand up to this limit, so this attempts to limit
      62            1 :         // compactions to half of available disk space. Note that this will not
      63            1 :         // prevent compaction picking from pursuing compactions that are larger
      64            1 :         // than this threshold before expansion.
      65            1 :         //
      66            1 :         // NB: this heuristic is an approximation since we may run more compactions
      67            1 :         // than the upper concurrency limit.
      68            1 :         _, maxConcurrency := opts.CompactionConcurrencyRange()
      69            1 :         diskMax := (availBytes / 2) / uint64(maxConcurrency)
      70            1 :         if v > diskMax {
      71            1 :                 v = diskMax
      72            1 :         }
      73            1 :         return v
      74              : }
      75              : 
      76              : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
      77              : // before we stop building a single file in a level-1 to level compaction.
      78            1 : func maxGrandparentOverlapBytes(targetFileSize int64) uint64 {
      79            1 :         return uint64(10 * targetFileSize)
      80            1 : }
      81              : 
      82              : // maxReadCompactionBytes is used to prevent read compactions which
      83              : // are too wide.
      84            1 : func maxReadCompactionBytes(targetFileSize int64) uint64 {
      85            1 :         return uint64(10 * targetFileSize)
      86            1 : }
      87              : 
      88              : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
      89              : // calls to Close. It is used during compaction to ensure that rangeDelIters
      90              : // are not closed prematurely.
      91              : type noCloseIter struct {
      92              :         keyspan.FragmentIterator
      93              : }
      94              : 
      95            1 : func (i *noCloseIter) Close() {}
      96              : 
      97              : type compactionLevel struct {
      98              :         level int
      99              :         files manifest.LevelSlice
     100              :         // l0SublevelInfo contains information about L0 sublevels being compacted.
     101              :         // It's only set for the start level of a compaction starting out of L0 and
     102              :         // is nil for all other compactions.
     103              :         l0SublevelInfo []sublevelInfo
     104              : }
     105              : 
     106            1 : func (cl compactionLevel) Clone() compactionLevel {
     107            1 :         newCL := compactionLevel{
     108            1 :                 level: cl.level,
     109            1 :                 files: cl.files,
     110            1 :         }
     111            1 :         return newCL
     112            1 : }
     113            1 : func (cl compactionLevel) String() string {
     114            1 :         return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
     115            1 : }
     116              : 
     117              : // compactionWritable is a objstorage.Writable wrapper that, on every write,
     118              : // updates a metric in `versions` on bytes written by in-progress compactions so
     119              : // far. It also increments a per-compaction `written` atomic int.
     120              : type compactionWritable struct {
     121              :         objstorage.Writable
     122              : 
     123              :         versions *versionSet
     124              :         written  *atomic.Int64
     125              : }
     126              : 
     127              : // Write is part of the objstorage.Writable interface.
     128            1 : func (c *compactionWritable) Write(p []byte) error {
     129            1 :         if err := c.Writable.Write(p); err != nil {
     130            0 :                 return err
     131            0 :         }
     132              : 
     133            1 :         c.written.Add(int64(len(p)))
     134            1 :         c.versions.incrementCompactionBytes(int64(len(p)))
     135            1 :         return nil
     136              : }
     137              : 
     138              : type compactionKind int
     139              : 
     140              : const (
     141              :         compactionKindDefault compactionKind = iota
     142              :         compactionKindFlush
     143              :         // compactionKindMove denotes a move compaction where the input file is
     144              :         // retained and linked in a new level without being obsoleted.
     145              :         compactionKindMove
     146              :         // compactionKindCopy denotes a copy compaction where the input file is
     147              :         // copied byte-by-byte into a new file with a new TableNum in the output level.
     148              :         compactionKindCopy
     149              :         // compactionKindDeleteOnly denotes a compaction that only deletes input
     150              :         // files. It can occur when wide range tombstones completely contain sstables.
     151              :         compactionKindDeleteOnly
     152              :         compactionKindElisionOnly
     153              :         compactionKindRead
     154              :         compactionKindTombstoneDensity
     155              :         compactionKindRewrite
     156              :         compactionKindIngestedFlushable
     157              :         compactionKindBlobFileRewrite
     158              :         compactionKindVirtualRewrite
     159              : )
     160              : 
     161            1 : func (k compactionKind) String() string {
     162            1 :         switch k {
     163            1 :         case compactionKindDefault:
     164            1 :                 return "default"
     165            0 :         case compactionKindFlush:
     166            0 :                 return "flush"
     167            1 :         case compactionKindMove:
     168            1 :                 return "move"
     169            1 :         case compactionKindDeleteOnly:
     170            1 :                 return "delete-only"
     171            1 :         case compactionKindElisionOnly:
     172            1 :                 return "elision-only"
     173            1 :         case compactionKindRead:
     174            1 :                 return "read"
     175            1 :         case compactionKindTombstoneDensity:
     176            1 :                 return "tombstone-density"
     177            1 :         case compactionKindRewrite:
     178            1 :                 return "rewrite"
     179            0 :         case compactionKindIngestedFlushable:
     180            0 :                 return "ingested-flushable"
     181            1 :         case compactionKindCopy:
     182            1 :                 return "copy"
     183            0 :         case compactionKindBlobFileRewrite:
     184            0 :                 return "blob-file-rewrite"
     185            1 :         case compactionKindVirtualRewrite:
     186            1 :                 return "virtual-sst-rewrite"
     187              :         }
     188            0 :         return "?"
     189              : }
     190              : 
     191              : // compactingOrFlushing returns "flushing" if the compaction kind is a flush,
     192              : // otherwise it returns "compacting".
     193            1 : func (k compactionKind) compactingOrFlushing() string {
     194            1 :         if k == compactionKindFlush {
     195            1 :                 return "flushing"
     196            1 :         }
     197            1 :         return "compacting"
     198              : }
     199              : 
     200              : type compaction interface {
     201              :         AddInProgressLocked(*DB)
     202              :         BeganAt() time.Time
     203              :         Bounds() *base.UserKeyBounds
     204              :         Cancel()
     205              :         Execute(JobID, *DB) error
     206              :         GrantHandle() CompactionGrantHandle
     207              :         Info() compactionInfo
     208              :         IsDownload() bool
     209              :         IsFlush() bool
     210              :         PprofLabels(UserKeyCategories) pprof.LabelSet
     211              :         RecordError(*problemspans.ByLevel, error)
     212              :         Tables() iter.Seq2[int, *manifest.TableMetadata]
     213              :         UsesBurstConcurrency() bool
     214              :         VersionEditApplied() bool
     215              : }
     216              : 
     217              : // tableCompaction is a table compaction from one level to the next, starting
     218              : // from a given version. It implements the compaction interface.
     219              : type tableCompaction struct {
     220              :         // cancel is a bool that can be used by other goroutines to signal a compaction
     221              :         // to cancel, such as if a conflicting excise operation raced it to manifest
     222              :         // application. Only holders of the manifest lock will write to this atomic.
     223              :         cancel atomic.Bool
     224              :         // kind indicates the kind of compaction. Different compaction kinds have
     225              :         // different semantics and mechanics. Some may have additional fields.
     226              :         kind compactionKind
     227              :         // isDownload is true if this compaction was started as part of a Download
     228              :         // operation. In this case kind is compactionKindCopy or
     229              :         // compactionKindRewrite.
     230              :         isDownload bool
     231              : 
     232              :         comparer *base.Comparer
     233              :         logger   Logger
     234              :         version  *manifest.Version
     235              :         // versionEditApplied is set to true when a compaction has completed and the
     236              :         // resulting version has been installed (if successful), but the compaction
     237              :         // goroutine is still cleaning up (eg, deleting obsolete files).
     238              :         versionEditApplied bool
     239              :         // getValueSeparation constructs a compact.ValueSeparation for use in a
     240              :         // compaction. It implements heuristics around choosing whether a compaction
     241              :         // should:
     242              :         //
     243              :         // a) preserve existing blob references: The compaction does not write any
     244              :         // new blob files, but propagates existing references to blob files.This
     245              :         // conserves write bandwidth by avoiding rewriting the referenced values. It
     246              :         // also reduces the locality of the referenced values which can reduce scan
     247              :         // performance because a scan must load values from more unique blob files.
     248              :         // It can also delay reclamation of disk space if some of the references to
     249              :         // blob values are elided by the compaction, increasing space amplification.
     250              :         //
     251              :         // b) rewrite blob files: The compaction will write eligible values to new
     252              :         // blob files. This consumes more write bandwidth because all values are
     253              :         // rewritten. However it restores locality.
     254              :         getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
     255              : 
     256              :         // startLevel is the level that is being compacted. Inputs from startLevel
     257              :         // and outputLevel will be merged to produce a set of outputLevel files.
     258              :         startLevel *compactionLevel
     259              : 
     260              :         // outputLevel is the level that files are being produced in. outputLevel is
     261              :         // equal to startLevel+1 except when:
     262              :         //    - if startLevel is 0, the output level equals compactionPicker.baseLevel().
     263              :         //    - in multilevel compaction, the output level is the lowest level involved in
     264              :         //      the compaction
     265              :         // A compaction's outputLevel is nil for delete-only compactions.
     266              :         outputLevel *compactionLevel
     267              : 
     268              :         // extraLevels point to additional levels in between the input and output
     269              :         // levels that get compacted in multilevel compactions
     270              :         extraLevels []*compactionLevel
     271              : 
     272              :         inputs []compactionLevel
     273              : 
     274              :         // maxOutputFileSize is the maximum size of an individual table created
     275              :         // during compaction.
     276              :         maxOutputFileSize uint64
     277              :         // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
     278              :         // single output table with the tables in the grandparent level.
     279              :         maxOverlapBytes uint64
     280              : 
     281              :         // The boundaries of the input data.
     282              :         bounds base.UserKeyBounds
     283              : 
     284              :         // grandparents are the tables in level+2 that overlap with the files being
     285              :         // compacted. Used to determine output table boundaries. Do not assume that the actual files
     286              :         // in the grandparent when this compaction finishes will be the same.
     287              :         grandparents manifest.LevelSlice
     288              : 
     289              :         delElision      compact.TombstoneElision
     290              :         rangeKeyElision compact.TombstoneElision
     291              : 
     292              :         // deleteOnly contains information specific to compactions with kind
     293              :         // compactionKindDeleteOnly. A delete-only compaction is a special
     294              :         // compaction that does not merge or write sstables. Instead, it only
     295              :         // performs deletions either through removing whole sstables from the LSM or
     296              :         // virtualizing them into virtual sstables.
     297              :         deleteOnly struct {
     298              :                 // hints are collected by the table stats collector and describe range
     299              :                 // deletions and the files containing keys deleted by them.
     300              :                 hints []deleteCompactionHint
     301              :                 // exciseEnabled is set to true if this compaction is allowed to excise
     302              :                 // files. If false, the compaction will only remove whole sstables that
     303              :                 // are wholly contained within the bounds of range deletions.
     304              :                 exciseEnabled bool
     305              :         }
     306              :         // flush contains information specific to flushes (compactionKindFlush and
     307              :         // compactionKindIngestedFlushable). A flush is modeled by a compaction
     308              :         // because it has similar mechanics to a default compaction.
     309              :         flush struct {
     310              :                 // flushables contains the flushables (aka memtables, large batches,
     311              :                 // flushable ingestions, etc) that are being flushed.
     312              :                 flushables flushableList
     313              :                 // Boundaries at which sstables flushed to L0 should be split.
     314              :                 // Determined by L0Sublevels. If nil, ignored.
     315              :                 l0Limits [][]byte
     316              :         }
     317              :         // iterationState contains state used during compaction iteration.
     318              :         iterationState struct {
     319              :                 // bufferPool is a pool of buffers used when reading blocks. Compactions
     320              :                 // do not populate the block cache under the assumption that the blocks
     321              :                 // we read will soon be irrelevant when their containing sstables are
     322              :                 // removed from the LSM.
     323              :                 bufferPool sstable.BufferPool
     324              :                 // keyspanIterClosers is a list of fragment iterators to close when the
     325              :                 // compaction finishes. As iteration opens new keyspan iterators,
     326              :                 // elements are appended. Keyspan iterators must remain open for the
     327              :                 // lifetime of the compaction, so they're accumulated here. When the
     328              :                 // compaction finishes, all the underlying keyspan iterators are closed.
     329              :                 keyspanIterClosers []*noCloseIter
     330              :                 // valueFetcher is used to fetch values from blob files. It's propagated
     331              :                 // down the iterator tree through the internal iterator options.
     332              :                 valueFetcher blob.ValueFetcher
     333              :         }
     334              :         // metrics encapsulates various metrics collected during a compaction.
     335              :         metrics compactionMetrics
     336              : 
     337              :         grantHandle CompactionGrantHandle
     338              : 
     339              :         tableFormat   sstable.TableFormat
     340              :         objCreateOpts objstorage.CreateOptions
     341              : 
     342              :         annotations []string
     343              : }
     344              : 
     345              : // Assert that tableCompaction implements the compaction interface.
     346              : var _ compaction = (*tableCompaction)(nil)
     347              : 
     348            1 : func (c *tableCompaction) AddInProgressLocked(d *DB) {
     349            1 :         d.mu.compact.inProgress[c] = struct{}{}
     350            1 :         if c.UsesBurstConcurrency() {
     351            0 :                 d.mu.compact.burstConcurrency.Add(1)
     352            0 :         }
     353            1 :         var isBase, isIntraL0 bool
     354            1 :         for _, cl := range c.inputs {
     355            1 :                 for f := range cl.files.All() {
     356            1 :                         if f.IsCompacting() {
     357            0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
     358            0 :                         }
     359            1 :                         f.SetCompactionState(manifest.CompactionStateCompacting)
     360            1 :                         if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
     361            1 :                                 if c.outputLevel.level == 0 {
     362            1 :                                         f.IsIntraL0Compacting = true
     363            1 :                                         isIntraL0 = true
     364            1 :                                 } else {
     365            1 :                                         isBase = true
     366            1 :                                 }
     367              :                         }
     368              :                 }
     369              :         }
     370              : 
     371            1 :         if isIntraL0 || isBase {
     372            1 :                 l0Inputs := []manifest.LevelSlice{c.startLevel.files}
     373            1 :                 if isIntraL0 {
     374            1 :                         l0Inputs = append(l0Inputs, c.outputLevel.files)
     375            1 :                 }
     376            1 :                 if err := d.mu.versions.latest.l0Organizer.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
     377            0 :                         d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
     378            0 :                 }
     379              :         }
     380              : }
     381              : 
     382            1 : func (c *tableCompaction) BeganAt() time.Time          { return c.metrics.beganAt }
     383            1 : func (c *tableCompaction) Bounds() *base.UserKeyBounds { return &c.bounds }
     384            1 : func (c *tableCompaction) Cancel()                     { c.cancel.Store(true) }
     385              : 
     386            1 : func (c *tableCompaction) Execute(jobID JobID, d *DB) error {
     387            1 :         c.grantHandle.Started()
     388            1 :         err := d.compact1(jobID, c)
     389            1 :         // The version stored in the compaction is ref'd when the compaction is
     390            1 :         // created. We're responsible for un-refing it when the compaction is
     391            1 :         // complete.
     392            1 :         if c.version != nil {
     393            1 :                 c.version.UnrefLocked()
     394            1 :         }
     395            1 :         return err
     396              : }
     397              : 
     398            1 : func (c *tableCompaction) RecordError(problemSpans *problemspans.ByLevel, err error) {
     399            1 :         // Record problem spans for a short duration, unless the error is a
     400            1 :         // corruption.
     401            1 :         expiration := 30 * time.Second
     402            1 :         if IsCorruptionError(err) {
     403            1 :                 // TODO(radu): ideally, we should be using the corruption reporting
     404            1 :                 // mechanism which has a tighter span for the corruption. We would need to
     405            1 :                 // somehow plumb the level of the file.
     406            1 :                 expiration = 5 * time.Minute
     407            1 :         }
     408              : 
     409            1 :         for i := range c.inputs {
     410            1 :                 level := c.inputs[i].level
     411            1 :                 if level == 0 {
     412            1 :                         // We do not set problem spans on L0, as they could block flushes.
     413            1 :                         continue
     414              :                 }
     415            1 :                 it := c.inputs[i].files.Iter()
     416            1 :                 for f := it.First(); f != nil; f = it.Next() {
     417            1 :                         problemSpans.Add(level, f.UserKeyBounds(), expiration)
     418            1 :                 }
     419              :         }
     420              : }
     421              : 
     422            1 : func (c *tableCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
     423            1 : func (c *tableCompaction) IsDownload() bool                   { return c.isDownload }
     424            1 : func (c *tableCompaction) IsFlush() bool                      { return len(c.flush.flushables) > 0 }
     425            1 : func (c *tableCompaction) Info() compactionInfo {
     426            1 :         info := compactionInfo{
     427            1 :                 versionEditApplied: c.versionEditApplied,
     428            1 :                 kind:               c.kind,
     429            1 :                 inputs:             c.inputs,
     430            1 :                 bounds:             &c.bounds,
     431            1 :                 outputLevel:        -1,
     432            1 :         }
     433            1 :         if c.outputLevel != nil {
     434            1 :                 info.outputLevel = c.outputLevel.level
     435            1 :         }
     436            1 :         return info
     437              : }
     438            1 : func (c *tableCompaction) PprofLabels(kc UserKeyCategories) pprof.LabelSet {
     439            1 :         activity := "compact"
     440            1 :         if len(c.flush.flushables) != 0 {
     441            0 :                 activity = "flush"
     442            0 :         }
     443            1 :         level := "L?"
     444            1 :         // Delete-only compactions don't have an output level.
     445            1 :         if c.outputLevel != nil {
     446            1 :                 level = fmt.Sprintf("L%d", c.outputLevel.level)
     447            1 :         }
     448            1 :         if kc.Len() > 0 {
     449            0 :                 cat := kc.CategorizeKeyRange(c.bounds.Start, c.bounds.End.Key)
     450            0 :                 return pprof.Labels("pebble", activity, "output-level", level, "key-type", cat)
     451            0 :         }
     452            1 :         return pprof.Labels("pebble", activity, "output-level", level)
     453              : }
     454              : 
     455            1 : func (c *tableCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
     456            1 :         return func(yield func(int, *manifest.TableMetadata) bool) {
     457            1 :                 for _, cl := range c.inputs {
     458            1 :                         for f := range cl.files.All() {
     459            1 :                                 if !yield(cl.level, f) {
     460            1 :                                         return
     461            1 :                                 }
     462              :                         }
     463              :                 }
     464              :         }
     465              : }
     466              : 
     467            1 : func (c *tableCompaction) UsesBurstConcurrency() bool { return false }
     468              : 
     469            1 : func (c *tableCompaction) VersionEditApplied() bool { return c.versionEditApplied }
     470              : 
     471              : // compactionMetrics contians metrics surrounding a compaction.
     472              : type compactionMetrics struct {
     473              :         // beganAt is the time when the compaction began.
     474              :         beganAt time.Time
     475              :         // bytesWritten contains the number of bytes that have been written to
     476              :         // outputs. It's updated whenever the compaction outputs'
     477              :         // objstorage.Writables receive new writes. See newCompactionOutputObj.
     478              :         bytesWritten atomic.Int64
     479              :         // internalIterStats contains statistics from the internal iterators used by
     480              :         // the compaction.
     481              :         //
     482              :         // TODO(jackson): Use these to power the compaction BytesRead metric.
     483              :         internalIterStats base.InternalIteratorStats
     484              :         // perLevel contains metrics for each level involved in the compaction.
     485              :         perLevel levelMetricsDelta
     486              :         // picker contains metrics from the compaction picker when the compaction
     487              :         // was picked.
     488              :         picker pickedCompactionMetrics
     489              : }
     490              : 
     491              : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
     492              : // input sstables.
     493            1 : func (c *tableCompaction) inputLargestSeqNumAbsolute() base.SeqNum {
     494            1 :         var seqNum base.SeqNum
     495            1 :         for _, cl := range c.inputs {
     496            1 :                 for m := range cl.files.All() {
     497            1 :                         seqNum = max(seqNum, m.LargestSeqNumAbsolute)
     498            1 :                 }
     499              :         }
     500            1 :         return seqNum
     501              : }
     502              : 
     503            1 : func (c *tableCompaction) makeInfo(jobID JobID) CompactionInfo {
     504            1 :         info := CompactionInfo{
     505            1 :                 JobID:       int(jobID),
     506            1 :                 Reason:      c.kind.String(),
     507            1 :                 Input:       make([]LevelInfo, 0, len(c.inputs)),
     508            1 :                 Annotations: []string{},
     509            1 :         }
     510            1 :         if c.isDownload {
     511            1 :                 info.Reason = "download," + info.Reason
     512            1 :         }
     513            1 :         for _, cl := range c.inputs {
     514            1 :                 inputInfo := LevelInfo{Level: cl.level, Tables: nil}
     515            1 :                 for m := range cl.files.All() {
     516            1 :                         inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
     517            1 :                 }
     518            1 :                 info.Input = append(info.Input, inputInfo)
     519              :         }
     520            1 :         if c.outputLevel != nil {
     521            1 :                 info.Output.Level = c.outputLevel.level
     522            1 : 
     523            1 :                 // If there are no inputs from the output level (eg, a move
     524            1 :                 // compaction), add an empty LevelInfo to info.Input.
     525            1 :                 if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
     526            0 :                         info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
     527            0 :                 }
     528            1 :         } else {
     529            1 :                 // For a delete-only compaction, set the output level to L6. The
     530            1 :                 // output level is not meaningful here, but complicating the
     531            1 :                 // info.Output interface with a pointer doesn't seem worth the
     532            1 :                 // semantic distinction.
     533            1 :                 info.Output.Level = numLevels - 1
     534            1 :         }
     535              : 
     536            1 :         for i, score := range c.metrics.picker.scores {
     537            1 :                 info.Input[i].Score = score
     538            1 :         }
     539            1 :         info.SingleLevelOverlappingRatio = c.metrics.picker.singleLevelOverlappingRatio
     540            1 :         info.MultiLevelOverlappingRatio = c.metrics.picker.multiLevelOverlappingRatio
     541            1 :         if len(info.Input) > 2 {
     542            1 :                 info.Annotations = append(info.Annotations, "multilevel")
     543            1 :         }
     544            1 :         return info
     545              : }
     546              : 
     547              : type getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
     548              : 
     549              : // newCompaction constructs a compaction from the provided picked compaction.
     550              : //
     551              : // The compaction is created with a reference to its version that must be
     552              : // released when the compaction is complete.
     553              : func newCompaction(
     554              :         pc *pickedTableCompaction,
     555              :         opts *Options,
     556              :         beganAt time.Time,
     557              :         provider objstorage.Provider,
     558              :         grantHandle CompactionGrantHandle,
     559              :         tableFormat sstable.TableFormat,
     560              :         getValueSeparation getValueSeparation,
     561            1 : ) *tableCompaction {
     562            1 :         c := &tableCompaction{
     563            1 :                 kind:               compactionKindDefault,
     564            1 :                 comparer:           opts.Comparer,
     565            1 :                 inputs:             pc.inputs,
     566            1 :                 bounds:             pc.bounds,
     567            1 :                 logger:             opts.Logger,
     568            1 :                 version:            pc.version,
     569            1 :                 getValueSeparation: getValueSeparation,
     570            1 :                 maxOutputFileSize:  pc.maxOutputFileSize,
     571            1 :                 maxOverlapBytes:    pc.maxOverlapBytes,
     572            1 :                 metrics: compactionMetrics{
     573            1 :                         beganAt: beganAt,
     574            1 :                         picker:  pc.pickerMetrics,
     575            1 :                 },
     576            1 :                 grantHandle: grantHandle,
     577            1 :                 tableFormat: tableFormat,
     578            1 :         }
     579            1 :         // Acquire a reference to the version to ensure that files and in-memory
     580            1 :         // version state necessary for reading files remain available. Ignoring
     581            1 :         // excises, this isn't strictly necessary for reading the sstables that are
     582            1 :         // inputs to the compaction because those files are 'marked as compacting'
     583            1 :         // and shouldn't be subject to any competing compactions. However with
     584            1 :         // excises, a concurrent excise may remove a compaction's file from the
     585            1 :         // Version and then cancel the compaction. The file shouldn't be physically
     586            1 :         // removed until the cancelled compaction stops reading it.
     587            1 :         //
     588            1 :         // Additionally, we need any blob files referenced by input sstables to
     589            1 :         // remain available, even if the blob file is rewritten. Maintaining a
     590            1 :         // reference ensures that all these files remain available for the
     591            1 :         // compaction's reads.
     592            1 :         c.version.Ref()
     593            1 : 
     594            1 :         c.startLevel = &c.inputs[0]
     595            1 :         if pc.startLevel.l0SublevelInfo != nil {
     596            1 :                 c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
     597            1 :         }
     598              : 
     599            1 :         c.outputLevel = &c.inputs[len(c.inputs)-1]
     600            1 : 
     601            1 :         if len(pc.inputs) > 2 {
     602            1 :                 // TODO(xinhaoz): Look into removing extraLevels on the compaction struct.
     603            1 :                 c.extraLevels = make([]*compactionLevel, 0, len(pc.inputs)-2)
     604            1 :                 for i := 1; i < len(pc.inputs)-1; i++ {
     605            1 :                         c.extraLevels = append(c.extraLevels, &c.inputs[i])
     606            1 :                 }
     607              :         }
     608              :         // Compute the set of outputLevel+1 files that overlap this compaction (these
     609              :         // are the grandparent sstables).
     610            1 :         if c.outputLevel.level+1 < numLevels {
     611            1 :                 c.grandparents = c.version.Overlaps(max(c.outputLevel.level+1, pc.baseLevel), c.bounds)
     612            1 :         }
     613            1 :         c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
     614            1 :                 c.comparer.Compare, c.version, pc.l0Organizer, c.outputLevel.level, c.bounds,
     615            1 :         )
     616            1 :         c.kind = pc.kind
     617            1 : 
     618            1 :         preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
     619            1 :                 remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     620            1 :         c.maybeSwitchToMoveOrCopy(preferSharedStorage, provider)
     621            1 :         c.objCreateOpts = objstorage.CreateOptions{
     622            1 :                 PreferSharedStorage: preferSharedStorage,
     623            1 :                 WriteCategory:       getDiskWriteCategoryForCompaction(opts, c.kind),
     624            1 :         }
     625            1 :         if preferSharedStorage {
     626            1 :                 c.getValueSeparation = neverSeparateValues
     627            1 :         }
     628              : 
     629            1 :         return c
     630              : }
     631              : 
     632              : // maybeSwitchToMoveOrCopy decides if the compaction can be changed into a move
     633              : // or copy compaction, in which case c.kind is updated.
     634              : func (c *tableCompaction) maybeSwitchToMoveOrCopy(
     635              :         preferSharedStorage bool, provider objstorage.Provider,
     636            1 : ) {
     637            1 :         // Only non-multi-level compactions with a single input file can be
     638            1 :         // considered.
     639            1 :         if c.startLevel.files.Len() != 1 || !c.outputLevel.files.Empty() || c.hasExtraLevelData() {
     640            1 :                 return
     641            1 :         }
     642              : 
     643              :         // In addition to the default compaction, we also check whether a tombstone
     644              :         // density compaction can be optimized into a move compaction. However, we
     645              :         // want to avoid performing a move compaction into the lowest level, since the
     646              :         // goal there is to actually remove the tombstones.
     647              :         //
     648              :         // Tombstone density compaction is meant to address cases where tombstones
     649              :         // don't reclaim much space but are still expensive to scan over. We can only
     650              :         // remove the tombstones once there's nothing at all underneath them.
     651            1 :         switch c.kind {
     652            1 :         case compactionKindDefault:
     653              :                 // Proceed.
     654            1 :         case compactionKindTombstoneDensity:
     655            1 :                 // Tombstone density compaction can be optimized into a move compaction.
     656            1 :                 // However, we want to avoid performing a move compaction into the lowest
     657            1 :                 // level, since the goal there is to actually remove the tombstones; even if
     658            1 :                 // they don't prevent a lot of space from being reclaimed, tombstones can
     659            1 :                 // still be expensive to scan over.
     660            1 :                 if c.outputLevel.level == numLevels-1 {
     661            1 :                         return
     662            1 :                 }
     663            1 :         default:
     664            1 :                 // Other compaction kinds not supported.
     665            1 :                 return
     666              :         }
     667              : 
     668              :         // We avoid a move or copy if there is lots of overlapping grandparent data.
     669              :         // Otherwise, the move could create a parent file that will require a very
     670              :         // expensive merge later on.
     671            1 :         if c.grandparents.AggregateSizeSum() > c.maxOverlapBytes {
     672            1 :                 return
     673            1 :         }
     674              : 
     675            1 :         iter := c.startLevel.files.Iter()
     676            1 :         meta := iter.First()
     677            1 : 
     678            1 :         // We should always be passed a provider, except in some unit tests.
     679            1 :         isRemote := provider != nil && !objstorage.IsLocalTable(provider, meta.TableBacking.DiskFileNum)
     680            1 : 
     681            1 :         // Shared and external tables can always be moved. We can also move a local
     682            1 :         // table unless we need the result to be on shared storage.
     683            1 :         if isRemote || !preferSharedStorage {
     684            1 :                 c.kind = compactionKindMove
     685            1 :                 return
     686            1 :         }
     687              : 
     688              :         // We can rewrite the table (regular compaction) or we can use a copy compaction.
     689            1 :         switch {
     690            1 :         case meta.Virtual:
     691              :                 // We want to avoid a copy compaction if the table is virtual, as we may end
     692              :                 // up copying a lot more data than necessary.
     693            1 :         case meta.BlobReferenceDepth != 0:
     694              :                 // We also want to avoid copy compactions for tables with blob references,
     695              :                 // as we currently lack a mechanism to propagate blob references along with
     696              :                 // the sstable.
     697            1 :         default:
     698            1 :                 c.kind = compactionKindCopy
     699              :         }
     700              : }
     701              : 
     702              : // newDeleteOnlyCompaction constructs a delete-only compaction from the provided
     703              : // inputs.
     704              : //
     705              : // The compaction is created with a reference to its version that must be
     706              : // released when the compaction is complete.
     707              : func newDeleteOnlyCompaction(
     708              :         opts *Options,
     709              :         cur *manifest.Version,
     710              :         inputs []compactionLevel,
     711              :         beganAt time.Time,
     712              :         hints []deleteCompactionHint,
     713              :         exciseEnabled bool,
     714            1 : ) *tableCompaction {
     715            1 :         c := &tableCompaction{
     716            1 :                 kind:        compactionKindDeleteOnly,
     717            1 :                 comparer:    opts.Comparer,
     718            1 :                 logger:      opts.Logger,
     719            1 :                 version:     cur,
     720            1 :                 inputs:      inputs,
     721            1 :                 grantHandle: noopGrantHandle{},
     722            1 :                 metrics: compactionMetrics{
     723            1 :                         beganAt: beganAt,
     724            1 :                 },
     725            1 :         }
     726            1 :         c.deleteOnly.hints = hints
     727            1 :         c.deleteOnly.exciseEnabled = exciseEnabled
     728            1 :         // Acquire a reference to the version to ensure that files and in-memory
     729            1 :         // version state necessary for reading files remain available. Ignoring
     730            1 :         // excises, this isn't strictly necessary for reading the sstables that are
     731            1 :         // inputs to the compaction because those files are 'marked as compacting'
     732            1 :         // and shouldn't be subject to any competing compactions. However with
     733            1 :         // excises, a concurrent excise may remove a compaction's file from the
     734            1 :         // Version and then cancel the compaction. The file shouldn't be physically
     735            1 :         // removed until the cancelled compaction stops reading it.
     736            1 :         //
     737            1 :         // Additionally, we need any blob files referenced by input sstables to
     738            1 :         // remain available, even if the blob file is rewritten. Maintaining a
     739            1 :         // reference ensures that all these files remain available for the
     740            1 :         // compaction's reads.
     741            1 :         c.version.Ref()
     742            1 : 
     743            1 :         // Set c.smallest, c.largest.
     744            1 :         cmp := opts.Comparer.Compare
     745            1 :         for _, in := range inputs {
     746            1 :                 c.bounds = manifest.ExtendKeyRange(cmp, c.bounds, in.files.All())
     747            1 :         }
     748            1 :         return c
     749              : }
     750              : 
     751            1 : func adjustGrandparentOverlapBytesForFlush(c *tableCompaction, flushingBytes uint64) {
     752            1 :         // Heuristic to place a lower bound on compaction output file size
     753            1 :         // caused by Lbase. Prior to this heuristic we have observed an L0 in
     754            1 :         // production with 310K files of which 290K files were < 10KB in size.
     755            1 :         // Our hypothesis is that it was caused by L1 having 2600 files and
     756            1 :         // ~10GB, such that each flush got split into many tiny files due to
     757            1 :         // overlapping with most of the files in Lbase.
     758            1 :         //
     759            1 :         // The computation below is general in that it accounts
     760            1 :         // for flushing different volumes of data (e.g. we may be flushing
     761            1 :         // many memtables). For illustration, we consider the typical
     762            1 :         // example of flushing a 64MB memtable. So 12.8MB output,
     763            1 :         // based on the compression guess below. If the compressed bytes
     764            1 :         // guess is an over-estimate we will end up with smaller files,
     765            1 :         // and if an under-estimate we will end up with larger files.
     766            1 :         // With a 2MB target file size, 7 files. We are willing to accept
     767            1 :         // 4x the number of files, if it results in better write amplification
     768            1 :         // when later compacting to Lbase, i.e., ~450KB files (target file
     769            1 :         // size / 4).
     770            1 :         //
     771            1 :         // Note that this is a pessimistic heuristic in that
     772            1 :         // fileCountUpperBoundDueToGrandparents could be far from the actual
     773            1 :         // number of files produced due to the grandparent limits. For
     774            1 :         // example, in the extreme, consider a flush that overlaps with 1000
     775            1 :         // files in Lbase f0...f999, and the initially calculated value of
     776            1 :         // maxOverlapBytes will cause splits at f10, f20,..., f990, which
     777            1 :         // means an upper bound file count of 100 files. Say the input bytes
     778            1 :         // in the flush are such that acceptableFileCount=10. We will fatten
     779            1 :         // up maxOverlapBytes by 10x to ensure that the upper bound file count
     780            1 :         // drops to 10. However, it is possible that in practice, even without
     781            1 :         // this change, we would have produced no more than 10 files, and that
     782            1 :         // this change makes the files unnecessarily wide. Say the input bytes
     783            1 :         // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
     784            1 :         // 10% in f80...f89 and 10% in f990...f999. The original value of
     785            1 :         // maxOverlapBytes would have actually produced only 10 sstables. But
     786            1 :         // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
     787            1 :         // spans f0...f89, i.e., a much wider sstable than necessary.
     788            1 :         //
     789            1 :         // We could produce a tighter estimate of
     790            1 :         // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
     791            1 :         // distribution of the flush. The 4x multiplier mentioned earlier is
     792            1 :         // a way to try to compensate for this pessimism.
     793            1 :         //
     794            1 :         // TODO(sumeer): we don't have compression info for the data being
     795            1 :         // flushed, but it is likely that existing files that overlap with
     796            1 :         // this flush in Lbase are representative wrt compression ratio. We
     797            1 :         // could store the uncompressed size in TableMetadata and estimate
     798            1 :         // the compression ratio.
     799            1 :         const approxCompressionRatio = 0.2
     800            1 :         approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
     801            1 :         approxNumFilesBasedOnTargetSize :=
     802            1 :                 int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
     803            1 :         acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
     804            1 :         // The byte calculation is linear in numGrandparentFiles, but we will
     805            1 :         // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
     806            1 :         // also willing to pay it now. We could approximate this cheaply by using the
     807            1 :         // mean file size of Lbase.
     808            1 :         grandparentFileBytes := c.grandparents.AggregateSizeSum()
     809            1 :         fileCountUpperBoundDueToGrandparents :=
     810            1 :                 float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
     811            1 :         if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
     812            1 :                 c.maxOverlapBytes = uint64(
     813            1 :                         float64(c.maxOverlapBytes) *
     814            1 :                                 (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
     815            1 :         }
     816              : }
     817              : 
     818              : // newFlush creates the state necessary for a flush (modeled with the compaction
     819              : // struct).
     820              : //
     821              : // newFlush takes the current Version in order to populate grandparent flushing
     822              : // limits, but it does not reference the version.
     823              : //
     824              : // TODO(jackson): Consider maintaining a reference to the version anyways since
     825              : // in the future in-memory Version state may only be available while a Version
     826              : // is referenced (eg, if we start recycling B-Tree nodes once they're no longer
     827              : // referenced). There's subtlety around unref'ing the version at the right
     828              : // moment, so we defer it for now.
     829              : func newFlush(
     830              :         opts *Options,
     831              :         cur *manifest.Version,
     832              :         l0Organizer *manifest.L0Organizer,
     833              :         baseLevel int,
     834              :         flushing flushableList,
     835              :         beganAt time.Time,
     836              :         tableFormat sstable.TableFormat,
     837              :         getValueSeparation getValueSeparation,
     838            1 : ) (*tableCompaction, error) {
     839            1 :         c := &tableCompaction{
     840            1 :                 kind:               compactionKindFlush,
     841            1 :                 comparer:           opts.Comparer,
     842            1 :                 logger:             opts.Logger,
     843            1 :                 inputs:             []compactionLevel{{level: -1}, {level: 0}},
     844            1 :                 getValueSeparation: getValueSeparation,
     845            1 :                 maxOutputFileSize:  math.MaxUint64,
     846            1 :                 maxOverlapBytes:    math.MaxUint64,
     847            1 :                 grantHandle:        noopGrantHandle{},
     848            1 :                 tableFormat:        tableFormat,
     849            1 :                 metrics: compactionMetrics{
     850            1 :                         beganAt: beganAt,
     851            1 :                 },
     852            1 :         }
     853            1 :         c.flush.flushables = flushing
     854            1 :         c.flush.l0Limits = l0Organizer.FlushSplitKeys()
     855            1 :         c.startLevel = &c.inputs[0]
     856            1 :         c.outputLevel = &c.inputs[1]
     857            1 :         if len(flushing) > 0 {
     858            1 :                 if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
     859            1 :                         if len(flushing) != 1 {
     860            0 :                                 panic("pebble: ingestedFlushable must be flushed one at a time.")
     861              :                         }
     862            1 :                         c.kind = compactionKindIngestedFlushable
     863            1 :                         return c, nil
     864            1 :                 } else {
     865            1 :                         // Make sure there's no ingestedFlushable after the first flushable
     866            1 :                         // in the list.
     867            1 :                         for _, f := range c.flush.flushables[1:] {
     868            1 :                                 if _, ok := f.flushable.(*ingestedFlushable); ok {
     869            0 :                                         panic("pebble: flushables shouldn't contain ingestedFlushable")
     870              :                                 }
     871              :                         }
     872              :                 }
     873              :         }
     874              : 
     875            1 :         preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
     876            1 :                 remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
     877            1 :         c.objCreateOpts = objstorage.CreateOptions{
     878            1 :                 PreferSharedStorage: preferSharedStorage,
     879            1 :                 WriteCategory:       getDiskWriteCategoryForCompaction(opts, c.kind),
     880            1 :         }
     881            1 :         if preferSharedStorage {
     882            1 :                 c.getValueSeparation = neverSeparateValues
     883            1 :         }
     884              : 
     885            1 :         cmp := c.comparer.Compare
     886            1 :         updatePointBounds := func(iter internalIterator) {
     887            1 :                 if kv := iter.First(); kv != nil {
     888            1 :                         if c.bounds.Start == nil || cmp(c.bounds.Start, kv.K.UserKey) > 0 {
     889            1 :                                 c.bounds.Start = slices.Clone(kv.K.UserKey)
     890            1 :                         }
     891              :                 }
     892            1 :                 if kv := iter.Last(); kv != nil {
     893            1 :                         if c.bounds.End.Key == nil || !c.bounds.End.IsUpperBoundForInternalKey(cmp, kv.K) {
     894            1 :                                 c.bounds.End = base.UserKeyExclusiveIf(slices.Clone(kv.K.UserKey), kv.K.IsExclusiveSentinel())
     895            1 :                         }
     896              :                 }
     897              :         }
     898              : 
     899            1 :         updateRangeBounds := func(iter keyspan.FragmentIterator) error {
     900            1 :                 // File bounds require s != nil && !s.Empty(). We only need to check for
     901            1 :                 // s != nil here, as the memtable's FragmentIterator would never surface
     902            1 :                 // empty spans.
     903            1 :                 if s, err := iter.First(); err != nil {
     904            0 :                         return err
     905            1 :                 } else if s != nil {
     906            1 :                         c.bounds = c.bounds.Union(cmp, s.Bounds().Clone())
     907            1 :                 }
     908            1 :                 if s, err := iter.Last(); err != nil {
     909            0 :                         return err
     910            1 :                 } else if s != nil {
     911            1 :                         c.bounds = c.bounds.Union(cmp, s.Bounds().Clone())
     912            1 :                 }
     913            1 :                 return nil
     914              :         }
     915              : 
     916            1 :         var flushingBytes uint64
     917            1 :         for i := range flushing {
     918            1 :                 f := flushing[i]
     919            1 :                 updatePointBounds(f.newIter(nil))
     920            1 :                 if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
     921            1 :                         if err := updateRangeBounds(rangeDelIter); err != nil {
     922            0 :                                 return nil, err
     923            0 :                         }
     924              :                 }
     925            1 :                 if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
     926            1 :                         if err := updateRangeBounds(rangeKeyIter); err != nil {
     927            0 :                                 return nil, err
     928            0 :                         }
     929              :                 }
     930            1 :                 flushingBytes += f.inuseBytes()
     931              :         }
     932              : 
     933            1 :         if opts.FlushSplitBytes > 0 {
     934            1 :                 c.maxOutputFileSize = uint64(opts.TargetFileSizes[0])
     935            1 :                 c.maxOverlapBytes = maxGrandparentOverlapBytes(opts.TargetFileSizes[0])
     936            1 :                 c.grandparents = cur.Overlaps(baseLevel, c.bounds)
     937            1 :                 adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
     938            1 :         }
     939              : 
     940              :         // We don't elide tombstones for flushes.
     941            1 :         c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
     942            1 :         return c, nil
     943              : }
     944              : 
     945            1 : func (c *tableCompaction) hasExtraLevelData() bool {
     946            1 :         if len(c.extraLevels) == 0 {
     947            1 :                 // not a multi level compaction
     948            1 :                 return false
     949            1 :         } else if c.extraLevels[0].files.Empty() {
     950            1 :                 // a multi level compaction without data in the intermediate input level;
     951            1 :                 // e.g. for a multi level compaction with levels 4,5, and 6, this could
     952            1 :                 // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
     953            1 :                 return false
     954            1 :         }
     955            1 :         return true
     956              : }
     957              : 
     958              : // errorOnUserKeyOverlap returns an error if the last two written sstables in
     959              : // this compaction have revisions of the same user key present in both sstables,
     960              : // when it shouldn't (eg. when splitting flushes).
     961            1 : func (c *tableCompaction) errorOnUserKeyOverlap(ve *manifest.VersionEdit) error {
     962            1 :         if n := len(ve.NewTables); n > 1 {
     963            1 :                 meta := ve.NewTables[n-1].Meta
     964            1 :                 prevMeta := ve.NewTables[n-2].Meta
     965            1 :                 if !prevMeta.Largest().IsExclusiveSentinel() &&
     966            1 :                         c.comparer.Compare(prevMeta.Largest().UserKey, meta.Smallest().UserKey) >= 0 {
     967            1 :                         return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
     968            1 :                                 prevMeta.Largest().Pretty(c.comparer.FormatKey),
     969            1 :                                 prevMeta.TableNum,
     970            1 :                                 meta.TableNum)
     971            1 :                 }
     972              :         }
     973            1 :         return nil
     974              : }
     975              : 
     976              : // isBottommostDataLayer returns true if the compaction's inputs are known to be
     977              : // the bottommost layer of data for the compaction's key range. If true, this
     978              : // allows the compaction iterator to perform transformations to keys such as
     979              : // setting a key's sequence number to zero.
     980              : //
     981              : // This function performs this determination by looking at the TombstoneElision
     982              : // values which are set up based on sstables which overlap the bounds of the
     983              : // compaction at a lower level in the LSM. This function always returns false
     984              : // for flushes.
     985            1 : func (c *tableCompaction) isBottommostDataLayer() bool {
     986            1 :         // TODO(peter): we disable zeroing of seqnums during flushing to match
     987            1 :         // RocksDB behavior and to avoid generating overlapping sstables during
     988            1 :         // DB.replayWAL. When replaying WAL files at startup, we flush after each
     989            1 :         // WAL is replayed building up a single version edit that is
     990            1 :         // applied. Because we don't apply the version edit after each flush, this
     991            1 :         // code doesn't know that L0 contains files and zeroing of seqnums should
     992            1 :         // be disabled. That is fixable, but it seems safer to just match the
     993            1 :         // RocksDB behavior for now.
     994            1 :         return len(c.flush.flushables) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
     995            1 : }
     996              : 
     997              : // newInputIters returns an iterator over all the input tables in a compaction.
     998              : func (c *tableCompaction) newInputIters(
     999              :         newIters tableNewIters, iiopts internalIterOpts,
    1000              : ) (
    1001              :         pointIter internalIterator,
    1002              :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
    1003              :         retErr error,
    1004            1 : ) {
    1005            1 :         ctx := context.TODO()
    1006            1 :         cmp := c.comparer.Compare
    1007            1 : 
    1008            1 :         // Validate the ordering of compaction input files for defense in depth.
    1009            1 :         if len(c.flush.flushables) == 0 {
    1010            1 :                 if c.startLevel.level >= 0 {
    1011            1 :                         err := manifest.CheckOrdering(c.comparer, manifest.Level(c.startLevel.level),
    1012            1 :                                 c.startLevel.files.Iter())
    1013            1 :                         if err != nil {
    1014            1 :                                 return nil, nil, nil, err
    1015            1 :                         }
    1016              :                 }
    1017            1 :                 err := manifest.CheckOrdering(c.comparer, manifest.Level(c.outputLevel.level),
    1018            1 :                         c.outputLevel.files.Iter())
    1019            1 :                 if err != nil {
    1020            1 :                         return nil, nil, nil, err
    1021            1 :                 }
    1022            1 :                 if c.startLevel.level == 0 {
    1023            1 :                         if c.startLevel.l0SublevelInfo == nil {
    1024            0 :                                 panic("l0SublevelInfo not created for compaction out of L0")
    1025              :                         }
    1026            1 :                         for _, info := range c.startLevel.l0SublevelInfo {
    1027            1 :                                 err := manifest.CheckOrdering(c.comparer, info.sublevel, info.Iter())
    1028            1 :                                 if err != nil {
    1029            1 :                                         return nil, nil, nil, err
    1030            1 :                                 }
    1031              :                         }
    1032              :                 }
    1033            1 :                 if len(c.extraLevels) > 0 {
    1034            1 :                         if len(c.extraLevels) > 1 {
    1035            0 :                                 panic("n>2 multi level compaction not implemented yet")
    1036              :                         }
    1037            1 :                         interLevel := c.extraLevels[0]
    1038            1 :                         err := manifest.CheckOrdering(c.comparer, manifest.Level(interLevel.level),
    1039            1 :                                 interLevel.files.Iter())
    1040            1 :                         if err != nil {
    1041            0 :                                 return nil, nil, nil, err
    1042            0 :                         }
    1043              :                 }
    1044              :         }
    1045              : 
    1046              :         // There are three classes of keys that a compaction needs to process: point
    1047              :         // keys, range deletion tombstones and range keys. Collect all iterators for
    1048              :         // all these classes of keys from all the levels. We'll aggregate them
    1049              :         // together farther below.
    1050              :         //
    1051              :         // numInputLevels is an approximation of the number of iterator levels. Due
    1052              :         // to idiosyncrasies in iterator construction, we may (rarely) exceed this
    1053              :         // initial capacity.
    1054            1 :         numInputLevels := max(len(c.flush.flushables), len(c.inputs))
    1055            1 :         iters := make([]internalIterator, 0, numInputLevels)
    1056            1 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
    1057            1 :         rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
    1058            1 : 
    1059            1 :         // If construction of the iterator inputs fails, ensure that we close all
    1060            1 :         // the consitutent iterators.
    1061            1 :         defer func() {
    1062            1 :                 if retErr != nil {
    1063            1 :                         for _, iter := range iters {
    1064            1 :                                 if iter != nil {
    1065            1 :                                         _ = iter.Close()
    1066            1 :                                 }
    1067              :                         }
    1068            1 :                         for _, rangeDelIter := range rangeDelIters {
    1069            0 :                                 rangeDelIter.Close()
    1070            0 :                         }
    1071              :                 }
    1072              :         }()
    1073            1 :         iterOpts := IterOptions{
    1074            1 :                 Category: categoryCompaction,
    1075            1 :                 logger:   c.logger,
    1076            1 :         }
    1077            1 : 
    1078            1 :         // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
    1079            1 :         // constituent iterators. This depends on whether this is a flush or a
    1080            1 :         // compaction.
    1081            1 :         if len(c.flush.flushables) != 0 {
    1082            1 :                 // If flushing, we need to build the input iterators over the memtables
    1083            1 :                 // stored in c.flush.flushables.
    1084            1 :                 for _, f := range c.flush.flushables {
    1085            1 :                         iters = append(iters, f.newFlushIter(nil))
    1086            1 :                         rangeDelIter := f.newRangeDelIter(nil)
    1087            1 :                         if rangeDelIter != nil {
    1088            1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
    1089            1 :                         }
    1090            1 :                         if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
    1091            1 :                                 rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
    1092            1 :                         }
    1093              :                 }
    1094            1 :         } else {
    1095            1 :                 addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
    1096            1 :                         // Add a *levelIter for point iterators. Because we don't call
    1097            1 :                         // initRangeDel, the levelIter will close and forget the range
    1098            1 :                         // deletion iterator when it steps on to a new file. Surfacing range
    1099            1 :                         // deletions to compactions are handled below.
    1100            1 :                         iters = append(iters, newLevelIter(ctx, iterOpts, c.comparer,
    1101            1 :                                 newIters, level.files.Iter(), l, iiopts))
    1102            1 :                         // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
    1103            1 :                         // deletions into memory upfront. (See #2015, which reverted this.) There
    1104            1 :                         // will be no user keys that are split between sstables within a level in
    1105            1 :                         // Cockroach 23.1, which unblocks this optimization.
    1106            1 : 
    1107            1 :                         // Add the range deletion iterator for each file as an independent level
    1108            1 :                         // in mergingIter, as opposed to making a levelIter out of those. This
    1109            1 :                         // is safer as levelIter expects all keys coming from underlying
    1110            1 :                         // iterators to be in order. Due to compaction / tombstone writing
    1111            1 :                         // logic in finishOutput(), it is possible for range tombstones to not
    1112            1 :                         // be strictly ordered across all files in one level.
    1113            1 :                         //
    1114            1 :                         // Consider this example from the metamorphic tests (also repeated in
    1115            1 :                         // finishOutput()), consisting of three L3 files with their bounds
    1116            1 :                         // specified in square brackets next to the file name:
    1117            1 :                         //
    1118            1 :                         // ./000240.sst   [tmgc#391,MERGE-tmgc#391,MERGE]
    1119            1 :                         // tmgc#391,MERGE [786e627a]
    1120            1 :                         // tmgc-udkatvs#331,RANGEDEL
    1121            1 :                         //
    1122            1 :                         // ./000241.sst   [tmgc#384,MERGE-tmgc#384,MERGE]
    1123            1 :                         // tmgc#384,MERGE [666c7070]
    1124            1 :                         // tmgc-tvsalezade#383,RANGEDEL
    1125            1 :                         // tmgc-tvsalezade#331,RANGEDEL
    1126            1 :                         //
    1127            1 :                         // ./000242.sst   [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
    1128            1 :                         // tmgc-tvsalezade#383,RANGEDEL
    1129            1 :                         // tmgc#375,SET [72646c78766965616c72776865676e79]
    1130            1 :                         // tmgc-tvsalezade#356,RANGEDEL
    1131            1 :                         //
    1132            1 :                         // Here, the range tombstone in 000240.sst falls "after" one in
    1133            1 :                         // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
    1134            1 :                         // levelIter's purposes. While each file is still consistent before its
    1135            1 :                         // bounds, it's safer to have all rangedel iterators be visible to
    1136            1 :                         // mergingIter.
    1137            1 :                         iter := level.files.Iter()
    1138            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1139            1 :                                 rangeDelIter, err := c.newRangeDelIter(ctx, newIters, iter.Take(), iterOpts, iiopts, l)
    1140            1 :                                 if err != nil {
    1141            1 :                                         // The error will already be annotated with the BackingFileNum, so
    1142            1 :                                         // we annotate it with the FileNum.
    1143            1 :                                         return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.TableNum))
    1144            1 :                                 }
    1145            1 :                                 if rangeDelIter == nil {
    1146            1 :                                         continue
    1147              :                                 }
    1148            1 :                                 rangeDelIters = append(rangeDelIters, rangeDelIter)
    1149            1 :                                 c.iterationState.keyspanIterClosers = append(c.iterationState.keyspanIterClosers, rangeDelIter)
    1150              :                         }
    1151              : 
    1152              :                         // Check if this level has any range keys.
    1153            1 :                         hasRangeKeys := false
    1154            1 :                         for f := iter.First(); f != nil; f = iter.Next() {
    1155            1 :                                 if f.HasRangeKeys {
    1156            1 :                                         hasRangeKeys = true
    1157            1 :                                         break
    1158              :                                 }
    1159              :                         }
    1160            1 :                         if hasRangeKeys {
    1161            1 :                                 newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
    1162            1 :                                         iters, err := newIters(ctx, file, &iterOpts, iiopts, iterRangeKeys)
    1163            1 :                                         if err != nil {
    1164            0 :                                                 return nil, err
    1165            1 :                                         } else if iters.rangeKey == nil {
    1166            0 :                                                 return emptyKeyspanIter, nil
    1167            0 :                                         }
    1168              :                                         // Ensure that the range key iter is not closed until the compaction is
    1169              :                                         // finished. This is necessary because range key processing
    1170              :                                         // requires the range keys to be held in memory for up to the
    1171              :                                         // lifetime of the compaction.
    1172            1 :                                         noCloseIter := &noCloseIter{iters.rangeKey}
    1173            1 :                                         c.iterationState.keyspanIterClosers = append(c.iterationState.keyspanIterClosers, noCloseIter)
    1174            1 : 
    1175            1 :                                         // We do not need to truncate range keys to sstable boundaries, or
    1176            1 :                                         // only read within the file's atomic compaction units, unlike with
    1177            1 :                                         // range tombstones. This is because range keys were added after we
    1178            1 :                                         // stopped splitting user keys across sstables, so all the range keys
    1179            1 :                                         // in this sstable must wholly lie within the file's bounds.
    1180            1 :                                         return noCloseIter, err
    1181              :                                 }
    1182            1 :                                 li := keyspanimpl.NewLevelIter(ctx, keyspan.SpanIterOptions{}, cmp,
    1183            1 :                                         newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
    1184            1 :                                 rangeKeyIters = append(rangeKeyIters, li)
    1185              :                         }
    1186            1 :                         return nil
    1187              :                 }
    1188              : 
    1189            1 :                 for i := range c.inputs {
    1190            1 :                         // If the level is annotated with l0SublevelInfo, expand it into one
    1191            1 :                         // level per sublevel.
    1192            1 :                         // TODO(jackson): Perform this expansion even earlier when we pick the
    1193            1 :                         // compaction?
    1194            1 :                         if len(c.inputs[i].l0SublevelInfo) > 0 {
    1195            1 :                                 for _, info := range c.startLevel.l0SublevelInfo {
    1196            1 :                                         sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
    1197            1 :                                         if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
    1198            1 :                                                 return nil, nil, nil, err
    1199            1 :                                         }
    1200              :                                 }
    1201            1 :                                 continue
    1202              :                         }
    1203            1 :                         if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
    1204            1 :                                 return nil, nil, nil, err
    1205            1 :                         }
    1206              :                 }
    1207              :         }
    1208              : 
    1209              :         // If there's only one constituent point iterator, we can avoid the overhead
    1210              :         // of a *mergingIter. This is possible, for example, when performing a flush
    1211              :         // of a single memtable. Otherwise, combine all the iterators into a merging
    1212              :         // iter.
    1213            1 :         pointIter = iters[0]
    1214            1 :         if len(iters) > 1 {
    1215            1 :                 pointIter = newMergingIter(c.logger, &c.metrics.internalIterStats, cmp, nil, iters...)
    1216            1 :         }
    1217              : 
    1218              :         // In normal operation, levelIter iterates over the point operations in a
    1219              :         // level, and initializes a rangeDelIter pointer for the range deletions in
    1220              :         // each table. During compaction, we want to iterate over the merged view of
    1221              :         // point operations and range deletions. In order to do this we create one
    1222              :         // levelIter per level to iterate over the point operations, and collect up
    1223              :         // all the range deletion files.
    1224              :         //
    1225              :         // The range deletion levels are combined with a keyspanimpl.MergingIter. The
    1226              :         // resulting merged rangedel iterator is then included using an
    1227              :         // InterleavingIter.
    1228              :         // TODO(jackson): Consider using a defragmenting iterator to stitch together
    1229              :         // logical range deletions that were fragmented due to previous file
    1230              :         // boundaries.
    1231            1 :         if len(rangeDelIters) > 0 {
    1232            1 :                 mi := &keyspanimpl.MergingIter{}
    1233            1 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
    1234            1 :                 rangeDelIter = mi
    1235            1 :         }
    1236              : 
    1237              :         // If there are range key iterators, we need to combine them using
    1238              :         // keyspanimpl.MergingIter, and then interleave them among the points.
    1239            1 :         if len(rangeKeyIters) > 0 {
    1240            1 :                 mi := &keyspanimpl.MergingIter{}
    1241            1 :                 mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
    1242            1 :                 // TODO(radu): why do we have a defragmenter here but not above?
    1243            1 :                 di := &keyspan.DefragmentingIter{}
    1244            1 :                 di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
    1245            1 :                 rangeKeyIter = di
    1246            1 :         }
    1247            1 :         return pointIter, rangeDelIter, rangeKeyIter, nil
    1248              : }
    1249              : 
    1250              : func (c *tableCompaction) newRangeDelIter(
    1251              :         ctx context.Context,
    1252              :         newIters tableNewIters,
    1253              :         f manifest.LevelFile,
    1254              :         opts IterOptions,
    1255              :         iiopts internalIterOpts,
    1256              :         l manifest.Layer,
    1257            1 : ) (*noCloseIter, error) {
    1258            1 :         opts.layer = l
    1259            1 :         iterSet, err := newIters(ctx, f.TableMetadata, &opts, iiopts, iterRangeDeletions)
    1260            1 :         if err != nil {
    1261            1 :                 return nil, err
    1262            1 :         } else if iterSet.rangeDeletion == nil {
    1263            1 :                 // The file doesn't contain any range deletions.
    1264            1 :                 return nil, nil
    1265            1 :         }
    1266              :         // Ensure that rangeDelIter is not closed until the compaction is
    1267              :         // finished. This is necessary because range tombstone processing
    1268              :         // requires the range tombstones to be held in memory for up to the
    1269              :         // lifetime of the compaction.
    1270            1 :         return &noCloseIter{iterSet.rangeDeletion}, nil
    1271              : }
    1272              : 
    1273            1 : func (c *tableCompaction) String() string {
    1274            1 :         if len(c.flush.flushables) != 0 {
    1275            0 :                 return "flush\n"
    1276            0 :         }
    1277              : 
    1278            1 :         var buf bytes.Buffer
    1279            1 :         for _, l := range c.inputs {
    1280            1 :                 fmt.Fprintf(&buf, "%d:", l.level)
    1281            1 :                 for f := range l.files.All() {
    1282            1 :                         fmt.Fprintf(&buf, " %s:%s-%s", f.TableNum, f.Smallest(), f.Largest())
    1283            1 :                 }
    1284            1 :                 fmt.Fprintf(&buf, "\n")
    1285              :         }
    1286            1 :         return buf.String()
    1287              : }
    1288              : 
    1289              : type manualCompaction struct {
    1290              :         // id is for internal bookkeeping.
    1291              :         id uint64
    1292              :         // Count of the retries due to concurrent compaction to overlapping levels.
    1293              :         retries     int
    1294              :         level       int
    1295              :         outputLevel int
    1296              :         done        chan error
    1297              :         start       []byte
    1298              :         end         []byte
    1299              :         split       bool
    1300              : }
    1301              : 
    1302              : type readCompaction struct {
    1303              :         level int
    1304              :         // [start, end] key ranges are used for de-duping.
    1305              :         start []byte
    1306              :         end   []byte
    1307              : 
    1308              :         // The file associated with the compaction.
    1309              :         // If the file no longer belongs in the same
    1310              :         // level, then we skip the compaction.
    1311              :         tableNum base.TableNum
    1312              : }
    1313              : 
    1314              : // Removes compaction markers from files in a compaction. The rollback parameter
    1315              : // indicates whether the compaction state should be rolled back to its original
    1316              : // state in the case of an unsuccessful compaction.
    1317              : //
    1318              : // DB.mu must be held when calling this method, however this method can drop and
    1319              : // re-acquire that mutex. All writes to the manifest for this compaction should
    1320              : // have completed by this point.
    1321            1 : func (d *DB) clearCompactingState(c *tableCompaction, rollback bool) {
    1322            1 :         c.versionEditApplied = true
    1323            1 :         for _, cl := range c.inputs {
    1324            1 :                 for f := range cl.files.All() {
    1325            1 :                         if !f.IsCompacting() {
    1326            0 :                                 d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
    1327            0 :                         }
    1328            1 :                         if !rollback {
    1329            1 :                                 // On success all compactions other than move and delete-only compactions
    1330            1 :                                 // transition the file into the Compacted state. Move-compacted files
    1331            1 :                                 // become eligible for compaction again and transition back to NotCompacting.
    1332            1 :                                 // Delete-only compactions could, on rare occasion, leave files untouched
    1333            1 :                                 // (eg. if files have a loose bound), so we revert them all to NotCompacting
    1334            1 :                                 // just in case they need to be compacted again.
    1335            1 :                                 if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
    1336            1 :                                         f.SetCompactionState(manifest.CompactionStateCompacted)
    1337            1 :                                 } else {
    1338            1 :                                         f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1339            1 :                                 }
    1340            1 :                         } else {
    1341            1 :                                 // Else, on rollback, all input files unconditionally transition back to
    1342            1 :                                 // NotCompacting.
    1343            1 :                                 f.SetCompactionState(manifest.CompactionStateNotCompacting)
    1344            1 :                         }
    1345            1 :                         f.IsIntraL0Compacting = false
    1346              :                 }
    1347              :         }
    1348            1 :         l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
    1349            1 :         func() {
    1350            1 :                 // InitCompactingFileInfo requires that no other manifest writes be
    1351            1 :                 // happening in parallel with it, i.e. we're not in the midst of installing
    1352            1 :                 // another version. Otherwise, it's possible that we've created another
    1353            1 :                 // L0Sublevels instance, but not added it to the versions list, causing
    1354            1 :                 // all the indices in TableMetadata to be inaccurate. To ensure this,
    1355            1 :                 // grab the manifest lock.
    1356            1 :                 d.mu.versions.logLock()
    1357            1 :                 // It is a bit peculiar that we are fiddling with th current version state
    1358            1 :                 // in a separate critical section from when this version was installed.
    1359            1 :                 // But this fiddling is necessary if the compaction failed. When the
    1360            1 :                 // compaction succeeded, we've already done this in UpdateVersionLocked, so
    1361            1 :                 // this seems redundant. Anyway, we clear the pickedCompactionCache since we
    1362            1 :                 // may be able to pick a better compaction (though when this compaction
    1363            1 :                 // succeeded we've also cleared the cache in UpdateVersionLocked).
    1364            1 :                 defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
    1365            1 :                 d.mu.versions.latest.l0Organizer.InitCompactingFileInfo(l0InProgress)
    1366            1 :         }()
    1367              : }
    1368              : 
    1369            1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
    1370            1 :         space, err := d.opts.FS.GetDiskUsage(d.dirname)
    1371            1 :         if err != nil {
    1372            1 :                 if !errors.Is(err, vfs.ErrUnsupported) {
    1373            1 :                         d.opts.EventListener.BackgroundError(err)
    1374            1 :                 }
    1375              :                 // Return the last value we managed to obtain.
    1376            1 :                 return d.diskAvailBytes.Load()
    1377              :         }
    1378              : 
    1379            1 :         d.lowDiskSpaceReporter.Report(space.AvailBytes, space.TotalBytes, d.opts.EventListener)
    1380            1 :         d.diskAvailBytes.Store(space.AvailBytes)
    1381            1 :         return space.AvailBytes
    1382              : }
    1383              : 
    1384              : // maybeScheduleFlush schedules a flush if necessary.
    1385              : //
    1386              : // d.mu must be held when calling this.
    1387            1 : func (d *DB) maybeScheduleFlush() {
    1388            1 :         if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
    1389            1 :                 return
    1390            1 :         }
    1391            1 :         if len(d.mu.mem.queue) <= 1 {
    1392            1 :                 return
    1393            1 :         }
    1394              : 
    1395            1 :         if !d.passedFlushThreshold() {
    1396            1 :                 return
    1397            1 :         }
    1398              : 
    1399            1 :         d.mu.compact.flushing = true
    1400            1 :         go d.flush()
    1401              : }
    1402              : 
    1403            1 : func (d *DB) passedFlushThreshold() bool {
    1404            1 :         var n int
    1405            1 :         var size uint64
    1406            1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1407            1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1408            1 :                         break
    1409              :                 }
    1410            1 :                 if d.mu.mem.queue[n].flushForced {
    1411            1 :                         // A flush was forced. Pretend the memtable size is the configured
    1412            1 :                         // size. See minFlushSize below.
    1413            1 :                         size += d.opts.MemTableSize
    1414            1 :                 } else {
    1415            1 :                         size += d.mu.mem.queue[n].totalBytes()
    1416            1 :                 }
    1417              :         }
    1418            1 :         if n == 0 {
    1419            1 :                 // None of the immutable memtables are ready for flushing.
    1420            1 :                 return false
    1421            1 :         }
    1422              : 
    1423              :         // Only flush once the sum of the queued memtable sizes exceeds half the
    1424              :         // configured memtable size. This prevents flushing of memtables at startup
    1425              :         // while we're undergoing the ramp period on the memtable size. See
    1426              :         // DB.newMemTable().
    1427            1 :         minFlushSize := d.opts.MemTableSize / 2
    1428            1 :         return size >= minFlushSize
    1429              : }
    1430              : 
    1431            1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
    1432            1 :         var mem *flushableEntry
    1433            1 :         for _, m := range d.mu.mem.queue {
    1434            1 :                 if m.flushable == tbl {
    1435            1 :                         mem = m
    1436            1 :                         break
    1437              :                 }
    1438              :         }
    1439            1 :         if mem == nil || mem.flushForced {
    1440            1 :                 return
    1441            1 :         }
    1442            1 :         deadline := d.timeNow().Add(dur)
    1443            1 :         if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
    1444            1 :                 // Already scheduled to flush sooner than within `dur`.
    1445            1 :                 return
    1446            1 :         }
    1447            1 :         mem.delayedFlushForcedAt = deadline
    1448            1 :         go func() {
    1449            1 :                 timer := time.NewTimer(dur)
    1450            1 :                 defer timer.Stop()
    1451            1 : 
    1452            1 :                 select {
    1453            1 :                 case <-d.closedCh:
    1454            1 :                         return
    1455            1 :                 case <-mem.flushed:
    1456            1 :                         return
    1457            1 :                 case <-timer.C:
    1458            1 :                         d.commit.mu.Lock()
    1459            1 :                         defer d.commit.mu.Unlock()
    1460            1 :                         d.mu.Lock()
    1461            1 :                         defer d.mu.Unlock()
    1462            1 : 
    1463            1 :                         // NB: The timer may fire concurrently with a call to Close.  If a
    1464            1 :                         // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
    1465            1 :                         // and it's too late to flush anything. Otherwise, the Close call
    1466            1 :                         // will block on locking d.mu until we've finished scheduling the
    1467            1 :                         // flush and set `d.mu.compact.flushing` to true. Close will wait
    1468            1 :                         // for the current flush to complete.
    1469            1 :                         if d.closed.Load() != nil {
    1470            1 :                                 return
    1471            1 :                         }
    1472              : 
    1473            1 :                         if d.mu.mem.mutable == tbl {
    1474            1 :                                 _ = d.makeRoomForWrite(nil)
    1475            1 :                         } else {
    1476            1 :                                 mem.flushForced = true
    1477            1 :                         }
    1478            1 :                         d.maybeScheduleFlush()
    1479              :                 }
    1480              :         }()
    1481              : }
    1482              : 
    1483            1 : func (d *DB) flush() {
    1484            1 :         pprof.Do(context.Background(), flushLabels, func(context.Context) {
    1485            1 :                 flushingWorkStart := crtime.NowMono()
    1486            1 :                 d.mu.Lock()
    1487            1 :                 defer d.mu.Unlock()
    1488            1 :                 idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
    1489            1 :                 var bytesFlushed uint64
    1490            1 :                 var err error
    1491            1 :                 if bytesFlushed, err = d.flush1(); err != nil {
    1492            1 :                         // TODO(peter): count consecutive flush errors and backoff.
    1493            1 :                         d.opts.EventListener.BackgroundError(err)
    1494            1 :                 }
    1495            1 :                 d.mu.compact.flushing = false
    1496            1 :                 d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
    1497            1 :                 workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
    1498            1 :                 d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
    1499            1 :                 d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
    1500            1 :                 d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
    1501            1 :                 // More flush work may have arrived while we were flushing, so schedule
    1502            1 :                 // another flush if needed.
    1503            1 :                 d.maybeScheduleFlush()
    1504            1 :                 // Let the CompactionScheduler know, so that it can react immediately to
    1505            1 :                 // an increase in DB.GetAllowedWithoutPermission.
    1506            1 :                 d.compactionScheduler.UpdateGetAllowedWithoutPermission()
    1507            1 :                 // The flush may have produced too many files in a level, so schedule a
    1508            1 :                 // compaction if needed.
    1509            1 :                 d.maybeScheduleCompaction()
    1510            1 :                 d.mu.compact.cond.Broadcast()
    1511              :         })
    1512              : }
    1513              : 
    1514              : // runIngestFlush is used to generate a flush version edit for sstables which
    1515              : // were ingested as flushables. Both DB.mu and the manifest lock must be held
    1516              : // while runIngestFlush is called.
    1517            1 : func (d *DB) runIngestFlush(c *tableCompaction) (*manifest.VersionEdit, error) {
    1518            1 :         if len(c.flush.flushables) != 1 {
    1519            0 :                 panic("pebble: ingestedFlushable must be flushed one at a time.")
    1520              :         }
    1521              : 
    1522              :         // Finding the target level for ingestion must use the latest version
    1523              :         // after the logLock has been acquired.
    1524            1 :         version := d.mu.versions.currentVersion()
    1525            1 : 
    1526            1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    1527            1 :         ve := &manifest.VersionEdit{}
    1528            1 :         var ingestSplitFiles []ingestSplitFile
    1529            1 :         ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
    1530            1 : 
    1531            1 :         updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
    1532            1 :                 levelMetrics := c.metrics.perLevel[level]
    1533            1 :                 if levelMetrics == nil {
    1534            1 :                         levelMetrics = &LevelMetrics{}
    1535            1 :                         c.metrics.perLevel[level] = levelMetrics
    1536            1 :                 }
    1537            1 :                 levelMetrics.TablesCount--
    1538            1 :                 levelMetrics.TablesSize -= int64(m.Size)
    1539            1 :                 levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
    1540            1 :                 for i := range added {
    1541            1 :                         levelMetrics.TablesCount++
    1542            1 :                         levelMetrics.TablesSize += int64(added[i].Meta.Size)
    1543            1 :                         levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
    1544            1 :                 }
    1545              :         }
    1546              : 
    1547            1 :         suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
    1548            1 :                 d.FormatMajorVersion() >= FormatVirtualSSTables
    1549            1 : 
    1550            1 :         if suggestSplit || ingestFlushable.exciseSpan.Valid() {
    1551            1 :                 // We could add deleted files to ve.
    1552            1 :                 ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
    1553            1 :         }
    1554              : 
    1555            1 :         ctx := context.Background()
    1556            1 :         overlapChecker := &overlapChecker{
    1557            1 :                 comparer: d.opts.Comparer,
    1558            1 :                 newIters: d.newIters,
    1559            1 :                 opts: IterOptions{
    1560            1 :                         logger:   d.opts.Logger,
    1561            1 :                         Category: categoryIngest,
    1562            1 :                 },
    1563            1 :                 v: version,
    1564            1 :         }
    1565            1 :         replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
    1566            1 :         for _, file := range ingestFlushable.files {
    1567            1 :                 var fileToSplit *manifest.TableMetadata
    1568            1 :                 var level int
    1569            1 : 
    1570            1 :                 // This file fits perfectly within the excise span, so we can slot it at L6.
    1571            1 :                 if ingestFlushable.exciseSpan.Valid() &&
    1572            1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.Smallest()) &&
    1573            1 :                         ingestFlushable.exciseSpan.Contains(d.cmp, file.Largest()) {
    1574            1 :                         level = 6
    1575            1 :                 } else {
    1576            1 :                         // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
    1577            1 :                         lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
    1578            1 :                         if err != nil {
    1579            0 :                                 return nil, err
    1580            0 :                         }
    1581            1 :                         level, fileToSplit, err = ingestTargetLevel(
    1582            1 :                                 ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file, suggestSplit,
    1583            1 :                         )
    1584            1 :                         if err != nil {
    1585            0 :                                 return nil, err
    1586            0 :                         }
    1587              :                 }
    1588              : 
    1589              :                 // Add the current flushableIngest file to the version.
    1590            1 :                 ve.NewTables = append(ve.NewTables, manifest.NewTableEntry{Level: level, Meta: file})
    1591            1 :                 if fileToSplit != nil {
    1592            0 :                         ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
    1593            0 :                                 ingestFile: file,
    1594            0 :                                 splitFile:  fileToSplit,
    1595            0 :                                 level:      level,
    1596            0 :                         })
    1597            0 :                 }
    1598            1 :                 levelMetrics := c.metrics.perLevel.level(level)
    1599            1 :                 levelMetrics.TableBytesIngested += file.Size
    1600            1 :                 levelMetrics.TablesIngested++
    1601              :         }
    1602            1 :         if ingestFlushable.exciseSpan.Valid() {
    1603            1 :                 exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
    1604            1 :                 if d.FormatMajorVersion() >= FormatExciseBoundsRecord {
    1605            1 :                         ve.ExciseBoundsRecord = append(ve.ExciseBoundsRecord, manifest.ExciseOpEntry{
    1606            1 :                                 Bounds: exciseBounds,
    1607            1 :                                 SeqNum: ingestFlushable.exciseSeqNum,
    1608            1 :                         })
    1609            1 :                         d.mu.versions.metrics.Ingest.ExciseIngestCount++
    1610            1 :                 }
    1611              :                 // Iterate through all levels and find files that intersect with exciseSpan.
    1612            1 :                 for layer, ls := range version.AllLevelsAndSublevels() {
    1613            1 :                         for m := range ls.Overlaps(d.cmp, ingestFlushable.exciseSpan.UserKeyBounds()).All() {
    1614            1 :                                 leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, m, layer.Level(), tightExciseBounds)
    1615            1 :                                 if err != nil {
    1616            0 :                                         return nil, err
    1617            0 :                                 }
    1618            1 :                                 newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
    1619            1 :                                 replacedTables[m.TableNum] = newFiles
    1620            1 :                                 updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
    1621              :                         }
    1622              :                 }
    1623              :         }
    1624              : 
    1625            1 :         if len(ingestSplitFiles) > 0 {
    1626            0 :                 if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedTables); err != nil {
    1627            0 :                         return nil, err
    1628            0 :                 }
    1629              :         }
    1630              : 
    1631            1 :         return ve, nil
    1632              : }
    1633              : 
    1634              : // flush runs a compaction that copies the immutable memtables from memory to
    1635              : // disk.
    1636              : //
    1637              : // d.mu must be held when calling this, but the mutex may be dropped and
    1638              : // re-acquired during the course of this method.
    1639            1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
    1640            1 :         // NB: The flushable queue can contain flushables of type ingestedFlushable.
    1641            1 :         // The sstables in ingestedFlushable.files must be placed into the appropriate
    1642            1 :         // level in the lsm. Let's say the flushable queue contains a prefix of
    1643            1 :         // regular immutable memtables, then an ingestedFlushable, and then the
    1644            1 :         // mutable memtable. When the flush of the ingestedFlushable is performed,
    1645            1 :         // it needs an updated view of the lsm. That is, the prefix of immutable
    1646            1 :         // memtables must have already been flushed. Similarly, if there are two
    1647            1 :         // contiguous ingestedFlushables in the queue, then the first flushable must
    1648            1 :         // be flushed, so that the second flushable can see an updated view of the
    1649            1 :         // lsm.
    1650            1 :         //
    1651            1 :         // Given the above, we restrict flushes to either some prefix of regular
    1652            1 :         // memtables, or a single flushable of type ingestedFlushable. The DB.flush
    1653            1 :         // function will call DB.maybeScheduleFlush again, so a new flush to finish
    1654            1 :         // the remaining flush work should be scheduled right away.
    1655            1 :         //
    1656            1 :         // NB: Large batches placed in the flushable queue share the WAL with the
    1657            1 :         // previous memtable in the queue. We must ensure the property that both the
    1658            1 :         // large batch and the memtable with which it shares a WAL are flushed
    1659            1 :         // together. The property ensures that the minimum unflushed log number
    1660            1 :         // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
    1661            1 :         // returns true, and since the large batch will always be placed right after
    1662            1 :         // the memtable with which it shares a WAL, the property is naturally
    1663            1 :         // ensured. The large batch will always be placed after the memtable with
    1664            1 :         // which it shares a WAL because we ensure it in DB.commitWrite by holding
    1665            1 :         // the commitPipeline.mu and then holding DB.mu. As an extra defensive
    1666            1 :         // measure, if we try to flush the memtable without also flushing the
    1667            1 :         // flushable batch in the same flush, since the memtable and flushableBatch
    1668            1 :         // have the same logNum, the logNum invariant check below will trigger.
    1669            1 :         var n, inputs int
    1670            1 :         var inputBytes uint64
    1671            1 :         var ingest bool
    1672            1 :         for ; n < len(d.mu.mem.queue)-1; n++ {
    1673            1 :                 if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
    1674            1 :                         if n == 0 {
    1675            1 :                                 // The first flushable is of type ingestedFlushable. Since these
    1676            1 :                                 // must be flushed individually, we perform a flush for just
    1677            1 :                                 // this.
    1678            1 :                                 if !f.readyForFlush() {
    1679            0 :                                         // This check is almost unnecessary, but we guard against it
    1680            0 :                                         // just in case this invariant changes in the future.
    1681            0 :                                         panic("pebble: ingestedFlushable should always be ready to flush.")
    1682              :                                 }
    1683              :                                 // By setting n = 1, we ensure that the first flushable(n == 0)
    1684              :                                 // is scheduled for a flush. The number of tables added is equal to the
    1685              :                                 // number of files in the ingest operation.
    1686            1 :                                 n = 1
    1687            1 :                                 inputs = len(f.files)
    1688            1 :                                 ingest = true
    1689            1 :                                 break
    1690            1 :                         } else {
    1691            1 :                                 // There was some prefix of flushables which weren't of type
    1692            1 :                                 // ingestedFlushable. So, perform a flush for those.
    1693            1 :                                 break
    1694              :                         }
    1695              :                 }
    1696            1 :                 if !d.mu.mem.queue[n].readyForFlush() {
    1697            1 :                         break
    1698              :                 }
    1699            1 :                 inputBytes += d.mu.mem.queue[n].inuseBytes()
    1700              :         }
    1701            1 :         if n == 0 {
    1702            0 :                 // None of the immutable memtables are ready for flushing.
    1703            0 :                 return 0, nil
    1704            0 :         }
    1705            1 :         if !ingest {
    1706            1 :                 // Flushes of memtables add the prefix of n memtables from the flushable
    1707            1 :                 // queue.
    1708            1 :                 inputs = n
    1709            1 :         }
    1710              : 
    1711              :         // Require that every memtable being flushed has a log number less than the
    1712              :         // new minimum unflushed log number.
    1713            1 :         minUnflushedLogNum := d.mu.mem.queue[n].logNum
    1714            1 :         if !d.opts.DisableWAL {
    1715            1 :                 for i := 0; i < n; i++ {
    1716            1 :                         if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
    1717            0 :                                 panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
    1718            0 :                                         n,
    1719            0 :                                         i, d.mu.mem.queue[i].flushable, logNum,
    1720            0 :                                         n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
    1721              :                         }
    1722              :                 }
    1723              :         }
    1724              : 
    1725            1 :         c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,
    1726            1 :                 d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow(), d.TableFormat(), d.determineCompactionValueSeparation)
    1727            1 :         if err != nil {
    1728            0 :                 return 0, err
    1729            0 :         }
    1730            1 :         c.AddInProgressLocked(d)
    1731            1 : 
    1732            1 :         jobID := d.newJobIDLocked()
    1733            1 :         info := FlushInfo{
    1734            1 :                 JobID:      int(jobID),
    1735            1 :                 Input:      inputs,
    1736            1 :                 InputBytes: inputBytes,
    1737            1 :                 Ingest:     ingest,
    1738            1 :         }
    1739            1 :         d.opts.EventListener.FlushBegin(info)
    1740            1 : 
    1741            1 :         startTime := d.timeNow()
    1742            1 : 
    1743            1 :         var ve *manifest.VersionEdit
    1744            1 :         var stats compact.Stats
    1745            1 :         var outputBlobs []compact.OutputBlob
    1746            1 :         // To determine the target level of the files in the ingestedFlushable, we
    1747            1 :         // need to acquire the logLock, and not release it for that duration. Since
    1748            1 :         // UpdateVersionLocked acquires it anyway, we create the VersionEdit for
    1749            1 :         // ingestedFlushable outside runCompaction. For all other flush cases, we
    1750            1 :         // construct the VersionEdit inside runCompaction.
    1751            1 :         var compactionErr error
    1752            1 :         if c.kind != compactionKindIngestedFlushable {
    1753            1 :                 ve, stats, outputBlobs, compactionErr = d.runCompaction(jobID, c)
    1754            1 :         }
    1755              : 
    1756            1 :         _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
    1757            1 :                 err := compactionErr
    1758            1 :                 if c.kind == compactionKindIngestedFlushable {
    1759            1 :                         ve, err = d.runIngestFlush(c)
    1760            1 :                 }
    1761            1 :                 info.Duration = d.timeNow().Sub(startTime)
    1762            1 :                 if err != nil {
    1763            1 :                         return versionUpdate{}, err
    1764            1 :                 }
    1765              : 
    1766            1 :                 validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
    1767            1 :                 for i := range ve.NewTables {
    1768            1 :                         e := &ve.NewTables[i]
    1769            1 :                         info.OutputTables = append(info.OutputTables, e.Meta.TableInfo())
    1770            1 :                         // Ingested tables are not necessarily flushed to L0. Record the level of
    1771            1 :                         // each ingested file explicitly.
    1772            1 :                         if ingest {
    1773            1 :                                 info.IngestLevels = append(info.IngestLevels, e.Level)
    1774            1 :                         }
    1775              :                 }
    1776            1 :                 for i := range outputBlobs {
    1777            1 :                         b := &outputBlobs[i]
    1778            1 :                         info.OutputBlobs = append(info.OutputBlobs, BlobFileInfo{
    1779            1 :                                 BlobFileID:      base.BlobFileID(b.Metadata.FileNum),
    1780            1 :                                 DiskFileNum:     b.ObjMeta.DiskFileNum,
    1781            1 :                                 Size:            b.Metadata.Size,
    1782            1 :                                 ValueSize:       b.Stats.UncompressedValueBytes,
    1783            1 :                                 MVCCGarbageSize: b.Stats.MVCCGarbageBytes,
    1784            1 :                         })
    1785            1 : 
    1786            1 :                 }
    1787              : 
    1788              :                 // The flush succeeded or it produced an empty sstable. In either case we
    1789              :                 // want to bump the minimum unflushed log number to the log number of the
    1790              :                 // oldest unflushed memtable.
    1791            1 :                 ve.MinUnflushedLogNum = minUnflushedLogNum
    1792            1 :                 if c.kind != compactionKindIngestedFlushable {
    1793            1 :                         l0Metrics := c.metrics.perLevel.level(0)
    1794            1 :                         if d.opts.DisableWAL {
    1795            1 :                                 // If the WAL is disabled, every flushable has a zero [logSize],
    1796            1 :                                 // resulting in zero bytes in. Instead, use the number of bytes we
    1797            1 :                                 // flushed as the BytesIn. This ensures we get a reasonable w-amp
    1798            1 :                                 // calculation even when the WAL is disabled.
    1799            1 :                                 l0Metrics.TableBytesIn = l0Metrics.TableBytesFlushed + l0Metrics.BlobBytesFlushed
    1800            1 :                         } else {
    1801            1 :                                 for i := 0; i < n; i++ {
    1802            1 :                                         l0Metrics.TableBytesIn += d.mu.mem.queue[i].logSize
    1803            1 :                                 }
    1804              :                         }
    1805            1 :                 } else {
    1806            1 :                         // c.kind == compactionKindIngestedFlushable && we could have deleted files due
    1807            1 :                         // to ingest-time splits or excises.
    1808            1 :                         ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
    1809            1 :                         exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
    1810            1 :                         for c2 := range d.mu.compact.inProgress {
    1811            1 :                                 // Check if this compaction overlaps with the excise span. Note that just
    1812            1 :                                 // checking if the inputs individually overlap with the excise span
    1813            1 :                                 // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    1814            1 :                                 // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    1815            1 :                                 // doing a [c,d) excise at the same time as this compaction, we will have
    1816            1 :                                 // to error out the whole compaction as we can't guarantee it hasn't/won't
    1817            1 :                                 // write a file overlapping with the excise span.
    1818            1 :                                 bounds := c2.Bounds()
    1819            1 :                                 if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
    1820            1 :                                         c2.Cancel()
    1821            1 :                                 }
    1822              :                         }
    1823              : 
    1824            1 :                         if len(ve.DeletedTables) > 0 {
    1825            1 :                                 // Iterate through all other compactions, and check if their inputs have
    1826            1 :                                 // been replaced due to an ingest-time split or excise. In that case,
    1827            1 :                                 // cancel the compaction.
    1828            1 :                                 for c2 := range d.mu.compact.inProgress {
    1829            1 :                                         for level, table := range c2.Tables() {
    1830            1 :                                                 if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{FileNum: table.TableNum, Level: level}]; ok {
    1831            1 :                                                         c2.Cancel()
    1832            1 :                                                         break
    1833              :                                                 }
    1834              :                                         }
    1835              :                                 }
    1836              :                         }
    1837              :                 }
    1838            1 :                 return versionUpdate{
    1839            1 :                         VE:                      ve,
    1840            1 :                         JobID:                   jobID,
    1841            1 :                         Metrics:                 c.metrics.perLevel,
    1842            1 :                         InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
    1843              :                 }, nil
    1844              :         })
    1845              : 
    1846              :         // If err != nil, then the flush will be retried, and we will recalculate
    1847              :         // these metrics.
    1848            1 :         if err == nil {
    1849            1 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    1850            1 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    1851            1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    1852            1 :         }
    1853              : 
    1854            1 :         d.clearCompactingState(c, err != nil)
    1855            1 :         if c.UsesBurstConcurrency() {
    1856            0 :                 if v := d.mu.compact.burstConcurrency.Add(-1); v < 0 {
    1857            0 :                         panic(errors.AssertionFailedf("burst concurrency underflow: %d", v))
    1858              :                 }
    1859              :         }
    1860            1 :         delete(d.mu.compact.inProgress, c)
    1861            1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
    1862            1 : 
    1863            1 :         var flushed flushableList
    1864            1 :         if err == nil {
    1865            1 :                 flushed = d.mu.mem.queue[:n]
    1866            1 :                 d.mu.mem.queue = d.mu.mem.queue[n:]
    1867            1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    1868            1 :                 d.updateTableStatsLocked(ve.NewTables)
    1869            1 :                 if ingest {
    1870            1 :                         d.mu.versions.metrics.Flush.AsIngestCount++
    1871            1 :                         for _, l := range c.metrics.perLevel {
    1872            1 :                                 if l != nil {
    1873            1 :                                         d.mu.versions.metrics.Flush.AsIngestBytes += l.TableBytesIngested
    1874            1 :                                         d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
    1875            1 :                                 }
    1876              :                         }
    1877              :                 }
    1878            1 :                 d.maybeTransitionSnapshotsToFileOnlyLocked()
    1879              :         }
    1880              :         // Signal FlushEnd after installing the new readState. This helps for unit
    1881              :         // tests that use the callback to trigger a read using an iterator with
    1882              :         // IterOptions.OnlyReadGuaranteedDurable.
    1883            1 :         info.Err = err
    1884            1 :         if info.Err == nil && len(ve.NewTables) == 0 {
    1885            1 :                 info.Err = errEmptyTable
    1886            1 :         }
    1887            1 :         info.Done = true
    1888            1 :         info.TotalDuration = d.timeNow().Sub(startTime)
    1889            1 :         d.opts.EventListener.FlushEnd(info)
    1890            1 : 
    1891            1 :         // The order of these operations matters here for ease of testing.
    1892            1 :         // Removing the reader reference first allows tests to be guaranteed that
    1893            1 :         // the memtable reservation has been released by the time a synchronous
    1894            1 :         // flush returns. readerUnrefLocked may also produce obsolete files so the
    1895            1 :         // call to deleteObsoleteFiles must happen after it.
    1896            1 :         for i := range flushed {
    1897            1 :                 flushed[i].readerUnrefLocked(true)
    1898            1 :         }
    1899              : 
    1900            1 :         d.deleteObsoleteFiles(jobID)
    1901            1 : 
    1902            1 :         // Mark all the memtables we flushed as flushed.
    1903            1 :         for i := range flushed {
    1904            1 :                 close(flushed[i].flushed)
    1905            1 :         }
    1906              : 
    1907            1 :         return inputBytes, err
    1908              : }
    1909              : 
    1910              : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
    1911              : // file-only" snapshots to be file-only if all their visible state has been
    1912              : // flushed to sstables.
    1913              : //
    1914              : // REQUIRES: d.mu.
    1915            1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
    1916            1 :         earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
    1917            1 :         currentVersion := d.mu.versions.currentVersion()
    1918            1 :         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
    1919            1 :                 if s.efos == nil {
    1920            1 :                         s = s.next
    1921            1 :                         continue
    1922              :                 }
    1923            1 :                 overlapsFlushable := false
    1924            1 :                 if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1925            1 :                         // There are some unflushed keys that are still visible to the EFOS.
    1926            1 :                         // Check if any memtables older than the EFOS contain keys within a
    1927            1 :                         // protected range of the EFOS. If no, we can transition.
    1928            1 :                         protectedRanges := make([]bounded, len(s.efos.protectedRanges))
    1929            1 :                         for i := range s.efos.protectedRanges {
    1930            1 :                                 protectedRanges[i] = s.efos.protectedRanges[i]
    1931            1 :                         }
    1932            1 :                         for i := range d.mu.mem.queue {
    1933            1 :                                 if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
    1934            1 :                                         // All keys in this memtable are newer than the EFOS. Skip this
    1935            1 :                                         // memtable.
    1936            1 :                                         continue
    1937              :                                 }
    1938              :                                 // NB: computePossibleOverlaps could have false positives, such as if
    1939              :                                 // the flushable is a flushable ingest and not a memtable. In that
    1940              :                                 // case we don't open the sstables to check; we just pessimistically
    1941              :                                 // assume an overlap.
    1942            1 :                                 d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
    1943            1 :                                         overlapsFlushable = true
    1944            1 :                                         return stopIteration
    1945            1 :                                 }, protectedRanges...)
    1946            1 :                                 if overlapsFlushable {
    1947            1 :                                         break
    1948              :                                 }
    1949              :                         }
    1950              :                 }
    1951            1 :                 if overlapsFlushable {
    1952            1 :                         s = s.next
    1953            1 :                         continue
    1954              :                 }
    1955            1 :                 currentVersion.Ref()
    1956            1 : 
    1957            1 :                 // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
    1958            1 :                 // case s.next would be nil. Save it before calling it.
    1959            1 :                 next := s.next
    1960            1 :                 _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
    1961            1 :                 s = next
    1962              :         }
    1963              : }
    1964              : 
    1965              : // maybeScheduleCompactionAsync should be used when
    1966              : // we want to possibly schedule a compaction, but don't
    1967              : // want to eat the cost of running maybeScheduleCompaction.
    1968              : // This method should be launched in a separate goroutine.
    1969              : // d.mu must not be held when this is called.
    1970            0 : func (d *DB) maybeScheduleCompactionAsync() {
    1971            0 :         defer d.compactionSchedulers.Done()
    1972            0 : 
    1973            0 :         d.mu.Lock()
    1974            0 :         d.maybeScheduleCompaction()
    1975            0 :         d.mu.Unlock()
    1976            0 : }
    1977              : 
    1978              : // maybeScheduleCompaction schedules a compaction if necessary.
    1979              : //
    1980              : // WARNING: maybeScheduleCompaction and Schedule must be the only ways that
    1981              : // any compactions are run. These ensure that the pickedCompactionCache is
    1982              : // used and not stale (by ensuring invalidation is done).
    1983              : //
    1984              : // Even compactions that are not scheduled by the CompactionScheduler must be
    1985              : // run using maybeScheduleCompaction, since starting those compactions needs
    1986              : // to invalidate the pickedCompactionCache.
    1987              : //
    1988              : // Requires d.mu to be held.
    1989            1 : func (d *DB) maybeScheduleCompaction() {
    1990            1 :         d.mu.versions.logLock()
    1991            1 :         defer d.mu.versions.logUnlock()
    1992            1 :         env := d.makeCompactionEnvLocked()
    1993            1 :         if env == nil {
    1994            1 :                 return
    1995            1 :         }
    1996              :         // env.inProgressCompactions will become stale once we pick a compaction, so
    1997              :         // it needs to be kept fresh. Also, the pickedCompaction in the
    1998              :         // pickedCompactionCache is not valid if we pick a compaction before using
    1999              :         // it, since those earlier compactions can mark the same file as compacting.
    2000              : 
    2001              :         // Delete-only compactions are expected to be cheap and reduce future
    2002              :         // compaction work, so schedule them directly instead of using the
    2003              :         // CompactionScheduler.
    2004            1 :         if d.tryScheduleDeleteOnlyCompaction() {
    2005            1 :                 env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2006            1 :                 d.mu.versions.pickedCompactionCache.invalidate()
    2007            1 :         }
    2008              :         // Download compactions have their own concurrency and do not currently
    2009              :         // interact with CompactionScheduler.
    2010              :         //
    2011              :         // TODO(sumeer): integrate with CompactionScheduler, since these consume
    2012              :         // disk write bandwidth.
    2013            1 :         if d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) {
    2014            1 :                 env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2015            1 :                 d.mu.versions.pickedCompactionCache.invalidate()
    2016            1 :         }
    2017              :         // The remaining compactions are scheduled by the CompactionScheduler.
    2018            1 :         if d.mu.versions.pickedCompactionCache.isWaiting() {
    2019            1 :                 // CompactionScheduler already knows that the DB is waiting to run a
    2020            1 :                 // compaction.
    2021            1 :                 return
    2022            1 :         }
    2023              :         // INVARIANT: !pickedCompactionCache.isWaiting. The following loop will
    2024              :         // either exit after successfully starting all the compactions it can pick,
    2025              :         // or will exit with one pickedCompaction in the cache, and isWaiting=true.
    2026            1 :         for {
    2027            1 :                 // Do not have a pickedCompaction in the cache.
    2028            1 :                 pc := d.pickAnyCompaction(*env)
    2029            1 :                 if pc == nil {
    2030            1 :                         return
    2031            1 :                 }
    2032            1 :                 success, grantHandle := d.compactionScheduler.TrySchedule()
    2033            1 :                 if !success {
    2034            1 :                         // Can't run now, but remember this pickedCompaction in the cache.
    2035            1 :                         d.mu.versions.pickedCompactionCache.add(pc)
    2036            1 :                         return
    2037            1 :                 }
    2038            1 :                 d.runPickedCompaction(pc, grantHandle)
    2039            1 :                 env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
    2040              :         }
    2041              : }
    2042              : 
    2043              : // makeCompactionEnv attempts to create a compactionEnv necessary during
    2044              : // compaction picking. If the DB is closed or marked as read-only,
    2045              : // makeCompactionEnv returns nil to indicate that compactions may not be
    2046              : // performed. Else, a new compactionEnv is constructed using the current DB
    2047              : // state.
    2048              : //
    2049              : // Compaction picking needs a coherent view of a Version. For example, we need
    2050              : // to exclude concurrent ingestions from making a decision on which level to
    2051              : // ingest into that conflicts with our compaction decision.
    2052              : //
    2053              : // A pickedCompaction constructed using a compactionEnv must only be used if
    2054              : // the latest Version has not changed.
    2055              : //
    2056              : // REQUIRES: d.mu and d.mu.versions.logLock are held.
    2057            1 : func (d *DB) makeCompactionEnvLocked() *compactionEnv {
    2058            1 :         if d.closed.Load() != nil || d.opts.ReadOnly {
    2059            1 :                 return nil
    2060            1 :         }
    2061            1 :         env := &compactionEnv{
    2062            1 :                 diskAvailBytes:          d.diskAvailBytes.Load(),
    2063            1 :                 earliestSnapshotSeqNum:  d.mu.snapshots.earliest(),
    2064            1 :                 earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
    2065            1 :                 inProgressCompactions:   d.getInProgressCompactionInfoLocked(nil),
    2066            1 :                 readCompactionEnv: readCompactionEnv{
    2067            1 :                         readCompactions:          &d.mu.compact.readCompactions,
    2068            1 :                         flushing:                 d.mu.compact.flushing || d.passedFlushThreshold(),
    2069            1 :                         rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
    2070            1 :                 },
    2071            1 :         }
    2072            1 :         if !d.problemSpans.IsEmpty() {
    2073            1 :                 env.problemSpans = &d.problemSpans
    2074            1 :         }
    2075            1 :         return env
    2076              : }
    2077              : 
    2078              : // pickAnyCompaction tries to pick a manual or automatic compaction.
    2079            1 : func (d *DB) pickAnyCompaction(env compactionEnv) (pc pickedCompaction) {
    2080            1 :         if !d.opts.DisableAutomaticCompactions {
    2081            1 :                 // Pick a score-based compaction first, since a misshapen LSM is bad.
    2082            1 :                 // We allow an exception for a high-priority disk-space reclamation
    2083            1 :                 // compaction. Future work will explore balancing the various competing
    2084            1 :                 // compaction priorities more judiciously. For now, we're relying on the
    2085            1 :                 // configured heuristic to be set carefully so that we don't starve
    2086            1 :                 // score-based compactions.
    2087            1 :                 if pc := d.mu.versions.picker.pickHighPrioritySpaceCompaction(env); pc != nil {
    2088            1 :                         return pc
    2089            1 :                 }
    2090            1 :                 if pc = d.mu.versions.picker.pickAutoScore(env); pc != nil {
    2091            1 :                         return pc
    2092            1 :                 }
    2093              :         }
    2094              :         // Pick a manual compaction, if any.
    2095            1 :         if pc = d.pickManualCompaction(env); pc != nil {
    2096            1 :                 return pc
    2097            1 :         }
    2098            1 :         if !d.opts.DisableAutomaticCompactions {
    2099            1 :                 return d.mu.versions.picker.pickAutoNonScore(env)
    2100            1 :         }
    2101            1 :         return nil
    2102              : }
    2103              : 
    2104              : // runPickedCompaction kicks off the provided pickedCompaction. In case the
    2105              : // pickedCompaction is a manual compaction, the corresponding manualCompaction
    2106              : // is removed from d.mu.compact.manual.
    2107              : //
    2108              : // REQUIRES: d.mu and d.mu.versions.logLock is held.
    2109            1 : func (d *DB) runPickedCompaction(pc pickedCompaction, grantHandle CompactionGrantHandle) {
    2110            1 :         var doneChannel chan error
    2111            1 :         if pc.ManualID() > 0 {
    2112            1 :                 for i := range d.mu.compact.manual {
    2113            1 :                         if d.mu.compact.manual[i].id == pc.ManualID() {
    2114            1 :                                 doneChannel = d.mu.compact.manual[i].done
    2115            1 :                                 d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
    2116            1 :                                 d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
    2117            1 :                                 break
    2118              :                         }
    2119              :                 }
    2120            1 :                 if doneChannel == nil {
    2121            0 :                         panic(errors.AssertionFailedf("did not find manual compaction with id %d", pc.ManualID()))
    2122              :                 }
    2123              :         }
    2124              : 
    2125            1 :         d.mu.compact.compactingCount++
    2126            1 :         c := pc.ConstructCompaction(d, grantHandle)
    2127            1 :         c.AddInProgressLocked(d)
    2128            1 :         go func() {
    2129            1 :                 d.compact(c, doneChannel)
    2130            1 :         }()
    2131              : }
    2132              : 
    2133              : // Schedule implements DBForCompaction (it is called by the
    2134              : // CompactionScheduler).
    2135            1 : func (d *DB) Schedule(grantHandle CompactionGrantHandle) bool {
    2136            1 :         d.mu.Lock()
    2137            1 :         defer d.mu.Unlock()
    2138            1 :         d.mu.versions.logLock()
    2139            1 :         defer d.mu.versions.logUnlock()
    2140            1 :         isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
    2141            1 :         if !isWaiting {
    2142            0 :                 return false
    2143            0 :         }
    2144            1 :         pc := d.mu.versions.pickedCompactionCache.getForRunning()
    2145            1 :         if pc == nil {
    2146            1 :                 env := d.makeCompactionEnvLocked()
    2147            1 :                 if env != nil {
    2148            1 :                         pc = d.pickAnyCompaction(*env)
    2149            1 :                 }
    2150            1 :                 if pc == nil {
    2151            1 :                         d.mu.versions.pickedCompactionCache.setNotWaiting()
    2152            1 :                         return false
    2153            1 :                 }
    2154              :         }
    2155              :         // INVARIANT: pc != nil and is not in the cache. isWaiting is true, since
    2156              :         // there may be more compactions to run.
    2157            1 :         d.runPickedCompaction(pc, grantHandle)
    2158            1 :         return true
    2159              : }
    2160              : 
    2161              : // GetWaitingCompaction implements DBForCompaction (it is called by the
    2162              : // CompactionScheduler).
    2163            1 : func (d *DB) GetWaitingCompaction() (bool, WaitingCompaction) {
    2164            1 :         d.mu.Lock()
    2165            1 :         defer d.mu.Unlock()
    2166            1 :         d.mu.versions.logLock()
    2167            1 :         defer d.mu.versions.logUnlock()
    2168            1 :         isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
    2169            1 :         if !isWaiting {
    2170            1 :                 return false, WaitingCompaction{}
    2171            1 :         }
    2172            1 :         pc := d.mu.versions.pickedCompactionCache.peek()
    2173            1 :         if pc == nil {
    2174            1 :                 // Need to pick a compaction.
    2175            1 :                 env := d.makeCompactionEnvLocked()
    2176            1 :                 if env != nil {
    2177            1 :                         pc = d.pickAnyCompaction(*env)
    2178            1 :                 }
    2179            1 :                 if pc == nil {
    2180            1 :                         // Call setNotWaiting so that next call to GetWaitingCompaction can
    2181            1 :                         // return early.
    2182            1 :                         d.mu.versions.pickedCompactionCache.setNotWaiting()
    2183            1 :                         return false, WaitingCompaction{}
    2184            1 :                 } else {
    2185            1 :                         d.mu.versions.pickedCompactionCache.add(pc)
    2186            1 :                 }
    2187              :         }
    2188              :         // INVARIANT: pc != nil and is in the cache.
    2189            1 :         return true, pc.WaitingCompaction()
    2190              : }
    2191              : 
    2192              : // GetAllowedWithoutPermission implements DBForCompaction (it is called by the
    2193              : // CompactionScheduler).
    2194            1 : func (d *DB) GetAllowedWithoutPermission() int {
    2195            1 :         allowedBasedOnBacklog := int(d.mu.versions.curCompactionConcurrency.Load())
    2196            1 :         allowedBasedOnManual := int(d.mu.compact.manualLen.Load())
    2197            1 :         allowedBasedOnSpaceHeuristic := int(d.mu.compact.burstConcurrency.Load())
    2198            1 : 
    2199            1 :         v := allowedBasedOnBacklog + allowedBasedOnManual + allowedBasedOnSpaceHeuristic
    2200            1 :         if v == allowedBasedOnBacklog {
    2201            1 :                 return v
    2202            1 :         }
    2203            1 :         _, maxAllowed := d.opts.CompactionConcurrencyRange()
    2204            1 :         return min(v, maxAllowed)
    2205              : }
    2206              : 
    2207              : // tryScheduleDownloadCompactions tries to start download compactions.
    2208              : //
    2209              : // Requires d.mu to be held. Updates d.mu.compact.downloads.
    2210              : //
    2211              : // Returns true iff at least one compaction was started.
    2212            1 : func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) bool {
    2213            1 :         started := false
    2214            1 :         vers := d.mu.versions.currentVersion()
    2215            1 :         for i := 0; i < len(d.mu.compact.downloads); {
    2216            1 :                 if d.mu.compact.downloadingCount >= maxConcurrentDownloads {
    2217            1 :                         break
    2218              :                 }
    2219            1 :                 download := d.mu.compact.downloads[i]
    2220            1 :                 switch d.tryLaunchDownloadCompaction(download, vers, d.mu.versions.latest.l0Organizer, env, maxConcurrentDownloads) {
    2221            1 :                 case launchedCompaction:
    2222            1 :                         started = true
    2223            1 :                         continue
    2224            0 :                 case didNotLaunchCompaction:
    2225            0 :                         // See if we can launch a compaction for another download task.
    2226            0 :                         i++
    2227            1 :                 case downloadTaskCompleted:
    2228            1 :                         // Task is completed and must be removed.
    2229            1 :                         d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
    2230              :                 }
    2231              :         }
    2232            1 :         return started
    2233              : }
    2234              : 
    2235            1 : func (d *DB) pickManualCompaction(env compactionEnv) (pc pickedCompaction) {
    2236            1 :         v := d.mu.versions.currentVersion()
    2237            1 :         for len(d.mu.compact.manual) > 0 {
    2238            1 :                 manual := d.mu.compact.manual[0]
    2239            1 :                 pc, retryLater := newPickedManualCompaction(v, d.mu.versions.latest.l0Organizer,
    2240            1 :                         d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
    2241            1 :                 if pc != nil {
    2242            1 :                         return pc
    2243            1 :                 }
    2244            1 :                 if retryLater {
    2245            1 :                         // We are not able to run this manual compaction at this time.
    2246            1 :                         // Inability to run the head blocks later manual compactions.
    2247            1 :                         manual.retries++
    2248            1 :                         return nil
    2249            1 :                 }
    2250              :                 // Manual compaction is a no-op. Signal that it's complete.
    2251            1 :                 manual.done <- nil
    2252            1 :                 d.mu.compact.manual = d.mu.compact.manual[1:]
    2253            1 :                 d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
    2254              :         }
    2255            1 :         return nil
    2256              : }
    2257              : 
    2258              : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
    2259              : // for all files that can be deleted as suggested by deletionHints.
    2260              : //
    2261              : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
    2262              : //
    2263              : // Returns true iff a compaction was started.
    2264            1 : func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
    2265            1 :         if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions ||
    2266            1 :                 len(d.mu.compact.deletionHints) == 0 {
    2267            1 :                 return false
    2268            1 :         }
    2269            1 :         if _, maxConcurrency := d.opts.CompactionConcurrencyRange(); d.mu.compact.compactingCount >= maxConcurrency {
    2270            1 :                 return false
    2271            1 :         }
    2272            1 :         v := d.mu.versions.currentVersion()
    2273            1 :         snapshots := d.mu.snapshots.toSlice()
    2274            1 :         // We need to save the value of exciseEnabled in the compaction itself, as
    2275            1 :         // it can change dynamically between now and when the compaction runs.
    2276            1 :         exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables &&
    2277            1 :                 d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises()
    2278            1 :         inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled)
    2279            1 :         d.mu.compact.deletionHints = unresolvedHints
    2280            1 : 
    2281            1 :         if len(inputs) > 0 {
    2282            1 :                 c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
    2283            1 :                 d.mu.compact.compactingCount++
    2284            1 :                 c.AddInProgressLocked(d)
    2285            1 :                 go d.compact(c, nil)
    2286            1 :                 return true
    2287            1 :         }
    2288            1 :         return false
    2289              : }
    2290              : 
    2291              : // deleteCompactionHintType indicates whether the deleteCompactionHint was
    2292              : // generated from a span containing a range del (point key only), a range key
    2293              : // delete (range key only), or both a point and range key.
    2294              : type deleteCompactionHintType uint8
    2295              : 
    2296              : const (
    2297              :         // NOTE: While these are primarily used as enumeration types, they are also
    2298              :         // used for some bitwise operations. Care should be taken when updating.
    2299              :         deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
    2300              :         deleteCompactionHintTypePointKeyOnly
    2301              :         deleteCompactionHintTypeRangeKeyOnly
    2302              :         deleteCompactionHintTypePointAndRangeKey
    2303              : )
    2304              : 
    2305              : // String implements fmt.Stringer.
    2306            1 : func (h deleteCompactionHintType) String() string {
    2307            1 :         switch h {
    2308            0 :         case deleteCompactionHintTypeUnknown:
    2309            0 :                 return "unknown"
    2310            1 :         case deleteCompactionHintTypePointKeyOnly:
    2311            1 :                 return "point-key-only"
    2312            1 :         case deleteCompactionHintTypeRangeKeyOnly:
    2313            1 :                 return "range-key-only"
    2314            1 :         case deleteCompactionHintTypePointAndRangeKey:
    2315            1 :                 return "point-and-range-key"
    2316            0 :         default:
    2317            0 :                 panic(fmt.Sprintf("unknown hint type: %d", h))
    2318              :         }
    2319              : }
    2320              : 
    2321              : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
    2322              : // keyspan.Keys.
    2323            1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
    2324            1 :         var hintType deleteCompactionHintType
    2325            1 :         for _, k := range keys {
    2326            1 :                 switch k.Kind() {
    2327            1 :                 case base.InternalKeyKindRangeDelete:
    2328            1 :                         hintType |= deleteCompactionHintTypePointKeyOnly
    2329            1 :                 case base.InternalKeyKindRangeKeyDelete:
    2330            1 :                         hintType |= deleteCompactionHintTypeRangeKeyOnly
    2331            0 :                 default:
    2332            0 :                         panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
    2333              :                 }
    2334              :         }
    2335            1 :         return hintType
    2336              : }
    2337              : 
    2338              : // A deleteCompactionHint records a user key and sequence number span that has been
    2339              : // deleted by a range tombstone. A hint is recorded if at least one sstable
    2340              : // falls completely within both the user key and sequence number spans.
    2341              : // Once the tombstones and the observed completely-contained sstables fall
    2342              : // into the same snapshot stripe, a delete-only compaction may delete any
    2343              : // sstables within the range.
    2344              : type deleteCompactionHint struct {
    2345              :         // The type of key span that generated this hint (point key, range key, or
    2346              :         // both).
    2347              :         hintType deleteCompactionHintType
    2348              :         // start and end are user keys specifying a key range [start, end) of
    2349              :         // deleted keys.
    2350              :         start []byte
    2351              :         end   []byte
    2352              :         // The level of the file containing the range tombstone(s) when the hint
    2353              :         // was created. Only lower levels need to be searched for files that may
    2354              :         // be deleted.
    2355              :         tombstoneLevel int
    2356              :         // The file containing the range tombstone(s) that created the hint.
    2357              :         tombstoneFile *manifest.TableMetadata
    2358              :         // The smallest and largest sequence numbers of the abutting tombstones
    2359              :         // merged to form this hint. All of a tables' keys must be less than the
    2360              :         // tombstone smallest sequence number to be deleted. All of a tables'
    2361              :         // sequence numbers must fall into the same snapshot stripe as the
    2362              :         // tombstone largest sequence number to be deleted.
    2363              :         tombstoneLargestSeqNum  base.SeqNum
    2364              :         tombstoneSmallestSeqNum base.SeqNum
    2365              :         // The smallest sequence number of a sstable that was found to be covered
    2366              :         // by this hint. The hint cannot be resolved until this sequence number is
    2367              :         // in the same snapshot stripe as the largest tombstone sequence number.
    2368              :         // This is set when a hint is created, so the LSM may look different and
    2369              :         // notably no longer contain the sstable that contained the key at this
    2370              :         // sequence number.
    2371              :         fileSmallestSeqNum base.SeqNum
    2372              : }
    2373              : 
    2374              : type deletionHintOverlap int8
    2375              : 
    2376              : const (
    2377              :         // hintDoesNotApply indicates that the hint does not apply to the file.
    2378              :         hintDoesNotApply deletionHintOverlap = iota
    2379              :         // hintExcisesFile indicates that the hint excises a portion of the file,
    2380              :         // and the format major version of the DB supports excises.
    2381              :         hintExcisesFile
    2382              :         // hintDeletesFile indicates that the hint deletes the entirety of the file.
    2383              :         hintDeletesFile
    2384              : )
    2385              : 
    2386            1 : func (h deleteCompactionHint) String() string {
    2387            1 :         return fmt.Sprintf(
    2388            1 :                 "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
    2389            1 :                 h.tombstoneLevel, h.tombstoneFile.TableNum, h.start, h.end,
    2390            1 :                 h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
    2391            1 :                 h.hintType,
    2392            1 :         )
    2393            1 : }
    2394              : 
    2395              : func (h *deleteCompactionHint) canDeleteOrExcise(
    2396              :         cmp Compare, m *manifest.TableMetadata, snapshots compact.Snapshots, exciseEnabled bool,
    2397            1 : ) deletionHintOverlap {
    2398            1 :         // The file can only be deleted if all of its keys are older than the
    2399            1 :         // earliest tombstone aggregated into the hint. Note that we use
    2400            1 :         // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
    2401            1 :         // zeroes sequence numbers. A compaction may zero the sequence number of a
    2402            1 :         // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
    2403            1 :         // zero. If we looked at m.LargestSeqNum, the resulting output file would
    2404            1 :         // appear to not contain any keys more recent than the oldest tombstone. To
    2405            1 :         // avoid this error, the largest pre-zeroing sequence number is maintained
    2406            1 :         // in LargestSeqNumAbsolute and used here to make the determination whether
    2407            1 :         // the file's keys are older than all of the hint's tombstones.
    2408            1 :         if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
    2409            1 :                 return hintDoesNotApply
    2410            1 :         }
    2411              : 
    2412              :         // The file's oldest key must  be in the same snapshot stripe as the
    2413              :         // newest tombstone. NB: We already checked the hint's sequence numbers,
    2414              :         // but this file's oldest sequence number might be lower than the hint's
    2415              :         // smallest sequence number despite the file falling within the key range
    2416              :         // if this file was constructed after the hint by a compaction.
    2417            1 :         if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
    2418            0 :                 return hintDoesNotApply
    2419            0 :         }
    2420              : 
    2421            1 :         switch h.hintType {
    2422            1 :         case deleteCompactionHintTypePointKeyOnly:
    2423            1 :                 // A hint generated by a range del span cannot delete tables that contain
    2424            1 :                 // range keys.
    2425            1 :                 if m.HasRangeKeys {
    2426            1 :                         return hintDoesNotApply
    2427            1 :                 }
    2428            1 :         case deleteCompactionHintTypeRangeKeyOnly:
    2429            1 :                 // A hint generated by a range key del span cannot delete tables that
    2430            1 :                 // contain point keys.
    2431            1 :                 if m.HasPointKeys {
    2432            1 :                         return hintDoesNotApply
    2433            1 :                 }
    2434            1 :         case deleteCompactionHintTypePointAndRangeKey:
    2435              :                 // A hint from a span that contains both range dels *and* range keys can
    2436              :                 // only be deleted if both bounds fall within the hint. The next check takes
    2437              :                 // care of this.
    2438            0 :         default:
    2439            0 :                 panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
    2440              :         }
    2441            1 :         if cmp(h.start, m.Smallest().UserKey) <= 0 &&
    2442            1 :                 base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0 {
    2443            1 :                 return hintDeletesFile
    2444            1 :         }
    2445            1 :         if !exciseEnabled {
    2446            1 :                 // The file's keys must be completely contained within the hint range; excises
    2447            1 :                 // aren't allowed.
    2448            1 :                 return hintDoesNotApply
    2449            1 :         }
    2450              :         // Check for any overlap. In cases of partial overlap, we can excise the part of the file
    2451              :         // that overlaps with the deletion hint.
    2452            1 :         if cmp(h.end, m.Smallest().UserKey) > 0 &&
    2453            1 :                 (m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0) {
    2454            1 :                 return hintExcisesFile
    2455            1 :         }
    2456            0 :         return hintDoesNotApply
    2457              : }
    2458              : 
    2459              : // checkDeleteCompactionHints checks the passed-in deleteCompactionHints for those that
    2460              : // can be resolved and those that cannot. A hint is considered resolved when its largest
    2461              : // tombstone sequence number and the smallest sequence number of covered files fall in
    2462              : // the same snapshot stripe. No more than maxHintsPerDeleteOnlyCompaction will be resolved
    2463              : // per method call. Resolved and unresolved hints are returned in separate return values.
    2464              : // The files that the resolved hints apply to, are returned as compactionLevels.
    2465              : func checkDeleteCompactionHints(
    2466              :         cmp Compare,
    2467              :         v *manifest.Version,
    2468              :         hints []deleteCompactionHint,
    2469              :         snapshots compact.Snapshots,
    2470              :         exciseEnabled bool,
    2471            1 : ) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
    2472            1 :         var files map[*manifest.TableMetadata]bool
    2473            1 :         var byLevel [numLevels][]*manifest.TableMetadata
    2474            1 : 
    2475            1 :         // Delete-only compactions can be quadratic (O(mn)) in terms of runtime
    2476            1 :         // where m = number of files in the delete-only compaction and n = number
    2477            1 :         // of resolved hints. To prevent these from growing unbounded, we cap
    2478            1 :         // the number of hints we resolve for one delete-only compaction. This
    2479            1 :         // cap only applies if exciseEnabled == true.
    2480            1 :         const maxHintsPerDeleteOnlyCompaction = 10
    2481            1 : 
    2482            1 :         unresolvedHints := hints[:0]
    2483            1 :         // Lazily populate resolvedHints, similar to files above.
    2484            1 :         resolvedHints := make([]deleteCompactionHint, 0)
    2485            1 :         for _, h := range hints {
    2486            1 :                 // Check each compaction hint to see if it's resolvable. Resolvable
    2487            1 :                 // hints are removed and trigger a delete-only compaction if any files
    2488            1 :                 // in the current LSM still meet their criteria. Unresolvable hints
    2489            1 :                 // are saved and don't trigger a delete-only compaction.
    2490            1 :                 //
    2491            1 :                 // When a compaction hint is created, the sequence numbers of the
    2492            1 :                 // range tombstones and the covered file with the oldest key are
    2493            1 :                 // recorded. The largest tombstone sequence number and the smallest
    2494            1 :                 // file sequence number must be in the same snapshot stripe for the
    2495            1 :                 // hint to be resolved. The below graphic models a compaction hint
    2496            1 :                 // covering the keyspace [b, r). The hint completely contains two
    2497            1 :                 // files, 000002 and 000003. The file 000003 contains the lowest
    2498            1 :                 // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
    2499            1 :                 // the highest tombstone sequence number incorporated into the hint.
    2500            1 :                 // The hint may be resolved only once the snapshots at #100, #180 and
    2501            1 :                 // #210 are all closed. File 000001 is not included within the hint
    2502            1 :                 // because it extends beyond the range tombstones in user key space.
    2503            1 :                 //
    2504            1 :                 // 250
    2505            1 :                 //
    2506            1 :                 //       |-b...230:h-|
    2507            1 :                 // _____________________________________________________ snapshot #210
    2508            1 :                 // 200               |--h.RANGEDEL.200:r--|
    2509            1 :                 //
    2510            1 :                 // _____________________________________________________ snapshot #180
    2511            1 :                 //
    2512            1 :                 // 150                     +--------+
    2513            1 :                 //           +---------+   | 000003 |
    2514            1 :                 //           | 000002  |   |        |
    2515            1 :                 //           +_________+   |        |
    2516            1 :                 // 100_____________________|________|___________________ snapshot #100
    2517            1 :                 //                         +--------+
    2518            1 :                 // _____________________________________________________ snapshot #70
    2519            1 :                 //                             +---------------+
    2520            1 :                 //  50                         | 000001        |
    2521            1 :                 //                             |               |
    2522            1 :                 //                             +---------------+
    2523            1 :                 // ______________________________________________________________
    2524            1 :                 //     a b c d e f g h i j k l m n o p q r s t u v w x y z
    2525            1 : 
    2526            1 :                 if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) ||
    2527            1 :                         (len(resolvedHints) >= maxHintsPerDeleteOnlyCompaction && exciseEnabled) {
    2528            1 :                         // Cannot resolve yet.
    2529            1 :                         unresolvedHints = append(unresolvedHints, h)
    2530            1 :                         continue
    2531              :                 }
    2532              : 
    2533              :                 // The hint h will be resolved and dropped, if it either affects no files at all
    2534              :                 // or if the number of files it creates (eg. through excision) is less than or
    2535              :                 // equal to the number of files it deletes. First, determine how many files are
    2536              :                 // affected by this hint.
    2537            1 :                 filesDeletedByCurrentHint := 0
    2538            1 :                 var filesDeletedByLevel [7][]*manifest.TableMetadata
    2539            1 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2540            1 :                         for m := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end)).All() {
    2541            1 :                                 doesHintApply := h.canDeleteOrExcise(cmp, m, snapshots, exciseEnabled)
    2542            1 :                                 if m.IsCompacting() || doesHintApply == hintDoesNotApply || files[m] {
    2543            1 :                                         continue
    2544              :                                 }
    2545            1 :                                 switch doesHintApply {
    2546            1 :                                 case hintDeletesFile:
    2547            1 :                                         filesDeletedByCurrentHint++
    2548            1 :                                 case hintExcisesFile:
    2549            1 :                                         // Account for the original file being deleted.
    2550            1 :                                         filesDeletedByCurrentHint++
    2551            1 :                                         // An excise could produce up to 2 new files. If the hint
    2552            1 :                                         // leaves a fragment of the file on the left, decrement
    2553            1 :                                         // the counter once. If the hint leaves a fragment of the
    2554            1 :                                         // file on the right, decrement the counter once.
    2555            1 :                                         if cmp(h.start, m.Smallest().UserKey) > 0 {
    2556            1 :                                                 filesDeletedByCurrentHint--
    2557            1 :                                         }
    2558            1 :                                         if m.UserKeyBounds().End.IsUpperBoundFor(cmp, h.end) {
    2559            1 :                                                 filesDeletedByCurrentHint--
    2560            1 :                                         }
    2561              :                                 }
    2562            1 :                                 filesDeletedByLevel[l] = append(filesDeletedByLevel[l], m)
    2563              :                         }
    2564              :                 }
    2565            1 :                 if filesDeletedByCurrentHint < 0 {
    2566            1 :                         // This hint does not delete a sufficient number of files to warrant
    2567            1 :                         // a delete-only compaction at this stage. Drop it (ie. don't add it
    2568            1 :                         // to either resolved or unresolved hints) so it doesn't stick around
    2569            1 :                         // forever.
    2570            1 :                         continue
    2571              :                 }
    2572              :                 // This hint will be resolved and dropped.
    2573            1 :                 for l := h.tombstoneLevel + 1; l < numLevels; l++ {
    2574            1 :                         byLevel[l] = append(byLevel[l], filesDeletedByLevel[l]...)
    2575            1 :                         for _, m := range filesDeletedByLevel[l] {
    2576            1 :                                 if files == nil {
    2577            1 :                                         // Construct files lazily, assuming most calls will not
    2578            1 :                                         // produce delete-only compactions.
    2579            1 :                                         files = make(map[*manifest.TableMetadata]bool)
    2580            1 :                                 }
    2581            1 :                                 files[m] = true
    2582              :                         }
    2583              :                 }
    2584            1 :                 resolvedHints = append(resolvedHints, h)
    2585              :         }
    2586              : 
    2587            1 :         var compactLevels []compactionLevel
    2588            1 :         for l, files := range byLevel {
    2589            1 :                 if len(files) == 0 {
    2590            1 :                         continue
    2591              :                 }
    2592            1 :                 compactLevels = append(compactLevels, compactionLevel{
    2593            1 :                         level: l,
    2594            1 :                         files: manifest.NewLevelSliceKeySorted(cmp, files),
    2595            1 :                 })
    2596              :         }
    2597            1 :         return compactLevels, resolvedHints, unresolvedHints
    2598              : }
    2599              : 
    2600              : // compact runs one compaction and maybe schedules another call to compact.
    2601            1 : func (d *DB) compact(c compaction, errChannel chan error) {
    2602            1 :         pprof.Do(context.Background(), c.PprofLabels(d.opts.Experimental.UserKeyCategories), func(context.Context) {
    2603            1 :                 func() {
    2604            1 :                         d.mu.Lock()
    2605            1 :                         defer d.mu.Unlock()
    2606            1 :                         jobID := d.newJobIDLocked()
    2607            1 : 
    2608            1 :                         compactErr := c.Execute(jobID, d)
    2609            1 : 
    2610            1 :                         d.deleteObsoleteFiles(jobID)
    2611            1 :                         // We send on the error channel only after we've deleted
    2612            1 :                         // obsolete files so that tests performing manual compactions
    2613            1 :                         // block until the obsolete files are deleted, and the test
    2614            1 :                         // observes the deletion.
    2615            1 :                         if errChannel != nil {
    2616            1 :                                 errChannel <- compactErr
    2617            1 :                         }
    2618            1 :                         if compactErr != nil {
    2619            1 :                                 d.handleCompactFailure(c, compactErr)
    2620            1 :                         }
    2621            1 :                         if c.IsDownload() {
    2622            1 :                                 d.mu.compact.downloadingCount--
    2623            1 :                         } else {
    2624            1 :                                 d.mu.compact.compactingCount--
    2625            1 :                         }
    2626            1 :                         if c.UsesBurstConcurrency() {
    2627            1 :                                 if v := d.mu.compact.burstConcurrency.Add(-1); v < 0 {
    2628            0 :                                         panic(errors.AssertionFailedf("burst concurrency underflow: %d", v))
    2629              :                                 }
    2630              :                         }
    2631            1 :                         delete(d.mu.compact.inProgress, c)
    2632            1 :                         // Add this compaction's duration to the cumulative duration. NB: This
    2633            1 :                         // must be atomic with the above removal of c from
    2634            1 :                         // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
    2635            1 :                         // miss or double count a completing compaction's duration.
    2636            1 :                         d.mu.compact.duration += d.timeNow().Sub(c.BeganAt())
    2637              :                 }()
    2638              :                 // Done must not be called while holding any lock that needs to be
    2639              :                 // acquired by Schedule. Also, it must be called after new Version has
    2640              :                 // been installed, and metadata related to compactingCount and inProgress
    2641              :                 // compactions has been updated. This is because when we are running at
    2642              :                 // the limit of permitted compactions, Done can cause the
    2643              :                 // CompactionScheduler to schedule another compaction. Note that the only
    2644              :                 // compactions that may be scheduled by Done are those integrated with the
    2645              :                 // CompactionScheduler.
    2646            1 :                 c.GrantHandle().Done()
    2647            1 :                 // The previous compaction may have produced too many files in a level, so
    2648            1 :                 // reschedule another compaction if needed.
    2649            1 :                 //
    2650            1 :                 // The preceding Done call will not necessarily cause a compaction to be
    2651            1 :                 // scheduled, so we also need to call maybeScheduleCompaction. And
    2652            1 :                 // maybeScheduleCompaction encompasses all compactions, and not only those
    2653            1 :                 // scheduled via the CompactionScheduler.
    2654            1 :                 func() {
    2655            1 :                         d.mu.Lock()
    2656            1 :                         defer d.mu.Unlock()
    2657            1 :                         d.maybeScheduleCompaction()
    2658            1 :                         d.mu.compact.cond.Broadcast()
    2659            1 :                 }()
    2660              :         })
    2661              : }
    2662              : 
    2663            1 : func (d *DB) handleCompactFailure(c compaction, err error) {
    2664            1 :         if errors.Is(err, ErrCancelledCompaction) {
    2665            1 :                 // ErrCancelledCompaction is expected during normal operation, so we don't
    2666            1 :                 // want to report it as a background error.
    2667            1 :                 d.opts.Logger.Infof("%v", err)
    2668            1 :                 return
    2669            1 :         }
    2670            1 :         c.RecordError(&d.problemSpans, err)
    2671            1 :         // TODO(peter): count consecutive compaction errors and backoff.
    2672            1 :         d.opts.EventListener.BackgroundError(err)
    2673              : }
    2674              : 
    2675              : // cleanupVersionEdit cleans up any on-disk artifacts that were created
    2676              : // for the application of a versionEdit that is no longer going to be applied.
    2677              : //
    2678              : // d.mu must be held when calling this method.
    2679            1 : func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
    2680            1 :         obsoleteFiles := manifest.ObsoleteFiles{
    2681            1 :                 TableBackings: make([]*manifest.TableBacking, 0, len(ve.NewTables)),
    2682            1 :                 BlobFiles:     make([]*manifest.PhysicalBlobFile, 0, len(ve.NewBlobFiles)),
    2683            1 :         }
    2684            1 :         deletedTables := make(map[base.TableNum]struct{})
    2685            1 :         for key := range ve.DeletedTables {
    2686            1 :                 deletedTables[key.FileNum] = struct{}{}
    2687            1 :         }
    2688            1 :         for i := range ve.NewBlobFiles {
    2689            0 :                 obsoleteFiles.AddBlob(ve.NewBlobFiles[i].Physical)
    2690            0 :                 d.mu.versions.zombieBlobs.Add(objectInfo{
    2691            0 :                         fileInfo: fileInfo{
    2692            0 :                                 FileNum:  ve.NewBlobFiles[i].Physical.FileNum,
    2693            0 :                                 FileSize: ve.NewBlobFiles[i].Physical.Size,
    2694            0 :                         },
    2695            0 :                         isLocal: objstorage.IsLocalBlobFile(d.objProvider, ve.NewBlobFiles[i].Physical.FileNum),
    2696            0 :                 })
    2697            0 :         }
    2698            1 :         for i := range ve.NewTables {
    2699            1 :                 if ve.NewTables[i].Meta.Virtual {
    2700            0 :                         // We handle backing files separately.
    2701            0 :                         continue
    2702              :                 }
    2703            1 :                 if _, ok := deletedTables[ve.NewTables[i].Meta.TableNum]; ok {
    2704            0 :                         // This file is being moved in this ve to a different level.
    2705            0 :                         // Don't mark it as obsolete.
    2706            0 :                         continue
    2707              :                 }
    2708            1 :                 obsoleteFiles.AddBacking(ve.NewTables[i].Meta.PhysicalMeta().TableBacking)
    2709              :         }
    2710            1 :         for i := range ve.CreatedBackingTables {
    2711            0 :                 if ve.CreatedBackingTables[i].IsUnused() {
    2712            0 :                         obsoleteFiles.AddBacking(ve.CreatedBackingTables[i])
    2713            0 :                 }
    2714              :         }
    2715            1 :         for _, of := range obsoleteFiles.TableBackings {
    2716            1 :                 // Add this file to zombie tables as well, as the versionSet
    2717            1 :                 // asserts on whether every obsolete file was at one point
    2718            1 :                 // marked zombie.
    2719            1 :                 d.mu.versions.zombieTables.Add(objectInfo{
    2720            1 :                         fileInfo: fileInfo{
    2721            1 :                                 FileNum:  of.DiskFileNum,
    2722            1 :                                 FileSize: of.Size,
    2723            1 :                         },
    2724            1 :                         isLocal: objstorage.IsLocalTable(d.objProvider, of.DiskFileNum),
    2725            1 :                 })
    2726            1 :         }
    2727            1 :         d.mu.versions.addObsoleteLocked(obsoleteFiles)
    2728              : }
    2729              : 
    2730              : // compact1 runs one compaction.
    2731              : //
    2732              : // d.mu must be held when calling this, but the mutex may be dropped and
    2733              : // re-acquired during the course of this method.
    2734            1 : func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
    2735            1 :         info := c.makeInfo(jobID)
    2736            1 :         d.opts.EventListener.CompactionBegin(info)
    2737            1 :         startTime := d.timeNow()
    2738            1 : 
    2739            1 :         ve, stats, outputBlobs, err := d.runCompaction(jobID, c)
    2740            1 : 
    2741            1 :         info.Annotations = append(info.Annotations, c.annotations...)
    2742            1 :         info.Duration = d.timeNow().Sub(startTime)
    2743            1 :         if err == nil {
    2744            1 :                 validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
    2745            1 :                 _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
    2746            1 :                         // Check if this compaction had a conflicting operation (eg. a d.excise())
    2747            1 :                         // that necessitates it restarting from scratch. Note that since we hold
    2748            1 :                         // the manifest lock, we don't expect this bool to change its value
    2749            1 :                         // as only the holder of the manifest lock will ever write to it.
    2750            1 :                         if c.cancel.Load() {
    2751            1 :                                 err = firstError(err, ErrCancelledCompaction)
    2752            1 :                                 // This is the first time we've seen a cancellation during the
    2753            1 :                                 // life of this compaction (or the original condition on err == nil
    2754            1 :                                 // would not have been true). We should delete any tables already
    2755            1 :                                 // created, as d.runCompaction did not do that.
    2756            1 :                                 d.cleanupVersionEdit(ve)
    2757            1 :                                 // Note that UpdateVersionLocked invalidates the pickedCompactionCache
    2758            1 :                                 // when we return, which is relevant because this failed compaction
    2759            1 :                                 // may be the highest priority to run next.
    2760            1 :                                 return versionUpdate{}, err
    2761            1 :                         }
    2762            1 :                         return versionUpdate{
    2763            1 :                                 VE:                      ve,
    2764            1 :                                 JobID:                   jobID,
    2765            1 :                                 Metrics:                 c.metrics.perLevel,
    2766            1 :                                 InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
    2767              :                         }, nil
    2768              :                 })
    2769              :         }
    2770              : 
    2771            1 :         info.Done = true
    2772            1 :         info.Err = err
    2773            1 :         if err == nil {
    2774            1 :                 for i := range ve.NewTables {
    2775            1 :                         e := &ve.NewTables[i]
    2776            1 :                         info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
    2777            1 :                 }
    2778            1 :                 for i := range outputBlobs {
    2779            1 :                         b := &outputBlobs[i]
    2780            1 :                         info.Output.Blobs = append(info.Output.Blobs, BlobFileInfo{
    2781            1 :                                 BlobFileID:      base.BlobFileID(b.Metadata.FileNum),
    2782            1 :                                 DiskFileNum:     b.ObjMeta.DiskFileNum,
    2783            1 :                                 Size:            b.Metadata.Size,
    2784            1 :                                 ValueSize:       b.Stats.UncompressedValueBytes,
    2785            1 :                                 MVCCGarbageSize: b.Stats.MVCCGarbageBytes,
    2786            1 :                         })
    2787            1 :                 }
    2788            1 :                 d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
    2789            1 :                 d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
    2790            1 :                 d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
    2791              :         }
    2792              : 
    2793              :         // NB: clearing compacting state must occur before updating the read state;
    2794              :         // L0Sublevels initialization depends on it.
    2795            1 :         d.clearCompactingState(c, err != nil)
    2796            1 :         d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
    2797            1 :         d.mu.versions.incrementCompactionBytes(-c.metrics.bytesWritten.Load())
    2798            1 : 
    2799            1 :         info.TotalDuration = d.timeNow().Sub(c.metrics.beganAt)
    2800            1 :         d.opts.EventListener.CompactionEnd(info)
    2801            1 : 
    2802            1 :         // Update the read state before deleting obsolete files because the
    2803            1 :         // read-state update will cause the previous version to be unref'd and if
    2804            1 :         // there are no references obsolete tables will be added to the obsolete
    2805            1 :         // table list.
    2806            1 :         if err == nil {
    2807            1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
    2808            1 :                 d.updateTableStatsLocked(ve.NewTables)
    2809            1 :         }
    2810              : 
    2811            1 :         return err
    2812              : }
    2813              : 
    2814              : // runCopyCompaction runs a copy compaction where a new TableNum is created that
    2815              : // is a byte-for-byte copy of the input file or span thereof in some cases. This
    2816              : // is used in lieu of a move compaction when a file is being moved across the
    2817              : // local/remote storage boundary. It could also be used in lieu of a rewrite
    2818              : // compaction as part of a Download() call, which allows copying only a span of
    2819              : // the external file, provided the file does not contain range keys or value
    2820              : // blocks (see sstable.CopySpan).
    2821              : //
    2822              : // d.mu must be held when calling this method. The mutex will be released when
    2823              : // doing IO.
    2824              : func (d *DB) runCopyCompaction(
    2825              :         jobID JobID, c *tableCompaction,
    2826            1 : ) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, _ error) {
    2827            1 :         if c.cancel.Load() {
    2828            0 :                 return nil, compact.Stats{}, blobs, ErrCancelledCompaction
    2829            0 :         }
    2830            1 :         iter := c.startLevel.files.Iter()
    2831            1 :         inputMeta := iter.First()
    2832            1 :         if iter.Next() != nil {
    2833            0 :                 return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf("got more than one file for a move compaction")
    2834            0 :         }
    2835            1 :         if inputMeta.BlobReferenceDepth > 0 || len(inputMeta.BlobReferences) > 0 {
    2836            0 :                 return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf(
    2837            0 :                         "copy compaction for %s with blob references (depth=%d, refs=%d)",
    2838            0 :                         inputMeta.TableNum, inputMeta.BlobReferenceDepth, len(inputMeta.BlobReferences),
    2839            0 :                 )
    2840            0 :         }
    2841            1 :         ve = &manifest.VersionEdit{
    2842            1 :                 DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
    2843            1 :                         {Level: c.startLevel.level, FileNum: inputMeta.TableNum}: inputMeta,
    2844            1 :                 },
    2845            1 :         }
    2846            1 : 
    2847            1 :         objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.TableBacking.DiskFileNum)
    2848            1 :         if err != nil {
    2849            0 :                 return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2850            0 :         }
    2851              :         // This code does not support copying a shared table (which should never be necessary).
    2852            1 :         if objMeta.IsShared() {
    2853            0 :                 return nil, compact.Stats{}, []compact.OutputBlob{}, base.AssertionFailedf("copy compaction of shared table")
    2854            0 :         }
    2855              : 
    2856              :         // We are in the relatively more complex case where we need to copy this
    2857              :         // file to remote storage. Drop the db mutex while we do the copy
    2858              :         //
    2859              :         // To ease up cleanup of the local file and tracking of refs, we create
    2860              :         // a new FileNum. This has the potential of making the block cache less
    2861              :         // effective, however.
    2862            1 :         newMeta := &manifest.TableMetadata{
    2863            1 :                 Size:                     inputMeta.Size,
    2864            1 :                 CreationTime:             inputMeta.CreationTime,
    2865            1 :                 SmallestSeqNum:           inputMeta.SmallestSeqNum,
    2866            1 :                 LargestSeqNum:            inputMeta.LargestSeqNum,
    2867            1 :                 LargestSeqNumAbsolute:    inputMeta.LargestSeqNumAbsolute,
    2868            1 :                 Virtual:                  inputMeta.Virtual,
    2869            1 :                 SyntheticPrefixAndSuffix: inputMeta.SyntheticPrefixAndSuffix,
    2870            1 :         }
    2871            1 :         if inputStats, ok := inputMeta.Stats(); ok {
    2872            1 :                 newMeta.PopulateStats(inputStats)
    2873            1 :         }
    2874            1 :         if inputMeta.HasPointKeys {
    2875            1 :                 newMeta.ExtendPointKeyBounds(c.comparer.Compare,
    2876            1 :                         inputMeta.PointKeyBounds.Smallest(),
    2877            1 :                         inputMeta.PointKeyBounds.Largest())
    2878            1 :         }
    2879            1 :         if inputMeta.HasRangeKeys {
    2880            1 :                 newMeta.ExtendRangeKeyBounds(c.comparer.Compare,
    2881            1 :                         inputMeta.RangeKeyBounds.Smallest(),
    2882            1 :                         inputMeta.RangeKeyBounds.Largest())
    2883            1 :         }
    2884            1 :         newMeta.TableNum = d.mu.versions.getNextTableNum()
    2885            1 :         if objMeta.IsExternal() {
    2886            1 :                 // external -> local/shared copy. File must be virtual.
    2887            1 :                 // We will update this size later after we produce the new backing file.
    2888            1 :                 newMeta.InitVirtualBacking(base.DiskFileNum(newMeta.TableNum), inputMeta.TableBacking.Size)
    2889            1 :         } else {
    2890            1 :                 // local -> shared copy. New file is guaranteed to not be virtual.
    2891            1 :                 newMeta.InitPhysicalBacking()
    2892            1 :         }
    2893              : 
    2894              :         // NB: The order here is reversed, lock after unlock. This is similar to
    2895              :         // runCompaction.
    2896            1 :         d.mu.Unlock()
    2897            1 :         defer d.mu.Lock()
    2898            1 : 
    2899            1 :         deleteOnExit := false
    2900            1 :         defer func() {
    2901            1 :                 if deleteOnExit {
    2902            1 :                         _ = d.objProvider.Remove(base.FileTypeTable, newMeta.TableBacking.DiskFileNum)
    2903            1 :                 }
    2904              :         }()
    2905              : 
    2906              :         // If the src obj is external, we're doing an external to local/shared copy.
    2907            1 :         if objMeta.IsExternal() {
    2908            1 :                 ctx := context.TODO()
    2909            1 :                 src, err := d.objProvider.OpenForReading(
    2910            1 :                         ctx, base.FileTypeTable, inputMeta.TableBacking.DiskFileNum, objstorage.OpenOptions{},
    2911            1 :                 )
    2912            1 :                 if err != nil {
    2913            0 :                         return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2914            0 :                 }
    2915            1 :                 defer func() {
    2916            1 :                         if src != nil {
    2917            0 :                                 _ = src.Close()
    2918            0 :                         }
    2919              :                 }()
    2920              : 
    2921            1 :                 w, _, err := d.objProvider.Create(
    2922            1 :                         ctx, base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
    2923            1 :                         objstorage.CreateOptions{
    2924            1 :                                 PreferSharedStorage: d.shouldCreateShared(c.outputLevel.level),
    2925            1 :                         },
    2926            1 :                 )
    2927            1 :                 if err != nil {
    2928            0 :                         return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2929            0 :                 }
    2930            1 :                 deleteOnExit = true
    2931            1 : 
    2932            1 :                 start, end := newMeta.Smallest(), newMeta.Largest()
    2933            1 :                 if newMeta.SyntheticPrefixAndSuffix.HasPrefix() {
    2934            1 :                         syntheticPrefix := newMeta.SyntheticPrefixAndSuffix.Prefix()
    2935            1 :                         start.UserKey = syntheticPrefix.Invert(start.UserKey)
    2936            1 :                         end.UserKey = syntheticPrefix.Invert(end.UserKey)
    2937            1 :                 }
    2938            1 :                 if newMeta.SyntheticPrefixAndSuffix.HasSuffix() {
    2939            0 :                         // Extend the bounds as necessary so that the keys don't include suffixes.
    2940            0 :                         start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
    2941            0 :                         if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
    2942            0 :                                 end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
    2943            0 :                         }
    2944              :                 }
    2945              : 
    2946              :                 // NB: external files are always virtual.
    2947            1 :                 var wrote uint64
    2948            1 :                 err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
    2949            1 :                         var err error
    2950            1 :                         // TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
    2951            1 :                         // or update category stats).
    2952            1 :                         wrote, err = sstable.CopySpan(ctx,
    2953            1 :                                 src, r, d.opts.MakeReaderOptions(),
    2954            1 :                                 w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
    2955            1 :                                 start, end,
    2956            1 :                         )
    2957            1 :                         return err
    2958            1 :                 })
    2959              : 
    2960            1 :                 src = nil // We passed src to CopySpan; it's responsible for closing it.
    2961            1 :                 if err != nil {
    2962            1 :                         if errors.Is(err, sstable.ErrEmptySpan) {
    2963            1 :                                 // The virtual table was empty. Just remove the backing file.
    2964            1 :                                 // Note that deleteOnExit is true so we will delete the created object.
    2965            1 :                                 outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
    2966            1 :                                 outputMetrics.TableBytesIn = inputMeta.Size
    2967            1 : 
    2968            1 :                                 return ve, compact.Stats{}, []compact.OutputBlob{}, nil
    2969            1 :                         }
    2970            0 :                         return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2971              :                 }
    2972            1 :                 newMeta.TableBacking.Size = wrote
    2973            1 :                 newMeta.Size = wrote
    2974            1 :         } else {
    2975            1 :                 _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
    2976            1 :                         d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
    2977            1 :                         objstorage.CreateOptions{PreferSharedStorage: true})
    2978            1 :                 if err != nil {
    2979            0 :                         return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2980            0 :                 }
    2981            1 :                 deleteOnExit = true
    2982              :         }
    2983            1 :         ve.NewTables = []manifest.NewTableEntry{{
    2984            1 :                 Level: c.outputLevel.level,
    2985            1 :                 Meta:  newMeta,
    2986            1 :         }}
    2987            1 :         if newMeta.Virtual {
    2988            1 :                 ve.CreatedBackingTables = []*manifest.TableBacking{newMeta.TableBacking}
    2989            1 :         }
    2990            1 :         outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
    2991            1 :         outputMetrics.TableBytesIn = inputMeta.Size
    2992            1 :         outputMetrics.TableBytesCompacted = newMeta.Size
    2993            1 :         outputMetrics.TablesCompacted = 1
    2994            1 : 
    2995            1 :         if err := d.objProvider.Sync(); err != nil {
    2996            0 :                 return nil, compact.Stats{}, []compact.OutputBlob{}, err
    2997            0 :         }
    2998            1 :         deleteOnExit = false
    2999            1 :         return ve, compact.Stats{}, []compact.OutputBlob{}, nil
    3000              : }
    3001              : 
    3002              : // applyHintOnFile applies a deleteCompactionHint to a file, and updates the
    3003              : // versionEdit accordingly. It returns a list of new files that were created
    3004              : // if the hint was applied partially to a file (eg. through an exciseTable as opposed
    3005              : // to an outright deletion). levelMetrics is kept up-to-date with the number
    3006              : // of tables deleted or excised.
    3007              : func (d *DB) applyHintOnFile(
    3008              :         h deleteCompactionHint,
    3009              :         f *manifest.TableMetadata,
    3010              :         level int,
    3011              :         levelMetrics *LevelMetrics,
    3012              :         ve *manifest.VersionEdit,
    3013              :         hintOverlap deletionHintOverlap,
    3014            1 : ) (newFiles []manifest.NewTableEntry, err error) {
    3015            1 :         if hintOverlap == hintDoesNotApply {
    3016            0 :                 return nil, nil
    3017            0 :         }
    3018              : 
    3019              :         // The hint overlaps with at least part of the file.
    3020            1 :         if hintOverlap == hintDeletesFile {
    3021            1 :                 // The hint deletes the entirety of this file.
    3022            1 :                 ve.DeletedTables[manifest.DeletedTableEntry{
    3023            1 :                         Level:   level,
    3024            1 :                         FileNum: f.TableNum,
    3025            1 :                 }] = f
    3026            1 :                 levelMetrics.TablesDeleted++
    3027            1 :                 return nil, nil
    3028            1 :         }
    3029              :         // The hint overlaps with only a part of the file, not the entirety of it. We need
    3030              :         // to use d.exciseTable. (hintOverlap == hintExcisesFile)
    3031            1 :         if d.FormatMajorVersion() < FormatVirtualSSTables {
    3032            0 :                 panic("pebble: delete-only compaction hint excising a file is not supported in this version")
    3033              :         }
    3034              : 
    3035            1 :         levelMetrics.TablesExcised++
    3036            1 :         exciseBounds := base.UserKeyBoundsEndExclusive(h.start, h.end)
    3037            1 :         leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, f, level, tightExciseBounds)
    3038            1 :         if err != nil {
    3039            0 :                 return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
    3040            0 :         }
    3041            1 :         newFiles = applyExciseToVersionEdit(ve, f, leftTable, rightTable, level)
    3042            1 :         return newFiles, nil
    3043              : }
    3044              : 
    3045              : func (d *DB) runDeleteOnlyCompactionForLevel(
    3046              :         cl compactionLevel,
    3047              :         levelMetrics *LevelMetrics,
    3048              :         ve *manifest.VersionEdit,
    3049              :         snapshots compact.Snapshots,
    3050              :         fragments []deleteCompactionHintFragment,
    3051              :         exciseEnabled bool,
    3052            1 : ) error {
    3053            1 :         if cl.level == 0 {
    3054            0 :                 panic("cannot run delete-only compaction for L0")
    3055              :         }
    3056            1 :         curFragment := 0
    3057            1 : 
    3058            1 :         // Outer loop loops on files. Middle loop loops on fragments. Inner loop
    3059            1 :         // loops on raw fragments of hints. Number of fragments are bounded by
    3060            1 :         // the number of hints this compaction was created with, which is capped
    3061            1 :         // in the compaction picker to avoid very CPU-hot loops here.
    3062            1 :         for f := range cl.files.All() {
    3063            1 :                 // curFile usually matches f, except if f got excised in which case
    3064            1 :                 // it maps to a virtual file that replaces f, or nil if f got removed
    3065            1 :                 // in its entirety.
    3066            1 :                 curFile := f
    3067            1 :                 for curFragment < len(fragments) && d.cmp(fragments[curFragment].start, f.Smallest().UserKey) <= 0 {
    3068            1 :                         curFragment++
    3069            1 :                 }
    3070            1 :                 if curFragment > 0 {
    3071            1 :                         curFragment--
    3072            1 :                 }
    3073              : 
    3074            1 :                 for ; curFragment < len(fragments); curFragment++ {
    3075            1 :                         if f.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(fragments[curFragment].start)) < 0 {
    3076            1 :                                 break
    3077              :                         }
    3078              :                         // Process all overlapping hints with this file. Note that applying
    3079              :                         // a hint twice is idempotent; curFile should have already been excised
    3080              :                         // the first time, resulting in no change the second time.
    3081            1 :                         for _, h := range fragments[curFragment].hints {
    3082            1 :                                 if h.tombstoneLevel >= cl.level {
    3083            1 :                                         // We cannot excise out the deletion tombstone itself, or anything
    3084            1 :                                         // above it.
    3085            1 :                                         continue
    3086              :                                 }
    3087            1 :                                 hintOverlap := h.canDeleteOrExcise(d.cmp, curFile, snapshots, exciseEnabled)
    3088            1 :                                 if hintOverlap == hintDoesNotApply {
    3089            0 :                                         continue
    3090              :                                 }
    3091            1 :                                 newFiles, err := d.applyHintOnFile(h, curFile, cl.level, levelMetrics, ve, hintOverlap)
    3092            1 :                                 if err != nil {
    3093            0 :                                         return err
    3094            0 :                                 }
    3095            1 :                                 if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{Level: cl.level, FileNum: curFile.TableNum}]; ok {
    3096            1 :                                         curFile = nil
    3097            1 :                                 }
    3098            1 :                                 if len(newFiles) > 0 {
    3099            1 :                                         curFile = newFiles[len(newFiles)-1].Meta
    3100            1 :                                 } else if curFile == nil {
    3101            1 :                                         // Nothing remains of the file.
    3102            1 :                                         break
    3103              :                                 }
    3104              :                         }
    3105            1 :                         if curFile == nil {
    3106            1 :                                 // Nothing remains of the file.
    3107            1 :                                 break
    3108              :                         }
    3109              :                 }
    3110            1 :                 if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{
    3111            1 :                         Level:   cl.level,
    3112            1 :                         FileNum: f.TableNum,
    3113            1 :                 }]; !ok {
    3114            0 :                         panic("pebble: delete-only compaction scheduled with hints that did not delete or excise a file")
    3115              :                 }
    3116              :         }
    3117            1 :         return nil
    3118              : }
    3119              : 
    3120              : // deleteCompactionHintFragment represents a fragment of the key space and
    3121              : // contains a set of deleteCompactionHints that apply to that fragment; a
    3122              : // fragment starts at the start field and ends where the next fragment starts.
    3123              : type deleteCompactionHintFragment struct {
    3124              :         start []byte
    3125              :         hints []deleteCompactionHint
    3126              : }
    3127              : 
    3128              : // Delete compaction hints can overlap with each other, and multiple fragments
    3129              : // can apply to a single file. This function takes a list of hints and fragments
    3130              : // them, to make it easier to apply them to non-overlapping files occupying a level;
    3131              : // that way, files and hint fragments can be iterated on in lockstep, while efficiently
    3132              : // being able to apply all hints overlapping with a given file.
    3133              : func fragmentDeleteCompactionHints(
    3134              :         cmp Compare, hints []deleteCompactionHint,
    3135            1 : ) []deleteCompactionHintFragment {
    3136            1 :         fragments := make([]deleteCompactionHintFragment, 0, len(hints)*2)
    3137            1 :         for i := range hints {
    3138            1 :                 fragments = append(fragments, deleteCompactionHintFragment{start: hints[i].start},
    3139            1 :                         deleteCompactionHintFragment{start: hints[i].end})
    3140            1 :         }
    3141            1 :         slices.SortFunc(fragments, func(i, j deleteCompactionHintFragment) int {
    3142            1 :                 return cmp(i.start, j.start)
    3143            1 :         })
    3144            1 :         fragments = slices.CompactFunc(fragments, func(i, j deleteCompactionHintFragment) bool {
    3145            1 :                 return bytes.Equal(i.start, j.start)
    3146            1 :         })
    3147            1 :         for _, h := range hints {
    3148            1 :                 startIdx := sort.Search(len(fragments), func(i int) bool {
    3149            1 :                         return cmp(fragments[i].start, h.start) >= 0
    3150            1 :                 })
    3151            1 :                 endIdx := sort.Search(len(fragments), func(i int) bool {
    3152            1 :                         return cmp(fragments[i].start, h.end) >= 0
    3153            1 :                 })
    3154            1 :                 for i := startIdx; i < endIdx; i++ {
    3155            1 :                         fragments[i].hints = append(fragments[i].hints, h)
    3156            1 :                 }
    3157              :         }
    3158            1 :         return fragments
    3159              : }
    3160              : 
    3161              : // Runs a delete-only compaction.
    3162              : //
    3163              : // d.mu must *not* be held when calling this.
    3164              : func (d *DB) runDeleteOnlyCompaction(
    3165              :         jobID JobID, c *tableCompaction, snapshots compact.Snapshots,
    3166            1 : ) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, retErr error) {
    3167            1 :         fragments := fragmentDeleteCompactionHints(d.cmp, c.deleteOnly.hints)
    3168            1 :         ve = &manifest.VersionEdit{
    3169            1 :                 DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
    3170            1 :         }
    3171            1 :         for _, cl := range c.inputs {
    3172            1 :                 levelMetrics := c.metrics.perLevel.level(cl.level)
    3173            1 :                 err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.deleteOnly.exciseEnabled)
    3174            1 :                 if err != nil {
    3175            0 :                         return nil, stats, blobs, err
    3176            0 :                 }
    3177              :         }
    3178              :         // Remove any files that were added and deleted in the same versionEdit.
    3179            1 :         ve.NewTables = slices.DeleteFunc(ve.NewTables, func(e manifest.NewTableEntry) bool {
    3180            1 :                 entry := manifest.DeletedTableEntry{Level: e.Level, FileNum: e.Meta.TableNum}
    3181            1 :                 if _, deleted := ve.DeletedTables[entry]; deleted {
    3182            1 :                         delete(ve.DeletedTables, entry)
    3183            1 :                         return true
    3184            1 :                 }
    3185            1 :                 return false
    3186              :         })
    3187            1 :         sort.Slice(ve.NewTables, func(i, j int) bool {
    3188            1 :                 return ve.NewTables[i].Meta.TableNum < ve.NewTables[j].Meta.TableNum
    3189            1 :         })
    3190            1 :         deletedTableEntries := slices.Collect(maps.Keys(ve.DeletedTables))
    3191            1 :         slices.SortFunc(deletedTableEntries, func(a, b manifest.DeletedTableEntry) int {
    3192            1 :                 return stdcmp.Compare(a.FileNum, b.FileNum)
    3193            1 :         })
    3194              :         // Remove any entries from CreatedBackingTables that are not used in any
    3195              :         // NewFiles.
    3196            1 :         usedBackingFiles := make(map[base.DiskFileNum]struct{})
    3197            1 :         for _, e := range ve.NewTables {
    3198            1 :                 if e.Meta.Virtual {
    3199            1 :                         usedBackingFiles[e.Meta.TableBacking.DiskFileNum] = struct{}{}
    3200            1 :                 }
    3201              :         }
    3202            1 :         ve.CreatedBackingTables = slices.DeleteFunc(ve.CreatedBackingTables, func(b *manifest.TableBacking) bool {
    3203            1 :                 _, used := usedBackingFiles[b.DiskFileNum]
    3204            1 :                 return !used
    3205            1 :         })
    3206              : 
    3207              :         // Iterate through the deleted tables and new tables to annotate excised tables.
    3208              :         // If a new table is virtual and the base.DiskFileNum is the same as a deleted table, then
    3209              :         // our deleted table was excised.
    3210            1 :         for _, table := range deletedTableEntries {
    3211            1 :                 for _, newEntry := range ve.NewTables {
    3212            1 :                         if newEntry.Meta.Virtual &&
    3213            1 :                                 newEntry.Meta.TableBacking.DiskFileNum == ve.DeletedTables[table].TableBacking.DiskFileNum {
    3214            1 :                                 c.annotations = append(c.annotations,
    3215            1 :                                         fmt.Sprintf("(excised: %s)", ve.DeletedTables[table].TableNum))
    3216            1 :                                 break
    3217              :                         }
    3218              :                 }
    3219              : 
    3220              :         }
    3221              : 
    3222              :         // Refresh the disk available statistic whenever a compaction/flush
    3223              :         // completes, before re-acquiring the mutex.
    3224            1 :         d.calculateDiskAvailableBytes()
    3225            1 :         return ve, stats, blobs, nil
    3226              : }
    3227              : 
    3228              : func (d *DB) runMoveCompaction(
    3229              :         jobID JobID, c *tableCompaction,
    3230            1 : ) (ve *manifest.VersionEdit, stats compact.Stats, blobs []compact.OutputBlob, _ error) {
    3231            1 :         iter := c.startLevel.files.Iter()
    3232            1 :         meta := iter.First()
    3233            1 :         if iter.Next() != nil {
    3234            0 :                 return nil, stats, blobs, base.AssertionFailedf("got more than one file for a move compaction")
    3235            0 :         }
    3236            1 :         if c.cancel.Load() {
    3237            0 :                 return ve, stats, blobs, ErrCancelledCompaction
    3238            0 :         }
    3239            1 :         outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
    3240            1 :         outputMetrics.TableBytesMoved = meta.Size
    3241            1 :         outputMetrics.TablesMoved = 1
    3242            1 :         ve = &manifest.VersionEdit{
    3243            1 :                 DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
    3244            1 :                         {Level: c.startLevel.level, FileNum: meta.TableNum}: meta,
    3245            1 :                 },
    3246            1 :                 NewTables: []manifest.NewTableEntry{
    3247            1 :                         {Level: c.outputLevel.level, Meta: meta},
    3248            1 :                 },
    3249            1 :         }
    3250            1 : 
    3251            1 :         return ve, stats, blobs, nil
    3252              : }
    3253              : 
    3254              : // runCompaction runs a compaction that produces new on-disk tables from
    3255              : // memtables or old on-disk tables.
    3256              : //
    3257              : // runCompaction cannot be used for compactionKindIngestedFlushable.
    3258              : //
    3259              : // d.mu must be held when calling this, but the mutex may be dropped and
    3260              : // re-acquired during the course of this method.
    3261              : func (d *DB) runCompaction(
    3262              :         jobID JobID, c *tableCompaction,
    3263              : ) (
    3264              :         ve *manifest.VersionEdit,
    3265              :         stats compact.Stats,
    3266              :         outputBlobs []compact.OutputBlob,
    3267              :         retErr error,
    3268            1 : ) {
    3269            1 :         if c.cancel.Load() {
    3270            0 :                 return ve, stats, outputBlobs, ErrCancelledCompaction
    3271            0 :         }
    3272            1 :         switch c.kind {
    3273            1 :         case compactionKindDeleteOnly:
    3274            1 :                 // Release the d.mu lock while doing I/O.
    3275            1 :                 // Note the unusual order: Unlock and then Lock.
    3276            1 :                 snapshots := d.mu.snapshots.toSlice()
    3277            1 :                 d.mu.Unlock()
    3278            1 :                 defer d.mu.Lock()
    3279            1 :                 return d.runDeleteOnlyCompaction(jobID, c, snapshots)
    3280            1 :         case compactionKindMove:
    3281            1 :                 return d.runMoveCompaction(jobID, c)
    3282            1 :         case compactionKindCopy:
    3283            1 :                 return d.runCopyCompaction(jobID, c)
    3284            0 :         case compactionKindIngestedFlushable:
    3285            0 :                 panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
    3286              :         }
    3287            1 :         return d.runDefaultTableCompaction(jobID, c)
    3288              : }
    3289              : 
    3290              : func (d *DB) runDefaultTableCompaction(
    3291              :         jobID JobID, c *tableCompaction,
    3292              : ) (
    3293              :         ve *manifest.VersionEdit,
    3294              :         stats compact.Stats,
    3295              :         outputBlobs []compact.OutputBlob,
    3296              :         retErr error,
    3297            1 : ) {
    3298            1 :         snapshots := d.mu.snapshots.toSlice()
    3299            1 : 
    3300            1 :         // Release the d.mu lock while doing I/O.
    3301            1 :         // Note the unusual order: Unlock and then Lock.
    3302            1 :         d.mu.Unlock()
    3303            1 :         defer d.mu.Lock()
    3304            1 : 
    3305            1 :         // Determine whether we should separate values into blob files.
    3306            1 :         valueSeparation := c.getValueSeparation(jobID, c, c.tableFormat)
    3307            1 : 
    3308            1 :         result := d.compactAndWrite(jobID, c, snapshots, c.tableFormat, valueSeparation)
    3309            1 :         if result.Err == nil {
    3310            1 :                 ve, result.Err = c.makeVersionEdit(result)
    3311            1 :         }
    3312            1 :         if result.Err != nil {
    3313            1 :                 // Delete any created tables or blob files.
    3314            1 :                 obsoleteFiles := manifest.ObsoleteFiles{
    3315            1 :                         TableBackings: make([]*manifest.TableBacking, 0, len(result.Tables)),
    3316            1 :                         BlobFiles:     make([]*manifest.PhysicalBlobFile, 0, len(result.Blobs)),
    3317            1 :                 }
    3318            1 :                 d.mu.Lock()
    3319            1 :                 for i := range result.Tables {
    3320            1 :                         backing := &manifest.TableBacking{
    3321            1 :                                 DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
    3322            1 :                                 Size:        result.Tables[i].WriterMeta.Size,
    3323            1 :                         }
    3324            1 :                         obsoleteFiles.AddBacking(backing)
    3325            1 :                         // Add this file to zombie tables as well, as the versionSet
    3326            1 :                         // asserts on whether every obsolete file was at one point
    3327            1 :                         // marked zombie.
    3328            1 :                         d.mu.versions.zombieTables.AddMetadata(&result.Tables[i].ObjMeta, backing.Size)
    3329            1 :                 }
    3330            1 :                 for i := range result.Blobs {
    3331            0 :                         obsoleteFiles.AddBlob(result.Blobs[i].Metadata)
    3332            0 :                         // Add this file to zombie blobs as well, as the versionSet
    3333            0 :                         // asserts on whether every obsolete file was at one point
    3334            0 :                         // marked zombie.
    3335            0 :                         d.mu.versions.zombieBlobs.AddMetadata(&result.Blobs[i].ObjMeta, result.Blobs[i].Metadata.Size)
    3336            0 :                 }
    3337            1 :                 d.mu.versions.addObsoleteLocked(obsoleteFiles)
    3338            1 :                 d.mu.Unlock()
    3339              :         }
    3340              :         // Refresh the disk available statistic whenever a compaction/flush
    3341              :         // completes, before re-acquiring the mutex.
    3342            1 :         d.calculateDiskAvailableBytes()
    3343            1 :         return ve, result.Stats, result.Blobs, result.Err
    3344              : }
    3345              : 
    3346              : // compactAndWrite runs the data part of a compaction, where we set up a
    3347              : // compaction iterator and use it to write output tables.
    3348              : func (d *DB) compactAndWrite(
    3349              :         jobID JobID,
    3350              :         c *tableCompaction,
    3351              :         snapshots compact.Snapshots,
    3352              :         tableFormat sstable.TableFormat,
    3353              :         valueSeparation compact.ValueSeparation,
    3354            1 : ) (result compact.Result) {
    3355            1 :         suggestedCacheReaders := blob.SuggestedCachedReaders(len(c.inputs))
    3356            1 :         // Compactions use a pool of buffers to read blocks, avoiding polluting the
    3357            1 :         // block cache with blocks that will not be read again. We initialize the
    3358            1 :         // buffer pool with a size 12. This initial size does not need to be
    3359            1 :         // accurate, because the pool will grow to accommodate the maximum number of
    3360            1 :         // blocks allocated at a given time over the course of the compaction. But
    3361            1 :         // choosing a size larger than that working set avoids any additional
    3362            1 :         // allocations to grow the size of the pool over the course of iteration.
    3363            1 :         //
    3364            1 :         // Justification for initial size 18: In a compaction with up to 3 levels,
    3365            1 :         // at any given moment we'll have 3 index blocks in-use and 3 data blocks in-use.
    3366            1 :         // Additionally, when decoding a compressed block, we'll temporarily
    3367            1 :         // allocate 1 additional block to hold the compressed buffer. In the worst
    3368            1 :         // case that all input sstables have two-level index blocks (+3), value
    3369            1 :         // blocks (+3), range deletion blocks (+n) and range key blocks (+n), we'll
    3370            1 :         // additionally require 2n+6 blocks where n is the number of input sstables.
    3371            1 :         // Range deletion and range key blocks are relatively rare, and the cost of
    3372            1 :         // an additional allocation or two over the course of the compaction is
    3373            1 :         // considered to be okay. A larger initial size would cause the pool to hold
    3374            1 :         // on to more memory, even when it's not in-use because the pool will
    3375            1 :         // recycle buffers up to the current capacity of the pool. The memory use of
    3376            1 :         // a 18-buffer pool is expected to be within reason, even if all the buffers
    3377            1 :         // grow to the typical size of an index block (256 KiB) which would
    3378            1 :         // translate to 4.5 MiB per compaction.
    3379            1 :         c.iterationState.bufferPool.Init(18 + suggestedCacheReaders*2)
    3380            1 :         defer c.iterationState.bufferPool.Release()
    3381            1 :         blockReadEnv := block.ReadEnv{
    3382            1 :                 BufferPool: &c.iterationState.bufferPool,
    3383            1 :                 Stats:      &c.metrics.internalIterStats,
    3384            1 :                 IterStats: d.fileCache.SSTStatsCollector().Accumulator(
    3385            1 :                         uint64(uintptr(unsafe.Pointer(c))),
    3386            1 :                         categoryCompaction,
    3387            1 :                 ),
    3388            1 :         }
    3389            1 :         if c.version != nil {
    3390            1 :                 c.iterationState.valueFetcher.Init(&c.version.BlobFiles, d.fileCache, blockReadEnv, suggestedCacheReaders)
    3391            1 :         }
    3392            1 :         iiopts := internalIterOpts{
    3393            1 :                 compaction:       true,
    3394            1 :                 readEnv:          sstable.ReadEnv{Block: blockReadEnv},
    3395            1 :                 blobValueFetcher: &c.iterationState.valueFetcher,
    3396            1 :         }
    3397            1 :         defer func() { _ = c.iterationState.valueFetcher.Close() }()
    3398              : 
    3399            1 :         pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, iiopts)
    3400            1 :         defer func() {
    3401            1 :                 for _, closer := range c.iterationState.keyspanIterClosers {
    3402            1 :                         closer.FragmentIterator.Close()
    3403            1 :                 }
    3404              :         }()
    3405            1 :         if err != nil {
    3406            1 :                 return compact.Result{Err: err}
    3407            1 :         }
    3408            1 :         cfg := compact.IterConfig{
    3409            1 :                 Comparer:              c.comparer,
    3410            1 :                 Merge:                 d.merge,
    3411            1 :                 TombstoneElision:      c.delElision,
    3412            1 :                 RangeKeyElision:       c.rangeKeyElision,
    3413            1 :                 Snapshots:             snapshots,
    3414            1 :                 IsBottommostDataLayer: c.isBottommostDataLayer(),
    3415            1 :                 IneffectualSingleDeleteCallback: func(userKey []byte) {
    3416            1 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
    3417            1 :                                 Kind:    IneffectualSingleDelete,
    3418            1 :                                 UserKey: slices.Clone(userKey),
    3419            1 :                         })
    3420            1 :                 },
    3421            0 :                 NondeterministicSingleDeleteCallback: func(userKey []byte) {
    3422            0 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
    3423            0 :                                 Kind:    NondeterministicSingleDelete,
    3424            0 :                                 UserKey: slices.Clone(userKey),
    3425            0 :                         })
    3426            0 :                 },
    3427            1 :                 MissizedDeleteCallback: func(userKey []byte, elidedSize, expectedSize uint64) {
    3428            1 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
    3429            1 :                                 Kind:    MissizedDelete,
    3430            1 :                                 UserKey: slices.Clone(userKey),
    3431            1 :                                 ExtraInfo: redact.Sprintf("elidedSize=%d,expectedSize=%d",
    3432            1 :                                         redact.SafeUint(elidedSize), redact.SafeUint(expectedSize)),
    3433            1 :                         })
    3434            1 :                 },
    3435              :         }
    3436            1 :         iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
    3437            1 : 
    3438            1 :         runnerCfg := compact.RunnerConfig{
    3439            1 :                 CompactionBounds:           c.bounds,
    3440            1 :                 L0SplitKeys:                c.flush.l0Limits,
    3441            1 :                 Grandparents:               c.grandparents,
    3442            1 :                 MaxGrandparentOverlapBytes: c.maxOverlapBytes,
    3443            1 :                 TargetOutputFileSize:       c.maxOutputFileSize,
    3444            1 :                 GrantHandle:                c.grantHandle,
    3445            1 :         }
    3446            1 :         runner := compact.NewRunner(runnerCfg, iter)
    3447            1 : 
    3448            1 :         var spanPolicyValid bool
    3449            1 :         var spanPolicy SpanPolicy
    3450            1 :         // If spanPolicyValid is true and spanPolicyEndKey is empty, then spanPolicy
    3451            1 :         // applies for the rest of the keyspace.
    3452            1 :         var spanPolicyEndKey []byte
    3453            1 : 
    3454            1 :         for runner.MoreDataToWrite() {
    3455            1 :                 if c.cancel.Load() {
    3456            1 :                         return runner.Finish().WithError(ErrCancelledCompaction)
    3457            1 :                 }
    3458              :                 // Create a new table.
    3459            1 :                 firstKey := runner.FirstKey()
    3460            1 :                 if !spanPolicyValid || (len(spanPolicyEndKey) > 0 && d.cmp(firstKey, spanPolicyEndKey) >= 0) {
    3461            1 :                         var err error
    3462            1 :                         spanPolicy, spanPolicyEndKey, err = d.opts.Experimental.SpanPolicyFunc(firstKey)
    3463            1 :                         if err != nil {
    3464            0 :                                 return runner.Finish().WithError(err)
    3465            0 :                         }
    3466            1 :                         spanPolicyValid = true
    3467              :                 }
    3468              : 
    3469            1 :                 writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
    3470            1 :                 if spanPolicy.DisableValueSeparationBySuffix {
    3471            1 :                         writerOpts.DisableValueBlocks = true
    3472            1 :                 }
    3473            1 :                 if spanPolicy.PreferFastCompression && writerOpts.Compression != block.NoCompression {
    3474            0 :                         writerOpts.Compression = block.FastestCompression
    3475            0 :                 }
    3476            1 :                 vSep := valueSeparation
    3477            1 :                 switch spanPolicy.ValueStoragePolicy {
    3478            1 :                 case ValueStorageLowReadLatency:
    3479            1 :                         vSep = compact.NeverSeparateValues{}
    3480            1 :                 case ValueStorageLatencyTolerant:
    3481            1 :                         // This span of keyspace is more tolerant of latency, so set a more
    3482            1 :                         // aggressive value separation policy for this output.
    3483            1 :                         vSep.SetNextOutputConfig(compact.ValueSeparationOutputConfig{
    3484            1 :                                 MinimumSize: latencyTolerantMinimumSize,
    3485            1 :                         })
    3486              :                 }
    3487            1 :                 objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)
    3488            1 :                 if err != nil {
    3489            1 :                         return runner.Finish().WithError(err)
    3490            1 :                 }
    3491            1 :                 runner.WriteTable(objMeta, tw, spanPolicyEndKey, vSep)
    3492              :         }
    3493            1 :         result = runner.Finish()
    3494            1 :         if result.Err == nil {
    3495            1 :                 result.Err = d.objProvider.Sync()
    3496            1 :         }
    3497            1 :         return result
    3498              : }
    3499              : 
    3500              : // makeVersionEdit creates the version edit for a compaction, based on the
    3501              : // tables in compact.Result.
    3502            1 : func (c *tableCompaction) makeVersionEdit(result compact.Result) (*manifest.VersionEdit, error) {
    3503            1 :         ve := &manifest.VersionEdit{
    3504            1 :                 DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
    3505            1 :         }
    3506            1 :         for _, cl := range c.inputs {
    3507            1 :                 for f := range cl.files.All() {
    3508            1 :                         ve.DeletedTables[manifest.DeletedTableEntry{
    3509            1 :                                 Level:   cl.level,
    3510            1 :                                 FileNum: f.TableNum,
    3511            1 :                         }] = f
    3512            1 :                 }
    3513              :         }
    3514              :         // Add any newly constructed blob files to the version edit.
    3515            1 :         ve.NewBlobFiles = make([]manifest.BlobFileMetadata, len(result.Blobs))
    3516            1 :         for i := range result.Blobs {
    3517            1 :                 ve.NewBlobFiles[i] = manifest.BlobFileMetadata{
    3518            1 :                         FileID:   base.BlobFileID(result.Blobs[i].Metadata.FileNum),
    3519            1 :                         Physical: result.Blobs[i].Metadata,
    3520            1 :                 }
    3521            1 :         }
    3522              : 
    3523            1 :         startLevelBytes := c.startLevel.files.TableSizeSum()
    3524            1 : 
    3525            1 :         outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
    3526            1 :         outputMetrics.TableBytesIn = startLevelBytes
    3527            1 :         for i := range c.metrics.internalIterStats.BlockReads {
    3528            1 :                 switch blockkind.Kind(i) {
    3529            1 :                 case blockkind.BlobValue:
    3530            1 :                         outputMetrics.BlobBytesRead += c.metrics.internalIterStats.BlockReads[i].BlockBytes
    3531            1 :                 default:
    3532            1 :                         outputMetrics.TableBytesRead += c.metrics.internalIterStats.BlockReads[i].BlockBytes
    3533              :                 }
    3534              :         }
    3535            1 :         outputMetrics.BlobBytesCompacted = result.Stats.CumulativeBlobFileSize
    3536            1 :         if c.flush.flushables != nil {
    3537            1 :                 outputMetrics.BlobBytesFlushed = result.Stats.CumulativeBlobFileSize
    3538            1 :         }
    3539            1 :         if len(c.extraLevels) > 0 {
    3540            1 :                 outputMetrics.TableBytesIn += c.extraLevels[0].files.TableSizeSum()
    3541            1 :         }
    3542              : 
    3543            1 :         if len(c.flush.flushables) == 0 {
    3544            1 :                 c.metrics.perLevel.level(c.startLevel.level)
    3545            1 :         }
    3546            1 :         if len(c.extraLevels) > 0 {
    3547            1 :                 c.metrics.perLevel.level(c.extraLevels[0].level)
    3548            1 :                 outputMetrics.MultiLevel.TableBytesInTop = startLevelBytes
    3549            1 :                 outputMetrics.MultiLevel.TableBytesIn = outputMetrics.TableBytesIn
    3550            1 :                 outputMetrics.MultiLevel.TableBytesRead = outputMetrics.TableBytesRead
    3551            1 :         }
    3552              : 
    3553            1 :         inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
    3554            1 :         ve.NewTables = make([]manifest.NewTableEntry, len(result.Tables))
    3555            1 :         for i := range result.Tables {
    3556            1 :                 t := &result.Tables[i]
    3557            1 : 
    3558            1 :                 if t.WriterMeta.Properties.NumValuesInBlobFiles > 0 {
    3559            1 :                         if len(t.BlobReferences) == 0 {
    3560            0 :                                 return nil, base.AssertionFailedf("num values in blob files %d but no blob references",
    3561            0 :                                         t.WriterMeta.Properties.NumValuesInBlobFiles)
    3562            0 :                         }
    3563              :                 }
    3564              : 
    3565            1 :                 fileMeta := &manifest.TableMetadata{
    3566            1 :                         TableNum:           base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
    3567            1 :                         CreationTime:       t.CreationTime.Unix(),
    3568            1 :                         Size:               t.WriterMeta.Size,
    3569            1 :                         SmallestSeqNum:     t.WriterMeta.SmallestSeqNum,
    3570            1 :                         LargestSeqNum:      t.WriterMeta.LargestSeqNum,
    3571            1 :                         BlobReferences:     t.BlobReferences,
    3572            1 :                         BlobReferenceDepth: t.BlobReferenceDepth,
    3573            1 :                 }
    3574            1 :                 if c.flush.flushables == nil {
    3575            1 :                         // Set the file's LargestSeqNumAbsolute to be the maximum value of any
    3576            1 :                         // of the compaction's input sstables.
    3577            1 :                         // TODO(jackson): This could be narrowed to be the maximum of input
    3578            1 :                         // sstables that overlap the output sstable's key range.
    3579            1 :                         fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
    3580            1 :                 } else {
    3581            1 :                         fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
    3582            1 :                 }
    3583            1 :                 fileMeta.InitPhysicalBacking()
    3584            1 : 
    3585            1 :                 // If the file didn't contain any range deletions, we can fill its
    3586            1 :                 // table stats now, avoiding unnecessarily loading the table later.
    3587            1 :                 maybeSetStatsFromProperties(fileMeta.PhysicalMeta(), &t.WriterMeta.Properties)
    3588            1 : 
    3589            1 :                 if t.WriterMeta.HasPointKeys {
    3590            1 :                         fileMeta.ExtendPointKeyBounds(c.comparer.Compare,
    3591            1 :                                 t.WriterMeta.SmallestPoint,
    3592            1 :                                 t.WriterMeta.LargestPoint)
    3593            1 :                 }
    3594            1 :                 if t.WriterMeta.HasRangeDelKeys {
    3595            1 :                         fileMeta.ExtendPointKeyBounds(c.comparer.Compare,
    3596            1 :                                 t.WriterMeta.SmallestRangeDel,
    3597            1 :                                 t.WriterMeta.LargestRangeDel)
    3598            1 :                 }
    3599            1 :                 if t.WriterMeta.HasRangeKeys {
    3600            1 :                         fileMeta.ExtendRangeKeyBounds(c.comparer.Compare,
    3601            1 :                                 t.WriterMeta.SmallestRangeKey,
    3602            1 :                                 t.WriterMeta.LargestRangeKey)
    3603            1 :                 }
    3604              : 
    3605            1 :                 ve.NewTables[i] = manifest.NewTableEntry{
    3606            1 :                         Level: c.outputLevel.level,
    3607            1 :                         Meta:  fileMeta,
    3608            1 :                 }
    3609            1 : 
    3610            1 :                 // Update metrics.
    3611            1 :                 if c.flush.flushables == nil {
    3612            1 :                         outputMetrics.TablesCompacted++
    3613            1 :                         outputMetrics.TableBytesCompacted += fileMeta.Size
    3614            1 :                 } else {
    3615            1 :                         outputMetrics.TablesFlushed++
    3616            1 :                         outputMetrics.TableBytesFlushed += fileMeta.Size
    3617            1 :                 }
    3618            1 :                 outputMetrics.EstimatedReferencesSize += fileMeta.EstimatedReferenceSize()
    3619            1 :                 outputMetrics.TablesSize += int64(fileMeta.Size)
    3620            1 :                 outputMetrics.TablesCount++
    3621            1 :                 outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
    3622            1 :                 outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
    3623              :         }
    3624              : 
    3625              :         // Sanity check that the tables are ordered and don't overlap.
    3626            1 :         for i := 1; i < len(ve.NewTables); i++ {
    3627            1 :                 if ve.NewTables[i-1].Meta.Largest().IsUpperBoundFor(c.comparer.Compare, ve.NewTables[i].Meta.Smallest().UserKey) {
    3628            0 :                         return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
    3629            0 :                                 ve.NewTables[i-1].Meta.DebugString(c.comparer.FormatKey, true),
    3630            0 :                                 ve.NewTables[i].Meta.DebugString(c.comparer.FormatKey, true),
    3631            0 :                         )
    3632            0 :                 }
    3633              :         }
    3634              : 
    3635            1 :         return ve, nil
    3636              : }
    3637              : 
    3638              : // newCompactionOutputTable creates an object for a new table produced by a
    3639              : // compaction or flush.
    3640              : func (d *DB) newCompactionOutputTable(
    3641              :         jobID JobID, c *tableCompaction, writerOpts sstable.WriterOptions,
    3642            1 : ) (objstorage.ObjectMetadata, sstable.RawWriter, error) {
    3643            1 :         writable, objMeta, err := d.newCompactionOutputObj(
    3644            1 :                 base.FileTypeTable, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
    3645            1 :         if err != nil {
    3646            1 :                 return objstorage.ObjectMetadata{}, nil, err
    3647            1 :         }
    3648            1 :         d.opts.EventListener.TableCreated(TableCreateInfo{
    3649            1 :                 JobID:   int(jobID),
    3650            1 :                 Reason:  c.kind.compactingOrFlushing(),
    3651            1 :                 Path:    d.objProvider.Path(objMeta),
    3652            1 :                 FileNum: objMeta.DiskFileNum,
    3653            1 :         })
    3654            1 :         writerOpts.SetInternal(sstableinternal.WriterOptions{
    3655            1 :                 CacheOpts: sstableinternal.CacheOptions{
    3656            1 :                         CacheHandle: d.cacheHandle,
    3657            1 :                         FileNum:     objMeta.DiskFileNum,
    3658            1 :                 },
    3659            1 :         })
    3660            1 :         tw := sstable.NewRawWriterWithCPUMeasurer(writable, writerOpts, c.grantHandle)
    3661            1 :         return objMeta, tw, nil
    3662              : }
    3663              : 
    3664              : // newCompactionOutputBlob creates an object for a new blob produced by a
    3665              : // compaction or flush.
    3666              : func (d *DB) newCompactionOutputBlob(
    3667              :         jobID JobID,
    3668              :         kind compactionKind,
    3669              :         outputLevel int,
    3670              :         bytesWritten *atomic.Int64,
    3671              :         opts objstorage.CreateOptions,
    3672            1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
    3673            1 :         writable, objMeta, err := d.newCompactionOutputObj(base.FileTypeBlob, kind, outputLevel, bytesWritten, opts)
    3674            1 :         if err != nil {
    3675            0 :                 return nil, objstorage.ObjectMetadata{}, err
    3676            0 :         }
    3677            1 :         d.opts.EventListener.BlobFileCreated(BlobFileCreateInfo{
    3678            1 :                 JobID:   int(jobID),
    3679            1 :                 Reason:  kind.compactingOrFlushing(),
    3680            1 :                 Path:    d.objProvider.Path(objMeta),
    3681            1 :                 FileNum: objMeta.DiskFileNum,
    3682            1 :         })
    3683            1 :         return writable, objMeta, nil
    3684              : }
    3685              : 
    3686              : // newCompactionOutputObj creates an object produced by a compaction or flush.
    3687              : func (d *DB) newCompactionOutputObj(
    3688              :         typ base.FileType,
    3689              :         kind compactionKind,
    3690              :         outputLevel int,
    3691              :         bytesWritten *atomic.Int64,
    3692              :         opts objstorage.CreateOptions,
    3693            1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
    3694            1 :         diskFileNum := d.mu.versions.getNextDiskFileNum()
    3695            1 :         ctx := context.TODO()
    3696            1 : 
    3697            1 :         if objiotracing.Enabled {
    3698            0 :                 ctx = objiotracing.WithLevel(ctx, outputLevel)
    3699            0 :                 if kind == compactionKindFlush {
    3700            0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
    3701            0 :                 } else {
    3702            0 :                         ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
    3703            0 :                 }
    3704              :         }
    3705              : 
    3706            1 :         writable, objMeta, err := d.objProvider.Create(ctx, typ, diskFileNum, opts)
    3707            1 :         if err != nil {
    3708            1 :                 return nil, objstorage.ObjectMetadata{}, err
    3709            1 :         }
    3710              : 
    3711            1 :         if kind != compactionKindFlush {
    3712            1 :                 writable = &compactionWritable{
    3713            1 :                         Writable: writable,
    3714            1 :                         versions: d.mu.versions,
    3715            1 :                         written:  bytesWritten,
    3716            1 :                 }
    3717            1 :         }
    3718            1 :         return writable, objMeta, nil
    3719              : }
    3720              : 
    3721              : // validateVersionEdit validates that start and end keys across new and deleted
    3722              : // files in a versionEdit pass the given validation function.
    3723              : func validateVersionEdit(
    3724              :         ve *manifest.VersionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
    3725            1 : ) {
    3726            1 :         validateKey := func(f *manifest.TableMetadata, key []byte) {
    3727            1 :                 if err := vk.Validate(key); err != nil {
    3728            1 :                         logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
    3729            1 :                 }
    3730              :         }
    3731              : 
    3732              :         // Validate both new and deleted files.
    3733            1 :         for _, f := range ve.NewTables {
    3734            1 :                 validateKey(f.Meta, f.Meta.Smallest().UserKey)
    3735            1 :                 validateKey(f.Meta, f.Meta.Largest().UserKey)
    3736            1 :         }
    3737            1 :         for _, m := range ve.DeletedTables {
    3738            1 :                 validateKey(m, m.Smallest().UserKey)
    3739            1 :                 validateKey(m, m.Largest().UserKey)
    3740            1 :         }
    3741              : }
    3742              : 
    3743            1 : func getDiskWriteCategoryForCompaction(opts *Options, kind compactionKind) vfs.DiskWriteCategory {
    3744            1 :         if opts.EnableSQLRowSpillMetrics {
    3745            0 :                 // In the scenario that the Pebble engine is used for SQL row spills the
    3746            0 :                 // data written to the memtable will correspond to spills to disk and
    3747            0 :                 // should be categorized as such.
    3748            0 :                 return "sql-row-spill"
    3749            1 :         } else if kind == compactionKindFlush {
    3750            1 :                 return "pebble-memtable-flush"
    3751            1 :         } else if kind == compactionKindBlobFileRewrite {
    3752            1 :                 return "pebble-blob-file-rewrite"
    3753            1 :         } else {
    3754            1 :                 return "pebble-compaction"
    3755            1 :         }
    3756              : }
        

Generated by: LCOV version 2.0-1