LCOV - code coverage report
Current view: top level - pebble - value_separation.go (source / functions) Coverage Total Hit
Test: 2025-04-15 08:17Z 56cdd34e - tests only.lcov Lines: 86.7 % 180 156
Test Date: 2025-04-15 08:18:39 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "cmp"
       9              :         "slices"
      10              :         "time"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/pebble/internal/base"
      14              :         "github.com/cockroachdb/pebble/internal/compact"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/manifest"
      17              :         "github.com/cockroachdb/pebble/objstorage"
      18              :         "github.com/cockroachdb/pebble/sstable"
      19              :         "github.com/cockroachdb/pebble/sstable/blob"
      20              : )
      21              : 
      22              : // writeNewBlobFiles implements the strategy and mechanics for separating values
      23              : // into external blob files.
      24              : type writeNewBlobFiles struct {
      25              :         comparer *base.Comparer
      26              :         // newBlobObject constructs a new blob object for use in the compaction.
      27              :         newBlobObject      func() (objstorage.Writable, objstorage.ObjectMetadata, error)
      28              :         shortAttrExtractor ShortAttributeExtractor
      29              :         // writerOpts is used to configure all constructed blob writers.
      30              :         writerOpts blob.FileWriterOptions
      31              :         // minimumSize imposes a lower bound on the size of values that can be
      32              :         // separated into a blob file. Values smaller than this are always written
      33              :         // to the sstable (but may still be written to a value block within the
      34              :         // sstable).
      35              :         minimumSize int
      36              :         // requiredInPlaceValueBound configures a region of the keyspace that must
      37              :         // be written to the sstable in place, and are not eligible for value
      38              :         // separation.
      39              :         requiredInPlaceValueBound UserKeyPrefixBound
      40              : 
      41              :         // Current blob writer state
      42              :         writer  *blob.FileWriter
      43              :         objMeta objstorage.ObjectMetadata
      44              : 
      45              :         buf []byte
      46              : }
      47              : 
      48              : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
      49              : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
      50              : 
      51              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
      52              : // blob file if it were closed now.
      53            1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
      54            1 :         if vs.writer == nil {
      55            1 :                 return 0
      56            1 :         }
      57            1 :         return vs.writer.EstimatedSize()
      58              : }
      59              : 
      60              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
      61              : // current output sstable's blob references so far.
      62            1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
      63            1 :         // When we're writing to new blob files, the size of the blob file itself is
      64            1 :         // a better estimate of the disk space consumed than the uncompressed value
      65            1 :         // sizes.
      66            1 :         return vs.EstimatedFileSize()
      67            1 : }
      68              : 
      69              : // Add adds the provided key-value pair to the sstable, possibly separating the
      70              : // value into a blob file.
      71              : func (vs *writeNewBlobFiles) Add(
      72              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
      73            1 : ) error {
      74            1 :         // We always fetch the value if we're rewriting blob files. We want to
      75            1 :         // replace any references to existing blob files with references to new blob
      76            1 :         // files that we write during the compaction.
      77            1 :         v, callerOwned, err := kv.Value(vs.buf)
      78            1 :         if err != nil {
      79            0 :                 return err
      80            0 :         }
      81            1 :         if callerOwned {
      82            0 :                 vs.buf = v[:0]
      83            0 :         }
      84              : 
      85              :         // Values that are too small are never separated.
      86            1 :         if len(v) < vs.minimumSize {
      87            1 :                 return tw.Add(kv.K, v, forceObsolete)
      88            1 :         }
      89              :         // Merge keys are never separated.
      90            1 :         if kv.K.Kind() == base.InternalKeyKindMerge {
      91            0 :                 return tw.Add(kv.K, v, forceObsolete)
      92            0 :         }
      93              :         // If the user configured bounds requiring some keys' values to be in-place,
      94              :         // compare the user key's prefix against the bounds.
      95            1 :         if !vs.requiredInPlaceValueBound.IsEmpty() {
      96            1 :                 kPrefix := vs.comparer.Split.Prefix(kv.K.UserKey)
      97            1 :                 if vs.comparer.Compare(vs.requiredInPlaceValueBound.Upper, kPrefix) <= 0 {
      98            1 :                         // Common case for CockroachDB. Clear it since all future keys will
      99            1 :                         // be >= this key.
     100            1 :                         vs.requiredInPlaceValueBound = UserKeyPrefixBound{}
     101            1 :                 } else if vs.comparer.Compare(kPrefix, vs.requiredInPlaceValueBound.Lower) >= 0 {
     102            1 :                         // Don't separate the value if the key is within the bounds.
     103            1 :                         return tw.Add(kv.K, v, forceObsolete)
     104            1 :                 }
     105              :         }
     106              : 
     107              :         // This KV met all the criteria and its value will be separated.
     108              : 
     109              :         // If there's a configured short attribute extractor, extract the value's
     110              :         // short attribute.
     111            1 :         var shortAttr base.ShortAttribute
     112            1 :         if vs.shortAttrExtractor != nil {
     113            0 :                 keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
     114            0 :                 shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
     115            0 :                 if err != nil {
     116            0 :                         return err
     117            0 :                 }
     118              :         }
     119              : 
     120              :         // If we don't have an open blob writer, create one. We create blob objects
     121              :         // lazily so that we don't create them unless a compaction will actually
     122              :         // write to a blob file. This avoids creating and deleting empty blob files
     123              :         // on every compaction in parts of the keyspace that a) are required to be
     124              :         // in-place or b) have small values.
     125            1 :         if vs.writer == nil {
     126            1 :                 writable, objMeta, err := vs.newBlobObject()
     127            1 :                 if err != nil {
     128            0 :                         return err
     129            0 :                 }
     130            1 :                 vs.objMeta = objMeta
     131            1 :                 vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
     132              :         }
     133              : 
     134              :         // Append the value to the blob file.
     135            1 :         handle := vs.writer.AddValue(v)
     136            1 : 
     137            1 :         // Write the key and the handle to the sstable. We need to map the
     138            1 :         // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
     139            1 :         // except the FileNum is translated into a reference index.
     140            1 :         inlineHandle := blob.InlineHandle{
     141            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     142            1 :                         // Since we're writing blob files and maintaining a 1:1 mapping
     143            1 :                         // between sstables and blob files, the reference index is always 0
     144            1 :                         // here. Only compactions that don't rewrite blob files will produce
     145            1 :                         // handles with nonzero reference indices.
     146            1 :                         ReferenceID: 0,
     147            1 :                         ValueLen:    handle.ValueLen,
     148            1 :                 },
     149            1 :                 HandleSuffix: blob.HandleSuffix{
     150            1 :                         BlockNum:      handle.BlockNum,
     151            1 :                         OffsetInBlock: handle.OffsetInBlock,
     152            1 :                 },
     153            1 :         }
     154            1 :         return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
     155              : }
     156              : 
     157              : // FinishOutput closes the current blob file (if any). It returns the stats
     158              : // and metadata of the now completed blob file.
     159            1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
     160            1 :         if vs.writer == nil {
     161            1 :                 return compact.ValueSeparationMetadata{}, nil
     162            1 :         }
     163            1 :         stats, err := vs.writer.Close()
     164            1 :         if err != nil {
     165            0 :                 return compact.ValueSeparationMetadata{}, err
     166            0 :         }
     167            1 :         vs.writer = nil
     168            1 :         return compact.ValueSeparationMetadata{
     169            1 :                 BlobReferences: manifest.BlobReferences{{
     170            1 :                         FileNum:   vs.objMeta.DiskFileNum,
     171            1 :                         ValueSize: stats.UncompressedValueBytes,
     172            1 :                 }},
     173            1 :                 BlobReferenceSize:  stats.UncompressedValueBytes,
     174            1 :                 BlobReferenceDepth: 1,
     175            1 :                 BlobFileStats:      stats,
     176            1 :                 BlobFileObject:     vs.objMeta,
     177            1 :                 BlobFileMetadata: &manifest.BlobFileMetadata{
     178            1 :                         FileNum:      vs.objMeta.DiskFileNum,
     179            1 :                         Size:         stats.FileLen,
     180            1 :                         ValueSize:    stats.UncompressedValueBytes,
     181            1 :                         CreationTime: uint64(time.Now().Unix()),
     182            1 :                 },
     183            1 :         }, nil
     184              : }
     185              : 
     186              : // preserveBlobReferences implements the compact.ValueSeparation interface. When
     187              : // a compaction is configured with preserveBlobReferences, the compaction will
     188              : // not create any new blob files. However, input references to existing blob
     189              : // references will be preserved and metadata about the table's blob references
     190              : // will be collected.
     191              : type preserveBlobReferences struct {
     192              :         // inputBlobMetadatas should be populated to include the *BlobFileMetadata
     193              :         // for every unique blob file referenced by input sstables.
     194              :         // inputBlobMetadatas must be sorted by FileNum.
     195              :         inputBlobMetadatas       []*manifest.BlobFileMetadata
     196              :         outputBlobReferenceDepth manifest.BlobReferenceDepth
     197              : 
     198              :         // state
     199              :         buf            []byte
     200              :         currReferences manifest.BlobReferences
     201              :         // totalValueSize is the sum of the sizes of all ValueSizes in currReferences.
     202              :         totalValueSize uint64
     203              : }
     204              : 
     205              : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
     206              : // interface.
     207              : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
     208              : 
     209              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     210              : // blob file if it were closed now.
     211            1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
     212            1 :         return 0
     213            1 : }
     214              : 
     215              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     216              : // current output sstable's blob references so far.
     217            1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
     218            1 :         // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
     219            1 :         // compressible data, it overestimates the disk space consumed by the blob
     220            1 :         // references. It also does not include the blob file's index block or
     221            1 :         // footer, so it can underestimate if values are completely incompressible.
     222            1 :         //
     223            1 :         // Should we compute a compression ratio per blob file and scale the
     224            1 :         // references appropriately?
     225            1 :         return vs.totalValueSize
     226            1 : }
     227              : 
     228              : // Add implements compact.ValueSeparation. This implementation will write
     229              : // existing blob references to the output table.
     230              : func (vs *preserveBlobReferences) Add(
     231              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     232            1 : ) error {
     233            1 :         if !kv.V.IsBlobValueHandle() {
     234            1 :                 // If the value is not already a blob handle (either it's in-place or in
     235            1 :                 // a value block), we retrieve the value and write it through Add. The
     236            1 :                 // sstable writer may still decide to put the value in a value block,
     237            1 :                 // but regardless the value will be written to the sstable itself and
     238            1 :                 // not a blob file.
     239            1 :                 v, callerOwned, err := kv.Value(vs.buf)
     240            1 :                 if err != nil {
     241            0 :                         return err
     242            0 :                 }
     243            1 :                 if callerOwned {
     244            0 :                         vs.buf = v[:0]
     245            0 :                 }
     246            1 :                 return tw.Add(kv.K, v, forceObsolete)
     247              :         }
     248              : 
     249              :         // The value is an existing blob handle. We can copy it into the output
     250              :         // sstable, taking note of the reference for the table metadata.
     251            1 :         lv := kv.V.LazyValue()
     252            1 :         fn := lv.Fetcher.BlobFileNum
     253            1 : 
     254            1 :         refID, found := vs.currReferences.IDByFileNum(fn)
     255            1 :         if !found {
     256            1 :                 // This is the first time we're seeing this blob file for this sstable.
     257            1 :                 // Find the blob file metadata for this file among the input metadatas.
     258            1 :                 idx, found := vs.findInputBlobMetadata(fn)
     259            1 :                 if !found {
     260            0 :                         return errors.AssertionFailedf("pebble: blob file %s not found among input sstables", fn)
     261            0 :                 }
     262            1 :                 refID = blob.ReferenceID(len(vs.currReferences))
     263            1 :                 vs.currReferences = append(vs.currReferences, manifest.BlobReference{
     264            1 :                         FileNum:  fn,
     265            1 :                         Metadata: vs.inputBlobMetadatas[idx],
     266            1 :                 })
     267              :         }
     268              : 
     269            1 :         if invariants.Enabled && vs.currReferences[refID].Metadata.FileNum != fn {
     270            0 :                 panic("wrong reference index")
     271              :         }
     272              : 
     273            1 :         handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
     274            1 :         inlineHandle := blob.InlineHandle{
     275            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     276            1 :                         ReferenceID: refID,
     277            1 :                         ValueLen:    lv.Fetcher.Attribute.ValueLen,
     278            1 :                 },
     279            1 :                 HandleSuffix: handleSuffix,
     280            1 :         }
     281            1 :         err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
     282            1 :         if err != nil {
     283            0 :                 return err
     284            0 :         }
     285            1 :         vs.currReferences[refID].ValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     286            1 :         vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     287            1 :         return nil
     288              : }
     289              : 
     290              : // findInputBlobMetadata returns the index of the input blob metadata that
     291              : // corresponds to the provided file number. If the file number is not found,
     292              : // the function returns false in the second return value.
     293            1 : func (vs *preserveBlobReferences) findInputBlobMetadata(fn base.DiskFileNum) (int, bool) {
     294            1 :         return slices.BinarySearchFunc(vs.inputBlobMetadatas, fn,
     295            1 :                 func(bm *manifest.BlobFileMetadata, fn base.DiskFileNum) int {
     296            1 :                         return cmp.Compare(bm.FileNum, fn)
     297            1 :                 })
     298              : }
     299              : 
     300              : // FinishOutput implements compact.ValueSeparation.
     301            1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
     302            1 :         references := slices.Clone(vs.currReferences)
     303            1 :         vs.currReferences = vs.currReferences[:0]
     304            1 : 
     305            1 :         referenceSize := uint64(0)
     306            1 :         for _, ref := range references {
     307            1 :                 referenceSize += ref.ValueSize
     308            1 :         }
     309            1 :         return compact.ValueSeparationMetadata{
     310            1 :                 BlobReferences:    references,
     311            1 :                 BlobReferenceSize: referenceSize,
     312            1 :                 // The outputBlobReferenceDepth is computed from the input sstables,
     313            1 :                 // reflecting the worst-case overlap of referenced blob files. If this
     314            1 :                 // sstable references fewer unique blob files, reduce its depth to the
     315            1 :                 // count of unique files.
     316            1 :                 BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
     317            1 :         }, nil
     318              : }
        

Generated by: LCOV version 2.0-1