Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "maps"
10 : "slices"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/compact"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/sstable/blob"
21 : )
22 :
23 0 : var neverSeparateValues getValueSeparation = func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation {
24 0 : return compact.NeverSeparateValues{}
25 0 : }
26 :
27 : // determineCompactionValueSeparation determines whether a compaction should
28 : // separate values into blob files. It returns a compact.ValueSeparation
29 : // implementation that should be used for the compaction.
30 : func (d *DB) determineCompactionValueSeparation(
31 : jobID JobID, c *compaction, tableFormat sstable.TableFormat,
32 1 : ) compact.ValueSeparation {
33 1 : if tableFormat < sstable.TableFormatPebblev6 || d.FormatMajorVersion() < FormatExperimentalValueSeparation ||
34 1 : d.opts.Experimental.ValueSeparationPolicy == nil {
35 1 : return compact.NeverSeparateValues{}
36 1 : }
37 1 : policy := d.opts.Experimental.ValueSeparationPolicy()
38 1 : if !policy.Enabled {
39 0 : return compact.NeverSeparateValues{}
40 0 : }
41 :
42 : // We're allowed to write blob references. Determine whether we should carry
43 : // forward existing blob references, or write new ones.
44 1 : if writeBlobs, outputBlobReferenceDepth := shouldWriteBlobFiles(c, policy); !writeBlobs {
45 1 : // This compaction should preserve existing blob references.
46 1 : return &preserveBlobReferences{
47 1 : inputBlobMetadatas: uniqueInputBlobMetadatas(c.inputs),
48 1 : outputBlobReferenceDepth: outputBlobReferenceDepth,
49 1 : }
50 1 : }
51 :
52 : // This compaction should write values to new blob files.
53 1 : return &writeNewBlobFiles{
54 1 : comparer: d.opts.Comparer,
55 1 : newBlobObject: func() (objstorage.Writable, objstorage.ObjectMetadata, error) {
56 1 : return d.newCompactionOutputBlob(jobID, c)
57 1 : },
58 : shortAttrExtractor: d.opts.Experimental.ShortAttributeExtractor,
59 : writerOpts: d.opts.MakeBlobWriterOptions(c.outputLevel.level),
60 : minimumSize: policy.MinimumSize,
61 : }
62 : }
63 :
64 : // shouldWriteBlobFiles returns true if the compaction should write new blob
65 : // files. If it returns false, the referenceDepth return value contains the
66 : // maximum blob reference depth to assign to output sstables (the actual value
67 : // may be lower iff the output table references fewer distinct blob files).
68 : func shouldWriteBlobFiles(
69 : c *compaction, policy ValueSeparationPolicy,
70 1 : ) (writeBlobs bool, referenceDepth manifest.BlobReferenceDepth) {
71 1 : // Flushes will have no existing references to blob files and should write
72 1 : // their values to new blob files.
73 1 : if c.kind == compactionKindFlush {
74 1 : return true, 0
75 1 : }
76 1 : inputReferenceDepth := compactionBlobReferenceDepth(c.inputs)
77 1 : if inputReferenceDepth == 0 {
78 0 : // None of the input sstables reference blob files. It may be the case
79 0 : // that these sstables were created before value separation was enabled.
80 0 : // We should try to write to new blob files.
81 0 : return true, 0
82 0 : }
83 : // If the compaction's output blob reference depth would be greater than the
84 : // configured max, we should rewrite the values into new blob files to
85 : // restore locality.
86 1 : if inputReferenceDepth > manifest.BlobReferenceDepth(policy.MaxBlobReferenceDepth) {
87 1 : return true, 0
88 1 : }
89 : // Otherwise, we won't write any new blob files but will carry forward
90 : // existing references.
91 1 : return false, inputReferenceDepth
92 : }
93 :
94 : // compactionBlobReferenceDepth computes the blob reference depth for a
95 : // compaction. It's computed by finding the maximum blob reference depth of
96 : // input sstables in each level. These per-level depths are then summed to
97 : // produce a worst-case approximation of the blob reference locality of the
98 : // compaction's output sstables.
99 : //
100 : // The intuition is that as compactions combine files referencing distinct blob
101 : // files, outputted sstables begin to reference more and more distinct blob
102 : // files. In the worst case, these references are evenly distributed across the
103 : // keyspace and there is very little locality.
104 1 : func compactionBlobReferenceDepth(levels []compactionLevel) manifest.BlobReferenceDepth {
105 1 : // TODO(jackson): Consider using a range tree to precisely compute the
106 1 : // depth. This would require maintaining minimum and maximum keys.
107 1 : var depth manifest.BlobReferenceDepth
108 1 : for _, level := range levels {
109 1 : // L0 allows files to overlap one another, so it's not sufficient to
110 1 : // just take the maximum within the level. Instead, we need to sum the
111 1 : // max of each sublevel.
112 1 : //
113 1 : // TODO(jackson): This and other compaction logic would likely be
114 1 : // cleaner if we modeled each sublevel as its own `compactionLevel`.
115 1 : if level.level == 0 {
116 1 : for _, sublevel := range level.l0SublevelInfo {
117 1 : var sublevelDepth int
118 1 : for t := range sublevel.LevelSlice.All() {
119 1 : sublevelDepth = max(sublevelDepth, int(t.BlobReferenceDepth))
120 1 : }
121 1 : depth += manifest.BlobReferenceDepth(sublevelDepth)
122 : }
123 1 : continue
124 : }
125 :
126 1 : var levelDepth manifest.BlobReferenceDepth
127 1 : for t := range level.files.All() {
128 1 : levelDepth = max(levelDepth, t.BlobReferenceDepth)
129 1 : }
130 1 : depth += levelDepth
131 : }
132 1 : return depth
133 : }
134 :
135 : // uniqueInputBlobMetadatas returns a slice of all unique blob file metadata
136 : // objects referenced by tables in levels.
137 1 : func uniqueInputBlobMetadatas(levels []compactionLevel) []*manifest.BlobFileMetadata {
138 1 : m := make(map[*manifest.BlobFileMetadata]struct{})
139 1 : for _, level := range levels {
140 1 : for t := range level.files.All() {
141 1 : for _, ref := range t.BlobReferences {
142 1 : m[ref.Metadata] = struct{}{}
143 1 : }
144 : }
145 : }
146 1 : metadatas := slices.Collect(maps.Keys(m))
147 1 : slices.SortFunc(metadatas, func(a, b *manifest.BlobFileMetadata) int {
148 1 : return cmp.Compare(a.FileNum, b.FileNum)
149 1 : })
150 1 : return metadatas
151 : }
152 :
153 : // writeNewBlobFiles implements the strategy and mechanics for separating values
154 : // into external blob files.
155 : type writeNewBlobFiles struct {
156 : comparer *base.Comparer
157 : // newBlobObject constructs a new blob object for use in the compaction.
158 : newBlobObject func() (objstorage.Writable, objstorage.ObjectMetadata, error)
159 : shortAttrExtractor ShortAttributeExtractor
160 : // writerOpts is used to configure all constructed blob writers.
161 : writerOpts blob.FileWriterOptions
162 : // minimumSize imposes a lower bound on the size of values that can be
163 : // separated into a blob file. Values smaller than this are always written
164 : // to the sstable (but may still be written to a value block within the
165 : // sstable).
166 : minimumSize int
167 :
168 : // Current blob writer state
169 : writer *blob.FileWriter
170 : objMeta objstorage.ObjectMetadata
171 :
172 : buf []byte
173 : }
174 :
175 : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
176 : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
177 :
178 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
179 : // blob file if it were closed now.
180 1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
181 1 : if vs.writer == nil {
182 1 : return 0
183 1 : }
184 1 : return vs.writer.EstimatedSize()
185 : }
186 :
187 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
188 : // current output sstable's blob references so far.
189 1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
190 1 : // When we're writing to new blob files, the size of the blob file itself is
191 1 : // a better estimate of the disk space consumed than the uncompressed value
192 1 : // sizes.
193 1 : return vs.EstimatedFileSize()
194 1 : }
195 :
196 : // Add adds the provided key-value pair to the sstable, possibly separating the
197 : // value into a blob file.
198 : func (vs *writeNewBlobFiles) Add(
199 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
200 1 : ) error {
201 1 : // We always fetch the value if we're rewriting blob files. We want to
202 1 : // replace any references to existing blob files with references to new blob
203 1 : // files that we write during the compaction.
204 1 : v, callerOwned, err := kv.Value(vs.buf)
205 1 : if err != nil {
206 0 : return err
207 0 : }
208 1 : if callerOwned {
209 0 : vs.buf = v[:0]
210 0 : }
211 :
212 : // Values that are too small are never separated.
213 1 : if len(v) < vs.minimumSize {
214 1 : return tw.Add(kv.K, v, forceObsolete)
215 1 : }
216 : // Merge and deletesized keys are never separated.
217 1 : switch kv.K.Kind() {
218 1 : case base.InternalKeyKindMerge, base.InternalKeyKindDeleteSized:
219 1 : return tw.Add(kv.K, v, forceObsolete)
220 : }
221 :
222 : // This KV met all the criteria and its value will be separated.
223 : // If there's a configured short attribute extractor, extract the value's
224 : // short attribute.
225 1 : var shortAttr base.ShortAttribute
226 1 : if vs.shortAttrExtractor != nil {
227 0 : keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
228 0 : shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
229 0 : if err != nil {
230 0 : return err
231 0 : }
232 : }
233 :
234 : // If we don't have an open blob writer, create one. We create blob objects
235 : // lazily so that we don't create them unless a compaction will actually
236 : // write to a blob file. This avoids creating and deleting empty blob files
237 : // on every compaction in parts of the keyspace that a) are required to be
238 : // in-place or b) have small values.
239 1 : if vs.writer == nil {
240 1 : writable, objMeta, err := vs.newBlobObject()
241 1 : if err != nil {
242 0 : return err
243 0 : }
244 1 : vs.objMeta = objMeta
245 1 : vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
246 : }
247 :
248 : // Append the value to the blob file.
249 1 : handle := vs.writer.AddValue(v)
250 1 :
251 1 : // Write the key and the handle to the sstable. We need to map the
252 1 : // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
253 1 : // except the FileNum is translated into a reference index.
254 1 : inlineHandle := blob.InlineHandle{
255 1 : InlineHandlePreface: blob.InlineHandlePreface{
256 1 : // Since we're writing blob files and maintaining a 1:1 mapping
257 1 : // between sstables and blob files, the reference index is always 0
258 1 : // here. Only compactions that don't rewrite blob files will produce
259 1 : // handles with nonzero reference indices.
260 1 : ReferenceID: 0,
261 1 : ValueLen: handle.ValueLen,
262 1 : },
263 1 : HandleSuffix: blob.HandleSuffix{
264 1 : BlockNum: handle.BlockNum,
265 1 : OffsetInBlock: handle.OffsetInBlock,
266 1 : },
267 1 : }
268 1 : return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
269 : }
270 :
271 : // FinishOutput closes the current blob file (if any). It returns the stats
272 : // and metadata of the now completed blob file.
273 1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
274 1 : if vs.writer == nil {
275 1 : return compact.ValueSeparationMetadata{}, nil
276 1 : }
277 1 : stats, err := vs.writer.Close()
278 1 : if err != nil {
279 0 : return compact.ValueSeparationMetadata{}, err
280 0 : }
281 1 : vs.writer = nil
282 1 : meta := &manifest.BlobFileMetadata{
283 1 : FileNum: vs.objMeta.DiskFileNum,
284 1 : Size: stats.FileLen,
285 1 : ValueSize: stats.UncompressedValueBytes,
286 1 : CreationTime: uint64(time.Now().Unix()),
287 1 : }
288 1 : return compact.ValueSeparationMetadata{
289 1 : BlobReferences: manifest.BlobReferences{{
290 1 : FileNum: vs.objMeta.DiskFileNum,
291 1 : ValueSize: stats.UncompressedValueBytes,
292 1 : Metadata: meta,
293 1 : }},
294 1 : BlobReferenceSize: stats.UncompressedValueBytes,
295 1 : BlobReferenceDepth: 1,
296 1 : BlobFileStats: stats,
297 1 : BlobFileObject: vs.objMeta,
298 1 : BlobFileMetadata: meta,
299 1 : }, nil
300 : }
301 :
302 : // preserveBlobReferences implements the compact.ValueSeparation interface. When
303 : // a compaction is configured with preserveBlobReferences, the compaction will
304 : // not create any new blob files. However, input references to existing blob
305 : // references will be preserved and metadata about the table's blob references
306 : // will be collected.
307 : type preserveBlobReferences struct {
308 : // inputBlobMetadatas should be populated to include the *BlobFileMetadata
309 : // for every unique blob file referenced by input sstables.
310 : // inputBlobMetadatas must be sorted by FileNum.
311 : inputBlobMetadatas []*manifest.BlobFileMetadata
312 : outputBlobReferenceDepth manifest.BlobReferenceDepth
313 :
314 : // state
315 : buf []byte
316 : currReferences manifest.BlobReferences
317 : // totalValueSize is the sum of the sizes of all ValueSizes in currReferences.
318 : totalValueSize uint64
319 : }
320 :
321 : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
322 : // interface.
323 : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
324 :
325 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
326 : // blob file if it were closed now.
327 1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
328 1 : return 0
329 1 : }
330 :
331 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
332 : // current output sstable's blob references so far.
333 1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
334 1 : // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
335 1 : // compressible data, it overestimates the disk space consumed by the blob
336 1 : // references. It also does not include the blob file's index block or
337 1 : // footer, so it can underestimate if values are completely incompressible.
338 1 : //
339 1 : // Should we compute a compression ratio per blob file and scale the
340 1 : // references appropriately?
341 1 : return vs.totalValueSize
342 1 : }
343 :
344 : // Add implements compact.ValueSeparation. This implementation will write
345 : // existing blob references to the output table.
346 : func (vs *preserveBlobReferences) Add(
347 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
348 1 : ) error {
349 1 : if !kv.V.IsBlobValueHandle() {
350 1 : // If the value is not already a blob handle (either it's in-place or in
351 1 : // a value block), we retrieve the value and write it through Add. The
352 1 : // sstable writer may still decide to put the value in a value block,
353 1 : // but regardless the value will be written to the sstable itself and
354 1 : // not a blob file.
355 1 : v, callerOwned, err := kv.Value(vs.buf)
356 1 : if err != nil {
357 0 : return err
358 0 : }
359 1 : if callerOwned {
360 0 : vs.buf = v[:0]
361 0 : }
362 1 : return tw.Add(kv.K, v, forceObsolete)
363 : }
364 :
365 : // The value is an existing blob handle. We can copy it into the output
366 : // sstable, taking note of the reference for the table metadata.
367 1 : lv := kv.V.LazyValue()
368 1 : fn := lv.Fetcher.BlobFileNum
369 1 :
370 1 : refID, found := vs.currReferences.IDByFileNum(fn)
371 1 : if !found {
372 1 : // This is the first time we're seeing this blob file for this sstable.
373 1 : // Find the blob file metadata for this file among the input metadatas.
374 1 : idx, found := vs.findInputBlobMetadata(fn)
375 1 : if !found {
376 0 : return errors.AssertionFailedf("pebble: blob file %s not found among input sstables", fn)
377 0 : }
378 1 : refID = blob.ReferenceID(len(vs.currReferences))
379 1 : vs.currReferences = append(vs.currReferences, manifest.BlobReference{
380 1 : FileNum: fn,
381 1 : Metadata: vs.inputBlobMetadatas[idx],
382 1 : })
383 : }
384 :
385 1 : if invariants.Enabled && vs.currReferences[refID].Metadata.FileNum != fn {
386 0 : panic("wrong reference index")
387 : }
388 :
389 1 : handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
390 1 : inlineHandle := blob.InlineHandle{
391 1 : InlineHandlePreface: blob.InlineHandlePreface{
392 1 : ReferenceID: refID,
393 1 : ValueLen: lv.Fetcher.Attribute.ValueLen,
394 1 : },
395 1 : HandleSuffix: handleSuffix,
396 1 : }
397 1 : err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
398 1 : if err != nil {
399 0 : return err
400 0 : }
401 1 : vs.currReferences[refID].ValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
402 1 : vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
403 1 : return nil
404 : }
405 :
406 : // findInputBlobMetadata returns the index of the input blob metadata that
407 : // corresponds to the provided file number. If the file number is not found,
408 : // the function returns false in the second return value.
409 1 : func (vs *preserveBlobReferences) findInputBlobMetadata(fn base.DiskFileNum) (int, bool) {
410 1 : return slices.BinarySearchFunc(vs.inputBlobMetadatas, fn,
411 1 : func(bm *manifest.BlobFileMetadata, fn base.DiskFileNum) int {
412 1 : return cmp.Compare(bm.FileNum, fn)
413 1 : })
414 : }
415 :
416 : // FinishOutput implements compact.ValueSeparation.
417 1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
418 1 : references := slices.Clone(vs.currReferences)
419 1 : vs.currReferences = vs.currReferences[:0]
420 1 : vs.totalValueSize = 0
421 1 :
422 1 : referenceSize := uint64(0)
423 1 : for _, ref := range references {
424 1 : referenceSize += ref.ValueSize
425 1 : }
426 1 : return compact.ValueSeparationMetadata{
427 1 : BlobReferences: references,
428 1 : BlobReferenceSize: referenceSize,
429 1 : // The outputBlobReferenceDepth is computed from the input sstables,
430 1 : // reflecting the worst-case overlap of referenced blob files. If this
431 1 : // sstable references fewer unique blob files, reduce its depth to the
432 1 : // count of unique files.
433 1 : BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
434 1 : }, nil
435 : }
|