LCOV - code coverage report
Current view: top level - pebble - blob_rewrite.go (source / functions) Coverage Total Hit
Test: 2025-07-11 08:18Z b35ff004 - tests only.lcov Lines: 80.1 % 307 246
Test Date: 2025-07-11 08:19:21 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "container/heap"
       9              :         "context"
      10              :         "iter"
      11              :         "runtime/pprof"
      12              :         "slices"
      13              :         "sync/atomic"
      14              :         "time"
      15              : 
      16              :         "github.com/cockroachdb/errors"
      17              :         "github.com/cockroachdb/pebble/internal/base"
      18              :         "github.com/cockroachdb/pebble/internal/manifest"
      19              :         "github.com/cockroachdb/pebble/internal/problemspans"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
      22              :         "github.com/cockroachdb/pebble/sstable"
      23              :         "github.com/cockroachdb/pebble/sstable/blob"
      24              :         "github.com/cockroachdb/pebble/sstable/block"
      25              :         "github.com/cockroachdb/pebble/sstable/colblk"
      26              : )
      27              : 
      28              : // A pickedBlobFileCompaction is a blob file rewrite compaction that has been
      29              : // picked by the compaction picker.
      30              : type pickedBlobFileCompaction struct {
      31              :         vers              *manifest.Version
      32              :         file              manifest.BlobFileMetadata
      33              :         referencingTables []*manifest.TableMetadata
      34              : }
      35              : 
      36              : // Assert that *pickedBlobFileCompaction implements the pickedCompaction
      37              : // interface.
      38              : var _ pickedCompaction = (*pickedBlobFileCompaction)(nil)
      39              : 
      40            1 : func (c *pickedBlobFileCompaction) ManualID() uint64 { return 0 }
      41              : 
      42            1 : func (c *pickedBlobFileCompaction) WaitingCompaction() WaitingCompaction {
      43            1 :         entry := scheduledCompactionMap[compactionKindBlobFileRewrite]
      44            1 :         return WaitingCompaction{
      45            1 :                 Optional: entry.optional,
      46            1 :                 Priority: entry.priority,
      47            1 :         }
      48            1 : }
      49              : 
      50              : func (c *pickedBlobFileCompaction) ConstructCompaction(
      51              :         d *DB, grantHandle CompactionGrantHandle,
      52            1 : ) compaction {
      53            1 :         // Add a reference to the version. The compaction will release the reference
      54            1 :         // when it completes.
      55            1 :         c.vers.Ref()
      56            1 :         return &blobFileRewriteCompaction{
      57            1 :                 beganAt:           d.timeNow(),
      58            1 :                 grantHandle:       grantHandle,
      59            1 :                 version:           c.vers,
      60            1 :                 input:             c.file,
      61            1 :                 referencingTables: c.referencingTables,
      62            1 :                 objCreateOpts: objstorage.CreateOptions{
      63            1 :                         // TODO(jackson): Enable shared storage for blob files.
      64            1 :                         PreferSharedStorage: false,
      65            1 :                         WriteCategory:       getDiskWriteCategoryForCompaction(d.opts, compactionKindBlobFileRewrite),
      66            1 :                 },
      67            1 :         }
      68            1 : }
      69              : 
      70              : // A blobFileRewriteCompaction is a special variant of a compaction that
      71              : // rewrites a blob file without rewriting sstables. When the compaction
      72              : // completes, the Version's mapping of blob file ID to disk file number is
      73              : // updated to point to the new blob file. The blob file is rewritten without
      74              : // copying over values that are no longer referenced by any tables, reclaiming
      75              : // disk space.
      76              : type blobFileRewriteCompaction struct {
      77              :         // cancel is a bool that can be used by other goroutines to signal a compaction
      78              :         // to cancel, such as if a conflicting excise operation raced it to manifest
      79              :         // application. Only holders of the manifest lock will write to this atomic.
      80              :         cancel atomic.Bool
      81              :         // beganAt is the time when the compaction began.
      82              :         beganAt time.Time
      83              :         // grantHandle is a handle to the compaction that can be used to track
      84              :         // progress.
      85              :         grantHandle CompactionGrantHandle
      86              :         // version is a referenced version obtained when the compaction was picked.
      87              :         // This version must be unreferenced when the compaction is complete.
      88              :         version *manifest.Version
      89              :         // versionEditApplied is set to true when a compaction has completed and the
      90              :         // resulting version has been installed (if successful), but the compaction
      91              :         // goroutine is still cleaning up (eg, deleting obsolete files).
      92              :         versionEditApplied bool
      93              :         // input is the blob file that is being rewritten.
      94              :         input manifest.BlobFileMetadata
      95              :         // referencingTables is the set of sstables that reference the input blob
      96              :         // file in version.
      97              :         referencingTables     []*manifest.TableMetadata
      98              :         objCreateOpts         objstorage.CreateOptions
      99              :         internalIteratorStats base.InternalIteratorStats
     100              :         bytesWritten          atomic.Int64 // Total bytes written to the new blob file.
     101              : }
     102              : 
     103              : // Assert that *blobFileRewriteCompaction implements the Compaction interface.
     104              : var _ compaction = (*blobFileRewriteCompaction)(nil)
     105              : 
     106            1 : func (c *blobFileRewriteCompaction) AddInProgressLocked(d *DB) {
     107            1 :         d.mu.compact.inProgress[c] = struct{}{}
     108            1 :         // TODO(jackson): Currently the compaction picker iterates through all
     109            1 :         // ongoing compactions in order to limit the number of concurrent blob
     110            1 :         // rewrite compactions to 1.
     111            1 :         //
     112            1 :         // Consider instead tracking which blob files are being rewritten, and we
     113            1 :         // can allow multiple concurrent blob rewrite compactions as long as they
     114            1 :         // compact different blob files.
     115            1 : }
     116              : 
     117            1 : func (c *blobFileRewriteCompaction) BeganAt() time.Time                 { return c.beganAt }
     118            1 : func (c *blobFileRewriteCompaction) Bounds() *base.UserKeyBounds        { return nil }
     119            0 : func (c *blobFileRewriteCompaction) Cancel()                            { c.cancel.Store(true) }
     120            1 : func (c *blobFileRewriteCompaction) IsDownload() bool                   { return false }
     121            1 : func (c *blobFileRewriteCompaction) IsFlush() bool                      { return false }
     122            1 : func (c *blobFileRewriteCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
     123            0 : func (c *blobFileRewriteCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
     124            0 :         // No tables; return an empty iterator.
     125            0 :         return func(yield func(int, *manifest.TableMetadata) bool) {}
     126              : }
     127              : 
     128            0 : func (c *blobFileRewriteCompaction) ObjioTracingContext(ctx context.Context) context.Context {
     129            0 :         if objiotracing.Enabled {
     130            0 :                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
     131            0 :         }
     132            0 :         return ctx
     133              : }
     134              : 
     135            1 : func (c *blobFileRewriteCompaction) PprofLabels(UserKeyCategories) pprof.LabelSet {
     136            1 :         return pprof.Labels("pebble", "blob-rewrite")
     137            1 : }
     138              : 
     139            0 : func (c *blobFileRewriteCompaction) VersionEditApplied() bool {
     140            0 :         return c.versionEditApplied
     141            0 : }
     142              : 
     143            1 : func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
     144            1 :         ctx := context.TODO()
     145            1 :         if objiotracing.Enabled {
     146            0 :                 ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
     147            0 :         }
     148            1 :         c.grantHandle.Started()
     149            1 :         // The version stored in the compaction is ref'd when the compaction is
     150            1 :         // created. We're responsible for un-refing it when the compaction is
     151            1 :         // complete.
     152            1 :         defer c.version.UnrefLocked()
     153            1 : 
     154            1 :         // Notify the event listener that the compaction has begun.
     155            1 :         info := BlobFileRewriteInfo{
     156            1 :                 JobID: int(jobID),
     157            1 :                 Input: BlobFileInfo{
     158            1 :                         BlobFileID:  c.input.FileID,
     159            1 :                         DiskFileNum: c.input.Physical.FileNum,
     160            1 :                         Size:        c.input.Physical.Size,
     161            1 :                         ValueSize:   c.input.Physical.ValueSize,
     162            1 :                 },
     163            1 :         }
     164            1 :         d.opts.EventListener.BlobFileRewriteBegin(info)
     165            1 :         startTime := d.timeNow()
     166            1 : 
     167            1 :         // Run the blob file rewrite.
     168            1 :         objMeta, ve, err := d.runBlobFileRewriteLocked(ctx, jobID, c)
     169            1 : 
     170            1 :         info.Duration = d.timeNow().Sub(startTime)
     171            1 : 
     172            1 :         // Update the version with the remapped blob file.
     173            1 :         if err == nil {
     174            1 :                 info.Output.BlobFileID = ve.NewBlobFiles[0].FileID
     175            1 :                 info.Output.DiskFileNum = ve.NewBlobFiles[0].Physical.FileNum
     176            1 :                 info.Output.Size = ve.NewBlobFiles[0].Physical.Size
     177            1 :                 info.Output.ValueSize = ve.NewBlobFiles[0].Physical.ValueSize
     178            1 :                 err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
     179            1 :                         // It's possible that concurrent compactions removed references to
     180            1 :                         // the blob file while the blob file rewrite compaction was running.
     181            1 :                         // Now that we have the manifest lock, check if the blob file is
     182            1 :                         // still current. If not, we bubble up ErrCancelledCompaction.
     183            1 :                         v := d.mu.versions.currentVersion()
     184            1 :                         currentDiskFileNum, ok := v.BlobFiles.Lookup(c.input.FileID)
     185            1 :                         if !ok {
     186            0 :                                 return versionUpdate{}, errors.Wrapf(ErrCancelledCompaction,
     187            0 :                                         "blob file %s became unreferenced", c.input.FileID)
     188            0 :                         }
     189              :                         // Assert that the current version's disk file number for the blob
     190              :                         // matches the one we rewrote. This compaction should be the only
     191              :                         // rewrite compaction running for this blob file.
     192            1 :                         if currentDiskFileNum != c.input.Physical.FileNum {
     193            0 :                                 return versionUpdate{}, base.AssertionFailedf(
     194            0 :                                         "blob file %s was rewritten to %s during rewrite compaction of %s",
     195            0 :                                         c.input.FileID, currentDiskFileNum, c.input.Physical.FileNum)
     196            0 :                         }
     197            1 :                         return versionUpdate{
     198            1 :                                 VE:    ve,
     199            1 :                                 JobID: jobID,
     200            1 :                                 InProgressCompactionsFn: func() []compactionInfo {
     201            1 :                                         return d.getInProgressCompactionInfoLocked(c)
     202            1 :                                 },
     203              :                         }, nil
     204              :                 })
     205              :         }
     206              : 
     207            1 :         d.mu.versions.incrementCompactions(compactionKindBlobFileRewrite, nil, c.bytesWritten.Load(), err)
     208            1 :         d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
     209            1 : 
     210            1 :         // Update the read state to publish the new version.
     211            1 :         if err == nil {
     212            1 :                 d.updateReadStateLocked(d.opts.DebugCheck)
     213            1 :         }
     214              : 
     215              :         // Ensure we clean up the blob file we created on failure.
     216            1 :         if err != nil {
     217            0 :                 if objMeta.DiskFileNum != 0 {
     218            0 :                         d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []obsoleteFile{
     219            0 :                                 {
     220            0 :                                         fileType: base.FileTypeBlob,
     221            0 :                                         fs:       d.opts.FS,
     222            0 :                                         path:     d.objProvider.Path(objMeta),
     223            0 :                                         fileNum:  objMeta.DiskFileNum,
     224            0 :                                         // We don't know the size of the output blob file--it may have
     225            0 :                                         // been half-written. We use the input blob file size as an
     226            0 :                                         // approximation for deletion pacing.
     227            0 :                                         fileSize: c.input.Physical.Size,
     228            0 :                                         isLocal:  true,
     229            0 :                                 },
     230            0 :                         })
     231            0 :                 }
     232              :         }
     233              : 
     234              :         // Notify the event listener that the compaction has ended.
     235            1 :         now := d.timeNow()
     236            1 :         info.TotalDuration = now.Sub(c.beganAt)
     237            1 :         info.Done = true
     238            1 :         info.Err = err
     239            1 :         d.opts.EventListener.BlobFileRewriteEnd(info)
     240            1 :         return nil
     241              : }
     242              : 
     243            1 : func (c *blobFileRewriteCompaction) Info() compactionInfo {
     244            1 :         return compactionInfo{
     245            1 :                 kind:               compactionKindBlobFileRewrite,
     246            1 :                 versionEditApplied: c.versionEditApplied,
     247            1 :                 outputLevel:        -1,
     248            1 :         }
     249            1 : }
     250              : 
     251            0 : func (c *blobFileRewriteCompaction) RecordError(*problemspans.ByLevel, error) {
     252            0 :         // TODO(jackson): Track problematic blob files and avoid re-picking the same
     253            0 :         // blob file compaction.
     254            0 : }
     255              : 
     256              : // runBlobFileRewriteLocked runs a blob file rewrite. d.mu must be held when
     257              : // calling this, although it may be dropped and re-acquired during the course of
     258              : // the method.
     259              : func (d *DB) runBlobFileRewriteLocked(
     260              :         ctx context.Context, jobID JobID, c *blobFileRewriteCompaction,
     261            1 : ) (objstorage.ObjectMetadata, *manifest.VersionEdit, error) {
     262            1 :         // Drop the database mutex while we perform the rewrite, and re-acquire it
     263            1 :         // before returning.
     264            1 :         d.mu.Unlock()
     265            1 :         defer d.mu.Lock()
     266            1 : 
     267            1 :         // Construct the block.ReadEnv configured with a buffer pool. Setting the
     268            1 :         // buffer pool ensures we won't cache blocks in the block cache. As soon as
     269            1 :         // the compaction finishes new iterators will read the new blob file, so it
     270            1 :         // would be unlikely the cached blocks would be reused.
     271            1 :         var bufferPool block.BufferPool
     272            1 :         bufferPool.Init(4)
     273            1 :         defer bufferPool.Release()
     274            1 :         env := block.ReadEnv{
     275            1 :                 Stats:              &c.internalIteratorStats,
     276            1 :                 BufferPool:         &bufferPool,
     277            1 :                 ReportCorruptionFn: d.reportCorruption,
     278            1 :         }
     279            1 : 
     280            1 :         // Create a new file for the rewritten blob file.
     281            1 :         writable, objMeta, err := d.newCompactionOutputBlob(jobID, compactionKindBlobFileRewrite, -1, &c.bytesWritten, c.objCreateOpts)
     282            1 :         if err != nil {
     283            0 :                 return objstorage.ObjectMetadata{}, nil, err
     284            0 :         }
     285              :         // Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
     286              :         // There's no single associated level with a blob file. A long-lived blob
     287              :         // file that gets rewritten is likely to mostly be referenced from L6.
     288              :         // TODO(jackson): Consider refactoring to remove the level association.
     289            1 :         rewriter := newBlobFileRewriter(
     290            1 :                 d.fileCache,
     291            1 :                 env,
     292            1 :                 objMeta.DiskFileNum,
     293            1 :                 writable,
     294            1 :                 d.opts.MakeBlobWriterOptions(6),
     295            1 :                 c.referencingTables,
     296            1 :                 c.input,
     297            1 :         )
     298            1 :         // Perform the rewrite.
     299            1 :         stats, err := rewriter.Rewrite(ctx)
     300            1 :         if err != nil {
     301            0 :                 return objstorage.ObjectMetadata{}, nil, err
     302            0 :         }
     303              : 
     304              :         // Sync the object provider to ensure the metadata for the blob file is
     305              :         // persisted.
     306            1 :         if err := d.objProvider.Sync(); err != nil {
     307            0 :                 return objstorage.ObjectMetadata{}, nil, err
     308            0 :         }
     309              : 
     310            1 :         ve := &manifest.VersionEdit{
     311            1 :                 DeletedBlobFiles: map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile{
     312            1 :                         {
     313            1 :                                 FileID:  c.input.FileID,
     314            1 :                                 FileNum: c.input.Physical.FileNum,
     315            1 :                         }: c.input.Physical,
     316            1 :                 },
     317            1 :                 NewBlobFiles: []manifest.BlobFileMetadata{
     318            1 :                         {
     319            1 :                                 FileID: c.input.FileID,
     320            1 :                                 Physical: &manifest.PhysicalBlobFile{
     321            1 :                                         FileNum:      objMeta.DiskFileNum,
     322            1 :                                         Size:         stats.FileLen,
     323            1 :                                         ValueSize:    stats.UncompressedValueBytes,
     324            1 :                                         CreationTime: uint64(d.timeNow().Unix()),
     325            1 :                                 },
     326            1 :                         },
     327            1 :                 },
     328            1 :         }
     329            1 :         return objMeta, ve, nil
     330              : }
     331              : 
     332              : // blockHeap is a min-heap of blob reference liveness encodings, ordered by
     333              : // blockID. We use this to help us determine the overall liveness of values in
     334              : // each blob block by combining the blob reference liveness encodings of all
     335              : // referencing sstables for a particular blockID.
     336              : type blockHeap []*sstable.BlobRefLivenessEncoding
     337              : 
     338              : // Len implements sort.Interface.
     339            1 : func (h blockHeap) Len() int { return len(h) }
     340              : 
     341              : // Less implements sort.Interface.
     342            1 : func (h blockHeap) Less(i, j int) bool { return h[i].BlockID < h[j].BlockID }
     343              : 
     344              : // Swap implements sort.Interface.
     345            1 : func (h blockHeap) Swap(i, j int) {
     346            1 :         h[i], h[j] = h[j], h[i]
     347            1 : }
     348              : 
     349              : // Push implements heap.Interface.
     350            1 : func (h *blockHeap) Push(x any) {
     351            1 :         blobEnc := x.(*sstable.BlobRefLivenessEncoding)
     352            1 :         *h = append(*h, blobEnc)
     353            1 : }
     354              : 
     355              : // Pop implements heap.Interface.
     356            1 : func (h *blockHeap) Pop() any {
     357            1 :         old := *h
     358            1 :         n := len(old)
     359            1 :         item := old[n-1]
     360            1 :         old[n-1] = nil
     361            1 :         *h = old[0 : n-1]
     362            1 :         return item
     363            1 : }
     364              : 
     365              : // blockValues holds the accumulated liveness data for blockID.
     366              : type blockValues struct {
     367              :         blockID      blob.BlockID
     368              :         valuesSize   int
     369              :         liveValueIDs []int
     370              : }
     371              : 
     372              : // blobFileRewriter is responsible for rewriting blob files by combining and
     373              : // processing blob reference liveness encodings from multiple SSTables. It
     374              : // maintains state for writing to an output blob file.
     375              : type blobFileRewriter struct {
     376              :         fc        *fileCacheHandle
     377              :         readEnv   block.ReadEnv
     378              :         sstables  []*manifest.TableMetadata
     379              :         inputBlob manifest.BlobFileMetadata
     380              :         rw        *blob.FileRewriter
     381              :         blkHeap   blockHeap
     382              : }
     383              : 
     384              : func newBlobFileRewriter(
     385              :         fc *fileCacheHandle,
     386              :         readEnv block.ReadEnv,
     387              :         outputFileNum base.DiskFileNum,
     388              :         w objstorage.Writable,
     389              :         opts blob.FileWriterOptions,
     390              :         sstables []*manifest.TableMetadata,
     391              :         inputBlob manifest.BlobFileMetadata,
     392            1 : ) *blobFileRewriter {
     393            1 :         rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical.FileNum, fc, readEnv, outputFileNum, w, opts)
     394            1 :         return &blobFileRewriter{
     395            1 :                 fc:        fc,
     396            1 :                 readEnv:   readEnv,
     397            1 :                 rw:        rw,
     398            1 :                 sstables:  sstables,
     399            1 :                 inputBlob: inputBlob,
     400            1 :                 blkHeap:   blockHeap{},
     401            1 :         }
     402            1 : }
     403              : 
     404              : // generateHeap populates rw.blkHeap with the blob reference liveness encodings
     405              : // for each referencing sstable, rw.sstables.
     406            1 : func (rw *blobFileRewriter) generateHeap(ctx context.Context) error {
     407            1 :         heap.Init(&rw.blkHeap)
     408            1 : 
     409            1 :         var decoder colblk.ReferenceLivenessBlockDecoder
     410            1 :         // For each sstable that references the input blob file, push its
     411            1 :         // sstable.BlobLivenessEncoding on to the heap.
     412            1 :         for _, sst := range rw.sstables {
     413            1 :                 // Validate that the sstable contains a reference to the input blob
     414            1 :                 // file.
     415            1 :                 refID, ok := sst.BlobReferences.IDByBlobFileID(rw.inputBlob.FileID)
     416            1 :                 if !ok {
     417            0 :                         return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
     418            0 :                                 sst.TableNum, rw.inputBlob.FileID)
     419            0 :                 }
     420            1 :                 err := rw.fc.withReader(ctx, rw.readEnv, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
     421            1 :                         h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
     422            1 :                         if err != nil {
     423            0 :                                 return err
     424            0 :                         }
     425            1 :                         defer h.Release()
     426            1 :                         decoder.Init(h.BlockData())
     427            1 :                         bitmapEncodings := slices.Clone(decoder.LivenessAtReference(int(refID)))
     428            1 :                         // TODO(annie): We should instead maintain 1 heap item per sstable
     429            1 :                         // instead of 1 heap item per sstable block ref to reduce the heap
     430            1 :                         // comparisons to O(sstables).
     431            1 :                         for _, enc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
     432            1 :                                 heap.Push(&rw.blkHeap, &enc)
     433            1 :                         }
     434            1 :                         return nil
     435              :                 })
     436            1 :                 if err != nil {
     437            0 :                         return err
     438            0 :                 }
     439              :         }
     440            1 :         return nil
     441              : }
     442              : 
     443            1 : func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats, error) {
     444            1 :         if err := rw.generateHeap(ctx); err != nil {
     445            0 :                 return blob.FileWriterStats{}, err
     446            0 :         }
     447            1 :         if rw.blkHeap.Len() == 0 {
     448            0 :                 return blob.FileWriterStats{}, errors.AssertionFailedf("heap empty")
     449            0 :         }
     450              : 
     451              :         // Begin constructing our output blob file. We maintain a map of blockID
     452              :         // to accumulated liveness data across all referencing sstables.
     453            1 :         firstBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
     454            1 :         pending := blockValues{
     455            1 :                 blockID:      firstBlock.BlockID,
     456            1 :                 valuesSize:   firstBlock.ValuesSize,
     457            1 :                 liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(firstBlock.Bitmap)),
     458            1 :         }
     459            1 :         for rw.blkHeap.Len() > 0 {
     460            1 :                 nextBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
     461            1 : 
     462            1 :                 // If we are encountering a new block, write the last accumulated block
     463            1 :                 // to the blob file.
     464            1 :                 if pending.blockID != nextBlock.BlockID {
     465            1 :                         // Write the last accumulated block's values to the blob file.
     466            1 :                         err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
     467            1 :                         if err != nil {
     468            0 :                                 return blob.FileWriterStats{}, err
     469            0 :                         }
     470            1 :                         pending = blockValues{blockID: nextBlock.BlockID, liveValueIDs: pending.liveValueIDs[:0]}
     471              :                 }
     472              :                 // Update the accumulated encoding for this block.
     473            1 :                 pending.valuesSize += nextBlock.ValuesSize
     474            1 :                 pending.liveValueIDs = slices.AppendSeq(pending.liveValueIDs,
     475            1 :                         sstable.IterSetBitsInRunLengthBitmap(nextBlock.Bitmap))
     476              :         }
     477              : 
     478              :         // Copy the last accumulated block.
     479            1 :         err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
     480            1 :         if err != nil {
     481            0 :                 return blob.FileWriterStats{}, err
     482            0 :         }
     483            1 :         return rw.rw.Close()
     484              : }
        

Generated by: LCOV version 2.0-1