Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "github.com/cockroachdb/errors"
9 : "github.com/cockroachdb/pebble/internal/base"
10 : "github.com/cockroachdb/pebble/internal/manifest"
11 : "github.com/cockroachdb/pebble/objstorage"
12 : "github.com/cockroachdb/pebble/sstable"
13 : "github.com/cockroachdb/pebble/valsep"
14 : "github.com/cockroachdb/redact"
15 : )
16 :
17 : // latencyTolerantMinimumSize is the minimum size, in bytes, of a value that
18 : // will be separated into a blob file when the value storage policy is
19 : // ValueStorageLatencyTolerant.
20 : const latencyTolerantMinimumSize = 10
21 :
22 1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, ValueStoragePolicy) valsep.ValueSeparation {
23 1 : return valsep.NeverSeparateValues{}
24 1 : }
25 :
26 : // determineCompactionValueSeparation determines whether a compaction should
27 : // separate values into blob files. It returns a valsep.ValueSeparation
28 : // implementation that should be used for the compaction.
29 : //
30 : // It assumes that the compaction will write tables at d.TableFormat() or above.
31 : func (d *DB) determineCompactionValueSeparation(
32 : jobID JobID, c *tableCompaction, valueStorage ValueStoragePolicy,
33 1 : ) valsep.ValueSeparation {
34 1 : if d.FormatMajorVersion() < FormatValueSeparation ||
35 1 : d.opts.Experimental.ValueSeparationPolicy == nil {
36 1 : return valsep.NeverSeparateValues{}
37 1 : }
38 1 : policy := d.opts.Experimental.ValueSeparationPolicy()
39 1 : if !policy.Enabled {
40 1 : return valsep.NeverSeparateValues{}
41 1 : }
42 :
43 : // We're allowed to write blob references. Determine whether we should carry
44 : // forward existing blob references, or write new ones.
45 :
46 1 : var blobFileSet map[base.BlobFileID]*manifest.PhysicalBlobFile
47 1 : if c.version != nil {
48 1 : // For flushes, c.version is nil.
49 1 : blobFileSet = uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs)
50 1 : }
51 1 : minSize := policy.MinimumSize
52 1 : switch valueStorage {
53 0 : case ValueStorageLowReadLatency:
54 0 : return valsep.NeverSeparateValues{}
55 0 : case ValueStorageLatencyTolerant:
56 0 : minSize = latencyTolerantMinimumSize
57 1 : default:
58 : }
59 1 : if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy, uint64(minSize)); !writeBlobs {
60 1 : // This compaction should preserve existing blob references.
61 1 : kind := sstable.ValueSeparationDefault
62 1 : if valueStorage != ValueStorageDefault {
63 0 : kind = sstable.ValueSeparationSpanPolicy
64 0 : }
65 1 : return valsep.NewPreserveAllHotBlobReferences(
66 1 : blobFileSet,
67 1 : outputBlobReferenceDepth,
68 1 : kind,
69 1 : minSize,
70 1 : )
71 : }
72 :
73 : // This compaction should write values to new blob files.
74 1 : return valsep.NewWriteNewBlobFiles(
75 1 : d.opts.Comparer,
76 1 : func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
77 1 : return d.newCompactionOutputBlob(
78 1 : jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
79 1 : },
80 : d.makeBlobWriterOptions(c.outputLevel.level),
81 : policy.MinimumSize,
82 : valsep.WriteNewBlobFilesOptions{
83 : InputBlobPhysicalFiles: blobFileSet,
84 : ShortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
85 0 : InvalidValueCallback: func(userKey []byte, value []byte, err error) {
86 0 : // The value may not be safe, so it will be redacted when redaction
87 0 : // is enabled.
88 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
89 0 : Kind: InvalidValue,
90 0 : UserKey: userKey,
91 0 : ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
92 0 : value, err),
93 0 : })
94 0 : },
95 : },
96 : )
97 : }
98 :
99 : // shouldWriteBlobFiles returns true if the compaction should write new blob
100 : // files. If it returns false, the referenceDepth return value contains the
101 : // maximum blob reference depth to assign to output sstables (the actual value
102 : // may be lower iff the output table references fewer distinct blob files).
103 : func shouldWriteBlobFiles(
104 : c *tableCompaction, policy ValueSeparationPolicy, minimumValueSizeForCompaction uint64,
105 1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
106 1 : // Flushes will have no existing references to blob files and should write
107 1 : // their values to new blob files.
108 1 : if c.kind == compactionKindFlush {
109 1 : return true, 0
110 1 : }
111 :
112 1 : inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
113 1 :
114 1 : if c.kind == compactionKindVirtualRewrite {
115 1 : // A virtual rewrite is a compaction that just materializes a
116 1 : // virtual table. No new blob files should be written, and the
117 1 : // reference depth is unchanged.
118 1 : return false, inputReferenceDepth
119 1 : }
120 :
121 1 : if inputReferenceDepth == 0 {
122 1 : // None of the input sstables reference blob files. It may be the case
123 1 : // that these sstables were created before value separation was enabled.
124 1 : // We should try to write to new blob files.
125 1 : return true, 0
126 1 : }
127 : // If the compaction's output blob reference depth would be greater than the
128 : // configured max, we should rewrite the values into new blob files to
129 : // restore locality.
130 1 : if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
131 1 : return true, 0
132 1 : }
133 : // Compare policies used by each input file. If all input files have the
134 : // same policy characteristics as the current one, then we can preserve
135 : // existing blob references. This ensures that tables that were not written
136 : // with value separation enabled will have their values written to new blob files.
137 1 : for _, level := range c.inputs {
138 1 : for t := range level.files.All() {
139 1 : backingProps, backingPropsValid := t.TableBacking.Properties()
140 1 : if !backingPropsValid {
141 1 : continue
142 : }
143 1 : if backingProps.ValueSeparationMinSize != minimumValueSizeForCompaction {
144 1 : return true, 0
145 1 : }
146 : }
147 : }
148 :
149 : // Otherwise, we won't write any new blob files but will carry forward
150 : // existing references.
151 1 : return false, inputReferenceDepth
152 : }
153 :
154 : // compactionBlobReferenceDepth computes the blob reference depth for a
155 : // compaction. It's computed by finding the maximum blob reference depth of
156 : // input sstables in each level. These per-level depths are then summed to
157 : // produce a worst-case approximation of the blob reference locality of the
158 : // compaction's output sstables.
159 : //
160 : // The intuition is that as compactions combine files referencing distinct blob
161 : // files, outputted sstables begin to reference more and more distinct blob
162 : // files. In the worst case, these references are evenly distributed across the
163 : // keyspace and there is very little locality.
164 1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
165 1 : // TODO(jackson): Consider using a range tree to precisely compute the
166 1 : // depth. This would require maintaining minimum and maximum keys.
167 1 : var depth manifest.BlobReferenceDepth
168 1 : for _, level := range levels {
169 1 : // L0 allows files to overlap one another, so it's not sufficient to
170 1 : // just take the maximum within the level. Instead, we need to sum the
171 1 : // max of each sublevel.
172 1 : //
173 1 : // TODO(jackson): This and other compaction logic would likely be
174 1 : // cleaner if we modeled each sublevel as its own `compactionLevel`.
175 1 : if level.level == 0 {
176 1 : for _, sublevel := range level.l0SublevelInfo {
177 1 : var sublevelDepth int
178 1 : for t := range sublevel.LevelSlice.All() {
179 1 : sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
180 1 : }
181 1 : depth += manifest.BlobReferenceDepth(sublevelDepth)
182 : }
183 1 : continue
184 : }
185 :
186 1 : var levelDepth manifest.BlobReferenceDepth
187 1 : for t := range level.files.All() {
188 1 : levelDepth = max(levelDepth, t.BlobReferenceDepth)
189 1 : }
190 1 : depth += levelDepth
191 : }
192 1 : return depth
193 : }
194 :
195 : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
196 : // objects referenced by tables in levels.
197 : func uniqueInputBlobMetadatas(
198 : blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
199 1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
200 1 : m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
201 1 : for _, level := range levels {
202 1 : for t := range level.files.All() {
203 1 : for _, ref := range t.BlobReferences {
204 1 : if _, ok := m[ref.FileID]; ok {
205 1 : continue
206 : }
207 1 : phys, ok := blobFileSet.LookupPhysical(ref.FileID)
208 1 : if !ok {
209 0 : panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
210 : }
211 1 : m[ref.FileID] = phys
212 : }
213 : }
214 : }
215 1 : return m
216 : }
|