LCOV - code coverage report
Current view: top level - pebble - value_separation.go (source / functions) Coverage Total Hit
Test: 2025-05-13 08:18Z 3449cfbb - tests only.lcov Lines: 87.3 % 251 219
Test Date: 2025-05-13 08:19:17 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "maps"
      10              :         "slices"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/compact"
      16              :         "github.com/cockroachdb/pebble/internal/invariants"
      17              :         "github.com/cockroachdb/pebble/internal/manifest"
      18              :         "github.com/cockroachdb/pebble/objstorage"
      19              :         "github.com/cockroachdb/pebble/sstable"
      20              :         "github.com/cockroachdb/pebble/sstable/blob"
      21              : )
      22              : 
      23            0 : var neverSeparateValues getValueSeparation = func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation {
      24            0 :         return compact.NeverSeparateValues{}
      25            0 : }
      26              : 
      27              : // determineCompactionValueSeparation determines whether a compaction should
      28              : // separate values into blob files. It returns a compact.ValueSeparation
      29              : // implementation that should be used for the compaction.
      30              : func (d *DB) determineCompactionValueSeparation(
      31              :         jobID JobID, c *compaction, tableFormat sstable.TableFormat,
      32            1 : ) compact.ValueSeparation {
      33            1 :         if tableFormat < sstable.TableFormatPebblev6 || d.FormatMajorVersion() < FormatExperimentalValueSeparation ||
      34            1 :                 d.opts.Experimental.ValueSeparationPolicy == nil {
      35            1 :                 return compact.NeverSeparateValues{}
      36            1 :         }
      37            1 :         policy := d.opts.Experimental.ValueSeparationPolicy()
      38            1 :         if !policy.Enabled {
      39            0 :                 return compact.NeverSeparateValues{}
      40            0 :         }
      41              : 
      42              :         // We're allowed to write blob references. Determine whether we should carry
      43              :         // forward existing blob references, or write new ones.
      44            1 :         if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
      45            1 :                 // This compaction should preserve existing blob references.
      46            1 :                 return &preserveBlobReferences{
      47            1 :                         inputBlobMetadatas:       uniqueInputBlobMetadatas(c.inputs),
      48            1 :                         outputBlobReferenceDepth: outputBlobReferenceDepth,
      49            1 :                 }
      50            1 :         }
      51              : 
      52              :         // This compaction should write values to new blob files.
      53            1 :         return &writeNewBlobFiles{
      54            1 :                 comparer: d.opts.Comparer,
      55            1 :                 newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
      56            1 :                         return d.newCompactionOutputBlob(jobID, c)
      57            1 :                 },
      58              :                 shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
      59              :                 writerOpts:         d.opts.MakeBlobWriterOptions(c.outputLevel.level),
      60              :                 minimumSize:        policy.MinimumSize,
      61              :         }
      62              : }
      63              : 
      64              : // shouldWriteBlobFiles returns true if the compaction should write new blob
      65              : // files. If it returns false, the referenceDepth return value contains the
      66              : // maximum blob reference depth to assign to output sstables (the actual value
      67              : // may be lower iff the output table references fewer distinct blob files).
      68              : func shouldWriteBlobFiles(
      69              :         c *compaction, policy ValueSeparationPolicy,
      70            1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
      71            1 :         // Flushes will have no existing references to blob files and should write
      72            1 :         // their values to new blob files.
      73            1 :         if c.kind == compactionKindFlush {
      74            1 :                 return true, 0
      75            1 :         }
      76            1 :         inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
      77            1 :         if inputReferenceDepth == 0 {
      78            0 :                 // None of the input sstables reference blob files. It may be the case
      79            0 :                 // that these sstables were created before value separation was enabled.
      80            0 :                 // We should try to write to new blob files.
      81            0 :                 return true, 0
      82            0 :         }
      83              :         // If the compaction's output blob reference depth would be greater than the
      84              :         // configured max, we should rewrite the values into new blob files to
      85              :         // restore locality.
      86            1 :         if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
      87            1 :                 return true, 0
      88            1 :         }
      89              :         // Otherwise, we won't write any new blob files but will carry forward
      90              :         // existing references.
      91            1 :         return false, inputReferenceDepth
      92              : }
      93              : 
      94              : // compactionBlobReferenceDepth computes the blob reference depth for a
      95              : // compaction. It's computed by finding the maximum blob reference depth of
      96              : // input sstables in each level. These per-level depths are then summed to
      97              : // produce a worst-case approximation of the blob reference locality of the
      98              : // compaction's output sstables.
      99              : //
     100              : // The intuition is that as compactions combine files referencing distinct blob
     101              : // files, outputted sstables begin to reference more and more distinct blob
     102              : // files. In the worst case, these references are evenly distributed across the
     103              : // keyspace and there is very little locality.
     104            1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
     105            1 :         // TODO(jackson): Consider using a range tree to precisely compute the
     106            1 :         // depth. This would require maintaining minimum and maximum keys.
     107            1 :         var depth manifest.BlobReferenceDepth
     108            1 :         for _, level := range levels {
     109            1 :                 // L0 allows files to overlap one another, so it's not sufficient to
     110            1 :                 // just take the maximum within the level. Instead, we need to sum the
     111            1 :                 // max of each sublevel.
     112            1 :                 //
     113            1 :                 // TODO(jackson): This and other compaction logic would likely be
     114            1 :                 // cleaner if we modeled each sublevel as its own `compactionLevel`.
     115            1 :                 if level.level == 0 {
     116            1 :                         for _, sublevel := range level.l0SublevelInfo {
     117            1 :                                 var sublevelDepth int
     118            1 :                                 for t := range sublevel.LevelSlice.All() {
     119            1 :                                         sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
     120            1 :                                 }
     121            1 :                                 depth += manifest.BlobReferenceDepth(sublevelDepth)
     122              :                         }
     123            1 :                         continue
     124              :                 }
     125              : 
     126            1 :                 var levelDepth manifest.BlobReferenceDepth
     127            1 :                 for t := range level.files.All() {
     128            1 :                         levelDepth = max(levelDepth, t.BlobReferenceDepth)
     129            1 :                 }
     130            1 :                 depth += levelDepth
     131              :         }
     132            1 :         return depth
     133              : }
     134              : 
     135              : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
     136              : // objects referenced by tables in levels.
     137            1 : func uniqueInputBlobMetadatas(levels []compactionLevel) []*manifest.BlobFileMetadata {
     138            1 :         m := make(map[*manifest.BlobFileMetadata]struct{})
     139            1 :         for _, level := range levels {
     140            1 :                 for t := range level.files.All() {
     141            1 :                         for _, ref := range t.BlobReferences {
     142            1 :                                 m[ref.Metadata] = struct{}{}
     143            1 :                         }
     144              :                 }
     145              :         }
     146            1 :         metadatas := slices.Collect(maps.Keys(m))
     147            1 :         slices.SortFunc(metadatas, func(a, b *manifest.BlobFileMetadata) int {
     148            1 :                 return cmp.Compare(a.FileNum, b.FileNum)
     149            1 :         })
     150            1 :         return metadatas
     151              : }
     152              : 
     153              : // writeNewBlobFiles implements the strategy and mechanics for separating values
     154              : // into external blob files.
     155              : type writeNewBlobFiles struct {
     156              :         comparer *base.Comparer
     157              :         // newBlobObject constructs a new blob object for use in the compaction.
     158              :         newBlobObject      func() (objstorage.Writable, objstorage.ObjectMetadata, error)
     159              :         shortAttrExtractor ShortAttributeExtractor
     160              :         // writerOpts is used to configure all constructed blob writers.
     161              :         writerOpts blob.FileWriterOptions
     162              :         // minimumSize imposes a lower bound on the size of values that can be
     163              :         // separated into a blob file. Values smaller than this are always written
     164              :         // to the sstable (but may still be written to a value block within the
     165              :         // sstable).
     166              :         minimumSize int
     167              : 
     168              :         // Current blob writer state
     169              :         writer  *blob.FileWriter
     170              :         objMeta objstorage.ObjectMetadata
     171              : 
     172              :         buf []byte
     173              : }
     174              : 
     175              : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
     176              : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
     177              : 
     178              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     179              : // blob file if it were closed now.
     180            1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
     181            1 :         if vs.writer == nil {
     182            1 :                 return 0
     183            1 :         }
     184            1 :         return vs.writer.EstimatedSize()
     185              : }
     186              : 
     187              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     188              : // current output sstable's blob references so far.
     189            1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
     190            1 :         // When we're writing to new blob files, the size of the blob file itself is
     191            1 :         // a better estimate of the disk space consumed than the uncompressed value
     192            1 :         // sizes.
     193            1 :         return vs.EstimatedFileSize()
     194            1 : }
     195              : 
     196              : // Add adds the provided key-value pair to the sstable, possibly separating the
     197              : // value into a blob file.
     198              : func (vs *writeNewBlobFiles) Add(
     199              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     200            1 : ) error {
     201            1 :         // We always fetch the value if we're rewriting blob files. We want to
     202            1 :         // replace any references to existing blob files with references to new blob
     203            1 :         // files that we write during the compaction.
     204            1 :         v, callerOwned, err := kv.Value(vs.buf)
     205            1 :         if err != nil {
     206            0 :                 return err
     207            0 :         }
     208            1 :         if callerOwned {
     209            0 :                 vs.buf = v[:0]
     210            0 :         }
     211              : 
     212              :         // Values that are too small are never separated.
     213            1 :         if len(v) < vs.minimumSize {
     214            1 :                 return tw.Add(kv.K, v, forceObsolete)
     215            1 :         }
     216              :         // Merge and deletesized keys are never separated.
     217            1 :         switch kv.K.Kind() {
     218            1 :         case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
     219            1 :                 return tw.Add(kv.K, v, forceObsolete)
     220              :         }
     221              : 
     222              :         // This KV met all the criteria and its value will be separated.
     223              :         // If there's a configured short attribute extractor, extract the value's
     224              :         // short attribute.
     225            1 :         var shortAttr base.ShortAttribute
     226            1 :         if vs.shortAttrExtractor != nil {
     227            0 :                 keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
     228            0 :                 shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
     229            0 :                 if err != nil {
     230            0 :                         return err
     231            0 :                 }
     232              :         }
     233              : 
     234              :         // If we don't have an open blob writer, create one. We create blob objects
     235              :         // lazily so that we don't create them unless a compaction will actually
     236              :         // write to a blob file. This avoids creating and deleting empty blob files
     237              :         // on every compaction in parts of the keyspace that a) are required to be
     238              :         // in-place or b) have small values.
     239            1 :         if vs.writer == nil {
     240            1 :                 writable, objMeta, err := vs.newBlobObject()
     241            1 :                 if err != nil {
     242            0 :                         return err
     243            0 :                 }
     244            1 :                 vs.objMeta = objMeta
     245            1 :                 vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
     246              :         }
     247              : 
     248              :         // Append the value to the blob file.
     249            1 :         handle := vs.writer.AddValue(v)
     250            1 : 
     251            1 :         // Write the key and the handle to the sstable. We need to map the
     252            1 :         // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
     253            1 :         // except the FileNum is translated into a reference index.
     254            1 :         inlineHandle := blob.InlineHandle{
     255            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     256            1 :                         // Since we're writing blob files and maintaining a 1:1 mapping
     257            1 :                         // between sstables and blob files, the reference index is always 0
     258            1 :                         // here. Only compactions that don't rewrite blob files will produce
     259            1 :                         // handles with nonzero reference indices.
     260            1 :                         ReferenceID: 0,
     261            1 :                         ValueLen:    handle.ValueLen,
     262            1 :                 },
     263            1 :                 HandleSuffix: blob.HandleSuffix{
     264            1 :                         BlockNum:      handle.BlockNum,
     265            1 :                         OffsetInBlock: handle.OffsetInBlock,
     266            1 :                 },
     267            1 :         }
     268            1 :         return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
     269              : }
     270              : 
     271              : // FinishOutput closes the current blob file (if any). It returns the stats
     272              : // and metadata of the now completed blob file.
     273            1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
     274            1 :         if vs.writer == nil {
     275            1 :                 return compact.ValueSeparationMetadata{}, nil
     276            1 :         }
     277            1 :         stats, err := vs.writer.Close()
     278            1 :         if err != nil {
     279            0 :                 return compact.ValueSeparationMetadata{}, err
     280            0 :         }
     281            1 :         vs.writer = nil
     282            1 :         meta := &manifest.BlobFileMetadata{
     283            1 :                 FileNum:      vs.objMeta.DiskFileNum,
     284            1 :                 Size:         stats.FileLen,
     285            1 :                 ValueSize:    stats.UncompressedValueBytes,
     286            1 :                 CreationTime: uint64(time.Now().Unix()),
     287            1 :         }
     288            1 :         return compact.ValueSeparationMetadata{
     289            1 :                 BlobReferences: manifest.BlobReferences{{
     290            1 :                         FileNum:   vs.objMeta.DiskFileNum,
     291            1 :                         ValueSize: stats.UncompressedValueBytes,
     292            1 :                         Metadata:  meta,
     293            1 :                 }},
     294            1 :                 BlobReferenceSize:  stats.UncompressedValueBytes,
     295            1 :                 BlobReferenceDepth: 1,
     296            1 :                 BlobFileStats:      stats,
     297            1 :                 BlobFileObject:     vs.objMeta,
     298            1 :                 BlobFileMetadata:   meta,
     299            1 :         }, nil
     300              : }
     301              : 
     302              : // preserveBlobReferences implements the compact.ValueSeparation interface. When
     303              : // a compaction is configured with preserveBlobReferences, the compaction will
     304              : // not create any new blob files. However, input references to existing blob
     305              : // references will be preserved and metadata about the table's blob references
     306              : // will be collected.
     307              : type preserveBlobReferences struct {
     308              :         // inputBlobMetadatas should be populated to include the *BlobFileMetadata
     309              :         // for every unique blob file referenced by input sstables.
     310              :         // inputBlobMetadatas must be sorted by FileNum.
     311              :         inputBlobMetadatas       []*manifest.BlobFileMetadata
     312              :         outputBlobReferenceDepth manifest.BlobReferenceDepth
     313              : 
     314              :         // state
     315              :         buf            []byte
     316              :         currReferences manifest.BlobReferences
     317              :         // totalValueSize is the sum of the sizes of all ValueSizes in currReferences.
     318              :         totalValueSize uint64
     319              : }
     320              : 
     321              : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
     322              : // interface.
     323              : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
     324              : 
     325              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     326              : // blob file if it were closed now.
     327            1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
     328            1 :         return 0
     329            1 : }
     330              : 
     331              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     332              : // current output sstable's blob references so far.
     333            1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
     334            1 :         // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
     335            1 :         // compressible data, it overestimates the disk space consumed by the blob
     336            1 :         // references. It also does not include the blob file's index block or
     337            1 :         // footer, so it can underestimate if values are completely incompressible.
     338            1 :         //
     339            1 :         // Should we compute a compression ratio per blob file and scale the
     340            1 :         // references appropriately?
     341            1 :         return vs.totalValueSize
     342            1 : }
     343              : 
     344              : // Add implements compact.ValueSeparation. This implementation will write
     345              : // existing blob references to the output table.
     346              : func (vs *preserveBlobReferences) Add(
     347              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     348            1 : ) error {
     349            1 :         if !kv.V.IsBlobValueHandle() {
     350            1 :                 // If the value is not already a blob handle (either it's in-place or in
     351            1 :                 // a value block), we retrieve the value and write it through Add. The
     352            1 :                 // sstable writer may still decide to put the value in a value block,
     353            1 :                 // but regardless the value will be written to the sstable itself and
     354            1 :                 // not a blob file.
     355            1 :                 v, callerOwned, err := kv.Value(vs.buf)
     356            1 :                 if err != nil {
     357            0 :                         return err
     358            0 :                 }
     359            1 :                 if callerOwned {
     360            0 :                         vs.buf = v[:0]
     361            0 :                 }
     362            1 :                 return tw.Add(kv.K, v, forceObsolete)
     363              :         }
     364              : 
     365              :         // The value is an existing blob handle. We can copy it into the output
     366              :         // sstable, taking note of the reference for the table metadata.
     367            1 :         lv := kv.V.LazyValue()
     368            1 :         fn := lv.Fetcher.BlobFileNum
     369            1 : 
     370            1 :         refID, found := vs.currReferences.IDByFileNum(fn)
     371            1 :         if !found {
     372            1 :                 // This is the first time we're seeing this blob file for this sstable.
     373            1 :                 // Find the blob file metadata for this file among the input metadatas.
     374            1 :                 idx, found := vs.findInputBlobMetadata(fn)
     375            1 :                 if !found {
     376            0 :                         return errors.AssertionFailedf("pebble: blob file %s not found among input sstables", fn)
     377            0 :                 }
     378            1 :                 refID = blob.ReferenceID(len(vs.currReferences))
     379            1 :                 vs.currReferences = append(vs.currReferences, manifest.BlobReference{
     380            1 :                         FileNum:  fn,
     381            1 :                         Metadata: vs.inputBlobMetadatas[idx],
     382            1 :                 })
     383              :         }
     384              : 
     385            1 :         if invariants.Enabled && vs.currReferences[refID].Metadata.FileNum != fn {
     386            0 :                 panic("wrong reference index")
     387              :         }
     388              : 
     389            1 :         handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
     390            1 :         inlineHandle := blob.InlineHandle{
     391            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     392            1 :                         ReferenceID: refID,
     393            1 :                         ValueLen:    lv.Fetcher.Attribute.ValueLen,
     394            1 :                 },
     395            1 :                 HandleSuffix: handleSuffix,
     396            1 :         }
     397            1 :         err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
     398            1 :         if err != nil {
     399            0 :                 return err
     400            0 :         }
     401            1 :         vs.currReferences[refID].ValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     402            1 :         vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     403            1 :         return nil
     404              : }
     405              : 
     406              : // findInputBlobMetadata returns the index of the input blob metadata that
     407              : // corresponds to the provided file number. If the file number is not found,
     408              : // the function returns false in the second return value.
     409            1 : func (vs *preserveBlobReferences) findInputBlobMetadata(fn base.DiskFileNum) (int, bool) {
     410            1 :         return slices.BinarySearchFunc(vs.inputBlobMetadatas, fn,
     411            1 :                 func(bm *manifest.BlobFileMetadata, fn base.DiskFileNum) int {
     412            1 :                         return cmp.Compare(bm.FileNum, fn)
     413            1 :                 })
     414              : }
     415              : 
     416              : // FinishOutput implements compact.ValueSeparation.
     417            1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
     418            1 :         references := slices.Clone(vs.currReferences)
     419            1 :         vs.currReferences = vs.currReferences[:0]
     420            1 :         vs.totalValueSize = 0
     421            1 : 
     422            1 :         referenceSize := uint64(0)
     423            1 :         for _, ref := range references {
     424            1 :                 referenceSize += ref.ValueSize
     425            1 :         }
     426            1 :         return compact.ValueSeparationMetadata{
     427            1 :                 BlobReferences:    references,
     428            1 :                 BlobReferenceSize: referenceSize,
     429            1 :                 // The outputBlobReferenceDepth is computed from the input sstables,
     430            1 :                 // reflecting the worst-case overlap of referenced blob files. If this
     431            1 :                 // sstable references fewer unique blob files, reduce its depth to the
     432            1 :                 // count of unique files.
     433            1 :                 BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
     434            1 :         }, nil
     435              : }
        

Generated by: LCOV version 2.0-1