Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "github.com/cockroachdb/errors"
9 : "github.com/cockroachdb/pebble/internal/base"
10 : "github.com/cockroachdb/pebble/internal/manifest"
11 : "github.com/cockroachdb/pebble/objstorage"
12 : "github.com/cockroachdb/pebble/valsep"
13 : "github.com/cockroachdb/redact"
14 : )
15 :
16 1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction) valsep.ValueSeparation {
17 1 : return valsep.NeverSeparateValues{}
18 1 : }
19 :
20 : // determineCompactionValueSeparation determines whether a compaction should
21 : // separate values into blob files. It returns a valsep.ValueSeparation
22 : // implementation that should be used for the compaction.
23 : //
24 : // It assumes that the compaction will write tables at d.TableFormat() or above.
25 : func (d *DB) determineCompactionValueSeparation(
26 : jobID JobID, c *tableCompaction,
27 1 : ) valsep.ValueSeparation {
28 1 : if d.FormatMajorVersion() < FormatValueSeparation ||
29 1 : d.opts.Experimental.ValueSeparationPolicy == nil {
30 1 : return valsep.NeverSeparateValues{}
31 1 : }
32 1 : policy := d.opts.Experimental.ValueSeparationPolicy()
33 1 : if !policy.Enabled {
34 1 : return valsep.NeverSeparateValues{}
35 1 : }
36 :
37 : // We're allowed to write blob references. Determine whether we should carry
38 : // forward existing blob references, or write new ones.
39 :
40 1 : var blobFileSet map[base.BlobFileID]*manifest.PhysicalBlobFile
41 1 : if c.version != nil {
42 1 : // For flushes, c.version is nil.
43 1 : blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
44 1 : }
45 1 : if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, d.opts.Experimental.SpanPolicyFunc, d.cmp); !writeBlobs {
46 1 : // This compaction should preserve existing blob references.
47 1 : return valsep.NewPreserveAllHotBlobReferences(
48 1 : blobFileSet,
49 1 : outputBlobReferenceDepth,
50 1 : policy.MinimumSize,
51 1 : policy.MinimumMVCCGarbageSize,
52 1 : )
53 1 : }
54 :
55 : // This compaction should write values to new blob files.
56 1 : return valsep.NewWriteNewBlobFiles(
57 1 : d.opts.Comparer,
58 1 : func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
59 1 : return d.newCompactionOutputBlob(
60 1 : jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
61 1 : },
62 : d.makeBlobWriterOptions(c.outputLevel.level),
63 : policy.MinimumSize,
64 : policy.MinimumMVCCGarbageSize,
65 : valsep.WriteNewBlobFilesOptions{
66 : InputBlobPhysicalFiles: blobFileSet,
67 : ShortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
68 0 : InvalidValueCallback: func(userKey []byte, value []byte, err error) {
69 0 : // The value may not be safe, so it will be redacted when redaction
70 0 : // is enabled.
71 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
72 0 : Kind: InvalidValue,
73 0 : UserKey: userKey,
74 0 : ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
75 0 : value, err),
76 0 : })
77 0 : },
78 : },
79 : )
80 : }
81 :
82 : // shouldWriteBlobFiles returns true if the compaction should write new blob
83 : // files. If it returns false, the referenceDepth return value contains the
84 : // maximum blob reference depth to assign to output sstables (the actual value
85 : // may be lower iff the output table references fewer distinct blob files).
86 : func shouldWriteBlobFiles(
87 : c *tableCompaction, policy ValueSeparationPolicy, spanPolicyFunc SpanPolicyFunc, cmp Compare,
88 1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
89 1 : // Flushes will have no existing references to blob files and should write
90 1 : // their values to new blob files.
91 1 : if c.kind == compactionKindFlush {
92 1 : return true, 0
93 1 : }
94 :
95 1 : inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
96 1 :
97 1 : if c.kind == compactionKindVirtualRewrite {
98 1 : // A virtual rewrite is a compaction that just materializes a
99 1 : // virtual table. No new blob files should be written, and the
100 1 : // reference depth is unchanged.
101 1 : return false, inputReferenceDepth
102 1 : }
103 :
104 1 : if inputReferenceDepth == 0 {
105 1 : // None of the input sstables reference blob files. It may be the case
106 1 : // that these sstables were created before value separation was enabled.
107 1 : // We should try to write to new blob files.
108 1 : return true, 0
109 1 : }
110 :
111 : // If the compaction's output blob reference depth would be greater than the
112 : // configured max, we should rewrite the values into new blob files to
113 : // restore locality.
114 1 : if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
115 1 : return true, 0
116 1 : }
117 : // Compare policies used by each input file. If all input files have the
118 : // same policy characteristics as the current one, then we can preserve
119 : // existing blob references. This ensures that tables that were not written
120 : // with value separation enabled will have their values written to new blob files.
121 1 : for _, level := range c.inputs {
122 1 : for t := range level.files.All() {
123 1 : backingProps, backingPropsValid := t.TableBacking.Properties()
124 1 : if !backingPropsValid {
125 1 : continue
126 : }
127 :
128 : // Set expected policy to global policy values to start, and
129 : // extract the expected values from the span policy.
130 1 : expectedMinSize := policy.MinimumSize
131 1 : expectedValSepBySuffixDisabled := false
132 1 : bounds := t.UserKeyBounds()
133 1 : spanPolicy, spanPolicyEndKey, err := spanPolicyFunc(bounds.Start)
134 1 : // For now, if we can't determine the span policy, we should just assume
135 1 : // the default policy is in effect for this table.
136 1 : if err == nil {
137 1 : if len(spanPolicyEndKey) > 0 && cmp(bounds.End.Key, spanPolicyEndKey) >= 0 {
138 0 : // The table's key range now uses multiple span policies. Rewrite to new
139 0 : // blob files so values are stored according to the current policy.
140 0 : return true, 0
141 0 : }
142 1 : if spanPolicy.ValueStoragePolicy.DisableBlobSeparation {
143 0 : expectedMinSize = 0
144 1 : } else if spanPolicy.ValueStoragePolicy.ContainsOverrides() {
145 0 : expectedMinSize = spanPolicy.ValueStoragePolicy.OverrideBlobSeparationMinimumSize
146 0 : if expectedMinSize == 0 {
147 0 : // A 0 minimum value size on the span policy indicates the field
148 0 : // was unset, but other parts of value separation are being
149 0 : // overridden. Use the default min size.
150 0 : expectedMinSize = policy.MinimumSize
151 0 : }
152 0 : expectedValSepBySuffixDisabled = spanPolicy.ValueStoragePolicy.DisableSeparationBySuffix
153 : }
154 : }
155 :
156 1 : if int(backingProps.ValueSeparationMinSize) != expectedMinSize ||
157 1 : (expectedMinSize > 0 && backingProps.ValueSeparationBySuffixDisabled != expectedValSepBySuffixDisabled) {
158 1 : return true, 0
159 1 : }
160 : }
161 : }
162 :
163 : // Otherwise, we won't write any new blob files but will carry forward
164 : // existing references.
165 1 : return false, inputReferenceDepth
166 : }
167 :
168 : // compactionBlobReferenceDepth computes the blob reference depth for a
169 : // compaction. It's computed by finding the maximum blob reference depth of
170 : // input sstables in each level. These per-level depths are then summed to
171 : // produce a worst-case approximation of the blob reference locality of the
172 : // compaction's output sstables.
173 : //
174 : // The intuition is that as compactions combine files referencing distinct blob
175 : // files, outputted sstables begin to reference more and more distinct blob
176 : // files. In the worst case, these references are evenly distributed across the
177 : // keyspace and there is very little locality.
178 1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
179 1 : // TODO(jackson): Consider using a range tree to precisely compute the
180 1 : // depth. This would require maintaining minimum and maximum keys.
181 1 : var depth manifest.BlobReferenceDepth
182 1 : for _, level := range levels {
183 1 : // L0 allows files to overlap one another, so it's not sufficient to
184 1 : // just take the maximum within the level. Instead, we need to sum the
185 1 : // max of each sublevel.
186 1 : //
187 1 : // TODO(jackson): This and other compaction logic would likely be
188 1 : // cleaner if we modeled each sublevel as its own `compactionLevel`.
189 1 : if level.level == 0 {
190 1 : for _, sublevel := range level.l0SublevelInfo {
191 1 : var sublevelDepth int
192 1 : for t := range sublevel.LevelSlice.All() {
193 1 : sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
194 1 : }
195 1 : depth += manifest.BlobReferenceDepth(sublevelDepth)
196 : }
197 1 : continue
198 : }
199 :
200 1 : var levelDepth manifest.BlobReferenceDepth
201 1 : for t := range level.files.All() {
202 1 : levelDepth = max(levelDepth, t.BlobReferenceDepth)
203 1 : }
204 1 : depth += levelDepth
205 : }
206 1 : return depth
207 : }
208 :
209 : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
210 : // objects referenced by tables in levels.
211 : func uniqueInputBlobMetadatas(
212 : blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
213 1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
214 1 : m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
215 1 : for _, level := range levels {
216 1 : for t := range level.files.All() {
217 1 : for _, ref := range t.BlobReferences {
218 1 : if _, ok := m[ref.FileID]; ok {
219 1 : continue
220 : }
221 1 : phys, ok := blobFileSet.LookupPhysical(ref.FileID)
222 1 : if !ok {
223 0 : panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
224 : }
225 1 : m[ref.FileID] = phys
226 : }
227 : }
228 : }
229 1 : return m
230 : }
|