LCOV - code coverage report
Current view: top level - pebble/internal/compact - run.go (source / functions) Hit Total Coverage
Test: 2024-12-21 08:16Z 78d53457 - tests + meta.lcov Lines: 166 199 83.4 %
Date: 2024-12-21 08:17:55 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package compact
       6             : 
       7             : import (
       8             :         "sort"
       9             :         "time"
      10             : 
      11             :         "github.com/cockroachdb/errors"
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/internal/keyspan"
      14             :         "github.com/cockroachdb/pebble/internal/manifest"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             :         "github.com/cockroachdb/pebble/sstable"
      17             : )
      18             : 
      19             : // Result stores the result of a compaction - more specifically, the "data" part
      20             : // where we use the compaction iterator to write output tables.
      21             : type Result struct {
      22             :         // Err is the result of the compaction. On success, Err is nil and Tables
      23             :         // stores the output tables. On failure, Err is set and Tables stores the
      24             :         // tables created so far (and which need to be cleaned up).
      25             :         Err    error
      26             :         Tables []OutputTable
      27             :         Stats  Stats
      28             : }
      29             : 
      30             : // WithError returns a modified Result which has the Err field set.
      31           2 : func (r Result) WithError(err error) Result {
      32           2 :         return Result{
      33           2 :                 Err:    errors.CombineErrors(r.Err, err),
      34           2 :                 Tables: r.Tables,
      35           2 :                 Stats:  r.Stats,
      36           2 :         }
      37           2 : }
      38             : 
      39             : // OutputTable contains metadata about a table that was created during a compaction.
      40             : type OutputTable struct {
      41             :         CreationTime time.Time
      42             :         // ObjMeta is metadata for the object backing the table.
      43             :         ObjMeta objstorage.ObjectMetadata
      44             :         // WriterMeta is populated once the table is fully written. On compaction
      45             :         // failure (see Result), WriterMeta might not be set.
      46             :         WriterMeta sstable.WriterMetadata
      47             : }
      48             : 
      49             : // Stats describes stats collected during the compaction.
      50             : type Stats struct {
      51             :         CumulativePinnedKeys  uint64
      52             :         CumulativePinnedSize  uint64
      53             :         CumulativeWrittenSize uint64
      54             :         CountMissizedDels     uint64
      55             : }
      56             : 
      57             : // RunnerConfig contains the parameters needed for the Runner.
      58             : type RunnerConfig struct {
      59             :         // CompactionBounds are the bounds containing all the input tables. All output
      60             :         // tables must fall within these bounds as well.
      61             :         CompactionBounds base.UserKeyBounds
      62             : 
      63             :         // L0SplitKeys is only set for flushes and it contains the flush split keys
      64             :         // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
      65             :         // output tables.
      66             :         L0SplitKeys [][]byte
      67             : 
      68             :         // Grandparents are the tables in level+2 that overlap with the files being
      69             :         // compacted. Used to determine output table boundaries. Do not assume that
      70             :         // the actual files in the grandparent when this compaction finishes will be
      71             :         // the same.
      72             :         Grandparents manifest.LevelSlice
      73             : 
      74             :         // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
      75             :         // allowed for a single output table with the tables in the grandparent level.
      76             :         MaxGrandparentOverlapBytes uint64
      77             : 
      78             :         // TargetOutputFileSize is the desired size of an individual table created
      79             :         // during compaction. In practice, the sizes can vary between 50%-200% of this
      80             :         // value.
      81             :         TargetOutputFileSize uint64
      82             : 
      83             :         // Slot is the compaction slot taken up by this compaction. Used to perform
      84             :         // pacing or account for concurrency limits.
      85             :         Slot base.CompactionSlot
      86             : 
      87             :         // IteratorStats is the stats collected by the compaction iterator.
      88             :         IteratorStats *base.InternalIteratorStats
      89             : }
      90             : 
      91             : // Runner is a helper for running the "data" part of a compaction (where we use
      92             : // the compaction iterator to write output tables).
      93             : //
      94             : // Sample usage:
      95             : //
      96             : //      r := NewRunner(cfg, iter)
      97             : //      for r.MoreDataToWrite() {
      98             : //        objMeta, tw := ... // Create object and table writer.
      99             : //        r.WriteTable(objMeta, tw)
     100             : //      }
     101             : //      result := r.Finish()
     102             : type Runner struct {
     103             :         cmp  base.Compare
     104             :         cfg  RunnerConfig
     105             :         iter *Iter
     106             : 
     107             :         tables []OutputTable
     108             :         // Stores any error encountered.
     109             :         err error
     110             :         // Last key/value returned by the compaction iterator.
     111             :         key   *base.InternalKey
     112             :         value base.LazyValue
     113             :         // Last RANGEDEL span (or portion of it) that was not yet written to a table.
     114             :         lastRangeDelSpan keyspan.Span
     115             :         // Last range key span (or portion of it) that was not yet written to a table.
     116             :         lastRangeKeySpan keyspan.Span
     117             :         stats            Stats
     118             : }
     119             : 
     120             : // NewRunner creates a new Runner.
     121           2 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
     122           2 :         r := &Runner{
     123           2 :                 cmp:  iter.cmp,
     124           2 :                 cfg:  cfg,
     125           2 :                 iter: iter,
     126           2 :         }
     127           2 :         r.key, r.value = r.iter.First()
     128           2 :         return r
     129           2 : }
     130             : 
     131             : // MoreDataToWrite returns true if there is more data to be written.
     132           2 : func (r *Runner) MoreDataToWrite() bool {
     133           2 :         if r.err != nil {
     134           1 :                 return false
     135           1 :         }
     136           2 :         return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
     137             : }
     138             : 
     139             : // WriteTable writes a new output table. This table will be part of
     140             : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
     141             : //
     142             : // WriteTable always closes the Writer.
     143           2 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw sstable.RawWriter) {
     144           2 :         if r.err != nil {
     145           0 :                 panic("error already encountered")
     146             :         }
     147           2 :         r.tables = append(r.tables, OutputTable{
     148           2 :                 CreationTime: time.Now(),
     149           2 :                 ObjMeta:      objMeta,
     150           2 :         })
     151           2 :         splitKey, err := r.writeKeysToTable(tw)
     152           2 :         err = errors.CombineErrors(err, tw.Close())
     153           2 :         if err != nil {
     154           1 :                 r.err = err
     155           1 :                 r.key, r.value = nil, base.LazyValue{}
     156           1 :                 return
     157           1 :         }
     158           2 :         writerMeta, err := tw.Metadata()
     159           2 :         if err != nil {
     160           0 :                 r.err = err
     161           0 :                 return
     162           0 :         }
     163           2 :         if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
     164           0 :                 r.err = err
     165           0 :                 return
     166           0 :         }
     167           2 :         r.tables[len(r.tables)-1].WriterMeta = *writerMeta
     168           2 :         r.stats.CumulativeWrittenSize += writerMeta.Size
     169             : }
     170             : 
     171           2 : func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ error) {
     172           2 :         const updateSlotEveryNKeys = 1024
     173           2 :         firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
     174           2 :         if r.key != nil && firstKey == nil {
     175           2 :                 firstKey = r.key.UserKey
     176           2 :         }
     177           2 :         if firstKey == nil {
     178           0 :                 return nil, base.AssertionFailedf("no data to write")
     179           0 :         }
     180           2 :         splitter := NewOutputSplitter(
     181           2 :                 r.cmp, firstKey, r.TableSplitLimit(firstKey),
     182           2 :                 r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
     183           2 :         )
     184           2 :         equalPrev := func(k []byte) bool {
     185           2 :                 return tw.ComparePrev(k) == 0
     186           2 :         }
     187           2 :         var pinnedKeySize, pinnedValueSize, pinnedCount uint64
     188           2 :         var iteratedKeys uint64
     189           2 :         key, value := r.key, r.value
     190           2 :         for ; key != nil; key, value = r.iter.Next() {
     191           2 :                 iteratedKeys++
     192           2 :                 if iteratedKeys%updateSlotEveryNKeys == 0 {
     193           1 :                         r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize())
     194           1 :                 }
     195           2 :                 if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), equalPrev) {
     196           2 :                         break
     197             :                 }
     198             : 
     199           2 :                 switch key.Kind() {
     200           2 :                 case base.InternalKeyKindRangeDelete:
     201           2 :                         // The previous span (if any) must end at or before this key, since the
     202           2 :                         // spans we receive are non-overlapping.
     203           2 :                         if err := tw.EncodeSpan(r.lastRangeDelSpan); r.err != nil {
     204           0 :                                 return nil, err
     205           0 :                         }
     206           2 :                         r.lastRangeDelSpan.CopyFrom(r.iter.Span())
     207           2 :                         continue
     208             : 
     209           2 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     210           2 :                         // The previous span (if any) must end at or before this key, since the
     211           2 :                         // spans we receive are non-overlapping.
     212           2 :                         if err := tw.EncodeSpan(r.lastRangeKeySpan); err != nil {
     213           0 :                                 return nil, err
     214           0 :                         }
     215           2 :                         r.lastRangeKeySpan.CopyFrom(r.iter.Span())
     216           2 :                         continue
     217             :                 }
     218           2 :                 v, _, err := value.Value(nil)
     219           2 :                 if err != nil {
     220           0 :                         return nil, err
     221           0 :                 }
     222           2 :                 if err := tw.AddWithForceObsolete(*key, v, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
     223           0 :                         return nil, err
     224           0 :                 }
     225           2 :                 if r.iter.SnapshotPinned() {
     226           2 :                         // The kv pair we just added to the sstable was only surfaced by
     227           2 :                         // the compaction iterator because an open snapshot prevented
     228           2 :                         // its elision. Increment the stats.
     229           2 :                         pinnedCount++
     230           2 :                         pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
     231           2 :                         pinnedValueSize += uint64(len(v))
     232           2 :                 }
     233             :         }
     234           2 :         r.key, r.value = key, value
     235           2 :         splitKey = splitter.SplitKey()
     236           2 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
     237           0 :                 return nil, err
     238           0 :         }
     239           2 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
     240           0 :                 return nil, err
     241           0 :         }
     242             :         // Set internal sstable properties.
     243           2 :         tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize)
     244           2 :         r.stats.CumulativePinnedKeys += pinnedCount
     245           2 :         r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
     246           2 :         r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize())
     247           2 :         return splitKey, nil
     248             : }
     249             : 
     250             : // Finish closes the compaction iterator and returns the result of the
     251             : // compaction.
     252           2 : func (r *Runner) Finish() Result {
     253           2 :         r.err = errors.CombineErrors(r.err, r.iter.Close())
     254           2 :         // The compaction iterator keeps track of a count of the number of DELSIZED
     255           2 :         // keys that encoded an incorrect size.
     256           2 :         r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
     257           2 :         r.cfg.Slot.Release(r.stats.CumulativeWrittenSize)
     258           2 :         return Result{
     259           2 :                 Err:    r.err,
     260           2 :                 Tables: r.tables,
     261           2 :                 Stats:  r.stats,
     262           2 :         }
     263           2 : }
     264             : 
     265             : // TableSplitLimit returns a hard split limit for an output table that starts at
     266             : // startKey (which must be strictly greater than startKey), or nil if there is
     267             : // no limit.
     268           2 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
     269           2 :         var limitKey []byte
     270           2 : 
     271           2 :         // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
     272           2 :         // that table can extend without excessively overlapping the grandparent
     273           2 :         // level. If no limit is needed considering the grandparent, limitKey stays
     274           2 :         // nil.
     275           2 :         //
     276           2 :         // This is done in order to prevent a table at level N from overlapping too
     277           2 :         // much data at level N+1. We want to avoid such large overlaps because they
     278           2 :         // translate into large compactions. The current heuristic stops output of a
     279           2 :         // table if the addition of another key would cause the table to overlap more
     280           2 :         // than 10x the target file size at level N. See
     281           2 :         // compaction.maxGrandparentOverlapBytes.
     282           2 :         iter := r.cfg.Grandparents.Iter()
     283           2 :         var overlappedBytes uint64
     284           2 :         f := iter.SeekGE(r.cmp, startKey)
     285           2 :         // Handle an overlapping table.
     286           2 :         if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
     287           2 :                 overlappedBytes += f.Size
     288           2 :                 f = iter.Next()
     289           2 :         }
     290           2 :         for ; f != nil; f = iter.Next() {
     291           2 :                 overlappedBytes += f.Size
     292           2 :                 if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
     293           2 :                         limitKey = f.Smallest.UserKey
     294           2 :                         break
     295             :                 }
     296             :         }
     297             : 
     298           2 :         if len(r.cfg.L0SplitKeys) != 0 {
     299           2 :                 // Find the first split key that is greater than startKey.
     300           2 :                 index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
     301           2 :                         return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
     302           2 :                 })
     303           2 :                 if index < len(r.cfg.L0SplitKeys) {
     304           2 :                         limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
     305           2 :                 }
     306             :         }
     307             : 
     308           2 :         return limitKey
     309             : }
     310             : 
     311             : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
     312             : // table that was just finished. splitKey is the key where the table must have
     313             : // ended (or nil).
     314           2 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
     315           2 :         if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
     316           0 :                 return base.AssertionFailedf("output table has no keys")
     317           0 :         }
     318             : 
     319           2 :         var err error
     320           2 :         checkBounds := func(smallest, largest base.InternalKey, description string) {
     321           2 :                 bounds := base.UserKeyBoundsFromInternal(smallest, largest)
     322           2 :                 if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
     323           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     324           0 :                                 "output table %s bounds %s extend beyond compaction bounds %s",
     325           0 :                                 description, bounds, r.cfg.CompactionBounds,
     326           0 :                         ))
     327           0 :                 }
     328           2 :                 if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
     329           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     330           0 :                                 "output table %s bounds %s extend beyond split key %s",
     331           0 :                                 description, bounds, splitKey,
     332           0 :                         ))
     333           0 :                 }
     334             :         }
     335             : 
     336           2 :         if meta.HasPointKeys {
     337           2 :                 checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
     338           2 :         }
     339           2 :         if meta.HasRangeDelKeys {
     340           2 :                 checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
     341           2 :         }
     342           2 :         if meta.HasRangeKeys {
     343           2 :                 checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
     344           2 :         }
     345           2 :         return err
     346             : }
     347             : 
     348           2 : func spanStartOrNil(s *keyspan.Span) []byte {
     349           2 :         if s.Empty() {
     350           2 :                 return nil
     351           2 :         }
     352           2 :         return s.Start
     353             : }

Generated by: LCOV version 1.14