Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "cmp"
9 : "slices"
10 : "time"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/compact"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/sstable"
19 : "github.com/cockroachdb/pebble/sstable/blob"
20 : )
21 :
22 : // writeNewBlobFiles implements the strategy and mechanics for separating values
23 : // into external blob files.
24 : type writeNewBlobFiles struct {
25 : comparer *base.Comparer
26 : // newBlobObject constructs a new blob object for use in the compaction.
27 : newBlobObject func() (objstorage.Writable, objstorage.ObjectMetadata, error)
28 : shortAttrExtractor ShortAttributeExtractor
29 : // writerOpts is used to configure all constructed blob writers.
30 : writerOpts blob.FileWriterOptions
31 : // minimumSize imposes a lower bound on the size of values that can be
32 : // separated into a blob file. Values smaller than this are always written
33 : // to the sstable (but may still be written to a value block within the
34 : // sstable).
35 : minimumSize int
36 : // requiredInPlaceValueBound configures a region of the keyspace that must
37 : // be written to the sstable in place, and are not eligible for value
38 : // separation.
39 : requiredInPlaceValueBound UserKeyPrefixBound
40 :
41 : // Current blob writer state
42 : writer *blob.FileWriter
43 : objMeta objstorage.ObjectMetadata
44 :
45 : buf []byte
46 : }
47 :
48 : // Assert that *writeNewBlobFiles implements the compact.ValueSeparation interface.
49 : var _ compact.ValueSeparation = (*writeNewBlobFiles)(nil)
50 :
51 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
52 : // blob file if it were closed now.
53 1 : func (vs *writeNewBlobFiles) EstimatedFileSize() uint64 {
54 1 : if vs.writer == nil {
55 1 : return 0
56 1 : }
57 1 : return vs.writer.EstimatedSize()
58 : }
59 :
60 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
61 : // current output sstable's blob references so far.
62 1 : func (vs *writeNewBlobFiles) EstimatedReferenceSize() uint64 {
63 1 : // When we're writing to new blob files, the size of the blob file itself is
64 1 : // a better estimate of the disk space consumed than the uncompressed value
65 1 : // sizes.
66 1 : return vs.EstimatedFileSize()
67 1 : }
68 :
69 : // Add adds the provided key-value pair to the sstable, possibly separating the
70 : // value into a blob file.
71 : func (vs *writeNewBlobFiles) Add(
72 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
73 1 : ) error {
74 1 : // We always fetch the value if we're rewriting blob files. We want to
75 1 : // replace any references to existing blob files with references to new blob
76 1 : // files that we write during the compaction.
77 1 : v, callerOwned, err := kv.Value(vs.buf)
78 1 : if err != nil {
79 0 : return err
80 0 : }
81 1 : if callerOwned {
82 0 : vs.buf = v[:0]
83 0 : }
84 :
85 : // Values that are too small are never separated.
86 1 : if len(v) < vs.minimumSize {
87 1 : return tw.Add(kv.K, v, forceObsolete)
88 1 : }
89 : // Merge keys are never separated.
90 1 : if kv.K.Kind() == base.InternalKeyKindMerge {
91 0 : return tw.Add(kv.K, v, forceObsolete)
92 0 : }
93 : // If the user configured bounds requiring some keys' values to be in-place,
94 : // compare the user key's prefix against the bounds.
95 1 : if !vs.requiredInPlaceValueBound.IsEmpty() {
96 1 : kPrefix := vs.comparer.Split.Prefix(kv.K.UserKey)
97 1 : if vs.comparer.Compare(vs.requiredInPlaceValueBound.Upper, kPrefix) <= 0 {
98 1 : // Common case for CockroachDB. Clear it since all future keys will
99 1 : // be >= this key.
100 1 : vs.requiredInPlaceValueBound = UserKeyPrefixBound{}
101 1 : } else if vs.comparer.Compare(kPrefix, vs.requiredInPlaceValueBound.Lower) >= 0 {
102 1 : // Don't separate the value if the key is within the bounds.
103 1 : return tw.Add(kv.K, v, forceObsolete)
104 1 : }
105 : }
106 :
107 : // This KV met all the criteria and its value will be separated.
108 :
109 : // If there's a configured short attribute extractor, extract the value's
110 : // short attribute.
111 1 : var shortAttr base.ShortAttribute
112 1 : if vs.shortAttrExtractor != nil {
113 0 : keyPrefixLen := vs.comparer.Split(kv.K.UserKey)
114 0 : shortAttr, err = vs.shortAttrExtractor(kv.K.UserKey, keyPrefixLen, v)
115 0 : if err != nil {
116 0 : return err
117 0 : }
118 : }
119 :
120 : // If we don't have an open blob writer, create one. We create blob objects
121 : // lazily so that we don't create them unless a compaction will actually
122 : // write to a blob file. This avoids creating and deleting empty blob files
123 : // on every compaction in parts of the keyspace that a) are required to be
124 : // in-place or b) have small values.
125 1 : if vs.writer == nil {
126 1 : writable, objMeta, err := vs.newBlobObject()
127 1 : if err != nil {
128 0 : return err
129 0 : }
130 1 : vs.objMeta = objMeta
131 1 : vs.writer = blob.NewFileWriter(objMeta.DiskFileNum, writable, vs.writerOpts)
132 : }
133 :
134 : // Append the value to the blob file.
135 1 : handle := vs.writer.AddValue(v)
136 1 :
137 1 : // Write the key and the handle to the sstable. We need to map the
138 1 : // blob.Handle into a blob.InlineHandle. Everything is copied verbatim,
139 1 : // except the FileNum is translated into a reference index.
140 1 : inlineHandle := blob.InlineHandle{
141 1 : InlineHandlePreface: blob.InlineHandlePreface{
142 1 : // Since we're writing blob files and maintaining a 1:1 mapping
143 1 : // between sstables and blob files, the reference index is always 0
144 1 : // here. Only compactions that don't rewrite blob files will produce
145 1 : // handles with nonzero reference indices.
146 1 : ReferenceID: 0,
147 1 : ValueLen: handle.ValueLen,
148 1 : },
149 1 : HandleSuffix: blob.HandleSuffix{
150 1 : BlockNum: handle.BlockNum,
151 1 : OffsetInBlock: handle.OffsetInBlock,
152 1 : },
153 1 : }
154 1 : return tw.AddWithBlobHandle(kv.K, inlineHandle, shortAttr, forceObsolete)
155 : }
156 :
157 : // FinishOutput closes the current blob file (if any). It returns the stats
158 : // and metadata of the now completed blob file.
159 1 : func (vs *writeNewBlobFiles) FinishOutput() (compact.ValueSeparationMetadata, error) {
160 1 : if vs.writer == nil {
161 1 : return compact.ValueSeparationMetadata{}, nil
162 1 : }
163 1 : stats, err := vs.writer.Close()
164 1 : if err != nil {
165 0 : return compact.ValueSeparationMetadata{}, err
166 0 : }
167 1 : vs.writer = nil
168 1 : return compact.ValueSeparationMetadata{
169 1 : BlobReferences: manifest.BlobReferences{{
170 1 : FileNum: vs.objMeta.DiskFileNum,
171 1 : ValueSize: stats.UncompressedValueBytes,
172 1 : }},
173 1 : BlobReferenceSize: stats.UncompressedValueBytes,
174 1 : BlobReferenceDepth: 1,
175 1 : BlobFileStats: stats,
176 1 : BlobFileObject: vs.objMeta,
177 1 : BlobFileMetadata: &manifest.BlobFileMetadata{
178 1 : FileNum: vs.objMeta.DiskFileNum,
179 1 : Size: stats.FileLen,
180 1 : ValueSize: stats.UncompressedValueBytes,
181 1 : CreationTime: uint64(time.Now().Unix()),
182 1 : },
183 1 : }, nil
184 : }
185 :
186 : // preserveBlobReferences implements the compact.ValueSeparation interface. When
187 : // a compaction is configured with preserveBlobReferences, the compaction will
188 : // not create any new blob files. However, input references to existing blob
189 : // references will be preserved and metadata about the table's blob references
190 : // will be collected.
191 : type preserveBlobReferences struct {
192 : // inputBlobMetadatas should be populated to include the *BlobFileMetadata
193 : // for every unique blob file referenced by input sstables.
194 : // inputBlobMetadatas must be sorted by FileNum.
195 : inputBlobMetadatas []*manifest.BlobFileMetadata
196 : outputBlobReferenceDepth manifest.BlobReferenceDepth
197 :
198 : // state
199 : buf []byte
200 : currReferences manifest.BlobReferences
201 : // totalValueSize is the sum of the sizes of all ValueSizes in currReferences.
202 : totalValueSize uint64
203 : }
204 :
205 : // Assert that *preserveBlobReferences implements the compact.ValueSeparation
206 : // interface.
207 : var _ compact.ValueSeparation = (*preserveBlobReferences)(nil)
208 :
209 : // EstimatedFileSize returns an estimate of the disk space consumed by the current
210 : // blob file if it were closed now.
211 1 : func (vs *preserveBlobReferences) EstimatedFileSize() uint64 {
212 1 : return 0
213 1 : }
214 :
215 : // EstimatedReferenceSize returns an estimate of the disk space consumed by the
216 : // current output sstable's blob references so far.
217 1 : func (vs *preserveBlobReferences) EstimatedReferenceSize() uint64 {
218 1 : // TODO(jackson): The totalValueSize is the uncompressed value sizes. With
219 1 : // compressible data, it overestimates the disk space consumed by the blob
220 1 : // references. It also does not include the blob file's index block or
221 1 : // footer, so it can underestimate if values are completely incompressible.
222 1 : //
223 1 : // Should we compute a compression ratio per blob file and scale the
224 1 : // references appropriately?
225 1 : return vs.totalValueSize
226 1 : }
227 :
228 : // Add implements compact.ValueSeparation. This implementation will write
229 : // existing blob references to the output table.
230 : func (vs *preserveBlobReferences) Add(
231 : tw sstable.RawWriter, kv *base.InternalKV, forceObsolete bool,
232 1 : ) error {
233 1 : if !kv.V.IsBlobValueHandle() {
234 1 : // If the value is not already a blob handle (either it's in-place or in
235 1 : // a value block), we retrieve the value and write it through Add. The
236 1 : // sstable writer may still decide to put the value in a value block,
237 1 : // but regardless the value will be written to the sstable itself and
238 1 : // not a blob file.
239 1 : v, callerOwned, err := kv.Value(vs.buf)
240 1 : if err != nil {
241 0 : return err
242 0 : }
243 1 : if callerOwned {
244 0 : vs.buf = v[:0]
245 0 : }
246 1 : return tw.Add(kv.K, v, forceObsolete)
247 : }
248 :
249 : // The value is an existing blob handle. We can copy it into the output
250 : // sstable, taking note of the reference for the table metadata.
251 1 : lv := kv.V.LazyValue()
252 1 : fn := lv.Fetcher.BlobFileNum
253 1 :
254 1 : refID, found := vs.currReferences.IDByFileNum(fn)
255 1 : if !found {
256 1 : // This is the first time we're seeing this blob file for this sstable.
257 1 : // Find the blob file metadata for this file among the input metadatas.
258 1 : idx, found := vs.findInputBlobMetadata(fn)
259 1 : if !found {
260 0 : return errors.AssertionFailedf("pebble: blob file %s not found among input sstables", fn)
261 0 : }
262 1 : refID = blob.ReferenceID(len(vs.currReferences))
263 1 : vs.currReferences = append(vs.currReferences, manifest.BlobReference{
264 1 : FileNum: fn,
265 1 : Metadata: vs.inputBlobMetadatas[idx],
266 1 : })
267 : }
268 :
269 1 : if invariants.Enabled && vs.currReferences[refID].Metadata.FileNum != fn {
270 0 : panic("wrong reference index")
271 : }
272 :
273 1 : handleSuffix := blob.DecodeHandleSuffix(lv.ValueOrHandle)
274 1 : inlineHandle := blob.InlineHandle{
275 1 : InlineHandlePreface: blob.InlineHandlePreface{
276 1 : ReferenceID: refID,
277 1 : ValueLen: lv.Fetcher.Attribute.ValueLen,
278 1 : },
279 1 : HandleSuffix: handleSuffix,
280 1 : }
281 1 : err := tw.AddWithBlobHandle(kv.K, inlineHandle, lv.Fetcher.Attribute.ShortAttribute, forceObsolete)
282 1 : if err != nil {
283 0 : return err
284 0 : }
285 1 : vs.currReferences[refID].ValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
286 1 : vs.totalValueSize += uint64(lv.Fetcher.Attribute.ValueLen)
287 1 : return nil
288 : }
289 :
290 : // findInputBlobMetadata returns the index of the input blob metadata that
291 : // corresponds to the provided file number. If the file number is not found,
292 : // the function returns false in the second return value.
293 1 : func (vs *preserveBlobReferences) findInputBlobMetadata(fn base.DiskFileNum) (int, bool) {
294 1 : return slices.BinarySearchFunc(vs.inputBlobMetadatas, fn,
295 1 : func(bm *manifest.BlobFileMetadata, fn base.DiskFileNum) int {
296 1 : return cmp.Compare(bm.FileNum, fn)
297 1 : })
298 : }
299 :
300 : // FinishOutput implements compact.ValueSeparation.
301 1 : func (vs *preserveBlobReferences) FinishOutput() (compact.ValueSeparationMetadata, error) {
302 1 : references := slices.Clone(vs.currReferences)
303 1 : vs.currReferences = vs.currReferences[:0]
304 1 :
305 1 : referenceSize := uint64(0)
306 1 : for _, ref := range references {
307 1 : referenceSize += ref.ValueSize
308 1 : }
309 1 : return compact.ValueSeparationMetadata{
310 1 : BlobReferences: references,
311 1 : BlobReferenceSize: referenceSize,
312 1 : // The outputBlobReferenceDepth is computed from the input sstables,
313 1 : // reflecting the worst-case overlap of referenced blob files. If this
314 1 : // sstable references fewer unique blob files, reduce its depth to the
315 1 : // count of unique files.
316 1 : BlobReferenceDepth: min(vs.outputBlobReferenceDepth, manifest.BlobReferenceDepth(len(references))),
317 1 : }, nil
318 : }
|