Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "slices"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/compact"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/pebble/sstable"
18 : "github.com/cockroachdb/pebble/sstable/blob"
19 : "github.com/cockroachdb/redact"
20 : )
21 :
22 : // latencyTolerantMinimumSize is the minimum size, in bytes, of a value that
23 : // will be separated into a blob file when the value storage policy is
24 : // ValueStorageLatencyTolerant.
25 : const latencyTolerantMinimumSize = 10
26 :
27 1 : var neverSeparateValues getValueSeparation = func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation {
28 1 : return compact.NeverSeparateValues{}
29 1 : }
30 :
31 : // determineCompactionValueSeparation determines whether a compaction should
32 : // separate values into blob files. It returns a compact.ValueSeparation
33 : // implementation that should be used for the compaction.
34 : func (d *DB) determineCompactionValueSeparation(
35 : jobID JobID, c *tableCompaction, tableFormat sstable.TableFormat,
36 1 : ) compact.ValueSeparation {
37 1 : if tableFormat < sstable.TableFormatPebblev7 || d.FormatMajorVersion() < FormatValueSeparation ||
38 1 : d.opts.Experimental.ValueSeparationPolicy == nil {
39 1 : return compact.NeverSeparateValues{}
40 1 : }
41 1 : policy := d.opts.Experimental.ValueSeparationPolicy()
42 1 : if !policy.Enabled {
43 1 : return compact.NeverSeparateValues{}
44 1 : }
45 :
46 : // We're allowed to write blob references. Determine whether we should carry
47 : // forward existing blob references, or write new ones.
48 1 : if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
49 1 : // This compaction should preserve existing blob references.
50 1 : return &preserveBlobReferences{
51 1 : inputBlobPhysicalFiles: uniqueInputBlobMetadatas(&c.version.BlobFiles, c.inputs),
52 1 : outputBlobReferenceDepth: outputBlobReferenceDepth,
53 1 : }
54 1 : }
55 :
56 : // This compaction should write values to new blob files.
57 1 : return &writeNewBlobFiles{
58 1 : comparer: d.opts.Comparer,
59 1 : newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
60 1 : return d.newCompactionOutputBlob(
61 1 : jobID, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
62 1 : },
63 : shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
64 : writerOpts: d.opts.MakeBlobWriterOptions(c.outputLevel.level),
65 : minimumSize: policy.MinimumSize,
66 : globalMinimumSize: policy.MinimumSize,
67 0 : invalidValueCallback: func(userKey []byte, value []byte, err error) {
68 0 : // The value may not be safe, so it will be redacted when redaction
69 0 : // is enabled.
70 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
71 0 : Kind: InvalidValue,
72 0 : UserKey: userKey,
73 0 : ExtraInfo: redact.Sprintf("callback=ShortAttributeExtractor,value=%x,err=%q",
74 0 : value, err),
75 0 : })
76 0 : },
77 : }
78 : }
79 :
80 : // shouldWriteBlobFiles returns true if the compaction should write new blob
81 : // files. If it returns false, the referenceDepth return value contains the
82 : // maximum blob reference depth to assign to output sstables (the actual value
83 : // may be lower iff the output table references fewer distinct blob files).
84 : func shouldWriteBlobFiles(
85 : c *tableCompaction, policy ValueSeparationPolicy,
86 1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
87 1 : // Flushes will have no existing references to blob files and should write
88 1 : // their values to new blob files.
89 1 : if c.kind == compactionKindFlush {
90 1 : return true, 0
91 1 : }
92 1 : inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
93 1 : if inputReferenceDepth == 0 {
94 1 : // None of the input sstables reference blob files. It may be the case
95 1 : // that these sstables were created before value separation was enabled.
96 1 : // We should try to write to new blob files.
97 1 : return true, 0
98 1 : }
99 : // If the compaction's output blob reference depth would be greater than the
100 : // configured max, we should rewrite the values into new blob files to
101 : // restore locality.
102 1 : if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
103 1 : return true, 0
104 1 : }
105 : // Otherwise, we won't write any new blob files but will carry forward
106 : // existing references.
107 1 : return false, inputReferenceDepth
108 : }
109 :
110 : // compactionBlobReferenceDepth computes the blob reference depth for a
111 : // compaction. It's computed by finding the maximum blob reference depth of
112 : // input sstables in each level. These per-level depths are then summed to
113 : // produce a worst-case approximation of the blob reference locality of the
114 : // compaction's output sstables.
115 : //
116 : // The intuition is that as compactions combine files referencing distinct blob
117 : // files, outputted sstables begin to reference more and more distinct blob
118 : // files. In the worst case, these references are evenly distributed across the
119 : // keyspace and there is very little locality.
120 1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
121 1 : // TODO(jackson): Consider using a range tree to precisely compute the
122 1 : // depth. This would require maintaining minimum and maximum keys.
123 1 : var depth manifest.BlobReferenceDepth
124 1 : for _, level := range levels {
125 1 : // L0 allows files to overlap one another, so it's not sufficient to
126 1 : // just take the maximum within the level. Instead, we need to sum the
127 1 : // max of each sublevel.
128 1 : //
129 1 : // TODO(jackson): This and other compaction logic would likely be
130 1 : // cleaner if we modeled each sublevel as its own `compactionLevel`.
131 1 : if level.level == 0 {
132 1 : for _, sublevel := range level.l0SublevelInfo {
133 1 : var sublevelDepth int
134 1 : for t := range sublevel.LevelSlice.All() {
135 1 : sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
136 1 : }
137 1 : depth += manifest.BlobReferenceDepth(sublevelDepth)
138 : }
139 1 : continue
140 : }
141 :
142 1 : var levelDepth manifest.BlobReferenceDepth
143 1 : for t := range level.files.All() {
144 1 : levelDepth = max(levelDepth, t.BlobReferenceDepth)
145 1 : }
146 1 : depth += levelDepth
147 : }
148 1 : return depth
149 : }
150 :
151 : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
152 : // objects referenced by tables in levels.
153 : func uniqueInputBlobMetadatas(
154 : blobFileSet *manifest.BlobFileSet, levels []compactionLevel,
155 1 : ) map[base.BlobFileID]*manifest.PhysicalBlobFile {
156 1 : m := make(map[base.BlobFileID]*manifest.PhysicalBlobFile)
157 1 : for _, level := range levels {
158 1 : for t := range level.files.All() {
159 1 : for _, ref := range t.BlobReferences {
160 1 : if _, ok := m[ref.FileID]; ok {
161 1 : continue
162 : }
163 1 : phys, ok := blobFileSet.LookupPhysical(ref.FileID)
164 1 : if !ok {
165 0 : panic(errors.AssertionFailedf("pebble: blob file %s not found", ref.FileID))
166 : }
167 1 : m[ref.FileID] = phys
168 : }
169 : }
170 : }
171 1 : return m
172 : }
173 :
174 : // writeNewBlobFiles implements the strategy and mechanics for separating values
175 : // into external blob files.
176 : type writeNewBlobFiles struct {
177 : comparer *base.Comparer
178 : // newBlobObject constructs a new blob object for use in the compaction.
179 : newBlobObject func() (objstorage.Writable, objstorage.ObjectMetadata, error)
180 : shortAttrExtractor ShortAttributeExtractor
181 : // writerOpts is used to configure all constructed blob writers.
182 : writerOpts blob.FileWriterOptions
183 : // minimumSize imposes a lower bound on the size of values that can be
184 : // separated into a blob file. Values smaller than this are always written
185 : // to the sstable (but may still be written to a value block within the
186 : // sstable).
187 : //
188 : // minimumSize is set to globalMinimumSize by default and on every call to
189 : // FinishOutput. It may be overriden by SetNextOutputConfig (i.e, if a
190 : // SpanPolicy dictates a different minimum size for a span of the keyspace).
191 : minimumSize int
192 : // globalMinimumSize is the size threshold for separating values into blob
193 : // files globally across the keyspace. It may be overridden per-output by
194 : // SetNextOutputConfig.
195 : globalMinimumSize int
196 : // invalidValueCallback is called when a value is encountered for which the
197 : // short attribute extractor returns an error.
198 : invalidValueCallback func(userKey []byte, value []byte, err error)
199 :
200 : // Current blob writer state
201 : writer *blob.FileWriter
202 : objMeta objstorage.ObjectMetadata
203 :
204 : buf []byte
205 : }
206 :
207 : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
208 : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
209 :
210 : // SetNextOutputConfig implements the ValueSeparation interface.
211 0 : func (vs *writeNewBlobFiles) SetNextOutputConfig(config compact.ValueSeparationOutputConfig) {
212 0 : vs.minimumSize = config.MinimumSize
213 0 : }
214 :
215 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
216 : // blob file if it were closed now.
217 1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
218 1 : if vs.writer == nil {
219 1 : return 0
220 1 : }
221 1 : return vs.writer.EstimatedSize()
222 : }
223 :
224 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
225 : // current output sstable's blob references so far.
226 1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
227 1 : // When we're writing to new blob files, the size of the blob file itself is
228 1 : // a better estimate of the disk space consumed than the uncompressed value
229 1 : // sizes.
230 1 : return vs.EstimatedFileSize()
231 1 : }
232 :
233 : // Add adds the provided key-value pair to the sstable, possibly separating the
234 : // value into a blob file.
235 : func (vs *writeNewBlobFiles) Add(
236 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
237 1 : ) error {
238 1 : // We always fetch the value if we're rewriting blob files. We want to
239 1 : // replace any references to existing blob files with references to new blob
240 1 : // files that we write during the compaction.
241 1 : v, callerOwned, err := kv.Value(vs.buf)
242 1 : if err != nil {
243 0 : return err
244 0 : }
245 1 : if callerOwned {
246 0 : vs.buf = v[:0]
247 0 : }
248 :
249 : // Values that are too small are never separated.
250 1 : if len(v) < vs.minimumSize {
251 1 : return tw.Add(kv.K, v, forceObsolete)
252 1 : }
253 : // Merge and deletesized keys are never separated.
254 1 : switch kv.K.Kind() {
255 1 : case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
256 1 : return tw.Add(kv.K, v, forceObsolete)
257 : }
258 :
259 : // This KV met all the criteria and its value will be separated.
260 : // If there's a configured short attribute extractor, extract the value's
261 : // short attribute.
262 1 : var shortAttr base.ShortAttribute
263 1 : if vs.shortAttrExtractor != nil {
264 0 : keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
265 0 : shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
266 0 : if err != nil {
267 0 : // Report that there was a value for which the short attribute
268 0 : // extractor was unable to extract a short attribute.
269 0 : vs.invalidValueCallback(kv.K.UserKey, v, err)
270 0 :
271 0 : // Rather than erroring out and aborting the flush or compaction, we
272 0 : // fallback to writing the value verbatim to the sstable. Otherwise
273 0 : // a flush could busy loop, repeatedly attempting to write the same
274 0 : // memtable and repeatedly unable to extract a key's short attribute.
275 0 : return tw.Add(kv.K, v, forceObsolete)
276 0 : }
277 : }
278 :
279 : // If we don't have an open blob writer, create one. We create blob objects
280 : // lazily so that we don't create them unless a compaction will actually
281 : // write to a blob file. This avoids creating and deleting empty blob files
282 : // on every compaction in parts of the keyspace that a) are required to be
283 : // in-place or b) have small values.
284 1 : if vs.writer == nil {
285 1 : writable, objMeta, err := vs.newBlobObject()
286 1 : if err != nil {
287 0 : return err
288 0 : }
289 1 : vs.objMeta = objMeta
290 1 : vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
291 : }
292 :
293 : // Append the value to the blob file.
294 1 : handle := vs.writer.AddValue(v)
295 1 :
296 1 : // Write the key and the handle to the sstable. We need to map the
297 1 : // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
298 1 : // except the FileNum is translated into a reference index.
299 1 : inlineHandle := blob.InlineHandle{
300 1 : InlineHandlePreface: blob.InlineHandlePreface{
301 1 : // Since we're writing blob files and maintaining a 1:1 mapping
302 1 : // between sstables and blob files, the reference index is always 0
303 1 : // here. Only compactions that don't rewrite blob files will produce
304 1 : // handles with nonzero reference indices.
305 1 : ReferenceID: 0,
306 1 : ValueLen: handle.ValueLen,
307 1 : },
308 1 : HandleSuffix: blob.HandleSuffix{
309 1 : BlockID: handle.BlockID,
310 1 : ValueID: handle.ValueID,
311 1 : },
312 1 : }
313 1 : return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
314 : }
315 :
316 : // FinishOutput closes the current blob file (if any). It returns the stats
317 : // and metadata of the now completed blob file.
318 1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
319 1 : if vs.writer == nil {
320 1 : return compact.ValueSeparationMetadata{}, nil
321 1 : }
322 1 : stats, err := vs.writer.Close()
323 1 : if err != nil {
324 0 : return compact.ValueSeparationMetadata{}, err
325 0 : }
326 1 : vs.writer = nil
327 1 : meta := &manifest.PhysicalBlobFile{
328 1 : FileNum: vs.objMeta.DiskFileNum,
329 1 : Size: stats.FileLen,
330 1 : ValueSize: stats.UncompressedValueBytes,
331 1 : CreationTime: uint64(time.Now().Unix()),
332 1 : }
333 1 : // Reset the minimum size for the next output.
334 1 : vs.minimumSize = vs.globalMinimumSize
335 1 :
336 1 : return compact.ValueSeparationMetadata{
337 1 : BlobReferences: manifest.BlobReferences{manifest.MakeBlobReference(
338 1 : base.BlobFileID(vs.objMeta.DiskFileNum),
339 1 : stats.UncompressedValueBytes,
340 1 : meta,
341 1 : )},
342 1 : BlobReferenceSize: stats.UncompressedValueBytes,
343 1 : BlobReferenceDepth: 1,
344 1 : BlobFileStats: stats,
345 1 : BlobFileObject: vs.objMeta,
346 1 : BlobFileMetadata: meta,
347 1 : }, nil
348 : }
349 :
350 : // preserveBlobReferences implements the compact.ValueSeparation interface. When
351 : // a compaction is configured with preserveBlobReferences, the compaction will
352 : // not create any new blob files. However, input references to existing blob
353 : // references will be preserved and metadata about the table's blob references
354 : // will be collected.
355 : type preserveBlobReferences struct {
356 : // inputBlobPhysicalFiles holds the *PhysicalBlobFile for every unique blob
357 : // file referenced by input sstables.
358 : inputBlobPhysicalFiles map[base.BlobFileID]*manifest.PhysicalBlobFile
359 : outputBlobReferenceDepth manifest.BlobReferenceDepth
360 :
361 : // state
362 : buf []byte
363 : // currReferences holds the pending references that have been referenced by
364 : // the current output sstable. The index of a reference with a given blob
365 : // file ID is the value of the blob.ReferenceID used by its value handles
366 : // within the output sstable.
367 : currReferences []pendingReference
368 : // totalValueSize is the sum of currReferenceValueSizes.
369 : //
370 : // INVARIANT: totalValueSize == sum(currReferenceValueSizes)
371 : totalValueSize uint64
372 : }
373 :
374 : type pendingReference struct {
375 : blobFileID base.BlobFileID
376 : valueSize uint64
377 : }
378 :
379 : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
380 : // interface.
381 : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
382 :
383 : // SetNextOutputConfig implements the ValueSeparation interface.
384 0 : func (vs *preserveBlobReferences) SetNextOutputConfig(config compact.ValueSeparationOutputConfig) {}
385 :
386 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
387 : // blob file if it were closed now.
388 1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
389 1 : return 0
390 1 : }
391 :
392 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
393 : // current output sstable's blob references so far.
394 1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
395 1 : // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
396 1 : // compressible data, it overestimates the disk space consumed by the blob
397 1 : // references. It also does not include the blob file's index block or
398 1 : // footer, so it can underestimate if values are completely incompressible.
399 1 : //
400 1 : // Should we compute a compression ratio per blob file and scale the
401 1 : // references appropriately?
402 1 : return vs.totalValueSize
403 1 : }
404 :
405 : // Add implements compact.ValueSeparation. This implementation will write
406 : // existing blob references to the output table.
407 : func (vs *preserveBlobReferences) Add(
408 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
409 1 : ) error {
410 1 : if !kv.V.IsBlobValueHandle() {
411 1 : // If the value is not already a blob handle (either it's in-place or in
412 1 : // a value block), we retrieve the value and write it through Add. The
413 1 : // sstable writer may still decide to put the value in a value block,
414 1 : // but regardless the value will be written to the sstable itself and
415 1 : // not a blob file.
416 1 : v, callerOwned, err := kv.Value(vs.buf)
417 1 : if err != nil {
418 0 : return err
419 0 : }
420 1 : if callerOwned {
421 0 : vs.buf = v[:0]
422 0 : }
423 1 : return tw.Add(kv.K, v, forceObsolete)
424 : }
425 :
426 : // The value is an existing blob handle. We can copy it into the output
427 : // sstable, taking note of the reference for the table metadata.
428 1 : lv := kv.V.LazyValue()
429 1 : fileID := lv.Fetcher.BlobFileID
430 1 :
431 1 : var refID blob.ReferenceID
432 1 : if refIdx := slices.IndexFunc(vs.currReferences, func(ref pendingReference) bool {
433 1 : return ref.blobFileID == fileID
434 1 : }); refIdx != -1 {
435 1 : refID = blob.ReferenceID(refIdx)
436 1 : } else {
437 1 : refID = blob.ReferenceID(len(vs.currReferences))
438 1 : vs.currReferences = append(vs.currReferences, pendingReference{
439 1 : blobFileID: fileID,
440 1 : valueSize: 0,
441 1 : })
442 1 : }
443 :
444 1 : if invariants.Enabled && vs.currReferences[refID].blobFileID != fileID {
445 0 : panic("wrong reference index")
446 : }
447 :
448 1 : handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
449 1 : inlineHandle := blob.InlineHandle{
450 1 : InlineHandlePreface: blob.InlineHandlePreface{
451 1 : ReferenceID: refID,
452 1 : ValueLen: lv.Fetcher.Attribute.ValueLen,
453 1 : },
454 1 : HandleSuffix: handleSuffix,
455 1 : }
456 1 : err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
457 1 : if err != nil {
458 0 : return err
459 0 : }
460 1 : vs.currReferences[refID].valueSize += uint64(lv.Fetcher.Attribute.ValueLen)
461 1 : vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
462 1 : return nil
463 : }
464 :
465 : // FinishOutput implements compact.ValueSeparation.
466 1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
467 1 : if invariants.Enabled {
468 1 : // Assert that the incrementally-maintained totalValueSize matches the
469 1 : // sum of all the reference value sizes.
470 1 : totalValueSize := uint64(0)
471 1 : for _, ref := range vs.currReferences {
472 1 : totalValueSize += ref.valueSize
473 1 : }
474 1 : if totalValueSize != vs.totalValueSize {
475 0 : return compact.ValueSeparationMetadata{},
476 0 : errors.AssertionFailedf("totalValueSize mismatch: %d != %d", totalValueSize, vs.totalValueSize)
477 0 : }
478 : }
479 :
480 1 : references := make(manifest.BlobReferences, len(vs.currReferences))
481 1 : for i := range vs.currReferences {
482 1 : ref := vs.currReferences[i]
483 1 : phys, ok := vs.inputBlobPhysicalFiles[ref.blobFileID]
484 1 : if !ok {
485 0 : return compact.ValueSeparationMetadata{},
486 0 : errors.AssertionFailedf("pebble: blob file %s not found among input sstables", ref.blobFileID)
487 0 : }
488 1 : references[i] = manifest.MakeBlobReference(ref.blobFileID, ref.valueSize, phys)
489 : }
490 1 : referenceSize := vs.totalValueSize
491 1 : vs.currReferences = vs.currReferences[:0]
492 1 : vs.totalValueSize = 0
493 1 : return compact.ValueSeparationMetadata{
494 1 : BlobReferences: references,
495 1 : BlobReferenceSize: referenceSize,
496 1 : // The outputBlobReferenceDepth is computed from the input sstables,
497 1 : // reflecting the worst-case overlap of referenced blob files. If this
498 1 : // sstable references fewer unique blob files, reduce its depth to the
499 1 : // count of unique files.
500 1 : BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
501 1 : }, nil
502 : }
|