LCOV - code coverage report
Current view: top level - pebble/internal/compact - run.go (source / functions) Hit Total Coverage
Test: 2024-06-07 08:15Z 47da75f0 - tests only.lcov Lines: 159 190 83.7 %
Date: 2024-06-07 08:16:16 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package compact
       6             : 
       7             : import (
       8             :         "sort"
       9             :         "time"
      10             : 
      11             :         "github.com/cockroachdb/errors"
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/internal/keyspan"
      14             :         "github.com/cockroachdb/pebble/internal/manifest"
      15             :         "github.com/cockroachdb/pebble/internal/private"
      16             :         "github.com/cockroachdb/pebble/objstorage"
      17             :         "github.com/cockroachdb/pebble/sstable"
      18             : )
      19             : 
      20             : // Result stores the result of a compaction - more specifically, the "data" part
      21             : // where we use the compaction iterator to write output tables.
      22             : type Result struct {
      23             :         // Err is the result of the compaction. On success, Err is nil and Tables
      24             :         // stores the output tables. On failure, Err is set and Tables stores the
      25             :         // tables created so far (and which need to be cleaned up).
      26             :         Err    error
      27             :         Tables []OutputTable
      28             :         Stats  Stats
      29             : }
      30             : 
      31             : // WithError returns a modified Result which has the Err field set.
      32           1 : func (r Result) WithError(err error) Result {
      33           1 :         return Result{
      34           1 :                 Err:    errors.CombineErrors(r.Err, err),
      35           1 :                 Tables: r.Tables,
      36           1 :                 Stats:  r.Stats,
      37           1 :         }
      38           1 : }
      39             : 
      40             : // OutputTable contains metadata about a table that was created during a compaction.
      41             : type OutputTable struct {
      42             :         CreationTime time.Time
      43             :         // ObjMeta is metadata for the object backing the table.
      44             :         ObjMeta objstorage.ObjectMetadata
      45             :         // WriterMeta is populated once the table is fully written. On compaction
      46             :         // failure (see Result), WriterMeta might not be set.
      47             :         WriterMeta sstable.WriterMetadata
      48             : }
      49             : 
      50             : // Stats describes stats collected during the compaction.
      51             : type Stats struct {
      52             :         CumulativePinnedKeys uint64
      53             :         CumulativePinnedSize uint64
      54             :         CountMissizedDels    uint64
      55             : }
      56             : 
      57             : // RunnerConfig contains the parameters needed for the Runner.
      58             : type RunnerConfig struct {
      59             :         // CompactionBounds are the bounds containing all the input tables. All output
      60             :         // tables must fall within these bounds as well.
      61             :         CompactionBounds base.UserKeyBounds
      62             : 
      63             :         // L0SplitKeys is only set for flushes and it contains the flush split keys
      64             :         // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
      65             :         // output tables.
      66             :         L0SplitKeys [][]byte
      67             : 
      68             :         // Grandparents are the tables in level+2 that overlap with the files being
      69             :         // compacted. Used to determine output table boundaries. Do not assume that
      70             :         // the actual files in the grandparent when this compaction finishes will be
      71             :         // the same.
      72             :         Grandparents manifest.LevelSlice
      73             : 
      74             :         // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
      75             :         // allowed for a single output table with the tables in the grandparent level.
      76             :         MaxGrandparentOverlapBytes uint64
      77             : 
      78             :         // TargetOutputFileSize is the desired size of an individual table created
      79             :         // during compaction. In practice, the sizes can vary between 50%-200% of this
      80             :         // value.
      81             :         TargetOutputFileSize uint64
      82             : }
      83             : 
      84             : // Runner is a helper for running the "data" part of a compaction (where we use
      85             : // the compaction iterator to write output tables).
      86             : //
      87             : // Sample usage:
      88             : //
      89             : //      r := NewRunner(cfg, iter)
      90             : //      for r.MoreDataToWrite() {
      91             : //        objMeta, tw := ... // Create object and table writer.
      92             : //        r.WriteTable(objMeta, tw)
      93             : //      }
      94             : //      result := r.Finish()
      95             : type Runner struct {
      96             :         cmp  base.Compare
      97             :         cfg  RunnerConfig
      98             :         iter *Iter
      99             : 
     100             :         tables []OutputTable
     101             :         // Stores any error encountered.
     102             :         err error
     103             :         // Last key/value returned by the compaction iterator.
     104             :         key   *base.InternalKey
     105             :         value []byte
     106             :         // Last RANGEDEL span (or portion of it) that was not yet written to a table.
     107             :         lastRangeDelSpan keyspan.Span
     108             :         // Last range key span (or portion of it) that was not yet written to a table.
     109             :         lastRangeKeySpan keyspan.Span
     110             :         stats            Stats
     111             : }
     112             : 
     113             : // NewRunner creates a new Runner.
     114           1 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
     115           1 :         r := &Runner{
     116           1 :                 cmp:  iter.cmp,
     117           1 :                 cfg:  cfg,
     118           1 :                 iter: iter,
     119           1 :         }
     120           1 :         r.key, r.value = r.iter.First()
     121           1 :         return r
     122           1 : }
     123             : 
     124             : // MoreDataToWrite returns true if there is more data to be written.
     125           1 : func (r *Runner) MoreDataToWrite() bool {
     126           1 :         if r.err != nil {
     127           1 :                 return false
     128           1 :         }
     129           1 :         return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
     130             : }
     131             : 
     132             : // WriteTable writes a new output table. This table will be part of
     133             : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
     134             : //
     135             : // WriteTable always closes the Writer.
     136           1 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) {
     137           1 :         if r.err != nil {
     138           0 :                 panic("error already encountered")
     139             :         }
     140           1 :         r.tables = append(r.tables, OutputTable{
     141           1 :                 CreationTime: time.Now(),
     142           1 :                 ObjMeta:      objMeta,
     143           1 :         })
     144           1 :         splitKey, err := r.writeKeysToTable(tw)
     145           1 :         err = errors.CombineErrors(err, tw.Close())
     146           1 :         if err != nil {
     147           1 :                 r.err = err
     148           1 :                 r.key, r.value = nil, nil
     149           1 :                 return
     150           1 :         }
     151           1 :         writerMeta, err := tw.Metadata()
     152           1 :         if err != nil {
     153           0 :                 r.err = err
     154           0 :                 return
     155           0 :         }
     156           1 :         if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
     157           0 :                 r.err = err
     158           0 :                 return
     159           0 :         }
     160           1 :         r.tables[len(r.tables)-1].WriterMeta = *writerMeta
     161             : }
     162             : 
     163           1 : func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) {
     164           1 :         firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
     165           1 :         if r.key != nil && firstKey == nil {
     166           1 :                 firstKey = r.key.UserKey
     167           1 :         }
     168           1 :         if firstKey == nil {
     169           0 :                 return nil, base.AssertionFailedf("no data to write")
     170           0 :         }
     171           1 :         splitter := NewOutputSplitter(
     172           1 :                 r.cmp, firstKey, r.TableSplitLimit(firstKey),
     173           1 :                 r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
     174           1 :         )
     175           1 :         lastUserKeyFn := func() []byte {
     176           1 :                 return tw.UnsafeLastPointUserKey()
     177           1 :         }
     178           1 :         var pinnedKeySize, pinnedValueSize, pinnedCount uint64
     179           1 :         key, value := r.key, r.value
     180           1 :         for ; key != nil; key, value = r.iter.Next() {
     181           1 :                 if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
     182           1 :                         break
     183             :                 }
     184             : 
     185           1 :                 switch key.Kind() {
     186           1 :                 case base.InternalKeyKindRangeDelete:
     187           1 :                         // The previous span (if any) must end at or before this key, since the
     188           1 :                         // spans we receive are non-overlapping.
     189           1 :                         if err := tw.EncodeSpan(&r.lastRangeDelSpan); r.err != nil {
     190           0 :                                 return nil, err
     191           0 :                         }
     192           1 :                         r.lastRangeDelSpan.CopyFrom(r.iter.Span())
     193           1 :                         continue
     194             : 
     195           1 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     196           1 :                         // The previous span (if any) must end at or before this key, since the
     197           1 :                         // spans we receive are non-overlapping.
     198           1 :                         if err := tw.EncodeSpan(&r.lastRangeKeySpan); err != nil {
     199           0 :                                 return nil, err
     200           0 :                         }
     201           1 :                         r.lastRangeKeySpan.CopyFrom(r.iter.Span())
     202           1 :                         continue
     203             :                 }
     204           1 :                 if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
     205           0 :                         return nil, err
     206           0 :                 }
     207           1 :                 if r.iter.SnapshotPinned() {
     208           1 :                         // The kv pair we just added to the sstable was only surfaced by
     209           1 :                         // the compaction iterator because an open snapshot prevented
     210           1 :                         // its elision. Increment the stats.
     211           1 :                         pinnedCount++
     212           1 :                         pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
     213           1 :                         pinnedValueSize += uint64(len(value))
     214           1 :                 }
     215             :         }
     216           1 :         r.key, r.value = key, value
     217           1 :         splitKey = splitter.SplitKey()
     218           1 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
     219           0 :                 return nil, err
     220           0 :         }
     221           1 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
     222           0 :                 return nil, err
     223           0 :         }
     224             :         // Set internal sstable properties.
     225           1 :         p := getInternalWriterProperties(tw)
     226           1 :         // Set the snapshot pinned totals.
     227           1 :         p.SnapshotPinnedKeys = pinnedCount
     228           1 :         p.SnapshotPinnedKeySize = pinnedKeySize
     229           1 :         p.SnapshotPinnedValueSize = pinnedValueSize
     230           1 :         r.stats.CumulativePinnedKeys += pinnedCount
     231           1 :         r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
     232           1 :         return splitKey, nil
     233             : }
     234             : 
     235             : // Finish closes the compaction iterator and returns the result of the
     236             : // compaction.
     237           1 : func (r *Runner) Finish() Result {
     238           1 :         r.err = errors.CombineErrors(r.err, r.iter.Close())
     239           1 :         // The compaction iterator keeps track of a count of the number of DELSIZED
     240           1 :         // keys that encoded an incorrect size.
     241           1 :         r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
     242           1 :         return Result{
     243           1 :                 Err:    r.err,
     244           1 :                 Tables: r.tables,
     245           1 :                 Stats:  r.stats,
     246           1 :         }
     247           1 : }
     248             : 
     249             : // TableSplitLimit returns a hard split limit for an output table that starts at
     250             : // startKey (which must be strictly greater than startKey), or nil if there is
     251             : // no limit.
     252           1 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
     253           1 :         var limitKey []byte
     254           1 : 
     255           1 :         // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
     256           1 :         // that table can extend without excessively overlapping the grandparent
     257           1 :         // level. If no limit is needed considering the grandparent, limitKey stays
     258           1 :         // nil.
     259           1 :         //
     260           1 :         // This is done in order to prevent a table at level N from overlapping too
     261           1 :         // much data at level N+1. We want to avoid such large overlaps because they
     262           1 :         // translate into large compactions. The current heuristic stops output of a
     263           1 :         // table if the addition of another key would cause the table to overlap more
     264           1 :         // than 10x the target file size at level N. See
     265           1 :         // compaction.maxGrandparentOverlapBytes.
     266           1 :         iter := r.cfg.Grandparents.Iter()
     267           1 :         var overlappedBytes uint64
     268           1 :         f := iter.SeekGE(r.cmp, startKey)
     269           1 :         // Handle an overlapping table.
     270           1 :         if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
     271           1 :                 overlappedBytes += f.Size
     272           1 :                 f = iter.Next()
     273           1 :         }
     274           1 :         for ; f != nil; f = iter.Next() {
     275           1 :                 overlappedBytes += f.Size
     276           1 :                 if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
     277           1 :                         limitKey = f.Smallest.UserKey
     278           1 :                         break
     279             :                 }
     280             :         }
     281             : 
     282           1 :         if len(r.cfg.L0SplitKeys) != 0 {
     283           1 :                 // Find the first split key that is greater than startKey.
     284           1 :                 index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
     285           1 :                         return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
     286           1 :                 })
     287           1 :                 if index < len(r.cfg.L0SplitKeys) {
     288           1 :                         limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
     289           1 :                 }
     290             :         }
     291             : 
     292           1 :         return limitKey
     293             : }
     294             : 
     295             : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
     296             : // table that was just finished. splitKey is the key where the table must have
     297             : // ended (or nil).
     298           1 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
     299           1 :         if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
     300           0 :                 return base.AssertionFailedf("output table has no keys")
     301           0 :         }
     302             : 
     303           1 :         var err error
     304           1 :         checkBounds := func(smallest, largest base.InternalKey, description string) {
     305           1 :                 bounds := base.UserKeyBoundsFromInternal(smallest, largest)
     306           1 :                 if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
     307           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     308           0 :                                 "output table %s bounds %s extend beyond compaction bounds %s",
     309           0 :                                 description, bounds, r.cfg.CompactionBounds,
     310           0 :                         ))
     311           0 :                 }
     312           1 :                 if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
     313           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     314           0 :                                 "output table %s bounds %s extend beyond split key %s",
     315           0 :                                 description, bounds, splitKey,
     316           0 :                         ))
     317           0 :                 }
     318             :         }
     319             : 
     320           1 :         if meta.HasPointKeys {
     321           1 :                 checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
     322           1 :         }
     323           1 :         if meta.HasRangeDelKeys {
     324           1 :                 checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
     325           1 :         }
     326           1 :         if meta.HasRangeKeys {
     327           1 :                 checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
     328           1 :         }
     329           1 :         return err
     330             : }
     331             : 
     332             : // getInternalWriterProperties accesses a private variable (in the
     333             : // internal/private package) initialized by the sstable Writer. This indirection
     334             : // is necessary to ensure non-Pebble users constructing sstables for ingestion
     335             : // are unable to set internal-only properties.
     336             : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
     337             : 
     338           1 : func spanStartOrNil(s *keyspan.Span) []byte {
     339           1 :         if s.Empty() {
     340           1 :                 return nil
     341           1 :         }
     342           1 :         return s.Start
     343             : }

Generated by: LCOV version 1.14