LCOV - code coverage report
Current view: top level - pebble - compaction_value_separation.go (source / functions) Coverage Total Hit
Test: 2025-11-23 08:19Z d7ce913e - meta test only.lcov Lines: 82.2 % 135 111
Test Date: 2025-11-23 08:20:36 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "github.com/cockroachdb/errors"
       9              :         "github.com/cockroachdb/pebble/internal/base"
      10              :         "github.com/cockroachdb/pebble/internal/manifest"
      11              :         "github.com/cockroachdb/pebble/objstorage"
      12              :         "github.com/cockroachdb/pebble/valsep"
      13              :         "github.com/cockroachdb/redact"
      14              : )
      15              : 
      16            1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction) valsep.ValueSeparation {
      17            1 :         return valsep.NeverSeparateValues{}
      18            1 : }
      19              : 
      20              : // determineCompactionValueSeparation determines whether a compaction should
      21              : // separate values into blob files. It returns a valsep.ValueSeparation
      22              : // implementation that should be used for the compaction.
      23              : //
      24              : // It assumes that the compaction will write tables at d.TableFormat() or above.
      25              : func (d *DB) determineCompactionValueSeparation(
      26              :         jobID JobID, c *tableCompaction,
      27            1 : ) valsep.ValueSeparation {
      28            1 :         if d.FormatMajorVersion() < FormatValueSeparation ||
      29            1 :                 d.opts.Experimental.ValueSeparationPolicy == nil {
      30            1 :                 return valsep.NeverSeparateValues{}
      31            1 :         }
      32            1 :         policy := d.opts.Experimental.ValueSeparationPolicy()
      33            1 :         if !policy.Enabled {
      34            1 :                 return valsep.NeverSeparateValues{}
      35            1 :         }
      36              : 
      37              :         // We're allowed to write blob references. Determine whether we should carry
      38              :         // forward existing blob references, or write new ones.
      39              : 
      40            1 :         var blobFileSet map[base.BlobFileID]*manifest.PhysicalBlobFile
      41            1 :         if c.version != nil {
      42            1 :                 // For flushes, c.version is nil.
      43            1 :                 blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
      44            1 :         }
      45            1 :         if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, d.opts.Experimental.SpanPolicyFunc, d.cmp); !writeBlobs {
      46            1 :                 // This compaction should preserve existing blob references.
      47            1 :                 return valsep.NewPreserveAllHotBlobReferences(
      48            1 :                         blobFileSet,
      49            1 :                         outputBlobReferenceDepth,
      50            1 :                         policy.MinimumSize,
      51            1 :                         policy.MinimumMVCCGarbageSize,
      52            1 :                 )
      53            1 :         }
      54              : 
      55              :         // This compaction should write values to new blob files.
      56            1 :         return valsep.NewWriteNewBlobFiles(
      57            1 :                 d.opts.Comparer,
      58            1 :                 func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
      59            1 :                         return d.newCompactionOutputBlob(
      60            1 :                                 jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
      61            1 :                 },
      62              :                 d.makeBlobWriterOptions(c.outputLevel.level),
      63              :                 policy.MinimumSize,
      64              :                 policy.MinimumMVCCGarbageSize,
      65              :                 valsep.WriteNewBlobFilesOptions{
      66              :                         InputBlobPhysicalFiles: blobFileSet,
      67              :                         ShortAttrExtractor:     d.opts.Experimental.ShortAttributeExtractor,
      68            0 :                         InvalidValueCallback: func(userKey []byte, value []byte, err error) {
      69            0 :                                 // The value may not be safe, so it will be redacted when redaction
      70            0 :                                 // is enabled.
      71            0 :                                 d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
      72            0 :                                         Kind:    InvalidValue,
      73            0 :                                         UserKey: userKey,
      74            0 :                                         ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
      75            0 :                                                 value, err),
      76            0 :                                 })
      77            0 :                         },
      78              :                 },
      79              :         )
      80              : }
      81              : 
      82              : // shouldWriteBlobFiles returns true if the compaction should write new blob
      83              : // files. If it returns false, the referenceDepth return value contains the
      84              : // maximum blob reference depth to assign to output sstables (the actual value
      85              : // may be lower iff the output table references fewer distinct blob files).
      86              : func shouldWriteBlobFiles(
      87              :         c *tableCompaction, policy ValueSeparationPolicy, spanPolicyFunc SpanPolicyFunc, cmp Compare,
      88            1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
      89            1 :         // Flushes will have no existing references to blob files and should write
      90            1 :         // their values to new blob files.
      91            1 :         if c.kind == compactionKindFlush {
      92            1 :                 return true, 0
      93            1 :         }
      94              : 
      95            1 :         inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
      96            1 : 
      97            1 :         if c.kind == compactionKindVirtualRewrite {
      98            1 :                 // A virtual rewrite is a compaction that just materializes a
      99            1 :                 // virtual table. No new blob files should be written, and the
     100            1 :                 // reference depth is unchanged.
     101            1 :                 return false, inputReferenceDepth
     102            1 :         }
     103              : 
     104            1 :         if inputReferenceDepth == 0 {
     105            1 :                 // None of the input sstables reference blob files. It may be the case
     106            1 :                 // that these sstables were created before value separation was enabled.
     107            1 :                 // We should try to write to new blob files.
     108            1 :                 return true, 0
     109            1 :         }
     110              : 
     111              :         // If the compaction's output blob reference depth would be greater than the
     112              :         // configured max, we should rewrite the values into new blob files to
     113              :         // restore locality.
     114            1 :         if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
     115            1 :                 return true, 0
     116            1 :         }
     117              :         // Compare policies used by each input file. If all input files have the
     118              :         // same policy characteristics as the current one, then we can preserve
     119              :         // existing blob references. This ensures that tables that were not written
     120              :         // with value separation enabled will have their values written to new blob files.
     121            1 :         for _, level := range c.inputs {
     122            1 :                 for t := range level.files.All() {
     123            1 :                         backingProps, backingPropsValid := t.TableBacking.Properties()
     124            1 :                         if !backingPropsValid {
     125            1 :                                 continue
     126              :                         }
     127              : 
     128              :                         // Set expected policy to global policy values to start, and
     129              :                         // extract the expected values from the span policy.
     130            1 :                         expectedMinSize := policy.MinimumSize
     131            1 :                         expectedValSepBySuffixDisabled := false
     132            1 :                         bounds := t.UserKeyBounds()
     133            1 :                         spanPolicy, spanPolicyEndKey, err := spanPolicyFunc(bounds.Start)
     134            1 :                         // For now, if we can't determine the span policy, we should just assume
     135            1 :                         // the default policy is in effect for this table.
     136            1 :                         if err == nil {
     137            1 :                                 if len(spanPolicyEndKey) > 0 && cmp(bounds.End.Key, spanPolicyEndKey) >= 0 {
     138            0 :                                         // The table's key range now uses multiple span policies. Rewrite to new
     139            0 :                                         // blob files so values are stored according to the current policy.
     140            0 :                                         return true, 0
     141            0 :                                 }
     142            1 :                                 if spanPolicy.ValueStoragePolicy.DisableBlobSeparation {
     143            0 :                                         expectedMinSize = 0
     144            1 :                                 } else if spanPolicy.ValueStoragePolicy.ContainsOverrides() {
     145            0 :                                         expectedMinSize = spanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize
     146            0 :                                         if expectedMinSize == 0 {
     147            0 :                                                 // A 0 minimum value size on the span policy indicates the field
     148            0 :                                                 // was unset, but other parts of value separation are being
     149            0 :                                                 // overridden. Use the default min size.
     150            0 :                                                 expectedMinSize = policy.MinimumSize
     151            0 :                                         }
     152            0 :                                         expectedValSepBySuffixDisabled = spanPolicy.ValueStoragePolicy.DisableSeparationBySuffix
     153              :                                 }
     154              :                         }
     155              : 
     156            1 :                         if int(backingProps.ValueSeparationMinSize) != expectedMinSize ||
     157            1 :                                 (expectedMinSize > 0 && backingProps.ValueSeparationBySuffixDisabled != expectedValSepBySuffixDisabled) {
     158            1 :                                 return true, 0
     159            1 :                         }
     160              :                 }
     161              :         }
     162              : 
     163              :         // Otherwise, we won't write any new blob files but will carry forward
     164              :         // existing references.
     165            1 :         return false, inputReferenceDepth
     166              : }
     167              : 
     168              : // compactionBlobReferenceDepth computes the blob reference depth for a
     169              : // compaction. It's computed by finding the maximum blob reference depth of
     170              : // input sstables in each level. These per-level depths are then summed to
     171              : // produce a worst-case approximation of the blob reference locality of the
     172              : // compaction's output sstables.
     173              : //
     174              : // The intuition is that as compactions combine files referencing distinct blob
     175              : // files, outputted sstables begin to reference more and more distinct blob
     176              : // files. In the worst case, these references are evenly distributed across the
     177              : // keyspace and there is very little locality.
     178            1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
     179            1 :         // TODO(jackson): Consider using a range tree to precisely compute the
     180            1 :         // depth. This would require maintaining minimum and maximum keys.
     181            1 :         var depth manifest.BlobReferenceDepth
     182            1 :         for _, level := range levels {
     183            1 :                 // L0 allows files to overlap one another, so it's not sufficient to
     184            1 :                 // just take the maximum within the level. Instead, we need to sum the
     185            1 :                 // max of each sublevel.
     186            1 :                 //
     187            1 :                 // TODO(jackson): This and other compaction logic would likely be
     188            1 :                 // cleaner if we modeled each sublevel as its own `compactionLevel`.
     189            1 :                 if level.level == 0 {
     190            1 :                         for _, sublevel := range level.l0SublevelInfo {
     191            1 :                                 var sublevelDepth int
     192            1 :                                 for t := range sublevel.LevelSlice.All() {
     193            1 :                                         sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
     194            1 :                                 }
     195            1 :                                 depth += manifest.BlobReferenceDepth(sublevelDepth)
     196              :                         }
     197            1 :                         continue
     198              :                 }
     199              : 
     200            1 :                 var levelDepth manifest.BlobReferenceDepth
     201            1 :                 for t := range level.files.All() {
     202            1 :                         levelDepth = max(levelDepth, t.BlobReferenceDepth)
     203            1 :                 }
     204            1 :                 depth += levelDepth
     205              :         }
     206            1 :         return depth
     207              : }
     208              : 
     209              : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
     210              : // objects referenced by tables in levels.
     211              : func uniqueInputBlobMetadatas(
     212              :         blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
     213            1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
     214            1 :         m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
     215            1 :         for _, level := range levels {
     216            1 :                 for t := range level.files.All() {
     217            1 :                         for _, ref := range t.BlobReferences {
     218            1 :                                 if _, ok := m[ref.FileID]; ok {
     219            1 :                                         continue
     220              :                                 }
     221            1 :                                 phys, ok := blobFileSet.LookupPhysical(ref.FileID)
     222            1 :                                 if !ok {
     223            0 :                                         panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
     224              :                                 }
     225            1 :                                 m[ref.FileID] = phys
     226              :                         }
     227              :                 }
     228              :         }
     229            1 :         return m
     230              : }
        

Generated by: LCOV version 2.0-1