LCOV - code coverage report
Current view: top level - pebble - value_separation.go (source / functions) Coverage Total Hit
Test: 2025-06-20 08:19Z 54bb06f3 - tests + meta.lcov Lines: 89.7 % 261 234
Test Date: 2025-06-20 08:20:18 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "slices"
       9              :         "time"
      10              : 
      11              :         "github.com/cockroachdb/errors"
      12              :         "github.com/cockroachdb/pebble/internal/base"
      13              :         "github.com/cockroachdb/pebble/internal/compact"
      14              :         "github.com/cockroachdb/pebble/internal/invariants"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/objstorage"
      17              :         "github.com/cockroachdb/pebble/sstable"
      18              :         "github.com/cockroachdb/pebble/sstable/blob"
      19              : )
      20              : 
      21            2 : var neverSeparateValues getValueSeparation = func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation {
      22            2 :         return compact.NeverSeparateValues{}
      23            2 : }
      24              : 
      25              : // determineCompactionValueSeparation determines whether a compaction should
      26              : // separate values into blob files. It returns a compact.ValueSeparation
      27              : // implementation that should be used for the compaction.
      28              : func (d *DB) determineCompactionValueSeparation(
      29              :         jobID JobID, c *compaction, tableFormat sstable.TableFormat,
      30            2 : ) compact.ValueSeparation {
      31            2 :         if tableFormat < sstable.TableFormatPebblev7 || d.FormatMajorVersion() < FormatValueSeparation ||
      32            2 :                 d.opts.Experimental.ValueSeparationPolicy == nil {
      33            2 :                 return compact.NeverSeparateValues{}
      34            2 :         }
      35            2 :         policy := d.opts.Experimental.ValueSeparationPolicy()
      36            2 :         if !policy.Enabled {
      37            2 :                 return compact.NeverSeparateValues{}
      38            2 :         }
      39              : 
      40              :         // We're allowed to write blob references. Determine whether we should carry
      41              :         // forward existing blob references, or write new ones.
      42            2 :         if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
      43            2 :                 // This compaction should preserve existing blob references.
      44            2 :                 return &preserveBlobReferences{
      45            2 :                         inputBlobPhysicalFiles:   uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs),
      46            2 :                         outputBlobReferenceDepth: outputBlobReferenceDepth,
      47            2 :                 }
      48            2 :         }
      49              : 
      50              :         // This compaction should write values to new blob files.
      51            2 :         return &writeNewBlobFiles{
      52            2 :                 comparer: d.opts.Comparer,
      53            2 :                 newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
      54            2 :                         return d.newCompactionOutputBlob(jobID, c)
      55            2 :                 },
      56              :                 shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
      57              :                 writerOpts:         d.opts.MakeBlobWriterOptions(c.outputLevel.level),
      58              :                 minimumSize:        policy.MinimumSize,
      59              :         }
      60              : }
      61              : 
      62              : // shouldWriteBlobFiles returns true if the compaction should write new blob
      63              : // files. If it returns false, the referenceDepth return value contains the
      64              : // maximum blob reference depth to assign to output sstables (the actual value
      65              : // may be lower iff the output table references fewer distinct blob files).
      66              : func shouldWriteBlobFiles(
      67              :         c *compaction, policy ValueSeparationPolicy,
      68            2 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
      69            2 :         // Flushes will have no existing references to blob files and should write
      70            2 :         // their values to new blob files.
      71            2 :         if c.kind == compactionKindFlush {
      72            2 :                 return true, 0
      73            2 :         }
      74            2 :         inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
      75            2 :         if inputReferenceDepth == 0 {
      76            2 :                 // None of the input sstables reference blob files. It may be the case
      77            2 :                 // that these sstables were created before value separation was enabled.
      78            2 :                 // We should try to write to new blob files.
      79            2 :                 return true, 0
      80            2 :         }
      81              :         // If the compaction's output blob reference depth would be greater than the
      82              :         // configured max, we should rewrite the values into new blob files to
      83              :         // restore locality.
      84            2 :         if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
      85            2 :                 return true, 0
      86            2 :         }
      87              :         // Otherwise, we won't write any new blob files but will carry forward
      88              :         // existing references.
      89            2 :         return false, inputReferenceDepth
      90              : }
      91              : 
      92              : // compactionBlobReferenceDepth computes the blob reference depth for a
      93              : // compaction. It's computed by finding the maximum blob reference depth of
      94              : // input sstables in each level. These per-level depths are then summed to
      95              : // produce a worst-case approximation of the blob reference locality of the
      96              : // compaction's output sstables.
      97              : //
      98              : // The intuition is that as compactions combine files referencing distinct blob
      99              : // files, outputted sstables begin to reference more and more distinct blob
     100              : // files. In the worst case, these references are evenly distributed across the
     101              : // keyspace and there is very little locality.
     102            2 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
     103            2 :         // TODO(jackson): Consider using a range tree to precisely compute the
     104            2 :         // depth. This would require maintaining minimum and maximum keys.
     105            2 :         var depth manifest.BlobReferenceDepth
     106            2 :         for _, level := range levels {
     107            2 :                 // L0 allows files to overlap one another, so it's not sufficient to
     108            2 :                 // just take the maximum within the level. Instead, we need to sum the
     109            2 :                 // max of each sublevel.
     110            2 :                 //
     111            2 :                 // TODO(jackson): This and other compaction logic would likely be
     112            2 :                 // cleaner if we modeled each sublevel as its own `compactionLevel`.
     113            2 :                 if level.level == 0 {
     114            2 :                         for _, sublevel := range level.l0SublevelInfo {
     115            2 :                                 var sublevelDepth int
     116            2 :                                 for t := range sublevel.LevelSlice.All() {
     117            2 :                                         sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
     118            2 :                                 }
     119            2 :                                 depth += manifest.BlobReferenceDepth(sublevelDepth)
     120              :                         }
     121            2 :                         continue
     122              :                 }
     123              : 
     124            2 :                 var levelDepth manifest.BlobReferenceDepth
     125            2 :                 for t := range level.files.All() {
     126            2 :                         levelDepth = max(levelDepth, t.BlobReferenceDepth)
     127            2 :                 }
     128            2 :                 depth += levelDepth
     129              :         }
     130            2 :         return depth
     131              : }
     132              : 
     133              : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
     134              : // objects referenced by tables in levels.
     135              : func uniqueInputBlobMetadatas(
     136              :         blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
     137            2 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
     138            2 :         m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
     139            2 :         for _, level := range levels {
     140            2 :                 for t := range level.files.All() {
     141            2 :                         for _, ref := range t.BlobReferences {
     142            2 :                                 if _, ok := m[ref.FileID]; ok {
     143            1 :                                         continue
     144              :                                 }
     145            2 :                                 phys, ok := blobFileSet.LookupPhysical(ref.FileID)
     146            2 :                                 if !ok {
     147            0 :                                         panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
     148              :                                 }
     149            2 :                                 m[ref.FileID] = phys
     150              :                         }
     151              :                 }
     152              :         }
     153            2 :         return m
     154              : }
     155              : 
     156              : // writeNewBlobFiles implements the strategy and mechanics for separating values
     157              : // into external blob files.
     158              : type writeNewBlobFiles struct {
     159              :         comparer *base.Comparer
     160              :         // newBlobObject constructs a new blob object for use in the compaction.
     161              :         newBlobObject      func() (objstorage.Writable, objstorage.ObjectMetadata, error)
     162              :         shortAttrExtractor ShortAttributeExtractor
     163              :         // writerOpts is used to configure all constructed blob writers.
     164              :         writerOpts blob.FileWriterOptions
     165              :         // minimumSize imposes a lower bound on the size of values that can be
     166              :         // separated into a blob file. Values smaller than this are always written
     167              :         // to the sstable (but may still be written to a value block within the
     168              :         // sstable).
     169              :         minimumSize int
     170              : 
     171              :         // Current blob writer state
     172              :         writer  *blob.FileWriter
     173              :         objMeta objstorage.ObjectMetadata
     174              : 
     175              :         buf []byte
     176              : }
     177              : 
     178              : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
     179              : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
     180              : 
     181              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     182              : // blob file if it were closed now.
     183            2 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
     184            2 :         if vs.writer == nil {
     185            2 :                 return 0
     186            2 :         }
     187            2 :         return vs.writer.EstimatedSize()
     188              : }
     189              : 
     190              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     191              : // current output sstable's blob references so far.
     192            2 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
     193            2 :         // When we're writing to new blob files, the size of the blob file itself is
     194            2 :         // a better estimate of the disk space consumed than the uncompressed value
     195            2 :         // sizes.
     196            2 :         return vs.EstimatedFileSize()
     197            2 : }
     198              : 
     199              : // Add adds the provided key-value pair to the sstable, possibly separating the
     200              : // value into a blob file.
     201              : func (vs *writeNewBlobFiles) Add(
     202              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     203            2 : ) error {
     204            2 :         // We always fetch the value if we're rewriting blob files. We want to
     205            2 :         // replace any references to existing blob files with references to new blob
     206            2 :         // files that we write during the compaction.
     207            2 :         v, callerOwned, err := kv.Value(vs.buf)
     208            2 :         if err != nil {
     209            0 :                 return err
     210            0 :         }
     211            2 :         if callerOwned {
     212            0 :                 vs.buf = v[:0]
     213            0 :         }
     214              : 
     215              :         // Values that are too small are never separated.
     216            2 :         if len(v) < vs.minimumSize {
     217            2 :                 return tw.Add(kv.K, v, forceObsolete)
     218            2 :         }
     219              :         // Merge and deletesized keys are never separated.
     220            2 :         switch kv.K.Kind() {
     221            2 :         case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
     222            2 :                 return tw.Add(kv.K, v, forceObsolete)
     223              :         }
     224              : 
     225              :         // This KV met all the criteria and its value will be separated.
     226              :         // If there's a configured short attribute extractor, extract the value's
     227              :         // short attribute.
     228            2 :         var shortAttr base.ShortAttribute
     229            2 :         if vs.shortAttrExtractor != nil {
     230            0 :                 keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
     231            0 :                 shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
     232            0 :                 if err != nil {
     233            0 :                         return err
     234            0 :                 }
     235              :         }
     236              : 
     237              :         // If we don't have an open blob writer, create one. We create blob objects
     238              :         // lazily so that we don't create them unless a compaction will actually
     239              :         // write to a blob file. This avoids creating and deleting empty blob files
     240              :         // on every compaction in parts of the keyspace that a) are required to be
     241              :         // in-place or b) have small values.
     242            2 :         if vs.writer == nil {
     243            2 :                 writable, objMeta, err := vs.newBlobObject()
     244            2 :                 if err != nil {
     245            0 :                         return err
     246            0 :                 }
     247            2 :                 vs.objMeta = objMeta
     248            2 :                 vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
     249              :         }
     250              : 
     251              :         // Append the value to the blob file.
     252            2 :         handle := vs.writer.AddValue(v)
     253            2 : 
     254            2 :         // Write the key and the handle to the sstable. We need to map the
     255            2 :         // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
     256            2 :         // except the FileNum is translated into a reference index.
     257            2 :         inlineHandle := blob.InlineHandle{
     258            2 :                 InlineHandlePreface: blob.InlineHandlePreface{
     259            2 :                         // Since we're writing blob files and maintaining a 1:1 mapping
     260            2 :                         // between sstables and blob files, the reference index is always 0
     261            2 :                         // here. Only compactions that don't rewrite blob files will produce
     262            2 :                         // handles with nonzero reference indices.
     263            2 :                         ReferenceID: 0,
     264            2 :                         ValueLen:    handle.ValueLen,
     265            2 :                 },
     266            2 :                 HandleSuffix: blob.HandleSuffix{
     267            2 :                         BlockID: handle.BlockID,
     268            2 :                         ValueID: handle.ValueID,
     269            2 :                 },
     270            2 :         }
     271            2 :         return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
     272              : }
     273              : 
     274              : // FinishOutput closes the current blob file (if any). It returns the stats
     275              : // and metadata of the now completed blob file.
     276            2 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
     277            2 :         if vs.writer == nil {
     278            2 :                 return compact.ValueSeparationMetadata{}, nil
     279            2 :         }
     280            2 :         stats, err := vs.writer.Close()
     281            2 :         if err != nil {
     282            0 :                 return compact.ValueSeparationMetadata{}, err
     283            0 :         }
     284            2 :         vs.writer = nil
     285            2 :         meta := &manifest.PhysicalBlobFile{
     286            2 :                 FileNum:      vs.objMeta.DiskFileNum,
     287            2 :                 Size:         stats.FileLen,
     288            2 :                 ValueSize:    stats.UncompressedValueBytes,
     289            2 :                 CreationTime: uint64(time.Now().Unix()),
     290            2 :         }
     291            2 : 
     292            2 :         return compact.ValueSeparationMetadata{
     293            2 :                 BlobReferences: manifest.BlobReferences{manifest.MakeBlobReference(
     294            2 :                         base.BlobFileID(vs.objMeta.DiskFileNum),
     295            2 :                         stats.UncompressedValueBytes,
     296            2 :                         meta,
     297            2 :                 )},
     298            2 :                 BlobReferenceSize:  stats.UncompressedValueBytes,
     299            2 :                 BlobReferenceDepth: 1,
     300            2 :                 BlobFileStats:      stats,
     301            2 :                 BlobFileObject:     vs.objMeta,
     302            2 :                 BlobFileMetadata:   meta,
     303            2 :         }, nil
     304              : }
     305              : 
     306              : // preserveBlobReferences implements the compact.ValueSeparation interface. When
     307              : // a compaction is configured with preserveBlobReferences, the compaction will
     308              : // not create any new blob files. However, input references to existing blob
     309              : // references will be preserved and metadata about the table's blob references
     310              : // will be collected.
     311              : type preserveBlobReferences struct {
     312              :         // inputBlobPhysicalFiles holds the *PhysicalBlobFile for every unique blob
     313              :         // file referenced by input sstables.
     314              :         inputBlobPhysicalFiles   map[base.BlobFileID]*manifest.PhysicalBlobFile
     315              :         outputBlobReferenceDepth manifest.BlobReferenceDepth
     316              : 
     317              :         // state
     318              :         buf []byte
     319              :         // currReferences holds the pending references that have been referenced by
     320              :         // the current output sstable. The index of a reference with a given blob
     321              :         // file ID is the value of the blob.ReferenceID used by its value handles
     322              :         // within the output sstable.
     323              :         currReferences []pendingReference
     324              :         // totalValueSize is the sum of currReferenceValueSizes.
     325              :         //
     326              :         // INVARIANT: totalValueSize == sum(currReferenceValueSizes)
     327              :         totalValueSize uint64
     328              : }
     329              : 
     330              : type pendingReference struct {
     331              :         blobFileID base.BlobFileID
     332              :         valueSize  uint64
     333              : }
     334              : 
     335              : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
     336              : // interface.
     337              : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
     338              : 
     339              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     340              : // blob file if it were closed now.
     341            2 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
     342            2 :         return 0
     343            2 : }
     344              : 
     345              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     346              : // current output sstable's blob references so far.
     347            2 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
     348            2 :         // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
     349            2 :         // compressible data, it overestimates the disk space consumed by the blob
     350            2 :         // references. It also does not include the blob file's index block or
     351            2 :         // footer, so it can underestimate if values are completely incompressible.
     352            2 :         //
     353            2 :         // Should we compute a compression ratio per blob file and scale the
     354            2 :         // references appropriately?
     355            2 :         return vs.totalValueSize
     356            2 : }
     357              : 
     358              : // Add implements compact.ValueSeparation. This implementation will write
     359              : // existing blob references to the output table.
     360              : func (vs *preserveBlobReferences) Add(
     361              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     362            2 : ) error {
     363            2 :         if !kv.V.IsBlobValueHandle() {
     364            2 :                 // If the value is not already a blob handle (either it's in-place or in
     365            2 :                 // a value block), we retrieve the value and write it through Add. The
     366            2 :                 // sstable writer may still decide to put the value in a value block,
     367            2 :                 // but regardless the value will be written to the sstable itself and
     368            2 :                 // not a blob file.
     369            2 :                 v, callerOwned, err := kv.Value(vs.buf)
     370            2 :                 if err != nil {
     371            0 :                         return err
     372            0 :                 }
     373            2 :                 if callerOwned {
     374            0 :                         vs.buf = v[:0]
     375            0 :                 }
     376            2 :                 return tw.Add(kv.K, v, forceObsolete)
     377              :         }
     378              : 
     379              :         // The value is an existing blob handle. We can copy it into the output
     380              :         // sstable, taking note of the reference for the table metadata.
     381            2 :         lv := kv.V.LazyValue()
     382            2 :         fileID := lv.Fetcher.BlobFileID
     383            2 : 
     384            2 :         var refID blob.ReferenceID
     385            2 :         if refIdx := slices.IndexFunc(vs.currReferences, func(ref pendingReference) bool {
     386            2 :                 return ref.blobFileID == fileID
     387            2 :         }); refIdx != -1 {
     388            2 :                 refID = blob.ReferenceID(refIdx)
     389            2 :         } else {
     390            2 :                 refID = blob.ReferenceID(len(vs.currReferences))
     391            2 :                 vs.currReferences = append(vs.currReferences, pendingReference{
     392            2 :                         blobFileID: fileID,
     393            2 :                         valueSize:  0,
     394            2 :                 })
     395            2 :         }
     396              : 
     397            2 :         if invariants.Enabled && vs.currReferences[refID].blobFileID != fileID {
     398            0 :                 panic("wrong reference index")
     399              :         }
     400              : 
     401            2 :         handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
     402            2 :         inlineHandle := blob.InlineHandle{
     403            2 :                 InlineHandlePreface: blob.InlineHandlePreface{
     404            2 :                         ReferenceID: refID,
     405            2 :                         ValueLen:    lv.Fetcher.Attribute.ValueLen,
     406            2 :                 },
     407            2 :                 HandleSuffix: handleSuffix,
     408            2 :         }
     409            2 :         err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
     410            2 :         if err != nil {
     411            0 :                 return err
     412            0 :         }
     413            2 :         vs.currReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     414            2 :         vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     415            2 :         return nil
     416              : }
     417              : 
     418              : // FinishOutput implements compact.ValueSeparation.
     419            2 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
     420            2 :         if invariants.Enabled {
     421            2 :                 // Assert that the incrementally-maintained totalValueSize matches the
     422            2 :                 // sum of all the reference value sizes.
     423            2 :                 totalValueSize := uint64(0)
     424            2 :                 for _, ref := range vs.currReferences {
     425            2 :                         totalValueSize += ref.valueSize
     426            2 :                 }
     427            2 :                 if totalValueSize != vs.totalValueSize {
     428            0 :                         return compact.ValueSeparationMetadata{},
     429            0 :                                 errors.AssertionFailedf("totalValueSize mismatch: %d != %d", totalValueSize, vs.totalValueSize)
     430            0 :                 }
     431              :         }
     432              : 
     433            2 :         references := make(manifest.BlobReferences, len(vs.currReferences))
     434            2 :         for i := range vs.currReferences {
     435            2 :                 ref := vs.currReferences[i]
     436            2 :                 phys, ok := vs.inputBlobPhysicalFiles[ref.blobFileID]
     437            2 :                 if !ok {
     438            0 :                         return compact.ValueSeparationMetadata{},
     439            0 :                                 errors.AssertionFailedf("pebble: blob file %s not found among input sstables", ref.blobFileID)
     440            0 :                 }
     441            2 :                 references[i] = manifest.MakeBlobReference(ref.blobFileID, ref.valueSize, phys)
     442              :         }
     443            2 :         referenceSize := vs.totalValueSize
     444            2 :         vs.currReferences = vs.currReferences[:0]
     445            2 :         vs.totalValueSize = 0
     446            2 :         return compact.ValueSeparationMetadata{
     447            2 :                 BlobReferences:    references,
     448            2 :                 BlobReferenceSize: referenceSize,
     449            2 :                 // The outputBlobReferenceDepth is computed from the input sstables,
     450            2 :                 // reflecting the worst-case overlap of referenced blob files. If this
     451            2 :                 // sstable references fewer unique blob files, reduce its depth to the
     452            2 :                 // count of unique files.
     453            2 :                 BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
     454            2 :         }, nil
     455              : }
        

Generated by: LCOV version 2.0-1