LCOV - code coverage report
Current view: top level - pebble - compaction_value_separation.go (source / functions) Coverage Total Hit
Test: 2025-10-29 08:19Z 15d065ba - meta test only.lcov Lines: 86.0 % 121 104
Test Date: 2025-10-29 08:21:03 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "github.com/cockroachdb/errors"
       9              :         "github.com/cockroachdb/pebble/internal/base"
      10              :         "github.com/cockroachdb/pebble/internal/manifest"
      11              :         "github.com/cockroachdb/pebble/objstorage"
      12              :         "github.com/cockroachdb/pebble/sstable"
      13              :         "github.com/cockroachdb/pebble/valsep"
      14              :         "github.com/cockroachdb/redact"
      15              : )
      16              : 
      17              : // latencyTolerantMinimumSize is the minimum size, in bytes, of a value that
      18              : // will be separated into a blob file when the value storage policy is
      19              : // ValueStorageLatencyTolerant.
      20              : const latencyTolerantMinimumSize = 10
      21              : 
      22            1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
      23            1 :         return valsep.NeverSeparateValues{}
      24            1 : }
      25              : 
      26              : // determineCompactionValueSeparation determines whether a compaction should
      27              : // separate values into blob files. It returns a valsep.ValueSeparation
      28              : // implementation that should be used for the compaction.
      29              : //
      30              : // It assumes that the compaction will write tables at d.TableFormat() or above.
      31              : func (d *DB) determineCompactionValueSeparation(
      32              :         jobID JobID, c *tableCompaction, valueStorage ValueStoragePolicy,
      33            1 : ) valsep.ValueSeparation {
      34            1 :         if d.FormatMajorVersion() < FormatValueSeparation ||
      35            1 :                 d.opts.Experimental.ValueSeparationPolicy == nil {
      36            1 :                 return valsep.NeverSeparateValues{}
      37            1 :         }
      38            1 :         policy := d.opts.Experimental.ValueSeparationPolicy()
      39            1 :         if !policy.Enabled {
      40            1 :                 return valsep.NeverSeparateValues{}
      41            1 :         }
      42              : 
      43              :         // We're allowed to write blob references. Determine whether we should carry
      44              :         // forward existing blob references, or write new ones.
      45              : 
      46            1 :         var blobFileSet map[base.BlobFileID]*manifest.PhysicalBlobFile
      47            1 :         if c.version != nil {
      48            1 :                 // For flushes, c.version is nil.
      49            1 :                 blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
      50            1 :         }
      51            1 :         minSize := policy.MinimumSize
      52            1 :         switch valueStorage {
      53            0 :         case ValueStorageLowReadLatency:
      54            0 :                 return valsep.NeverSeparateValues{}
      55            0 :         case ValueStorageLatencyTolerant:
      56            0 :                 minSize = latencyTolerantMinimumSize
      57            1 :         default:
      58              :         }
      59            1 :         if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, uint64(minSize)); !writeBlobs {
      60            1 :                 // This compaction should preserve existing blob references.
      61            1 :                 kind := sstable.ValueSeparationDefault
      62            1 :                 if valueStorage != ValueStorageDefault {
      63            0 :                         kind = sstable.ValueSeparationSpanPolicy
      64            0 :                 }
      65            1 :                 return valsep.NewPreserveAllHotBlobReferences(
      66            1 :                         blobFileSet,
      67            1 :                         outputBlobReferenceDepth,
      68            1 :                         kind,
      69            1 :                         minSize,
      70            1 :                 )
      71              :         }
      72              : 
      73              :         // This compaction should write values to new blob files.
      74            1 :         return valsep.NewWriteNewBlobFiles(
      75            1 :                 d.opts.Comparer,
      76            1 :                 func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
      77            1 :                         return d.newCompactionOutputBlob(
      78            1 :                                 jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
      79            1 :                 },
      80              :                 d.makeBlobWriterOptions(c.outputLevel.level),
      81              :                 policy.MinimumSize,
      82              :                 valsep.WriteNewBlobFilesOptions{
      83              :                         InputBlobPhysicalFiles: blobFileSet,
      84              :                         ShortAttrExtractor:     d.opts.Experimental.ShortAttributeExtractor,
      85            0 :                         InvalidValueCallback: func(userKey []byte, value []byte, err error) {
      86            0 :                                 // The value may not be safe, so it will be redacted when redaction
      87            0 :                                 // is enabled.
      88            0 :                                 d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
      89            0 :                                         Kind:    InvalidValue,
      90            0 :                                         UserKey: userKey,
      91            0 :                                         ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
      92            0 :                                                 value, err),
      93            0 :                                 })
      94            0 :                         },
      95              :                 },
      96              :         )
      97              : }
      98              : 
      99              : // shouldWriteBlobFiles returns true if the compaction should write new blob
     100              : // files. If it returns false, the referenceDepth return value contains the
     101              : // maximum blob reference depth to assign to output sstables (the actual value
     102              : // may be lower iff the output table references fewer distinct blob files).
     103              : func shouldWriteBlobFiles(
     104              :         c *tableCompaction, policy ValueSeparationPolicy, minimumValueSizeForCompaction uint64,
     105            1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
     106            1 :         // Flushes will have no existing references to blob files and should write
     107            1 :         // their values to new blob files.
     108            1 :         if c.kind == compactionKindFlush {
     109            1 :                 return true, 0
     110            1 :         }
     111              : 
     112            1 :         inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
     113            1 : 
     114            1 :         if c.kind == compactionKindVirtualRewrite {
     115            1 :                 // A virtual rewrite is a compaction that just materializes a
     116            1 :                 // virtual table. No new blob files should be written, and the
     117            1 :                 // reference depth is unchanged.
     118            1 :                 return false, inputReferenceDepth
     119            1 :         }
     120              : 
     121            1 :         if inputReferenceDepth == 0 {
     122            1 :                 // None of the input sstables reference blob files. It may be the case
     123            1 :                 // that these sstables were created before value separation was enabled.
     124            1 :                 // We should try to write to new blob files.
     125            1 :                 return true, 0
     126            1 :         }
     127              :         // If the compaction's output blob reference depth would be greater than the
     128              :         // configured max, we should rewrite the values into new blob files to
     129              :         // restore locality.
     130            1 :         if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
     131            1 :                 return true, 0
     132            1 :         }
     133              :         // Compare policies used by each input file. If all input files have the
     134              :         // same policy characteristics as the current one, then we can preserve
     135              :         // existing blob references. This ensures that tables that were not written
     136              :         // with value separation enabled will have their values written to new blob files.
     137            1 :         for _, level := range c.inputs {
     138            1 :                 for t := range level.files.All() {
     139            1 :                         backingProps, backingPropsValid := t.TableBacking.Properties()
     140            1 :                         if !backingPropsValid {
     141            1 :                                 continue
     142              :                         }
     143            1 :                         if backingProps.ValueSeparationMinSize != minimumValueSizeForCompaction {
     144            1 :                                 return true, 0
     145            1 :                         }
     146              :                 }
     147              :         }
     148              : 
     149              :         // Otherwise, we won't write any new blob files but will carry forward
     150              :         // existing references.
     151            1 :         return false, inputReferenceDepth
     152              : }
     153              : 
     154              : // compactionBlobReferenceDepth computes the blob reference depth for a
     155              : // compaction. It's computed by finding the maximum blob reference depth of
     156              : // input sstables in each level. These per-level depths are then summed to
     157              : // produce a worst-case approximation of the blob reference locality of the
     158              : // compaction's output sstables.
     159              : //
     160              : // The intuition is that as compactions combine files referencing distinct blob
     161              : // files, outputted sstables begin to reference more and more distinct blob
     162              : // files. In the worst case, these references are evenly distributed across the
     163              : // keyspace and there is very little locality.
     164            1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
     165            1 :         // TODO(jackson): Consider using a range tree to precisely compute the
     166            1 :         // depth. This would require maintaining minimum and maximum keys.
     167            1 :         var depth manifest.BlobReferenceDepth
     168            1 :         for _, level := range levels {
     169            1 :                 // L0 allows files to overlap one another, so it's not sufficient to
     170            1 :                 // just take the maximum within the level. Instead, we need to sum the
     171            1 :                 // max of each sublevel.
     172            1 :                 //
     173            1 :                 // TODO(jackson): This and other compaction logic would likely be
     174            1 :                 // cleaner if we modeled each sublevel as its own `compactionLevel`.
     175            1 :                 if level.level == 0 {
     176            1 :                         for _, sublevel := range level.l0SublevelInfo {
     177            1 :                                 var sublevelDepth int
     178            1 :                                 for t := range sublevel.LevelSlice.All() {
     179            1 :                                         sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
     180            1 :                                 }
     181            1 :                                 depth += manifest.BlobReferenceDepth(sublevelDepth)
     182              :                         }
     183            1 :                         continue
     184              :                 }
     185              : 
     186            1 :                 var levelDepth manifest.BlobReferenceDepth
     187            1 :                 for t := range level.files.All() {
     188            1 :                         levelDepth = max(levelDepth, t.BlobReferenceDepth)
     189            1 :                 }
     190            1 :                 depth += levelDepth
     191              :         }
     192            1 :         return depth
     193              : }
     194              : 
     195              : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
     196              : // objects referenced by tables in levels.
     197              : func uniqueInputBlobMetadatas(
     198              :         blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
     199            1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
     200            1 :         m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
     201            1 :         for _, level := range levels {
     202            1 :                 for t := range level.files.All() {
     203            1 :                         for _, ref := range t.BlobReferences {
     204            1 :                                 if _, ok := m[ref.FileID]; ok {
     205            1 :                                         continue
     206              :                                 }
     207            1 :                                 phys, ok := blobFileSet.LookupPhysical(ref.FileID)
     208            1 :                                 if !ok {
     209            0 :                                         panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
     210              :                                 }
     211            1 :                                 m[ref.FileID] = phys
     212              :                         }
     213              :                 }
     214              :         }
     215            1 :         return m
     216              : }
        

Generated by: LCOV version 2.0-1