Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "slices"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/compact"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/pebble/sstable"
18 : "github.com/cockroachdb/pebble/sstable/blob"
19 : )
20 :
21 2 : var neverSeparateValues getValueSeparation = func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation {
22 2 : return compact.NeverSeparateValues{}
23 2 : }
24 :
25 : // determineCompactionValueSeparation determines whether a compaction should
26 : // separate values into blob files. It returns a compact.ValueSeparation
27 : // implementation that should be used for the compaction.
28 : func (d *DB) determineCompactionValueSeparation(
29 : jobID JobID, c *compaction, tableFormat sstable.TableFormat,
30 2 : ) compact.ValueSeparation {
31 2 : if tableFormat < sstable.TableFormatPebblev7 || d.FormatMajorVersion() < FormatValueSeparation ||
32 2 : d.opts.Experimental.ValueSeparationPolicy == nil {
33 2 : return compact.NeverSeparateValues{}
34 2 : }
35 2 : policy := d.opts.Experimental.ValueSeparationPolicy()
36 2 : if !policy.Enabled {
37 2 : return compact.NeverSeparateValues{}
38 2 : }
39 :
40 : // We're allowed to write blob references. Determine whether we should carry
41 : // forward existing blob references, or write new ones.
42 2 : if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
43 2 : // This compaction should preserve existing blob references.
44 2 : return &preserveBlobReferences{
45 2 : inputBlobPhysicalFiles: uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs),
46 2 : outputBlobReferenceDepth: outputBlobReferenceDepth,
47 2 : }
48 2 : }
49 :
50 : // This compaction should write values to new blob files.
51 2 : return &writeNewBlobFiles{
52 2 : comparer: d.opts.Comparer,
53 2 : newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
54 2 : return d.newCompactionOutputBlob(jobID, c)
55 2 : },
56 : shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
57 : writerOpts: d.opts.MakeBlobWriterOptions(c.outputLevel.level),
58 : minimumSize: policy.MinimumSize,
59 : }
60 : }
61 :
62 : // shouldWriteBlobFiles returns true if the compaction should write new blob
63 : // files. If it returns false, the referenceDepth return value contains the
64 : // maximum blob reference depth to assign to output sstables (the actual value
65 : // may be lower iff the output table references fewer distinct blob files).
66 : func shouldWriteBlobFiles(
67 : c *compaction, policy ValueSeparationPolicy,
68 2 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
69 2 : // Flushes will have no existing references to blob files and should write
70 2 : // their values to new blob files.
71 2 : if c.kind == compactionKindFlush {
72 2 : return true, 0
73 2 : }
74 2 : inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
75 2 : if inputReferenceDepth == 0 {
76 2 : // None of the input sstables reference blob files. It may be the case
77 2 : // that these sstables were created before value separation was enabled.
78 2 : // We should try to write to new blob files.
79 2 : return true, 0
80 2 : }
81 : // If the compaction's output blob reference depth would be greater than the
82 : // configured max, we should rewrite the values into new blob files to
83 : // restore locality.
84 2 : if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
85 2 : return true, 0
86 2 : }
87 : // Otherwise, we won't write any new blob files but will carry forward
88 : // existing references.
89 2 : return false, inputReferenceDepth
90 : }
91 :
92 : // compactionBlobReferenceDepth computes the blob reference depth for a
93 : // compaction. It's computed by finding the maximum blob reference depth of
94 : // input sstables in each level. These per-level depths are then summed to
95 : // produce a worst-case approximation of the blob reference locality of the
96 : // compaction's output sstables.
97 : //
98 : // The intuition is that as compactions combine files referencing distinct blob
99 : // files, outputted sstables begin to reference more and more distinct blob
100 : // files. In the worst case, these references are evenly distributed across the
101 : // keyspace and there is very little locality.
102 2 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
103 2 : // TODO(jackson): Consider using a range tree to precisely compute the
104 2 : // depth. This would require maintaining minimum and maximum keys.
105 2 : var depth manifest.BlobReferenceDepth
106 2 : for _, level := range levels {
107 2 : // L0 allows files to overlap one another, so it's not sufficient to
108 2 : // just take the maximum within the level. Instead, we need to sum the
109 2 : // max of each sublevel.
110 2 : //
111 2 : // TODO(jackson): This and other compaction logic would likely be
112 2 : // cleaner if we modeled each sublevel as its own `compactionLevel`.
113 2 : if level.level == 0 {
114 2 : for _, sublevel := range level.l0SublevelInfo {
115 2 : var sublevelDepth int
116 2 : for t := range sublevel.LevelSlice.All() {
117 2 : sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
118 2 : }
119 2 : depth += manifest.BlobReferenceDepth(sublevelDepth)
120 : }
121 2 : continue
122 : }
123 :
124 2 : var levelDepth manifest.BlobReferenceDepth
125 2 : for t := range level.files.All() {
126 2 : levelDepth = max(levelDepth, t.BlobReferenceDepth)
127 2 : }
128 2 : depth += levelDepth
129 : }
130 2 : return depth
131 : }
132 :
133 : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
134 : // objects referenced by tables in levels.
135 : func uniqueInputBlobMetadatas(
136 : blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
137 2 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
138 2 : m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
139 2 : for _, level := range levels {
140 2 : for t := range level.files.All() {
141 2 : for _, ref := range t.BlobReferences {
142 2 : if _, ok := m[ref.FileID]; ok {
143 1 : continue
144 : }
145 2 : phys, ok := blobFileSet.LookupPhysical(ref.FileID)
146 2 : if !ok {
147 0 : panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
148 : }
149 2 : m[ref.FileID] = phys
150 : }
151 : }
152 : }
153 2 : return m
154 : }
155 :
156 : // writeNewBlobFiles implements the strategy and mechanics for separating values
157 : // into external blob files.
158 : type writeNewBlobFiles struct {
159 : comparer *base.Comparer
160 : // newBlobObject constructs a new blob object for use in the compaction.
161 : newBlobObject func() (objstorage.Writable, objstorage.ObjectMetadata, error)
162 : shortAttrExtractor ShortAttributeExtractor
163 : // writerOpts is used to configure all constructed blob writers.
164 : writerOpts blob.FileWriterOptions
165 : // minimumSize imposes a lower bound on the size of values that can be
166 : // separated into a blob file. Values smaller than this are always written
167 : // to the sstable (but may still be written to a value block within the
168 : // sstable).
169 : minimumSize int
170 :
171 : // Current blob writer state
172 : writer *blob.FileWriter
173 : objMeta objstorage.ObjectMetadata
174 :
175 : buf []byte
176 : }
177 :
178 : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
179 : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
180 :
181 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
182 : // blob file if it were closed now.
183 2 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
184 2 : if vs.writer == nil {
185 2 : return 0
186 2 : }
187 2 : return vs.writer.EstimatedSize()
188 : }
189 :
190 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
191 : // current output sstable's blob references so far.
192 2 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
193 2 : // When we're writing to new blob files, the size of the blob file itself is
194 2 : // a better estimate of the disk space consumed than the uncompressed value
195 2 : // sizes.
196 2 : return vs.EstimatedFileSize()
197 2 : }
198 :
199 : // Add adds the provided key-value pair to the sstable, possibly separating the
200 : // value into a blob file.
201 : func (vs *writeNewBlobFiles) Add(
202 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
203 2 : ) error {
204 2 : // We always fetch the value if we're rewriting blob files. We want to
205 2 : // replace any references to existing blob files with references to new blob
206 2 : // files that we write during the compaction.
207 2 : v, callerOwned, err := kv.Value(vs.buf)
208 2 : if err != nil {
209 0 : return err
210 0 : }
211 2 : if callerOwned {
212 0 : vs.buf = v[:0]
213 0 : }
214 :
215 : // Values that are too small are never separated.
216 2 : if len(v) < vs.minimumSize {
217 2 : return tw.Add(kv.K, v, forceObsolete)
218 2 : }
219 : // Merge and deletesized keys are never separated.
220 2 : switch kv.K.Kind() {
221 2 : case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
222 2 : return tw.Add(kv.K, v, forceObsolete)
223 : }
224 :
225 : // This KV met all the criteria and its value will be separated.
226 : // If there's a configured short attribute extractor, extract the value's
227 : // short attribute.
228 2 : var shortAttr base.ShortAttribute
229 2 : if vs.shortAttrExtractor != nil {
230 0 : keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
231 0 : shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
232 0 : if err != nil {
233 0 : return err
234 0 : }
235 : }
236 :
237 : // If we don't have an open blob writer, create one. We create blob objects
238 : // lazily so that we don't create them unless a compaction will actually
239 : // write to a blob file. This avoids creating and deleting empty blob files
240 : // on every compaction in parts of the keyspace that a) are required to be
241 : // in-place or b) have small values.
242 2 : if vs.writer == nil {
243 2 : writable, objMeta, err := vs.newBlobObject()
244 2 : if err != nil {
245 0 : return err
246 0 : }
247 2 : vs.objMeta = objMeta
248 2 : vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
249 : }
250 :
251 : // Append the value to the blob file.
252 2 : handle := vs.writer.AddValue(v)
253 2 :
254 2 : // Write the key and the handle to the sstable. We need to map the
255 2 : // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
256 2 : // except the FileNum is translated into a reference index.
257 2 : inlineHandle := blob.InlineHandle{
258 2 : InlineHandlePreface: blob.InlineHandlePreface{
259 2 : // Since we're writing blob files and maintaining a 1:1 mapping
260 2 : // between sstables and blob files, the reference index is always 0
261 2 : // here. Only compactions that don't rewrite blob files will produce
262 2 : // handles with nonzero reference indices.
263 2 : ReferenceID: 0,
264 2 : ValueLen: handle.ValueLen,
265 2 : },
266 2 : HandleSuffix: blob.HandleSuffix{
267 2 : BlockID: handle.BlockID,
268 2 : ValueID: handle.ValueID,
269 2 : },
270 2 : }
271 2 : return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
272 : }
273 :
274 : // FinishOutput closes the current blob file (if any). It returns the stats
275 : // and metadata of the now completed blob file.
276 2 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
277 2 : if vs.writer == nil {
278 2 : return compact.ValueSeparationMetadata{}, nil
279 2 : }
280 2 : stats, err := vs.writer.Close()
281 2 : if err != nil {
282 0 : return compact.ValueSeparationMetadata{}, err
283 0 : }
284 2 : vs.writer = nil
285 2 : meta := &manifest.PhysicalBlobFile{
286 2 : FileNum: vs.objMeta.DiskFileNum,
287 2 : Size: stats.FileLen,
288 2 : ValueSize: stats.UncompressedValueBytes,
289 2 : CreationTime: uint64(time.Now().Unix()),
290 2 : }
291 2 :
292 2 : return compact.ValueSeparationMetadata{
293 2 : BlobReferences: manifest.BlobReferences{manifest.MakeBlobReference(
294 2 : base.BlobFileID(vs.objMeta.DiskFileNum),
295 2 : stats.UncompressedValueBytes,
296 2 : meta,
297 2 : )},
298 2 : BlobReferenceSize: stats.UncompressedValueBytes,
299 2 : BlobReferenceDepth: 1,
300 2 : BlobFileStats: stats,
301 2 : BlobFileObject: vs.objMeta,
302 2 : BlobFileMetadata: meta,
303 2 : }, nil
304 : }
305 :
306 : // preserveBlobReferences implements the compact.ValueSeparation interface. When
307 : // a compaction is configured with preserveBlobReferences, the compaction will
308 : // not create any new blob files. However, input references to existing blob
309 : // references will be preserved and metadata about the table's blob references
310 : // will be collected.
311 : type preserveBlobReferences struct {
312 : // inputBlobPhysicalFiles holds the *PhysicalBlobFile for every unique blob
313 : // file referenced by input sstables.
314 : inputBlobPhysicalFiles map[base.BlobFileID]*manifest.PhysicalBlobFile
315 : outputBlobReferenceDepth manifest.BlobReferenceDepth
316 :
317 : // state
318 : buf []byte
319 : // currReferences holds the pending references that have been referenced by
320 : // the current output sstable. The index of a reference with a given blob
321 : // file ID is the value of the blob.ReferenceID used by its value handles
322 : // within the output sstable.
323 : currReferences []pendingReference
324 : // totalValueSize is the sum of currReferenceValueSizes.
325 : //
326 : // INVARIANT: totalValueSize == sum(currReferenceValueSizes)
327 : totalValueSize uint64
328 : }
329 :
330 : type pendingReference struct {
331 : blobFileID base.BlobFileID
332 : valueSize uint64
333 : }
334 :
335 : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
336 : // interface.
337 : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
338 :
339 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
340 : // blob file if it were closed now.
341 2 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
342 2 : return 0
343 2 : }
344 :
345 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
346 : // current output sstable's blob references so far.
347 2 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
348 2 : // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
349 2 : // compressible data, it overestimates the disk space consumed by the blob
350 2 : // references. It also does not include the blob file's index block or
351 2 : // footer, so it can underestimate if values are completely incompressible.
352 2 : //
353 2 : // Should we compute a compression ratio per blob file and scale the
354 2 : // references appropriately?
355 2 : return vs.totalValueSize
356 2 : }
357 :
358 : // Add implements compact.ValueSeparation. This implementation will write
359 : // existing blob references to the output table.
360 : func (vs *preserveBlobReferences) Add(
361 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
362 2 : ) error {
363 2 : if !kv.V.IsBlobValueHandle() {
364 2 : // If the value is not already a blob handle (either it's in-place or in
365 2 : // a value block), we retrieve the value and write it through Add. The
366 2 : // sstable writer may still decide to put the value in a value block,
367 2 : // but regardless the value will be written to the sstable itself and
368 2 : // not a blob file.
369 2 : v, callerOwned, err := kv.Value(vs.buf)
370 2 : if err != nil {
371 0 : return err
372 0 : }
373 2 : if callerOwned {
374 0 : vs.buf = v[:0]
375 0 : }
376 2 : return tw.Add(kv.K, v, forceObsolete)
377 : }
378 :
379 : // The value is an existing blob handle. We can copy it into the output
380 : // sstable, taking note of the reference for the table metadata.
381 2 : lv := kv.V.LazyValue()
382 2 : fileID := lv.Fetcher.BlobFileID
383 2 :
384 2 : var refID blob.ReferenceID
385 2 : if refIdx := slices.IndexFunc(vs.currReferences, func(ref pendingReference) bool {
386 2 : return ref.blobFileID == fileID
387 2 : }); refIdx != -1 {
388 2 : refID = blob.ReferenceID(refIdx)
389 2 : } else {
390 2 : refID = blob.ReferenceID(len(vs.currReferences))
391 2 : vs.currReferences = append(vs.currReferences, pendingReference{
392 2 : blobFileID: fileID,
393 2 : valueSize: 0,
394 2 : })
395 2 : }
396 :
397 2 : if invariants.Enabled && vs.currReferences[refID].blobFileID != fileID {
398 0 : panic("wrong reference index")
399 : }
400 :
401 2 : handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
402 2 : inlineHandle := blob.InlineHandle{
403 2 : InlineHandlePreface: blob.InlineHandlePreface{
404 2 : ReferenceID: refID,
405 2 : ValueLen: lv.Fetcher.Attribute.ValueLen,
406 2 : },
407 2 : HandleSuffix: handleSuffix,
408 2 : }
409 2 : err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
410 2 : if err != nil {
411 0 : return err
412 0 : }
413 2 : vs.currReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen)
414 2 : vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
415 2 : return nil
416 : }
417 :
418 : // FinishOutput implements compact.ValueSeparation.
419 2 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
420 2 : if invariants.Enabled {
421 2 : // Assert that the incrementally-maintained totalValueSize matches the
422 2 : // sum of all the reference value sizes.
423 2 : totalValueSize := uint64(0)
424 2 : for _, ref := range vs.currReferences {
425 2 : totalValueSize += ref.valueSize
426 2 : }
427 2 : if totalValueSize != vs.totalValueSize {
428 0 : return compact.ValueSeparationMetadata{},
429 0 : errors.AssertionFailedf("totalValueSize mismatch: %d != %d", totalValueSize, vs.totalValueSize)
430 0 : }
431 : }
432 :
433 2 : references := make(manifest.BlobReferences, len(vs.currReferences))
434 2 : for i := range vs.currReferences {
435 2 : ref := vs.currReferences[i]
436 2 : phys, ok := vs.inputBlobPhysicalFiles[ref.blobFileID]
437 2 : if !ok {
438 0 : return compact.ValueSeparationMetadata{},
439 0 : errors.AssertionFailedf("pebble: blob file %s not found among input sstables", ref.blobFileID)
440 0 : }
441 2 : references[i] = manifest.MakeBlobReference(ref.blobFileID, ref.valueSize, phys)
442 : }
443 2 : referenceSize := vs.totalValueSize
444 2 : vs.currReferences = vs.currReferences[:0]
445 2 : vs.totalValueSize = 0
446 2 : return compact.ValueSeparationMetadata{
447 2 : BlobReferences: references,
448 2 : BlobReferenceSize: referenceSize,
449 2 : // The outputBlobReferenceDepth is computed from the input sstables,
450 2 : // reflecting the worst-case overlap of referenced blob files. If this
451 2 : // sstable references fewer unique blob files, reduce its depth to the
452 2 : // count of unique files.
453 2 : BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
454 2 : }, nil
455 : }
|