LCOV - code coverage report
Current view: top level - pebble/internal/compact - run.go (source / functions) Hit Total Coverage
Test: 2024-08-06 08:17Z 65445b8a - tests only.lcov Lines: 155 186 83.3 %
Date: 2024-08-06 08:17:32 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package compact
       6             : 
       7             : import (
       8             :         "sort"
       9             :         "time"
      10             : 
      11             :         "github.com/cockroachdb/errors"
      12             :         "github.com/cockroachdb/pebble/internal/base"
      13             :         "github.com/cockroachdb/pebble/internal/keyspan"
      14             :         "github.com/cockroachdb/pebble/internal/manifest"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             :         "github.com/cockroachdb/pebble/sstable"
      17             : )
      18             : 
      19             : // Result stores the result of a compaction - more specifically, the "data" part
      20             : // where we use the compaction iterator to write output tables.
      21             : type Result struct {
      22             :         // Err is the result of the compaction. On success, Err is nil and Tables
      23             :         // stores the output tables. On failure, Err is set and Tables stores the
      24             :         // tables created so far (and which need to be cleaned up).
      25             :         Err    error
      26             :         Tables []OutputTable
      27             :         Stats  Stats
      28             : }
      29             : 
      30             : // WithError returns a modified Result which has the Err field set.
      31           1 : func (r Result) WithError(err error) Result {
      32           1 :         return Result{
      33           1 :                 Err:    errors.CombineErrors(r.Err, err),
      34           1 :                 Tables: r.Tables,
      35           1 :                 Stats:  r.Stats,
      36           1 :         }
      37           1 : }
      38             : 
      39             : // OutputTable contains metadata about a table that was created during a compaction.
      40             : type OutputTable struct {
      41             :         CreationTime time.Time
      42             :         // ObjMeta is metadata for the object backing the table.
      43             :         ObjMeta objstorage.ObjectMetadata
      44             :         // WriterMeta is populated once the table is fully written. On compaction
      45             :         // failure (see Result), WriterMeta might not be set.
      46             :         WriterMeta sstable.WriterMetadata
      47             : }
      48             : 
      49             : // Stats describes stats collected during the compaction.
      50             : type Stats struct {
      51             :         CumulativePinnedKeys uint64
      52             :         CumulativePinnedSize uint64
      53             :         CountMissizedDels    uint64
      54             : }
      55             : 
      56             : // RunnerConfig contains the parameters needed for the Runner.
      57             : type RunnerConfig struct {
      58             :         // CompactionBounds are the bounds containing all the input tables. All output
      59             :         // tables must fall within these bounds as well.
      60             :         CompactionBounds base.UserKeyBounds
      61             : 
      62             :         // L0SplitKeys is only set for flushes and it contains the flush split keys
      63             :         // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
      64             :         // output tables.
      65             :         L0SplitKeys [][]byte
      66             : 
      67             :         // Grandparents are the tables in level+2 that overlap with the files being
      68             :         // compacted. Used to determine output table boundaries. Do not assume that
      69             :         // the actual files in the grandparent when this compaction finishes will be
      70             :         // the same.
      71             :         Grandparents manifest.LevelSlice
      72             : 
      73             :         // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
      74             :         // allowed for a single output table with the tables in the grandparent level.
      75             :         MaxGrandparentOverlapBytes uint64
      76             : 
      77             :         // TargetOutputFileSize is the desired size of an individual table created
      78             :         // during compaction. In practice, the sizes can vary between 50%-200% of this
      79             :         // value.
      80             :         TargetOutputFileSize uint64
      81             : }
      82             : 
      83             : // Runner is a helper for running the "data" part of a compaction (where we use
      84             : // the compaction iterator to write output tables).
      85             : //
      86             : // Sample usage:
      87             : //
      88             : //      r := NewRunner(cfg, iter)
      89             : //      for r.MoreDataToWrite() {
      90             : //        objMeta, tw := ... // Create object and table writer.
      91             : //        r.WriteTable(objMeta, tw)
      92             : //      }
      93             : //      result := r.Finish()
      94             : type Runner struct {
      95             :         cmp  base.Compare
      96             :         cfg  RunnerConfig
      97             :         iter *Iter
      98             : 
      99             :         tables []OutputTable
     100             :         // Stores any error encountered.
     101             :         err error
     102             :         // Last key/value returned by the compaction iterator.
     103             :         key   *base.InternalKey
     104             :         value []byte
     105             :         // Last RANGEDEL span (or portion of it) that was not yet written to a table.
     106             :         lastRangeDelSpan keyspan.Span
     107             :         // Last range key span (or portion of it) that was not yet written to a table.
     108             :         lastRangeKeySpan keyspan.Span
     109             :         stats            Stats
     110             : }
     111             : 
     112             : // NewRunner creates a new Runner.
     113           1 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
     114           1 :         r := &Runner{
     115           1 :                 cmp:  iter.cmp,
     116           1 :                 cfg:  cfg,
     117           1 :                 iter: iter,
     118           1 :         }
     119           1 :         r.key, r.value = r.iter.First()
     120           1 :         return r
     121           1 : }
     122             : 
     123             : // MoreDataToWrite returns true if there is more data to be written.
     124           1 : func (r *Runner) MoreDataToWrite() bool {
     125           1 :         if r.err != nil {
     126           1 :                 return false
     127           1 :         }
     128           1 :         return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
     129             : }
     130             : 
     131             : // WriteTable writes a new output table. This table will be part of
     132             : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
     133             : //
     134             : // WriteTable always closes the Writer.
     135           1 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWriter) {
     136           1 :         if r.err != nil {
     137           0 :                 panic("error already encountered")
     138             :         }
     139           1 :         r.tables = append(r.tables, OutputTable{
     140           1 :                 CreationTime: time.Now(),
     141           1 :                 ObjMeta:      objMeta,
     142           1 :         })
     143           1 :         splitKey, err := r.writeKeysToTable(tw)
     144           1 :         err = errors.CombineErrors(err, tw.Close())
     145           1 :         if err != nil {
     146           1 :                 r.err = err
     147           1 :                 r.key, r.value = nil, nil
     148           1 :                 return
     149           1 :         }
     150           1 :         writerMeta, err := tw.Metadata()
     151           1 :         if err != nil {
     152           0 :                 r.err = err
     153           0 :                 return
     154           0 :         }
     155           1 :         if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
     156           0 :                 r.err = err
     157           0 :                 return
     158           0 :         }
     159           1 :         r.tables[len(r.tables)-1].WriterMeta = *writerMeta
     160             : }
     161             : 
     162           1 : func (r *Runner) writeKeysToTable(tw *sstable.RawWriter) (splitKey []byte, _ error) {
     163           1 :         firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
     164           1 :         if r.key != nil && firstKey == nil {
     165           1 :                 firstKey = r.key.UserKey
     166           1 :         }
     167           1 :         if firstKey == nil {
     168           0 :                 return nil, base.AssertionFailedf("no data to write")
     169           0 :         }
     170           1 :         splitter := NewOutputSplitter(
     171           1 :                 r.cmp, firstKey, r.TableSplitLimit(firstKey),
     172           1 :                 r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
     173           1 :         )
     174           1 :         lastUserKeyFn := func() []byte {
     175           1 :                 return tw.UnsafeLastPointUserKey()
     176           1 :         }
     177           1 :         var pinnedKeySize, pinnedValueSize, pinnedCount uint64
     178           1 :         key, value := r.key, r.value
     179           1 :         for ; key != nil; key, value = r.iter.Next() {
     180           1 :                 if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
     181           1 :                         break
     182             :                 }
     183             : 
     184           1 :                 switch key.Kind() {
     185           1 :                 case base.InternalKeyKindRangeDelete:
     186           1 :                         // The previous span (if any) must end at or before this key, since the
     187           1 :                         // spans we receive are non-overlapping.
     188           1 :                         if err := tw.EncodeSpan(r.lastRangeDelSpan); r.err != nil {
     189           0 :                                 return nil, err
     190           0 :                         }
     191           1 :                         r.lastRangeDelSpan.CopyFrom(r.iter.Span())
     192           1 :                         continue
     193             : 
     194           1 :                 case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
     195           1 :                         // The previous span (if any) must end at or before this key, since the
     196           1 :                         // spans we receive are non-overlapping.
     197           1 :                         if err := tw.EncodeSpan(r.lastRangeKeySpan); err != nil {
     198           0 :                                 return nil, err
     199           0 :                         }
     200           1 :                         r.lastRangeKeySpan.CopyFrom(r.iter.Span())
     201           1 :                         continue
     202             :                 }
     203           1 :                 if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
     204           0 :                         return nil, err
     205           0 :                 }
     206           1 :                 if r.iter.SnapshotPinned() {
     207           1 :                         // The kv pair we just added to the sstable was only surfaced by
     208           1 :                         // the compaction iterator because an open snapshot prevented
     209           1 :                         // its elision. Increment the stats.
     210           1 :                         pinnedCount++
     211           1 :                         pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
     212           1 :                         pinnedValueSize += uint64(len(value))
     213           1 :                 }
     214             :         }
     215           1 :         r.key, r.value = key, value
     216           1 :         splitKey = splitter.SplitKey()
     217           1 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
     218           0 :                 return nil, err
     219           0 :         }
     220           1 :         if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
     221           0 :                 return nil, err
     222           0 :         }
     223             :         // Set internal sstable properties.
     224           1 :         tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize)
     225           1 :         r.stats.CumulativePinnedKeys += pinnedCount
     226           1 :         r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
     227           1 :         return splitKey, nil
     228             : }
     229             : 
     230             : // Finish closes the compaction iterator and returns the result of the
     231             : // compaction.
     232           1 : func (r *Runner) Finish() Result {
     233           1 :         r.err = errors.CombineErrors(r.err, r.iter.Close())
     234           1 :         // The compaction iterator keeps track of a count of the number of DELSIZED
     235           1 :         // keys that encoded an incorrect size.
     236           1 :         r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
     237           1 :         return Result{
     238           1 :                 Err:    r.err,
     239           1 :                 Tables: r.tables,
     240           1 :                 Stats:  r.stats,
     241           1 :         }
     242           1 : }
     243             : 
     244             : // TableSplitLimit returns a hard split limit for an output table that starts at
     245             : // startKey (which must be strictly greater than startKey), or nil if there is
     246             : // no limit.
     247           1 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
     248           1 :         var limitKey []byte
     249           1 : 
     250           1 :         // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
     251           1 :         // that table can extend without excessively overlapping the grandparent
     252           1 :         // level. If no limit is needed considering the grandparent, limitKey stays
     253           1 :         // nil.
     254           1 :         //
     255           1 :         // This is done in order to prevent a table at level N from overlapping too
     256           1 :         // much data at level N+1. We want to avoid such large overlaps because they
     257           1 :         // translate into large compactions. The current heuristic stops output of a
     258           1 :         // table if the addition of another key would cause the table to overlap more
     259           1 :         // than 10x the target file size at level N. See
     260           1 :         // compaction.maxGrandparentOverlapBytes.
     261           1 :         iter := r.cfg.Grandparents.Iter()
     262           1 :         var overlappedBytes uint64
     263           1 :         f := iter.SeekGE(r.cmp, startKey)
     264           1 :         // Handle an overlapping table.
     265           1 :         if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
     266           1 :                 overlappedBytes += f.Size
     267           1 :                 f = iter.Next()
     268           1 :         }
     269           1 :         for ; f != nil; f = iter.Next() {
     270           1 :                 overlappedBytes += f.Size
     271           1 :                 if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
     272           1 :                         limitKey = f.Smallest.UserKey
     273           1 :                         break
     274             :                 }
     275             :         }
     276             : 
     277           1 :         if len(r.cfg.L0SplitKeys) != 0 {
     278           1 :                 // Find the first split key that is greater than startKey.
     279           1 :                 index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
     280           1 :                         return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
     281           1 :                 })
     282           1 :                 if index < len(r.cfg.L0SplitKeys) {
     283           1 :                         limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
     284           1 :                 }
     285             :         }
     286             : 
     287           1 :         return limitKey
     288             : }
     289             : 
     290             : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
     291             : // table that was just finished. splitKey is the key where the table must have
     292             : // ended (or nil).
     293           1 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
     294           1 :         if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
     295           0 :                 return base.AssertionFailedf("output table has no keys")
     296           0 :         }
     297             : 
     298           1 :         var err error
     299           1 :         checkBounds := func(smallest, largest base.InternalKey, description string) {
     300           1 :                 bounds := base.UserKeyBoundsFromInternal(smallest, largest)
     301           1 :                 if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
     302           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     303           0 :                                 "output table %s bounds %s extend beyond compaction bounds %s",
     304           0 :                                 description, bounds, r.cfg.CompactionBounds,
     305           0 :                         ))
     306           0 :                 }
     307           1 :                 if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
     308           0 :                         err = errors.CombineErrors(err, base.AssertionFailedf(
     309           0 :                                 "output table %s bounds %s extend beyond split key %s",
     310           0 :                                 description, bounds, splitKey,
     311           0 :                         ))
     312           0 :                 }
     313             :         }
     314             : 
     315           1 :         if meta.HasPointKeys {
     316           1 :                 checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
     317           1 :         }
     318           1 :         if meta.HasRangeDelKeys {
     319           1 :                 checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
     320           1 :         }
     321           1 :         if meta.HasRangeKeys {
     322           1 :                 checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
     323           1 :         }
     324           1 :         return err
     325             : }
     326             : 
     327           1 : func spanStartOrNil(s *keyspan.Span) []byte {
     328           1 :         if s.Empty() {
     329           1 :                 return nil
     330           1 :         }
     331           1 :         return s.Start
     332             : }

Generated by: LCOV version 1.14