Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "container/heap"
9 : "context"
10 : "iter"
11 : "runtime/pprof"
12 : "slices"
13 : "sync/atomic"
14 : "time"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/internal/problemspans"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
22 : "github.com/cockroachdb/pebble/sstable"
23 : "github.com/cockroachdb/pebble/sstable/blob"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : "github.com/cockroachdb/pebble/sstable/colblk"
26 : )
27 :
28 : // A pickedBlobFileCompaction is a blob file rewrite compaction that has been
29 : // picked by the compaction picker.
30 : type pickedBlobFileCompaction struct {
31 : vers *manifest.Version
32 : file manifest.BlobFileMetadata
33 : referencingTables []*manifest.TableMetadata
34 : }
35 :
36 : // Assert that *pickedBlobFileCompaction implements the pickedCompaction
37 : // interface.
38 : var _ pickedCompaction = (*pickedBlobFileCompaction)(nil)
39 :
40 1 : func (c *pickedBlobFileCompaction) ManualID() uint64 { return 0 }
41 :
42 1 : func (c *pickedBlobFileCompaction) WaitingCompaction() WaitingCompaction {
43 1 : entry := scheduledCompactionMap[compactionKindBlobFileRewrite]
44 1 : return WaitingCompaction{
45 1 : Optional: entry.optional,
46 1 : Priority: entry.priority,
47 1 : }
48 1 : }
49 :
50 : func (c *pickedBlobFileCompaction) ConstructCompaction(
51 : d *DB, grantHandle CompactionGrantHandle,
52 1 : ) compaction {
53 1 : // Add a reference to the version. The compaction will release the reference
54 1 : // when it completes.
55 1 : c.vers.Ref()
56 1 : return &blobFileRewriteCompaction{
57 1 : beganAt: d.timeNow(),
58 1 : grantHandle: grantHandle,
59 1 : version: c.vers,
60 1 : input: c.file,
61 1 : referencingTables: c.referencingTables,
62 1 : objCreateOpts: objstorage.CreateOptions{
63 1 : // TODO(jackson): Enable shared storage for blob files.
64 1 : PreferSharedStorage: false,
65 1 : WriteCategory: getDiskWriteCategoryForCompaction(d.opts, compactionKindBlobFileRewrite),
66 1 : },
67 1 : }
68 1 : }
69 :
70 : // A blobFileRewriteCompaction is a special variant of a compaction that
71 : // rewrites a blob file without rewriting sstables. When the compaction
72 : // completes, the Version's mapping of blob file ID to disk file number is
73 : // updated to point to the new blob file. The blob file is rewritten without
74 : // copying over values that are no longer referenced by any tables, reclaiming
75 : // disk space.
76 : type blobFileRewriteCompaction struct {
77 : // cancel is a bool that can be used by other goroutines to signal a compaction
78 : // to cancel, such as if a conflicting excise operation raced it to manifest
79 : // application. Only holders of the manifest lock will write to this atomic.
80 : cancel atomic.Bool
81 : // beganAt is the time when the compaction began.
82 : beganAt time.Time
83 : // grantHandle is a handle to the compaction that can be used to track
84 : // progress.
85 : grantHandle CompactionGrantHandle
86 : // version is a referenced version obtained when the compaction was picked.
87 : // This version must be unreferenced when the compaction is complete.
88 : version *manifest.Version
89 : // versionEditApplied is set to true when a compaction has completed and the
90 : // resulting version has been installed (if successful), but the compaction
91 : // goroutine is still cleaning up (eg, deleting obsolete files).
92 : versionEditApplied bool
93 : // input is the blob file that is being rewritten.
94 : input manifest.BlobFileMetadata
95 : // referencingTables is the set of sstables that reference the input blob
96 : // file in version.
97 : referencingTables []*manifest.TableMetadata
98 : objCreateOpts objstorage.CreateOptions
99 : internalIteratorStats base.InternalIteratorStats
100 : bytesWritten atomic.Int64 // Total bytes written to the new blob file.
101 : }
102 :
103 : // Assert that *blobFileRewriteCompaction implements the Compaction interface.
104 : var _ compaction = (*blobFileRewriteCompaction)(nil)
105 :
106 1 : func (c *blobFileRewriteCompaction) AddInProgressLocked(d *DB) {
107 1 : d.mu.compact.inProgress[c] = struct{}{}
108 1 : // TODO(jackson): Currently the compaction picker iterates through all
109 1 : // ongoing compactions in order to limit the number of concurrent blob
110 1 : // rewrite compactions to 1.
111 1 : //
112 1 : // Consider instead tracking which blob files are being rewritten, and we
113 1 : // can allow multiple concurrent blob rewrite compactions as long as they
114 1 : // compact different blob files.
115 1 : }
116 :
117 1 : func (c *blobFileRewriteCompaction) BeganAt() time.Time { return c.beganAt }
118 1 : func (c *blobFileRewriteCompaction) Bounds() *base.UserKeyBounds { return nil }
119 0 : func (c *blobFileRewriteCompaction) Cancel() { c.cancel.Store(true) }
120 1 : func (c *blobFileRewriteCompaction) IsDownload() bool { return false }
121 1 : func (c *blobFileRewriteCompaction) IsFlush() bool { return false }
122 1 : func (c *blobFileRewriteCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
123 0 : func (c *blobFileRewriteCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
124 0 : // No tables; return an empty iterator.
125 0 : return func(yield func(int, *manifest.TableMetadata) bool) {}
126 : }
127 :
128 0 : func (c *blobFileRewriteCompaction) ObjioTracingContext(ctx context.Context) context.Context {
129 0 : if objiotracing.Enabled {
130 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
131 0 : }
132 0 : return ctx
133 : }
134 :
135 1 : func (c *blobFileRewriteCompaction) PprofLabels(UserKeyCategories) pprof.LabelSet {
136 1 : return pprof.Labels("pebble", "blob-rewrite")
137 1 : }
138 :
139 0 : func (c *blobFileRewriteCompaction) VersionEditApplied() bool {
140 0 : return c.versionEditApplied
141 0 : }
142 :
143 1 : func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
144 1 : ctx := context.TODO()
145 1 : if objiotracing.Enabled {
146 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
147 0 : }
148 1 : c.grantHandle.Started()
149 1 : // The version stored in the compaction is ref'd when the compaction is
150 1 : // created. We're responsible for un-refing it when the compaction is
151 1 : // complete.
152 1 : defer c.version.UnrefLocked()
153 1 :
154 1 : // Notify the event listener that the compaction has begun.
155 1 : info := BlobFileRewriteInfo{
156 1 : JobID: int(jobID),
157 1 : Input: BlobFileInfo{
158 1 : BlobFileID: c.input.FileID,
159 1 : DiskFileNum: c.input.Physical.FileNum,
160 1 : Size: c.input.Physical.Size,
161 1 : ValueSize: c.input.Physical.ValueSize,
162 1 : },
163 1 : }
164 1 : d.opts.EventListener.BlobFileRewriteBegin(info)
165 1 : startTime := d.timeNow()
166 1 :
167 1 : // Run the blob file rewrite.
168 1 : objMeta, ve, err := d.runBlobFileRewriteLocked(ctx, jobID, c)
169 1 :
170 1 : info.Duration = d.timeNow().Sub(startTime)
171 1 :
172 1 : // Update the version with the remapped blob file.
173 1 : if err == nil {
174 1 : info.Output.BlobFileID = ve.NewBlobFiles[0].FileID
175 1 : info.Output.DiskFileNum = ve.NewBlobFiles[0].Physical.FileNum
176 1 : info.Output.Size = ve.NewBlobFiles[0].Physical.Size
177 1 : info.Output.ValueSize = ve.NewBlobFiles[0].Physical.ValueSize
178 1 : err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
179 1 : // It's possible that concurrent compactions removed references to
180 1 : // the blob file while the blob file rewrite compaction was running.
181 1 : // Now that we have the manifest lock, check if the blob file is
182 1 : // still current. If not, we bubble up ErrCancelledCompaction.
183 1 : v := d.mu.versions.currentVersion()
184 1 : currentDiskFileNum, ok := v.BlobFiles.Lookup(c.input.FileID)
185 1 : if !ok {
186 0 : return versionUpdate{}, errors.Wrapf(ErrCancelledCompaction,
187 0 : "blob file %s became unreferenced", c.input.FileID)
188 0 : }
189 : // Assert that the current version's disk file number for the blob
190 : // matches the one we rewrote. This compaction should be the only
191 : // rewrite compaction running for this blob file.
192 1 : if currentDiskFileNum != c.input.Physical.FileNum {
193 0 : return versionUpdate{}, base.AssertionFailedf(
194 0 : "blob file %s was rewritten to %s during rewrite compaction of %s",
195 0 : c.input.FileID, currentDiskFileNum, c.input.Physical.FileNum)
196 0 : }
197 1 : return versionUpdate{
198 1 : VE: ve,
199 1 : JobID: jobID,
200 1 : InProgressCompactionsFn: func() []compactionInfo {
201 1 : return d.getInProgressCompactionInfoLocked(c)
202 1 : },
203 : }, nil
204 : })
205 : }
206 :
207 1 : d.mu.versions.incrementCompactions(compactionKindBlobFileRewrite, nil, c.bytesWritten.Load(), err)
208 1 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
209 1 :
210 1 : // Update the read state to publish the new version.
211 1 : if err == nil {
212 1 : d.updateReadStateLocked(d.opts.DebugCheck)
213 1 : }
214 :
215 : // Ensure we clean up the blob file we created on failure.
216 1 : if err != nil {
217 0 : if objMeta.DiskFileNum != 0 {
218 0 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []obsoleteFile{
219 0 : {
220 0 : fileType: base.FileTypeBlob,
221 0 : fs: d.opts.FS,
222 0 : path: d.objProvider.Path(objMeta),
223 0 : fileNum: objMeta.DiskFileNum,
224 0 : // We don't know the size of the output blob file--it may have
225 0 : // been half-written. We use the input blob file size as an
226 0 : // approximation for deletion pacing.
227 0 : fileSize: c.input.Physical.Size,
228 0 : isLocal: true,
229 0 : },
230 0 : })
231 0 : }
232 : }
233 :
234 : // Notify the event listener that the compaction has ended.
235 1 : now := d.timeNow()
236 1 : info.TotalDuration = now.Sub(c.beganAt)
237 1 : info.Done = true
238 1 : info.Err = err
239 1 : d.opts.EventListener.BlobFileRewriteEnd(info)
240 1 : return nil
241 : }
242 :
243 1 : func (c *blobFileRewriteCompaction) Info() compactionInfo {
244 1 : return compactionInfo{
245 1 : kind: compactionKindBlobFileRewrite,
246 1 : versionEditApplied: c.versionEditApplied,
247 1 : outputLevel: -1,
248 1 : }
249 1 : }
250 :
251 0 : func (c *blobFileRewriteCompaction) RecordError(*problemspans.ByLevel, error) {
252 0 : // TODO(jackson): Track problematic blob files and avoid re-picking the same
253 0 : // blob file compaction.
254 0 : }
255 :
256 : // runBlobFileRewriteLocked runs a blob file rewrite. d.mu must be held when
257 : // calling this, although it may be dropped and re-acquired during the course of
258 : // the method.
259 : func (d *DB) runBlobFileRewriteLocked(
260 : ctx context.Context, jobID JobID, c *blobFileRewriteCompaction,
261 1 : ) (objstorage.ObjectMetadata, *manifest.VersionEdit, error) {
262 1 : // Drop the database mutex while we perform the rewrite, and re-acquire it
263 1 : // before returning.
264 1 : d.mu.Unlock()
265 1 : defer d.mu.Lock()
266 1 :
267 1 : // Construct the block.ReadEnv configured with a buffer pool. Setting the
268 1 : // buffer pool ensures we won't cache blocks in the block cache. As soon as
269 1 : // the compaction finishes new iterators will read the new blob file, so it
270 1 : // would be unlikely the cached blocks would be reused.
271 1 : var bufferPool block.BufferPool
272 1 : bufferPool.Init(4)
273 1 : defer bufferPool.Release()
274 1 : env := block.ReadEnv{
275 1 : Stats: &c.internalIteratorStats,
276 1 : BufferPool: &bufferPool,
277 1 : ReportCorruptionFn: d.reportCorruption,
278 1 : }
279 1 :
280 1 : // Create a new file for the rewritten blob file.
281 1 : writable, objMeta, err := d.newCompactionOutputBlob(jobID, compactionKindBlobFileRewrite, -1, &c.bytesWritten, c.objCreateOpts)
282 1 : if err != nil {
283 0 : return objstorage.ObjectMetadata{}, nil, err
284 0 : }
285 : // Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
286 : // There's no single associated level with a blob file. A long-lived blob
287 : // file that gets rewritten is likely to mostly be referenced from L6.
288 : // TODO(jackson): Consider refactoring to remove the level association.
289 1 : rewriter := newBlobFileRewriter(
290 1 : d.fileCache,
291 1 : env,
292 1 : objMeta.DiskFileNum,
293 1 : writable,
294 1 : d.opts.MakeBlobWriterOptions(6),
295 1 : c.referencingTables,
296 1 : c.input,
297 1 : )
298 1 : // Perform the rewrite.
299 1 : stats, err := rewriter.Rewrite(ctx)
300 1 : if err != nil {
301 0 : return objstorage.ObjectMetadata{}, nil, err
302 0 : }
303 :
304 : // Sync the object provider to ensure the metadata for the blob file is
305 : // persisted.
306 1 : if err := d.objProvider.Sync(); err != nil {
307 0 : return objstorage.ObjectMetadata{}, nil, err
308 0 : }
309 :
310 1 : ve := &manifest.VersionEdit{
311 1 : DeletedBlobFiles: map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile{
312 1 : {
313 1 : FileID: c.input.FileID,
314 1 : FileNum: c.input.Physical.FileNum,
315 1 : }: c.input.Physical,
316 1 : },
317 1 : NewBlobFiles: []manifest.BlobFileMetadata{
318 1 : {
319 1 : FileID: c.input.FileID,
320 1 : Physical: &manifest.PhysicalBlobFile{
321 1 : FileNum: objMeta.DiskFileNum,
322 1 : Size: stats.FileLen,
323 1 : ValueSize: stats.UncompressedValueBytes,
324 1 : CreationTime: uint64(d.timeNow().Unix()),
325 1 : },
326 1 : },
327 1 : },
328 1 : }
329 1 : return objMeta, ve, nil
330 : }
331 :
332 : // blockHeap is a min-heap of blob reference liveness encodings, ordered by
333 : // blockID. We use this to help us determine the overall liveness of values in
334 : // each blob block by combining the blob reference liveness encodings of all
335 : // referencing sstables for a particular blockID.
336 : type blockHeap []*sstable.BlobRefLivenessEncoding
337 :
338 : // Len implements sort.Interface.
339 1 : func (h blockHeap) Len() int { return len(h) }
340 :
341 : // Less implements sort.Interface.
342 1 : func (h blockHeap) Less(i, j int) bool { return h[i].BlockID < h[j].BlockID }
343 :
344 : // Swap implements sort.Interface.
345 1 : func (h blockHeap) Swap(i, j int) {
346 1 : h[i], h[j] = h[j], h[i]
347 1 : }
348 :
349 : // Push implements heap.Interface.
350 1 : func (h *blockHeap) Push(x any) {
351 1 : blobEnc := x.(*sstable.BlobRefLivenessEncoding)
352 1 : *h = append(*h, blobEnc)
353 1 : }
354 :
355 : // Pop implements heap.Interface.
356 1 : func (h *blockHeap) Pop() any {
357 1 : old := *h
358 1 : n := len(old)
359 1 : item := old[n-1]
360 1 : old[n-1] = nil
361 1 : *h = old[0 : n-1]
362 1 : return item
363 1 : }
364 :
365 : // blockValues holds the accumulated liveness data for blockID.
366 : type blockValues struct {
367 : blockID blob.BlockID
368 : valuesSize int
369 : liveValueIDs []int
370 : }
371 :
372 : // blobFileRewriter is responsible for rewriting blob files by combining and
373 : // processing blob reference liveness encodings from multiple SSTables. It
374 : // maintains state for writing to an output blob file.
375 : type blobFileRewriter struct {
376 : fc *fileCacheHandle
377 : readEnv block.ReadEnv
378 : sstables []*manifest.TableMetadata
379 : inputBlob manifest.BlobFileMetadata
380 : rw *blob.FileRewriter
381 : blkHeap blockHeap
382 : }
383 :
384 : func newBlobFileRewriter(
385 : fc *fileCacheHandle,
386 : readEnv block.ReadEnv,
387 : outputFileNum base.DiskFileNum,
388 : w objstorage.Writable,
389 : opts blob.FileWriterOptions,
390 : sstables []*manifest.TableMetadata,
391 : inputBlob manifest.BlobFileMetadata,
392 1 : ) *blobFileRewriter {
393 1 : rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical.FileNum, fc, readEnv, outputFileNum, w, opts)
394 1 : return &blobFileRewriter{
395 1 : fc: fc,
396 1 : readEnv: readEnv,
397 1 : rw: rw,
398 1 : sstables: sstables,
399 1 : inputBlob: inputBlob,
400 1 : blkHeap: blockHeap{},
401 1 : }
402 1 : }
403 :
404 : // generateHeap populates rw.blkHeap with the blob reference liveness encodings
405 : // for each referencing sstable, rw.sstables.
406 1 : func (rw *blobFileRewriter) generateHeap(ctx context.Context) error {
407 1 : heap.Init(&rw.blkHeap)
408 1 :
409 1 : var decoder colblk.ReferenceLivenessBlockDecoder
410 1 : // For each sstable that references the input blob file, push its
411 1 : // sstable.BlobLivenessEncoding on to the heap.
412 1 : for _, sst := range rw.sstables {
413 1 : // Validate that the sstable contains a reference to the input blob
414 1 : // file.
415 1 : refID, ok := sst.BlobReferences.IDByBlobFileID(rw.inputBlob.FileID)
416 1 : if !ok {
417 0 : return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
418 0 : sst.TableNum, rw.inputBlob.FileID)
419 0 : }
420 1 : err := rw.fc.withReader(ctx, rw.readEnv, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
421 1 : h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
422 1 : if err != nil {
423 0 : return err
424 0 : }
425 1 : defer h.Release()
426 1 : decoder.Init(h.BlockData())
427 1 : bitmapEncodings := slices.Clone(decoder.LivenessAtReference(int(refID)))
428 1 : // TODO(annie): We should instead maintain 1 heap item per sstable
429 1 : // instead of 1 heap item per sstable block ref to reduce the heap
430 1 : // comparisons to O(sstables).
431 1 : for _, enc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
432 1 : heap.Push(&rw.blkHeap, &enc)
433 1 : }
434 1 : return nil
435 : })
436 1 : if err != nil {
437 0 : return err
438 0 : }
439 : }
440 1 : return nil
441 : }
442 :
443 1 : func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats, error) {
444 1 : if err := rw.generateHeap(ctx); err != nil {
445 0 : return blob.FileWriterStats{}, err
446 0 : }
447 1 : if rw.blkHeap.Len() == 0 {
448 0 : return blob.FileWriterStats{}, errors.AssertionFailedf("heap empty")
449 0 : }
450 :
451 : // Begin constructing our output blob file. We maintain a map of blockID
452 : // to accumulated liveness data across all referencing sstables.
453 1 : firstBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
454 1 : pending := blockValues{
455 1 : blockID: firstBlock.BlockID,
456 1 : valuesSize: firstBlock.ValuesSize,
457 1 : liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(firstBlock.Bitmap)),
458 1 : }
459 1 : for rw.blkHeap.Len() > 0 {
460 1 : nextBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
461 1 :
462 1 : // If we are encountering a new block, write the last accumulated block
463 1 : // to the blob file.
464 1 : if pending.blockID != nextBlock.BlockID {
465 1 : // Write the last accumulated block's values to the blob file.
466 1 : err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
467 1 : if err != nil {
468 0 : return blob.FileWriterStats{}, err
469 0 : }
470 1 : pending = blockValues{blockID: nextBlock.BlockID, liveValueIDs: pending.liveValueIDs[:0]}
471 : }
472 : // Update the accumulated encoding for this block.
473 1 : pending.valuesSize += nextBlock.ValuesSize
474 1 : pending.liveValueIDs = slices.AppendSeq(pending.liveValueIDs,
475 1 : sstable.IterSetBitsInRunLengthBitmap(nextBlock.Bitmap))
476 : }
477 :
478 : // Copy the last accumulated block.
479 1 : err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
480 1 : if err != nil {
481 0 : return blob.FileWriterStats{}, err
482 0 : }
483 1 : return rw.rw.Close()
484 : }
|