LCOV - code coverage report
Current view: top level - pebble - blob_rewrite.go (source / functions) Coverage Total Hit
Test: 2025-10-06 08:19Z 3214797c - tests + meta.lcov Lines: 88.0 % 326 287
Test Date: 2025-10-06 08:22:01 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "container/heap"
       9              :         "context"
      10              :         "fmt"
      11              :         "iter"
      12              :         "runtime/pprof"
      13              :         "slices"
      14              :         "sync/atomic"
      15              :         "time"
      16              : 
      17              :         "github.com/cockroachdb/errors"
      18              :         "github.com/cockroachdb/pebble/internal/base"
      19              :         "github.com/cockroachdb/pebble/internal/manifest"
      20              :         "github.com/cockroachdb/pebble/internal/problemspans"
      21              :         "github.com/cockroachdb/pebble/objstorage"
      22              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      23              :         "github.com/cockroachdb/pebble/sstable"
      24              :         "github.com/cockroachdb/pebble/sstable/blob"
      25              :         "github.com/cockroachdb/pebble/sstable/block"
      26              :         "github.com/cockroachdb/pebble/sstable/colblk"
      27              : )
      28              : 
      29              : // A pickedBlobFileCompaction is a blob file rewrite compaction that has been
      30              : // picked by the compaction picker.
      31              : type pickedBlobFileCompaction struct {
      32              :         // highPriority is set to true if the compaction was picked because the
      33              :         // ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
      34              :         // triggered. In this case, the resulting compaction will be permitted to
      35              :         // use a burst compaction concurrency slot to avoid starving default
      36              :         // compactions.
      37              :         highPriority      bool
      38              :         vers              *manifest.Version
      39              :         file              manifest.BlobFileMetadata
      40              :         referencingTables []*manifest.TableMetadata
      41              : }
      42              : 
      43              : // Assert that *pickedBlobFileCompaction implements the pickedCompaction
      44              : // interface.
      45              : var _ pickedCompaction = (*pickedBlobFileCompaction)(nil)
      46              : 
      47            2 : func (c *pickedBlobFileCompaction) ManualID() uint64 { return 0 }
      48              : 
      49            2 : func (c *pickedBlobFileCompaction) WaitingCompaction() WaitingCompaction {
      50            2 :         entry := scheduledCompactionMap[compactionKindBlobFileRewrite]
      51            2 :         return WaitingCompaction{
      52            2 :                 Optional: entry.optional,
      53            2 :                 Priority: entry.priority,
      54            2 :         }
      55            2 : }
      56              : 
      57              : func (c *pickedBlobFileCompaction) ConstructCompaction(
      58              :         d *DB, grantHandle CompactionGrantHandle,
      59            2 : ) compaction {
      60            2 :         // Add a reference to the version. The compaction will release the reference
      61            2 :         // when it completes.
      62            2 :         c.vers.Ref()
      63            2 :         return &blobFileRewriteCompaction{
      64            2 :                 beganAt:           d.timeNow(),
      65            2 :                 grantHandle:       grantHandle,
      66            2 :                 version:           c.vers,
      67            2 :                 input:             c.file,
      68            2 :                 referencingTables: c.referencingTables,
      69            2 :                 objCreateOpts: objstorage.CreateOptions{
      70            2 :                         // TODO(jackson): Enable shared storage for blob files.
      71            2 :                         PreferSharedStorage: false,
      72            2 :                         WriteCategory:       getDiskWriteCategoryForCompaction(d.opts, compactionKindBlobFileRewrite),
      73            2 :                 },
      74            2 :                 highPriority: c.highPriority,
      75            2 :         }
      76            2 : }
      77              : 
      78              : // A blobFileRewriteCompaction is a special variant of a compaction that
      79              : // rewrites a blob file without rewriting sstables. When the compaction
      80              : // completes, the Version's mapping of blob file ID to disk file number is
      81              : // updated to point to the new blob file. The blob file is rewritten without
      82              : // copying over values that are no longer referenced by any tables, reclaiming
      83              : // disk space.
      84              : type blobFileRewriteCompaction struct {
      85              :         // cancel is a bool that can be used by other goroutines to signal a compaction
      86              :         // to cancel, such as if a conflicting excise operation raced it to manifest
      87              :         // application. Only holders of the manifest lock will write to this atomic.
      88              :         cancel atomic.Bool
      89              :         // beganAt is the time when the compaction began.
      90              :         beganAt time.Time
      91              :         // grantHandle is a handle to the compaction that can be used to track
      92              :         // progress.
      93              :         grantHandle CompactionGrantHandle
      94              :         // version is a referenced version obtained when the compaction was picked.
      95              :         // This version must be unreferenced when the compaction is complete.
      96              :         version *manifest.Version
      97              :         // versionEditApplied is set to true when a compaction has completed and the
      98              :         // resulting version has been installed (if successful), but the compaction
      99              :         // goroutine is still cleaning up (eg, deleting obsolete files).
     100              :         versionEditApplied bool
     101              :         // input is the blob file that is being rewritten.
     102              :         input manifest.BlobFileMetadata
     103              :         // referencingTables is the set of sstables that reference the input blob
     104              :         // file in version.
     105              :         referencingTables     []*manifest.TableMetadata
     106              :         objCreateOpts         objstorage.CreateOptions
     107              :         internalIteratorStats base.InternalIteratorStats
     108              :         bytesWritten          atomic.Int64 // Total bytes written to the new blob file.
     109              :         // highPriority is set to true if the compaction was picked because the
     110              :         // ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
     111              :         // triggered. In this case, the resulting compaction will be permitted to
     112              :         // use a burst compaction concurrency slot to avoid starving default
     113              :         // compactions. This field is set when the compaction is created.
     114              :         highPriority bool
     115              : }
     116              : 
     117            1 : func (c *blobFileRewriteCompaction) String() string {
     118            1 :         s := fmt.Sprintf("blob file (ID: %s) %s (%s) being rewritten",
     119            1 :                 c.input.FileID, c.input.Physical.FileNum, humanizeBytes(uint64(c.input.Physical.Size)))
     120            1 :         if c.highPriority {
     121            1 :                 s += " (high priority)"
     122            1 :         }
     123            1 :         return s
     124              : }
     125              : 
     126              : // Assert that *blobFileRewriteCompaction implements the Compaction interface.
     127              : var _ compaction = (*blobFileRewriteCompaction)(nil)
     128              : 
     129            2 : func (c *blobFileRewriteCompaction) AddInProgressLocked(d *DB) {
     130            2 :         d.mu.compact.inProgress[c] = struct{}{}
     131            2 :         if c.highPriority {
     132            2 :                 d.mu.compact.burstConcurrency.Add(1)
     133            2 :         }
     134              :         // TODO(jackson): Currently the compaction picker iterates through all
     135              :         // ongoing compactions in order to limit the number of concurrent blob
     136              :         // rewrite compactions to 1.
     137              :         //
     138              :         // Consider instead tracking which blob files are being rewritten, and we
     139              :         // can allow multiple concurrent blob rewrite compactions as long as they
     140              :         // compact different blob files.
     141              : }
     142              : 
     143            2 : func (c *blobFileRewriteCompaction) BeganAt() time.Time                 { return c.beganAt }
     144            2 : func (c *blobFileRewriteCompaction) Bounds() *base.UserKeyBounds        { return nil }
     145            0 : func (c *blobFileRewriteCompaction) Cancel()                            { c.cancel.Store(true) }
     146            2 : func (c *blobFileRewriteCompaction) IsDownload() bool                   { return false }
     147            2 : func (c *blobFileRewriteCompaction) IsFlush() bool                      { return false }
     148            2 : func (c *blobFileRewriteCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
     149            1 : func (c *blobFileRewriteCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
     150            1 :         // No tables; return an empty iterator.
     151            1 :         return func(yield func(int, *manifest.TableMetadata) bool) {}
     152              : }
     153              : 
     154            0 : func (c *blobFileRewriteCompaction) ObjioTracingContext(ctx context.Context) context.Context {
     155            0 :         if objiotracing.Enabled {
     156            0 :                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
     157            0 :         }
     158            0 :         return ctx
     159              : }
     160              : 
     161            2 : func (c *blobFileRewriteCompaction) PprofLabels(UserKeyCategories) pprof.LabelSet {
     162            2 :         return pprof.Labels("pebble", "blob-rewrite")
     163            2 : }
     164              : 
     165            1 : func (c *blobFileRewriteCompaction) VersionEditApplied() bool {
     166            1 :         return c.versionEditApplied
     167            1 : }
     168              : 
     169            2 : func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
     170            2 :         ctx := context.TODO()
     171            2 :         if objiotracing.Enabled {
     172            0 :                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
     173            0 :         }
     174            2 :         c.grantHandle.Started()
     175            2 :         // The version stored in the compaction is ref'd when the compaction is
     176            2 :         // created. We're responsible for un-refing it when the compaction is
     177            2 :         // complete.
     178            2 :         defer c.version.UnrefLocked()
     179            2 : 
     180            2 :         // Notify the event listener that the compaction has begun.
     181            2 :         info := BlobFileRewriteInfo{
     182            2 :                 JobID: int(jobID),
     183            2 :                 Input: BlobFileInfo{
     184            2 :                         BlobFileID:  c.input.FileID,
     185            2 :                         DiskFileNum: c.input.Physical.FileNum,
     186            2 :                         Size:        c.input.Physical.Size,
     187            2 :                         ValueSize:   c.input.Physical.ValueSize,
     188            2 :                 },
     189            2 :         }
     190            2 :         d.opts.EventListener.BlobFileRewriteBegin(info)
     191            2 :         startTime := d.timeNow()
     192            2 : 
     193            2 :         // Run the blob file rewrite.
     194            2 :         objMeta, ve, err := d.runBlobFileRewriteLocked(ctx, jobID, c)
     195            2 : 
     196            2 :         info.Duration = d.timeNow().Sub(startTime)
     197            2 : 
     198            2 :         // Update the version with the remapped blob file.
     199            2 :         if err == nil {
     200            2 :                 // Ensure the rewrite did reduce the aggregate value size. If it didn't,
     201            2 :                 // we should have never selected this blob file for rewrite and there
     202            2 :                 // must be a bug in the statistics we maintain.
     203            2 :                 if ve.NewBlobFiles[0].Physical.ValueSize >= c.input.Physical.ValueSize {
     204            0 :                         return errors.AssertionFailedf("pebble: blob file %s rewrite did not reduce value size", c.input.FileID)
     205            0 :                 }
     206              : 
     207            2 :                 info.Output.BlobFileID = ve.NewBlobFiles[0].FileID
     208            2 :                 info.Output.DiskFileNum = ve.NewBlobFiles[0].Physical.FileNum
     209            2 :                 info.Output.Size = ve.NewBlobFiles[0].Physical.Size
     210            2 :                 info.Output.ValueSize = ve.NewBlobFiles[0].Physical.ValueSize
     211            2 :                 _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
     212            2 :                         // It's possible that concurrent compactions removed references to
     213            2 :                         // the blob file while the blob file rewrite compaction was running.
     214            2 :                         // Now that we have the manifest lock, check if the blob file is
     215            2 :                         // still current. If not, we bubble up ErrCancelledCompaction.
     216            2 :                         v := d.mu.versions.currentVersion()
     217            2 :                         currentDiskFile, ok := v.BlobFiles.LookupPhysical(c.input.FileID)
     218            2 :                         if !ok {
     219            1 :                                 return versionUpdate{}, errors.Wrapf(ErrCancelledCompaction,
     220            1 :                                         "blob file %s became unreferenced", c.input.FileID)
     221            1 :                         }
     222            2 :                         currentDiskFileNum := currentDiskFile.FileNum
     223            2 :                         // Assert that the current version's disk file number for the blob
     224            2 :                         // matches the one we rewrote. This compaction should be the only
     225            2 :                         // rewrite compaction running for this blob file.
     226            2 :                         if currentDiskFileNum != c.input.Physical.FileNum {
     227            0 :                                 return versionUpdate{}, base.AssertionFailedf(
     228            0 :                                         "blob file %s was rewritten to %s during rewrite compaction of %s",
     229            0 :                                         c.input.FileID, currentDiskFileNum, c.input.Physical.FileNum)
     230            0 :                         }
     231            2 :                         return versionUpdate{
     232            2 :                                 VE:    ve,
     233            2 :                                 JobID: jobID,
     234            2 :                                 InProgressCompactionsFn: func() []compactionInfo {
     235            2 :                                         return d.getInProgressCompactionInfoLocked(c)
     236            2 :                                 },
     237              :                         }, nil
     238              :                 })
     239              :         }
     240              : 
     241            2 :         d.mu.versions.incrementCompactions(compactionKindBlobFileRewrite, nil, c.bytesWritten.Load(), err)
     242            2 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
     243            2 : 
     244            2 :         // Update the read state to publish the new version.
     245            2 :         if err == nil {
     246            2 :                 d.updateReadStateLocked(d.opts.DebugCheck)
     247            2 :         }
     248              : 
     249              :         // Ensure we clean up the blob file we created on failure.
     250            2 :         if err != nil {
     251            1 :                 if objMeta.DiskFileNum != 0 {
     252            1 :                         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []obsoleteFile{
     253            1 :                                 {
     254            1 :                                         fileType: base.FileTypeBlob,
     255            1 :                                         fs:       d.opts.FS,
     256            1 :                                         path:     d.objProvider.Path(objMeta),
     257            1 :                                         fileNum:  objMeta.DiskFileNum,
     258            1 :                                         // We don't know the size of the output blob file--it may have
     259            1 :                                         // been half-written. We use the input blob file size as an
     260            1 :                                         // approximation for deletion pacing.
     261            1 :                                         fileSize: c.input.Physical.Size,
     262            1 :                                         isLocal:  true,
     263            1 :                                 },
     264            1 :                         })
     265            1 :                 }
     266              :         }
     267              : 
     268              :         // Notify the event listener that the compaction has ended.
     269            2 :         now := d.timeNow()
     270            2 :         info.TotalDuration = now.Sub(c.beganAt)
     271            2 :         info.Done = true
     272            2 :         info.Err = err
     273            2 :         d.opts.EventListener.BlobFileRewriteEnd(info)
     274            2 :         return nil
     275              : }
     276              : 
     277            2 : func (c *blobFileRewriteCompaction) Info() compactionInfo {
     278            2 :         return compactionInfo{
     279            2 :                 kind:               compactionKindBlobFileRewrite,
     280            2 :                 versionEditApplied: c.versionEditApplied,
     281            2 :                 outputLevel:        -1,
     282            2 :         }
     283            2 : }
     284              : 
     285            2 : func (c *blobFileRewriteCompaction) UsesBurstConcurrency() bool {
     286            2 :         return c.highPriority
     287            2 : }
     288              : 
     289            0 : func (c *blobFileRewriteCompaction) RecordError(*problemspans.ByLevel, error) {
     290            0 :         // TODO(jackson): Track problematic blob files and avoid re-picking the same
     291            0 :         // blob file compaction.
     292            0 : }
     293              : 
     294              : // runBlobFileRewriteLocked runs a blob file rewrite. d.mu must be held when
     295              : // calling this, although it may be dropped and re-acquired during the course of
     296              : // the method.
     297              : func (d *DB) runBlobFileRewriteLocked(
     298              :         ctx context.Context, jobID JobID, c *blobFileRewriteCompaction,
     299            2 : ) (objstorage.ObjectMetadata, *manifest.VersionEdit, error) {
     300            2 :         // Drop the database mutex while we perform the rewrite, and re-acquire it
     301            2 :         // before returning.
     302            2 :         d.mu.Unlock()
     303            2 :         defer d.mu.Lock()
     304            2 : 
     305            2 :         // Construct the block.ReadEnv configured with a buffer pool. Setting the
     306            2 :         // buffer pool ensures we won't cache blocks in the block cache. As soon as
     307            2 :         // the compaction finishes new iterators will read the new blob file, so it
     308            2 :         // would be unlikely the cached blocks would be reused.
     309            2 :         var bufferPool block.BufferPool
     310            2 :         bufferPool.Init(4, block.ForBlobFileRewrite)
     311            2 :         defer bufferPool.Release()
     312            2 :         env := block.ReadEnv{
     313            2 :                 Stats:              &c.internalIteratorStats,
     314            2 :                 BufferPool:         &bufferPool,
     315            2 :                 ReportCorruptionFn: d.reportCorruption,
     316            2 :         }
     317            2 : 
     318            2 :         // Create a new file for the rewritten blob file.
     319            2 :         writable, objMeta, err := d.newCompactionOutputBlob(jobID, compactionKindBlobFileRewrite, -1, &c.bytesWritten, c.objCreateOpts)
     320            2 :         if err != nil {
     321            0 :                 return objstorage.ObjectMetadata{}, nil, err
     322            0 :         }
     323              :         // Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
     324              :         // There's no single associated level with a blob file. A long-lived blob
     325              :         // file that gets rewritten is likely to mostly be referenced from L6.
     326              :         // TODO(jackson): Consider refactoring to remove the level association.
     327            2 :         rewriter := newBlobFileRewriter(
     328            2 :                 d.fileCache,
     329            2 :                 env,
     330            2 :                 objMeta.DiskFileNum,
     331            2 :                 writable,
     332            2 :                 d.opts.MakeBlobWriterOptions(6, d.BlobFileFormat()),
     333            2 :                 c.referencingTables,
     334            2 :                 c.input,
     335            2 :         )
     336            2 :         // Perform the rewrite.
     337            2 :         stats, err := rewriter.Rewrite(ctx)
     338            2 :         if err != nil {
     339            0 :                 return objstorage.ObjectMetadata{}, nil, err
     340            0 :         }
     341              : 
     342              :         // Sync the object provider to ensure the metadata for the blob file is
     343              :         // persisted.
     344            2 :         if err := d.objProvider.Sync(); err != nil {
     345            0 :                 return objstorage.ObjectMetadata{}, nil, err
     346            0 :         }
     347              : 
     348            2 :         physical := &manifest.PhysicalBlobFile{
     349            2 :                 FileNum:      objMeta.DiskFileNum,
     350            2 :                 Size:         stats.FileLen,
     351            2 :                 ValueSize:    stats.UncompressedValueBytes,
     352            2 :                 CreationTime: uint64(d.timeNow().Unix()),
     353            2 :         }
     354            2 :         physical.PopulateProperties(&stats.Properties)
     355            2 : 
     356            2 :         ve := &manifest.VersionEdit{
     357            2 :                 DeletedBlobFiles: map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile{
     358            2 :                         {
     359            2 :                                 FileID:  c.input.FileID,
     360            2 :                                 FileNum: c.input.Physical.FileNum,
     361            2 :                         }: c.input.Physical,
     362            2 :                 },
     363            2 :                 NewBlobFiles: []manifest.BlobFileMetadata{
     364            2 :                         {
     365            2 :                                 FileID:   c.input.FileID,
     366            2 :                                 Physical: physical,
     367            2 :                         },
     368            2 :                 },
     369            2 :         }
     370            2 :         return objMeta, ve, nil
     371              : }
     372              : 
     373              : // blockHeap is a min-heap of blob reference liveness encodings, ordered by
     374              : // blockID. We use this to help us determine the overall liveness of values in
     375              : // each blob block by combining the blob reference liveness encodings of all
     376              : // referencing sstables for a particular blockID.
     377              : type blockHeap []*sstable.BlobRefLivenessEncoding
     378              : 
     379              : // Len implements sort.Interface.
     380            2 : func (h blockHeap) Len() int { return len(h) }
     381              : 
     382              : // Less implements sort.Interface.
     383            2 : func (h blockHeap) Less(i, j int) bool { return h[i].BlockID < h[j].BlockID }
     384              : 
     385              : // Swap implements sort.Interface.
     386            2 : func (h blockHeap) Swap(i, j int) {
     387            2 :         h[i], h[j] = h[j], h[i]
     388            2 : }
     389              : 
     390              : // Push implements heap.Interface.
     391            2 : func (h *blockHeap) Push(x any) {
     392            2 :         blobEnc := x.(*sstable.BlobRefLivenessEncoding)
     393            2 :         *h = append(*h, blobEnc)
     394            2 : }
     395              : 
     396              : // Pop implements heap.Interface.
     397            2 : func (h *blockHeap) Pop() any {
     398            2 :         old := *h
     399            2 :         n := len(old)
     400            2 :         item := old[n-1]
     401            2 :         old[n-1] = nil
     402            2 :         *h = old[0 : n-1]
     403            2 :         return item
     404            2 : }
     405              : 
     406              : // blockValues holds the accumulated liveness data for blockID.
     407              : type blockValues struct {
     408              :         blockID      blob.BlockID
     409              :         valuesSize   int
     410              :         liveValueIDs []int
     411              : }
     412              : 
     413              : // blobFileRewriter is responsible for rewriting blob files by combining and
     414              : // processing blob reference liveness encodings from multiple SSTables. It
     415              : // maintains state for writing to an output blob file.
     416              : type blobFileRewriter struct {
     417              :         fc        *fileCacheHandle
     418              :         readEnv   block.ReadEnv
     419              :         sstables  []*manifest.TableMetadata
     420              :         inputBlob manifest.BlobFileMetadata
     421              :         rw        *blob.FileRewriter
     422              :         blkHeap   blockHeap
     423              : }
     424              : 
     425              : func newBlobFileRewriter(
     426              :         fc *fileCacheHandle,
     427              :         readEnv block.ReadEnv,
     428              :         outputFileNum base.DiskFileNum,
     429              :         w objstorage.Writable,
     430              :         opts blob.FileWriterOptions,
     431              :         sstables []*manifest.TableMetadata,
     432              :         inputBlob manifest.BlobFileMetadata,
     433            2 : ) *blobFileRewriter {
     434            2 :         rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical, fc, readEnv, outputFileNum, w, opts)
     435            2 :         return &blobFileRewriter{
     436            2 :                 fc:        fc,
     437            2 :                 readEnv:   readEnv,
     438            2 :                 rw:        rw,
     439            2 :                 sstables:  sstables,
     440            2 :                 inputBlob: inputBlob,
     441            2 :                 blkHeap:   blockHeap{},
     442            2 :         }
     443            2 : }
     444              : 
     445              : // generateHeap populates rw.blkHeap with the blob reference liveness encodings
     446              : // for each referencing sstable, rw.sstables.
     447            2 : func (rw *blobFileRewriter) generateHeap(ctx context.Context) error {
     448            2 :         heap.Init(&rw.blkHeap)
     449            2 : 
     450            2 :         var decoder colblk.ReferenceLivenessBlockDecoder
     451            2 :         // For each sstable that references the input blob file, push its
     452            2 :         // sstable.BlobLivenessEncoding on to the heap.
     453            2 :         for _, sst := range rw.sstables {
     454            2 :                 // Validate that the sstable contains a reference to the input blob
     455            2 :                 // file.
     456            2 :                 refID, ok := sst.BlobReferences.IDByBlobFileID(rw.inputBlob.FileID)
     457            2 :                 if !ok {
     458            0 :                         return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
     459            0 :                                 sst.TableNum, rw.inputBlob.FileID)
     460            0 :                 }
     461            2 :                 err := rw.fc.withReader(ctx, rw.readEnv, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
     462            2 :                         h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
     463            2 :                         if err != nil {
     464            0 :                                 return err
     465            0 :                         }
     466            2 :                         defer h.Release()
     467            2 :                         decoder.Init(h.BlockData())
     468            2 :                         bitmapEncodings := slices.Clone(decoder.LivenessAtReference(int(refID)))
     469            2 :                         // TODO(annie): We should instead maintain 1 heap item per sstable
     470            2 :                         // instead of 1 heap item per sstable block ref to reduce the heap
     471            2 :                         // comparisons to O(sstables).
     472            2 :                         for _, enc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
     473            2 :                                 heap.Push(&rw.blkHeap, &enc)
     474            2 :                         }
     475            2 :                         return nil
     476              :                 })
     477            2 :                 if err != nil {
     478            0 :                         return err
     479            0 :                 }
     480              :         }
     481            2 :         return nil
     482              : }
     483              : 
     484            2 : func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats, error) {
     485            2 :         if err := rw.generateHeap(ctx); err != nil {
     486            0 :                 return blob.FileWriterStats{}, err
     487            0 :         }
     488            2 :         if rw.blkHeap.Len() == 0 {
     489            0 :                 return blob.FileWriterStats{}, errors.AssertionFailedf("heap empty")
     490            0 :         }
     491              : 
     492              :         // Begin constructing our output blob file. We maintain a map of blockID
     493              :         // to accumulated liveness data across all referencing sstables.
     494            2 :         firstBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
     495            2 :         pending := blockValues{
     496            2 :                 blockID:      firstBlock.BlockID,
     497            2 :                 valuesSize:   firstBlock.ValuesSize,
     498            2 :                 liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(firstBlock.Bitmap)),
     499            2 :         }
     500            2 :         for rw.blkHeap.Len() > 0 {
     501            2 :                 nextBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
     502            2 : 
     503            2 :                 // If we are encountering a new block, write the last accumulated block
     504            2 :                 // to the blob file.
     505            2 :                 if pending.blockID != nextBlock.BlockID {
     506            2 :                         // Write the last accumulated block's values to the blob file.
     507            2 :                         err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
     508            2 :                         if err != nil {
     509            0 :                                 return blob.FileWriterStats{}, err
     510            0 :                         }
     511            2 :                         pending = blockValues{blockID: nextBlock.BlockID, liveValueIDs: pending.liveValueIDs[:0]}
     512              :                 }
     513              :                 // Update the accumulated encoding for this block.
     514            2 :                 pending.valuesSize += nextBlock.ValuesSize
     515            2 :                 pending.liveValueIDs = slices.AppendSeq(pending.liveValueIDs,
     516            2 :                         sstable.IterSetBitsInRunLengthBitmap(nextBlock.Bitmap))
     517              :         }
     518              : 
     519              :         // Copy the last accumulated block.
     520            2 :         err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
     521            2 :         if err != nil {
     522            0 :                 return blob.FileWriterStats{}, err
     523            0 :         }
     524            2 :         return rw.rw.Close()
     525              : }
        

Generated by: LCOV version 2.0-1