LCOV - code coverage report
Current view: top level - pebble - value_separation.go (source / functions) Coverage Total Hit
Test: 2025-07-05 08:18Z 6f57a213 - meta test only.lcov Lines: 82.9 % 286 237
Test Date: 2025-07-05 08:20:30 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "slices"
       9              :         "time"
      10              : 
      11              :         "github.com/cockroachdb/errors"
      12              :         "github.com/cockroachdb/pebble/internal/base"
      13              :         "github.com/cockroachdb/pebble/internal/compact"
      14              :         "github.com/cockroachdb/pebble/internal/invariants"
      15              :         "github.com/cockroachdb/pebble/internal/manifest"
      16              :         "github.com/cockroachdb/pebble/objstorage"
      17              :         "github.com/cockroachdb/pebble/sstable"
      18              :         "github.com/cockroachdb/pebble/sstable/blob"
      19              :         "github.com/cockroachdb/redact"
      20              : )
      21              : 
      22              : // latencyTolerantMinimumSize is the minimum size, in bytes, of a value that
      23              : // will be separated into a blob file when the value storage policy is
      24              : // ValueStorageLatencyTolerant.
      25              : const latencyTolerantMinimumSize = 10
      26              : 
      27            1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation {
      28            1 :         return compact.NeverSeparateValues{}
      29            1 : }
      30              : 
      31              : // determineCompactionValueSeparation determines whether a compaction should
      32              : // separate values into blob files. It returns a compact.ValueSeparation
      33              : // implementation that should be used for the compaction.
      34              : func (d *DB) determineCompactionValueSeparation(
      35              :         jobID JobID, c *tableCompaction, tableFormat sstable.TableFormat,
      36            1 : ) compact.ValueSeparation {
      37            1 :         if tableFormat < sstable.TableFormatPebblev7 || d.FormatMajorVersion() < FormatValueSeparation ||
      38            1 :                 d.opts.Experimental.ValueSeparationPolicy == nil {
      39            1 :                 return compact.NeverSeparateValues{}
      40            1 :         }
      41            1 :         policy := d.opts.Experimental.ValueSeparationPolicy()
      42            1 :         if !policy.Enabled {
      43            1 :                 return compact.NeverSeparateValues{}
      44            1 :         }
      45              : 
      46              :         // We're allowed to write blob references. Determine whether we should carry
      47              :         // forward existing blob references, or write new ones.
      48            1 :         if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
      49            1 :                 // This compaction should preserve existing blob references.
      50            1 :                 return &preserveBlobReferences{
      51            1 :                         inputBlobPhysicalFiles:   uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs),
      52            1 :                         outputBlobReferenceDepth: outputBlobReferenceDepth,
      53            1 :                 }
      54            1 :         }
      55              : 
      56              :         // This compaction should write values to new blob files.
      57            1 :         return &writeNewBlobFiles{
      58            1 :                 comparer: d.opts.Comparer,
      59            1 :                 newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
      60            1 :                         return d.newCompactionOutputBlob(
      61            1 :                                 jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
      62            1 :                 },
      63              :                 shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
      64              :                 writerOpts:         d.opts.MakeBlobWriterOptions(c.outputLevel.level),
      65              :                 minimumSize:        policy.MinimumSize,
      66              :                 globalMinimumSize:  policy.MinimumSize,
      67            0 :                 invalidValueCallback: func(userKey []byte, value []byte, err error) {
      68            0 :                         // The value may not be safe, so it will be redacted when redaction
      69            0 :                         // is enabled.
      70            0 :                         d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
      71            0 :                                 Kind:    InvalidValue,
      72            0 :                                 UserKey: userKey,
      73            0 :                                 ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
      74            0 :                                         value, err),
      75            0 :                         })
      76            0 :                 },
      77              :         }
      78              : }
      79              : 
      80              : // shouldWriteBlobFiles returns true if the compaction should write new blob
      81              : // files. If it returns false, the referenceDepth return value contains the
      82              : // maximum blob reference depth to assign to output sstables (the actual value
      83              : // may be lower iff the output table references fewer distinct blob files).
      84              : func shouldWriteBlobFiles(
      85              :         c *tableCompaction, policy ValueSeparationPolicy,
      86            1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
      87            1 :         // Flushes will have no existing references to blob files and should write
      88            1 :         // their values to new blob files.
      89            1 :         if c.kind == compactionKindFlush {
      90            1 :                 return true, 0
      91            1 :         }
      92            1 :         inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
      93            1 :         if inputReferenceDepth == 0 {
      94            1 :                 // None of the input sstables reference blob files. It may be the case
      95            1 :                 // that these sstables were created before value separation was enabled.
      96            1 :                 // We should try to write to new blob files.
      97            1 :                 return true, 0
      98            1 :         }
      99              :         // If the compaction's output blob reference depth would be greater than the
     100              :         // configured max, we should rewrite the values into new blob files to
     101              :         // restore locality.
     102            1 :         if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
     103            1 :                 return true, 0
     104            1 :         }
     105              :         // Otherwise, we won't write any new blob files but will carry forward
     106              :         // existing references.
     107            1 :         return false, inputReferenceDepth
     108              : }
     109              : 
     110              : // compactionBlobReferenceDepth computes the blob reference depth for a
     111              : // compaction. It's computed by finding the maximum blob reference depth of
     112              : // input sstables in each level. These per-level depths are then summed to
     113              : // produce a worst-case approximation of the blob reference locality of the
     114              : // compaction's output sstables.
     115              : //
     116              : // The intuition is that as compactions combine files referencing distinct blob
     117              : // files, outputted sstables begin to reference more and more distinct blob
     118              : // files. In the worst case, these references are evenly distributed across the
     119              : // keyspace and there is very little locality.
     120            1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
     121            1 :         // TODO(jackson): Consider using a range tree to precisely compute the
     122            1 :         // depth. This would require maintaining minimum and maximum keys.
     123            1 :         var depth manifest.BlobReferenceDepth
     124            1 :         for _, level := range levels {
     125            1 :                 // L0 allows files to overlap one another, so it's not sufficient to
     126            1 :                 // just take the maximum within the level. Instead, we need to sum the
     127            1 :                 // max of each sublevel.
     128            1 :                 //
     129            1 :                 // TODO(jackson): This and other compaction logic would likely be
     130            1 :                 // cleaner if we modeled each sublevel as its own `compactionLevel`.
     131            1 :                 if level.level == 0 {
     132            1 :                         for _, sublevel := range level.l0SublevelInfo {
     133            1 :                                 var sublevelDepth int
     134            1 :                                 for t := range sublevel.LevelSlice.All() {
     135            1 :                                         sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
     136            1 :                                 }
     137            1 :                                 depth += manifest.BlobReferenceDepth(sublevelDepth)
     138              :                         }
     139            1 :                         continue
     140              :                 }
     141              : 
     142            1 :                 var levelDepth manifest.BlobReferenceDepth
     143            1 :                 for t := range level.files.All() {
     144            1 :                         levelDepth = max(levelDepth, t.BlobReferenceDepth)
     145            1 :                 }
     146            1 :                 depth += levelDepth
     147              :         }
     148            1 :         return depth
     149              : }
     150              : 
     151              : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
     152              : // objects referenced by tables in levels.
     153              : func uniqueInputBlobMetadatas(
     154              :         blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
     155            1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
     156            1 :         m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
     157            1 :         for _, level := range levels {
     158            1 :                 for t := range level.files.All() {
     159            1 :                         for _, ref := range t.BlobReferences {
     160            1 :                                 if _, ok := m[ref.FileID]; ok {
     161            1 :                                         continue
     162              :                                 }
     163            1 :                                 phys, ok := blobFileSet.LookupPhysical(ref.FileID)
     164            1 :                                 if !ok {
     165            0 :                                         panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
     166              :                                 }
     167            1 :                                 m[ref.FileID] = phys
     168              :                         }
     169              :                 }
     170              :         }
     171            1 :         return m
     172              : }
     173              : 
     174              : // writeNewBlobFiles implements the strategy and mechanics for separating values
     175              : // into external blob files.
     176              : type writeNewBlobFiles struct {
     177              :         comparer *base.Comparer
     178              :         // newBlobObject constructs a new blob object for use in the compaction.
     179              :         newBlobObject      func() (objstorage.Writable, objstorage.ObjectMetadata, error)
     180              :         shortAttrExtractor ShortAttributeExtractor
     181              :         // writerOpts is used to configure all constructed blob writers.
     182              :         writerOpts blob.FileWriterOptions
     183              :         // minimumSize imposes a lower bound on the size of values that can be
     184              :         // separated into a blob file. Values smaller than this are always written
     185              :         // to the sstable (but may still be written to a value block within the
     186              :         // sstable).
     187              :         //
     188              :         // minimumSize is set to globalMinimumSize by default and on every call to
     189              :         // FinishOutput. It may be overriden by SetNextOutputConfig (i.e, if a
     190              :         // SpanPolicy dictates a different minimum size for a span of the keyspace).
     191              :         minimumSize int
     192              :         // globalMinimumSize is the size threshold for separating values into blob
     193              :         // files globally across the keyspace. It may be overridden per-output by
     194              :         // SetNextOutputConfig.
     195              :         globalMinimumSize int
     196              :         // invalidValueCallback is called when a value is encountered for which the
     197              :         // short attribute extractor returns an error.
     198              :         invalidValueCallback func(userKey []byte, value []byte, err error)
     199              : 
     200              :         // Current blob writer state
     201              :         writer  *blob.FileWriter
     202              :         objMeta objstorage.ObjectMetadata
     203              : 
     204              :         buf []byte
     205              : }
     206              : 
     207              : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
     208              : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
     209              : 
     210              : // SetNextOutputConfig implements the ValueSeparation interface.
     211            0 : func (vs *writeNewBlobFiles) SetNextOutputConfig(config compact.ValueSeparationOutputConfig) {
     212            0 :         vs.minimumSize = config.MinimumSize
     213            0 : }
     214              : 
     215              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     216              : // blob file if it were closed now.
     217            1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
     218            1 :         if vs.writer == nil {
     219            1 :                 return 0
     220            1 :         }
     221            1 :         return vs.writer.EstimatedSize()
     222              : }
     223              : 
     224              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     225              : // current output sstable's blob references so far.
     226            1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
     227            1 :         // When we're writing to new blob files, the size of the blob file itself is
     228            1 :         // a better estimate of the disk space consumed than the uncompressed value
     229            1 :         // sizes.
     230            1 :         return vs.EstimatedFileSize()
     231            1 : }
     232              : 
     233              : // Add adds the provided key-value pair to the sstable, possibly separating the
     234              : // value into a blob file.
     235              : func (vs *writeNewBlobFiles) Add(
     236              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     237            1 : ) error {
     238            1 :         // We always fetch the value if we're rewriting blob files. We want to
     239            1 :         // replace any references to existing blob files with references to new blob
     240            1 :         // files that we write during the compaction.
     241            1 :         v, callerOwned, err := kv.Value(vs.buf)
     242            1 :         if err != nil {
     243            0 :                 return err
     244            0 :         }
     245            1 :         if callerOwned {
     246            0 :                 vs.buf = v[:0]
     247            0 :         }
     248              : 
     249              :         // Values that are too small are never separated.
     250            1 :         if len(v) < vs.minimumSize {
     251            1 :                 return tw.Add(kv.K, v, forceObsolete)
     252            1 :         }
     253              :         // Merge and deletesized keys are never separated.
     254            1 :         switch kv.K.Kind() {
     255            1 :         case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
     256            1 :                 return tw.Add(kv.K, v, forceObsolete)
     257              :         }
     258              : 
     259              :         // This KV met all the criteria and its value will be separated.
     260              :         // If there's a configured short attribute extractor, extract the value's
     261              :         // short attribute.
     262            1 :         var shortAttr base.ShortAttribute
     263            1 :         if vs.shortAttrExtractor != nil {
     264            0 :                 keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
     265            0 :                 shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
     266            0 :                 if err != nil {
     267            0 :                         // Report that there was a value for which the short attribute
     268            0 :                         // extractor was unable to extract a short attribute.
     269            0 :                         vs.invalidValueCallback(kv.K.UserKey, v, err)
     270            0 : 
     271            0 :                         // Rather than erroring out and aborting the flush or compaction, we
     272            0 :                         // fallback to writing the value verbatim to the sstable. Otherwise
     273            0 :                         // a flush could busy loop, repeatedly attempting to write the same
     274            0 :                         // memtable and repeatedly unable to extract a key's short attribute.
     275            0 :                         return tw.Add(kv.K, v, forceObsolete)
     276            0 :                 }
     277              :         }
     278              : 
     279              :         // If we don't have an open blob writer, create one. We create blob objects
     280              :         // lazily so that we don't create them unless a compaction will actually
     281              :         // write to a blob file. This avoids creating and deleting empty blob files
     282              :         // on every compaction in parts of the keyspace that a) are required to be
     283              :         // in-place or b) have small values.
     284            1 :         if vs.writer == nil {
     285            1 :                 writable, objMeta, err := vs.newBlobObject()
     286            1 :                 if err != nil {
     287            0 :                         return err
     288            0 :                 }
     289            1 :                 vs.objMeta = objMeta
     290            1 :                 vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
     291              :         }
     292              : 
     293              :         // Append the value to the blob file.
     294            1 :         handle := vs.writer.AddValue(v)
     295            1 : 
     296            1 :         // Write the key and the handle to the sstable. We need to map the
     297            1 :         // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
     298            1 :         // except the FileNum is translated into a reference index.
     299            1 :         inlineHandle := blob.InlineHandle{
     300            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     301            1 :                         // Since we're writing blob files and maintaining a 1:1 mapping
     302            1 :                         // between sstables and blob files, the reference index is always 0
     303            1 :                         // here. Only compactions that don't rewrite blob files will produce
     304            1 :                         // handles with nonzero reference indices.
     305            1 :                         ReferenceID: 0,
     306            1 :                         ValueLen:    handle.ValueLen,
     307            1 :                 },
     308            1 :                 HandleSuffix: blob.HandleSuffix{
     309            1 :                         BlockID: handle.BlockID,
     310            1 :                         ValueID: handle.ValueID,
     311            1 :                 },
     312            1 :         }
     313            1 :         return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
     314              : }
     315              : 
     316              : // FinishOutput closes the current blob file (if any). It returns the stats
     317              : // and metadata of the now completed blob file.
     318            1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
     319            1 :         if vs.writer == nil {
     320            1 :                 return compact.ValueSeparationMetadata{}, nil
     321            1 :         }
     322            1 :         stats, err := vs.writer.Close()
     323            1 :         if err != nil {
     324            0 :                 return compact.ValueSeparationMetadata{}, err
     325            0 :         }
     326            1 :         vs.writer = nil
     327            1 :         meta := &manifest.PhysicalBlobFile{
     328            1 :                 FileNum:      vs.objMeta.DiskFileNum,
     329            1 :                 Size:         stats.FileLen,
     330            1 :                 ValueSize:    stats.UncompressedValueBytes,
     331            1 :                 CreationTime: uint64(time.Now().Unix()),
     332            1 :         }
     333            1 :         // Reset the minimum size for the next output.
     334            1 :         vs.minimumSize = vs.globalMinimumSize
     335            1 : 
     336            1 :         return compact.ValueSeparationMetadata{
     337            1 :                 BlobReferences: manifest.BlobReferences{manifest.MakeBlobReference(
     338            1 :                         base.BlobFileID(vs.objMeta.DiskFileNum),
     339            1 :                         stats.UncompressedValueBytes,
     340            1 :                         meta,
     341            1 :                 )},
     342            1 :                 BlobReferenceSize:  stats.UncompressedValueBytes,
     343            1 :                 BlobReferenceDepth: 1,
     344            1 :                 BlobFileStats:      stats,
     345            1 :                 BlobFileObject:     vs.objMeta,
     346            1 :                 BlobFileMetadata:   meta,
     347            1 :         }, nil
     348              : }
     349              : 
     350              : // preserveBlobReferences implements the compact.ValueSeparation interface. When
     351              : // a compaction is configured with preserveBlobReferences, the compaction will
     352              : // not create any new blob files. However, input references to existing blob
     353              : // references will be preserved and metadata about the table's blob references
     354              : // will be collected.
     355              : type preserveBlobReferences struct {
     356              :         // inputBlobPhysicalFiles holds the *PhysicalBlobFile for every unique blob
     357              :         // file referenced by input sstables.
     358              :         inputBlobPhysicalFiles   map[base.BlobFileID]*manifest.PhysicalBlobFile
     359              :         outputBlobReferenceDepth manifest.BlobReferenceDepth
     360              : 
     361              :         // state
     362              :         buf []byte
     363              :         // currReferences holds the pending references that have been referenced by
     364              :         // the current output sstable. The index of a reference with a given blob
     365              :         // file ID is the value of the blob.ReferenceID used by its value handles
     366              :         // within the output sstable.
     367              :         currReferences []pendingReference
     368              :         // totalValueSize is the sum of currReferenceValueSizes.
     369              :         //
     370              :         // INVARIANT: totalValueSize == sum(currReferenceValueSizes)
     371              :         totalValueSize uint64
     372              : }
     373              : 
     374              : type pendingReference struct {
     375              :         blobFileID base.BlobFileID
     376              :         valueSize  uint64
     377              : }
     378              : 
     379              : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
     380              : // interface.
     381              : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
     382              : 
     383              : // SetNextOutputConfig implements the ValueSeparation interface.
     384            0 : func (vs *preserveBlobReferences) SetNextOutputConfig(config compact.ValueSeparationOutputConfig) {}
     385              : 
     386              : // EstimatedFileSize returns an estimate of the disk space consumed by the current
     387              : // blob file if it were closed now.
     388            1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
     389            1 :         return 0
     390            1 : }
     391              : 
     392              : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
     393              : // current output sstable's blob references so far.
     394            1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
     395            1 :         // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
     396            1 :         // compressible data, it overestimates the disk space consumed by the blob
     397            1 :         // references. It also does not include the blob file's index block or
     398            1 :         // footer, so it can underestimate if values are completely incompressible.
     399            1 :         //
     400            1 :         // Should we compute a compression ratio per blob file and scale the
     401            1 :         // references appropriately?
     402            1 :         return vs.totalValueSize
     403            1 : }
     404              : 
     405              : // Add implements compact.ValueSeparation. This implementation will write
     406              : // existing blob references to the output table.
     407              : func (vs *preserveBlobReferences) Add(
     408              :         tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
     409            1 : ) error {
     410            1 :         if !kv.V.IsBlobValueHandle() {
     411            1 :                 // If the value is not already a blob handle (either it's in-place or in
     412            1 :                 // a value block), we retrieve the value and write it through Add. The
     413            1 :                 // sstable writer may still decide to put the value in a value block,
     414            1 :                 // but regardless the value will be written to the sstable itself and
     415            1 :                 // not a blob file.
     416            1 :                 v, callerOwned, err := kv.Value(vs.buf)
     417            1 :                 if err != nil {
     418            0 :                         return err
     419            0 :                 }
     420            1 :                 if callerOwned {
     421            0 :                         vs.buf = v[:0]
     422            0 :                 }
     423            1 :                 return tw.Add(kv.K, v, forceObsolete)
     424              :         }
     425              : 
     426              :         // The value is an existing blob handle. We can copy it into the output
     427              :         // sstable, taking note of the reference for the table metadata.
     428            1 :         lv := kv.V.LazyValue()
     429            1 :         fileID := lv.Fetcher.BlobFileID
     430            1 : 
     431            1 :         var refID blob.ReferenceID
     432            1 :         if refIdx := slices.IndexFunc(vs.currReferences, func(ref pendingReference) bool {
     433            1 :                 return ref.blobFileID == fileID
     434            1 :         }); refIdx != -1 {
     435            1 :                 refID = blob.ReferenceID(refIdx)
     436            1 :         } else {
     437            1 :                 refID = blob.ReferenceID(len(vs.currReferences))
     438            1 :                 vs.currReferences = append(vs.currReferences, pendingReference{
     439            1 :                         blobFileID: fileID,
     440            1 :                         valueSize:  0,
     441            1 :                 })
     442            1 :         }
     443              : 
     444            1 :         if invariants.Enabled && vs.currReferences[refID].blobFileID != fileID {
     445            0 :                 panic("wrong reference index")
     446              :         }
     447              : 
     448            1 :         handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
     449            1 :         inlineHandle := blob.InlineHandle{
     450            1 :                 InlineHandlePreface: blob.InlineHandlePreface{
     451            1 :                         ReferenceID: refID,
     452            1 :                         ValueLen:    lv.Fetcher.Attribute.ValueLen,
     453            1 :                 },
     454            1 :                 HandleSuffix: handleSuffix,
     455            1 :         }
     456            1 :         err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
     457            1 :         if err != nil {
     458            0 :                 return err
     459            0 :         }
     460            1 :         vs.currReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     461            1 :         vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
     462            1 :         return nil
     463              : }
     464              : 
     465              : // FinishOutput implements compact.ValueSeparation.
     466            1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
     467            1 :         if invariants.Enabled {
     468            1 :                 // Assert that the incrementally-maintained totalValueSize matches the
     469            1 :                 // sum of all the reference value sizes.
     470            1 :                 totalValueSize := uint64(0)
     471            1 :                 for _, ref := range vs.currReferences {
     472            1 :                         totalValueSize += ref.valueSize
     473            1 :                 }
     474            1 :                 if totalValueSize != vs.totalValueSize {
     475            0 :                         return compact.ValueSeparationMetadata{},
     476            0 :                                 errors.AssertionFailedf("totalValueSize mismatch: %d != %d", totalValueSize, vs.totalValueSize)
     477            0 :                 }
     478              :         }
     479              : 
     480            1 :         references := make(manifest.BlobReferences, len(vs.currReferences))
     481            1 :         for i := range vs.currReferences {
     482            1 :                 ref := vs.currReferences[i]
     483            1 :                 phys, ok := vs.inputBlobPhysicalFiles[ref.blobFileID]
     484            1 :                 if !ok {
     485            0 :                         return compact.ValueSeparationMetadata{},
     486            0 :                                 errors.AssertionFailedf("pebble: blob file %s not found among input sstables", ref.blobFileID)
     487            0 :                 }
     488            1 :                 references[i] = manifest.MakeBlobReference(ref.blobFileID, ref.valueSize, phys)
     489              :         }
     490            1 :         referenceSize := vs.totalValueSize
     491            1 :         vs.currReferences = vs.currReferences[:0]
     492            1 :         vs.totalValueSize = 0
     493            1 :         return compact.ValueSeparationMetadata{
     494            1 :                 BlobReferences:    references,
     495            1 :                 BlobReferenceSize: referenceSize,
     496            1 :                 // The outputBlobReferenceDepth is computed from the input sstables,
     497            1 :                 // reflecting the worst-case overlap of referenced blob files. If this
     498            1 :                 // sstable references fewer unique blob files, reduce its depth to the
     499            1 :                 // count of unique files.
     500            1 :                 BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
     501            1 :         }, nil
     502              : }
        

Generated by: LCOV version 2.0-1