Line data Source code
1 : // Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "container/heap"
9 : "context"
10 : "fmt"
11 : "iter"
12 : "runtime/pprof"
13 : "slices"
14 : "sync/atomic"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/internal/problemspans"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
23 : "github.com/cockroachdb/pebble/sstable"
24 : "github.com/cockroachdb/pebble/sstable/blob"
25 : "github.com/cockroachdb/pebble/sstable/block"
26 : "github.com/cockroachdb/pebble/sstable/colblk"
27 : )
28 :
29 : // A pickedBlobFileCompaction is a blob file rewrite compaction that has been
30 : // picked by the compaction picker.
31 : type pickedBlobFileCompaction struct {
32 : // highPriority is set to true if the compaction was picked because the
33 : // ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
34 : // triggered. In this case, the resulting compaction will be permitted to
35 : // use a burst compaction concurrency slot to avoid starving default
36 : // compactions.
37 : highPriority bool
38 : vers *manifest.Version
39 : file manifest.BlobFileMetadata
40 : referencingTables []*manifest.TableMetadata
41 : }
42 :
43 : // Assert that *pickedBlobFileCompaction implements the pickedCompaction
44 : // interface.
45 : var _ pickedCompaction = (*pickedBlobFileCompaction)(nil)
46 :
47 2 : func (c *pickedBlobFileCompaction) ManualID() uint64 { return 0 }
48 :
49 2 : func (c *pickedBlobFileCompaction) WaitingCompaction() WaitingCompaction {
50 2 : entry := scheduledCompactionMap[compactionKindBlobFileRewrite]
51 2 : return WaitingCompaction{
52 2 : Optional: entry.optional,
53 2 : Priority: entry.priority,
54 2 : }
55 2 : }
56 :
57 : func (c *pickedBlobFileCompaction) ConstructCompaction(
58 : d *DB, grantHandle CompactionGrantHandle,
59 2 : ) compaction {
60 2 : // Add a reference to the version. The compaction will release the reference
61 2 : // when it completes.
62 2 : c.vers.Ref()
63 2 : return &blobFileRewriteCompaction{
64 2 : beganAt: d.timeNow(),
65 2 : grantHandle: grantHandle,
66 2 : version: c.vers,
67 2 : input: c.file,
68 2 : referencingTables: c.referencingTables,
69 2 : objCreateOpts: objstorage.CreateOptions{
70 2 : // TODO(jackson): Enable shared storage for blob files.
71 2 : PreferSharedStorage: false,
72 2 : WriteCategory: getDiskWriteCategoryForCompaction(d.opts, compactionKindBlobFileRewrite),
73 2 : },
74 2 : highPriority: c.highPriority,
75 2 : }
76 2 : }
77 :
78 : // A blobFileRewriteCompaction is a special variant of a compaction that
79 : // rewrites a blob file without rewriting sstables. When the compaction
80 : // completes, the Version's mapping of blob file ID to disk file number is
81 : // updated to point to the new blob file. The blob file is rewritten without
82 : // copying over values that are no longer referenced by any tables, reclaiming
83 : // disk space.
84 : type blobFileRewriteCompaction struct {
85 : // cancel is a bool that can be used by other goroutines to signal a compaction
86 : // to cancel, such as if a conflicting excise operation raced it to manifest
87 : // application. Only holders of the manifest lock will write to this atomic.
88 : cancel atomic.Bool
89 : // beganAt is the time when the compaction began.
90 : beganAt time.Time
91 : // grantHandle is a handle to the compaction that can be used to track
92 : // progress.
93 : grantHandle CompactionGrantHandle
94 : // version is a referenced version obtained when the compaction was picked.
95 : // This version must be unreferenced when the compaction is complete.
96 : version *manifest.Version
97 : // versionEditApplied is set to true when a compaction has completed and the
98 : // resulting version has been installed (if successful), but the compaction
99 : // goroutine is still cleaning up (eg, deleting obsolete files).
100 : versionEditApplied bool
101 : // input is the blob file that is being rewritten.
102 : input manifest.BlobFileMetadata
103 : // referencingTables is the set of sstables that reference the input blob
104 : // file in version.
105 : referencingTables []*manifest.TableMetadata
106 : objCreateOpts objstorage.CreateOptions
107 : internalIteratorStats base.InternalIteratorStats
108 : bytesWritten atomic.Int64 // Total bytes written to the new blob file.
109 : // highPriority is set to true if the compaction was picked because the
110 : // ValueSeparationPolcy.GarbageThresholdHighPriority heuristic was
111 : // triggered. In this case, the resulting compaction will be permitted to
112 : // use a burst compaction concurrency slot to avoid starving default
113 : // compactions. This field is set when the compaction is created.
114 : highPriority bool
115 : }
116 :
117 1 : func (c *blobFileRewriteCompaction) String() string {
118 1 : s := fmt.Sprintf("blob file (ID: %s) %s (%s) being rewritten",
119 1 : c.input.FileID, c.input.Physical.FileNum, humanizeBytes(uint64(c.input.Physical.Size)))
120 1 : if c.highPriority {
121 1 : s += " (high priority)"
122 1 : }
123 1 : return s
124 : }
125 :
126 : // Assert that *blobFileRewriteCompaction implements the Compaction interface.
127 : var _ compaction = (*blobFileRewriteCompaction)(nil)
128 :
129 2 : func (c *blobFileRewriteCompaction) AddInProgressLocked(d *DB) {
130 2 : d.mu.compact.inProgress[c] = struct{}{}
131 2 : if c.highPriority {
132 2 : d.mu.compact.burstConcurrency.Add(1)
133 2 : }
134 : // TODO(jackson): Currently the compaction picker iterates through all
135 : // ongoing compactions in order to limit the number of concurrent blob
136 : // rewrite compactions to 1.
137 : //
138 : // Consider instead tracking which blob files are being rewritten, and we
139 : // can allow multiple concurrent blob rewrite compactions as long as they
140 : // compact different blob files.
141 : }
142 :
143 2 : func (c *blobFileRewriteCompaction) BeganAt() time.Time { return c.beganAt }
144 2 : func (c *blobFileRewriteCompaction) Bounds() *base.UserKeyBounds { return nil }
145 0 : func (c *blobFileRewriteCompaction) Cancel() { c.cancel.Store(true) }
146 2 : func (c *blobFileRewriteCompaction) IsDownload() bool { return false }
147 2 : func (c *blobFileRewriteCompaction) IsFlush() bool { return false }
148 2 : func (c *blobFileRewriteCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
149 1 : func (c *blobFileRewriteCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
150 1 : // No tables; return an empty iterator.
151 1 : return func(yield func(int, *manifest.TableMetadata) bool) {}
152 : }
153 :
154 0 : func (c *blobFileRewriteCompaction) ObjioTracingContext(ctx context.Context) context.Context {
155 0 : if objiotracing.Enabled {
156 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
157 0 : }
158 0 : return ctx
159 : }
160 :
161 2 : func (c *blobFileRewriteCompaction) PprofLabels(UserKeyCategories) pprof.LabelSet {
162 2 : return pprof.Labels("pebble", "blob-rewrite")
163 2 : }
164 :
165 1 : func (c *blobFileRewriteCompaction) VersionEditApplied() bool {
166 1 : return c.versionEditApplied
167 1 : }
168 :
169 2 : func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
170 2 : ctx := context.TODO()
171 2 : if objiotracing.Enabled {
172 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
173 0 : }
174 2 : c.grantHandle.Started()
175 2 : // The version stored in the compaction is ref'd when the compaction is
176 2 : // created. We're responsible for un-refing it when the compaction is
177 2 : // complete.
178 2 : defer c.version.UnrefLocked()
179 2 :
180 2 : // Notify the event listener that the compaction has begun.
181 2 : info := BlobFileRewriteInfo{
182 2 : JobID: int(jobID),
183 2 : Input: BlobFileInfo{
184 2 : BlobFileID: c.input.FileID,
185 2 : DiskFileNum: c.input.Physical.FileNum,
186 2 : Size: c.input.Physical.Size,
187 2 : ValueSize: c.input.Physical.ValueSize,
188 2 : },
189 2 : }
190 2 : d.opts.EventListener.BlobFileRewriteBegin(info)
191 2 : startTime := d.timeNow()
192 2 :
193 2 : // Run the blob file rewrite.
194 2 : objMeta, ve, err := d.runBlobFileRewriteLocked(ctx, jobID, c)
195 2 :
196 2 : info.Duration = d.timeNow().Sub(startTime)
197 2 :
198 2 : // Update the version with the remapped blob file.
199 2 : if err == nil {
200 2 : // Ensure the rewrite did reduce the aggregate value size. If it didn't,
201 2 : // we should have never selected this blob file for rewrite and there
202 2 : // must be a bug in the statistics we maintain.
203 2 : if ve.NewBlobFiles[0].Physical.ValueSize >= c.input.Physical.ValueSize {
204 0 : return errors.AssertionFailedf("pebble: blob file %s rewrite did not reduce value size", c.input.FileID)
205 0 : }
206 :
207 2 : info.Output.BlobFileID = ve.NewBlobFiles[0].FileID
208 2 : info.Output.DiskFileNum = ve.NewBlobFiles[0].Physical.FileNum
209 2 : info.Output.Size = ve.NewBlobFiles[0].Physical.Size
210 2 : info.Output.ValueSize = ve.NewBlobFiles[0].Physical.ValueSize
211 2 : _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
212 2 : // It's possible that concurrent compactions removed references to
213 2 : // the blob file while the blob file rewrite compaction was running.
214 2 : // Now that we have the manifest lock, check if the blob file is
215 2 : // still current. If not, we bubble up ErrCancelledCompaction.
216 2 : v := d.mu.versions.currentVersion()
217 2 : currentDiskFile, ok := v.BlobFiles.LookupPhysical(c.input.FileID)
218 2 : if !ok {
219 1 : return versionUpdate{}, errors.Wrapf(ErrCancelledCompaction,
220 1 : "blob file %s became unreferenced", c.input.FileID)
221 1 : }
222 2 : currentDiskFileNum := currentDiskFile.FileNum
223 2 : // Assert that the current version's disk file number for the blob
224 2 : // matches the one we rewrote. This compaction should be the only
225 2 : // rewrite compaction running for this blob file.
226 2 : if currentDiskFileNum != c.input.Physical.FileNum {
227 0 : return versionUpdate{}, base.AssertionFailedf(
228 0 : "blob file %s was rewritten to %s during rewrite compaction of %s",
229 0 : c.input.FileID, currentDiskFileNum, c.input.Physical.FileNum)
230 0 : }
231 2 : return versionUpdate{
232 2 : VE: ve,
233 2 : JobID: jobID,
234 2 : InProgressCompactionsFn: func() []compactionInfo {
235 2 : return d.getInProgressCompactionInfoLocked(c)
236 2 : },
237 : }, nil
238 : })
239 : }
240 :
241 2 : d.mu.versions.incrementCompactions(compactionKindBlobFileRewrite, nil, c.bytesWritten.Load(), err)
242 2 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
243 2 :
244 2 : // Update the read state to publish the new version.
245 2 : if err == nil {
246 2 : d.updateReadStateLocked(d.opts.DebugCheck)
247 2 : }
248 :
249 : // Ensure we clean up the blob file we created on failure.
250 2 : if err != nil {
251 1 : if objMeta.DiskFileNum != 0 {
252 1 : d.mu.versions.obsoleteBlobs = mergeObsoleteFiles(d.mu.versions.obsoleteBlobs, []obsoleteFile{
253 1 : {
254 1 : fileType: base.FileTypeBlob,
255 1 : fs: d.opts.FS,
256 1 : path: d.objProvider.Path(objMeta),
257 1 : fileNum: objMeta.DiskFileNum,
258 1 : // We don't know the size of the output blob file--it may have
259 1 : // been half-written. We use the input blob file size as an
260 1 : // approximation for deletion pacing.
261 1 : fileSize: c.input.Physical.Size,
262 1 : isLocal: true,
263 1 : },
264 1 : })
265 1 : }
266 : }
267 :
268 : // Notify the event listener that the compaction has ended.
269 2 : now := d.timeNow()
270 2 : info.TotalDuration = now.Sub(c.beganAt)
271 2 : info.Done = true
272 2 : info.Err = err
273 2 : d.opts.EventListener.BlobFileRewriteEnd(info)
274 2 : return nil
275 : }
276 :
277 2 : func (c *blobFileRewriteCompaction) Info() compactionInfo {
278 2 : return compactionInfo{
279 2 : kind: compactionKindBlobFileRewrite,
280 2 : versionEditApplied: c.versionEditApplied,
281 2 : outputLevel: -1,
282 2 : }
283 2 : }
284 :
285 2 : func (c *blobFileRewriteCompaction) UsesBurstConcurrency() bool {
286 2 : return c.highPriority
287 2 : }
288 :
289 0 : func (c *blobFileRewriteCompaction) RecordError(*problemspans.ByLevel, error) {
290 0 : // TODO(jackson): Track problematic blob files and avoid re-picking the same
291 0 : // blob file compaction.
292 0 : }
293 :
294 : // runBlobFileRewriteLocked runs a blob file rewrite. d.mu must be held when
295 : // calling this, although it may be dropped and re-acquired during the course of
296 : // the method.
297 : func (d *DB) runBlobFileRewriteLocked(
298 : ctx context.Context, jobID JobID, c *blobFileRewriteCompaction,
299 2 : ) (objstorage.ObjectMetadata, *manifest.VersionEdit, error) {
300 2 : // Drop the database mutex while we perform the rewrite, and re-acquire it
301 2 : // before returning.
302 2 : d.mu.Unlock()
303 2 : defer d.mu.Lock()
304 2 :
305 2 : // Construct the block.ReadEnv configured with a buffer pool. Setting the
306 2 : // buffer pool ensures we won't cache blocks in the block cache. As soon as
307 2 : // the compaction finishes new iterators will read the new blob file, so it
308 2 : // would be unlikely the cached blocks would be reused.
309 2 : var bufferPool block.BufferPool
310 2 : bufferPool.Init(4, block.ForBlobFileRewrite)
311 2 : defer bufferPool.Release()
312 2 : env := block.ReadEnv{
313 2 : Stats: &c.internalIteratorStats,
314 2 : BufferPool: &bufferPool,
315 2 : ReportCorruptionFn: d.reportCorruption,
316 2 : }
317 2 :
318 2 : // Create a new file for the rewritten blob file.
319 2 : writable, objMeta, err := d.newCompactionOutputBlob(jobID, compactionKindBlobFileRewrite, -1, &c.bytesWritten, c.objCreateOpts)
320 2 : if err != nil {
321 0 : return objstorage.ObjectMetadata{}, nil, err
322 0 : }
323 : // Initialize a blob file rewriter. We pass L6 to MakeBlobWriterOptions.
324 : // There's no single associated level with a blob file. A long-lived blob
325 : // file that gets rewritten is likely to mostly be referenced from L6.
326 : // TODO(jackson): Consider refactoring to remove the level association.
327 2 : rewriter := newBlobFileRewriter(
328 2 : d.fileCache,
329 2 : env,
330 2 : objMeta.DiskFileNum,
331 2 : writable,
332 2 : d.opts.MakeBlobWriterOptions(6, d.BlobFileFormat()),
333 2 : c.referencingTables,
334 2 : c.input,
335 2 : )
336 2 : // Perform the rewrite.
337 2 : stats, err := rewriter.Rewrite(ctx)
338 2 : if err != nil {
339 0 : return objstorage.ObjectMetadata{}, nil, err
340 0 : }
341 :
342 : // Sync the object provider to ensure the metadata for the blob file is
343 : // persisted.
344 2 : if err := d.objProvider.Sync(); err != nil {
345 0 : return objstorage.ObjectMetadata{}, nil, err
346 0 : }
347 :
348 2 : physical := &manifest.PhysicalBlobFile{
349 2 : FileNum: objMeta.DiskFileNum,
350 2 : Size: stats.FileLen,
351 2 : ValueSize: stats.UncompressedValueBytes,
352 2 : CreationTime: uint64(d.timeNow().Unix()),
353 2 : }
354 2 : physical.PopulateProperties(&stats.Properties)
355 2 :
356 2 : ve := &manifest.VersionEdit{
357 2 : DeletedBlobFiles: map[manifest.DeletedBlobFileEntry]*manifest.PhysicalBlobFile{
358 2 : {
359 2 : FileID: c.input.FileID,
360 2 : FileNum: c.input.Physical.FileNum,
361 2 : }: c.input.Physical,
362 2 : },
363 2 : NewBlobFiles: []manifest.BlobFileMetadata{
364 2 : {
365 2 : FileID: c.input.FileID,
366 2 : Physical: physical,
367 2 : },
368 2 : },
369 2 : }
370 2 : return objMeta, ve, nil
371 : }
372 :
373 : // blockHeap is a min-heap of blob reference liveness encodings, ordered by
374 : // blockID. We use this to help us determine the overall liveness of values in
375 : // each blob block by combining the blob reference liveness encodings of all
376 : // referencing sstables for a particular blockID.
377 : type blockHeap []*sstable.BlobRefLivenessEncoding
378 :
379 : // Len implements sort.Interface.
380 2 : func (h blockHeap) Len() int { return len(h) }
381 :
382 : // Less implements sort.Interface.
383 2 : func (h blockHeap) Less(i, j int) bool { return h[i].BlockID < h[j].BlockID }
384 :
385 : // Swap implements sort.Interface.
386 2 : func (h blockHeap) Swap(i, j int) {
387 2 : h[i], h[j] = h[j], h[i]
388 2 : }
389 :
390 : // Push implements heap.Interface.
391 2 : func (h *blockHeap) Push(x any) {
392 2 : blobEnc := x.(*sstable.BlobRefLivenessEncoding)
393 2 : *h = append(*h, blobEnc)
394 2 : }
395 :
396 : // Pop implements heap.Interface.
397 2 : func (h *blockHeap) Pop() any {
398 2 : old := *h
399 2 : n := len(old)
400 2 : item := old[n-1]
401 2 : old[n-1] = nil
402 2 : *h = old[0 : n-1]
403 2 : return item
404 2 : }
405 :
406 : // blockValues holds the accumulated liveness data for blockID.
407 : type blockValues struct {
408 : blockID blob.BlockID
409 : valuesSize int
410 : liveValueIDs []int
411 : }
412 :
413 : // blobFileRewriter is responsible for rewriting blob files by combining and
414 : // processing blob reference liveness encodings from multiple SSTables. It
415 : // maintains state for writing to an output blob file.
416 : type blobFileRewriter struct {
417 : fc *fileCacheHandle
418 : readEnv block.ReadEnv
419 : sstables []*manifest.TableMetadata
420 : inputBlob manifest.BlobFileMetadata
421 : rw *blob.FileRewriter
422 : blkHeap blockHeap
423 : }
424 :
425 : func newBlobFileRewriter(
426 : fc *fileCacheHandle,
427 : readEnv block.ReadEnv,
428 : outputFileNum base.DiskFileNum,
429 : w objstorage.Writable,
430 : opts blob.FileWriterOptions,
431 : sstables []*manifest.TableMetadata,
432 : inputBlob manifest.BlobFileMetadata,
433 2 : ) *blobFileRewriter {
434 2 : rw := blob.NewFileRewriter(inputBlob.FileID, inputBlob.Physical, fc, readEnv, outputFileNum, w, opts)
435 2 : return &blobFileRewriter{
436 2 : fc: fc,
437 2 : readEnv: readEnv,
438 2 : rw: rw,
439 2 : sstables: sstables,
440 2 : inputBlob: inputBlob,
441 2 : blkHeap: blockHeap{},
442 2 : }
443 2 : }
444 :
445 : // generateHeap populates rw.blkHeap with the blob reference liveness encodings
446 : // for each referencing sstable, rw.sstables.
447 2 : func (rw *blobFileRewriter) generateHeap(ctx context.Context) error {
448 2 : heap.Init(&rw.blkHeap)
449 2 :
450 2 : var decoder colblk.ReferenceLivenessBlockDecoder
451 2 : // For each sstable that references the input blob file, push its
452 2 : // sstable.BlobLivenessEncoding on to the heap.
453 2 : for _, sst := range rw.sstables {
454 2 : // Validate that the sstable contains a reference to the input blob
455 2 : // file.
456 2 : refID, ok := sst.BlobReferences.IDByBlobFileID(rw.inputBlob.FileID)
457 2 : if !ok {
458 0 : return errors.AssertionFailedf("table %s doesn't contain a reference to blob file %s",
459 0 : sst.TableNum, rw.inputBlob.FileID)
460 0 : }
461 2 : err := rw.fc.withReader(ctx, rw.readEnv, sst, func(r *sstable.Reader, readEnv sstable.ReadEnv) error {
462 2 : h, err := r.ReadBlobRefIndexBlock(ctx, readEnv.Block)
463 2 : if err != nil {
464 0 : return err
465 0 : }
466 2 : defer h.Release()
467 2 : decoder.Init(h.BlockData())
468 2 : bitmapEncodings := slices.Clone(decoder.LivenessAtReference(int(refID)))
469 2 : // TODO(annie): We should instead maintain 1 heap item per sstable
470 2 : // instead of 1 heap item per sstable block ref to reduce the heap
471 2 : // comparisons to O(sstables).
472 2 : for _, enc := range sstable.DecodeBlobRefLivenessEncoding(bitmapEncodings) {
473 2 : heap.Push(&rw.blkHeap, &enc)
474 2 : }
475 2 : return nil
476 : })
477 2 : if err != nil {
478 0 : return err
479 0 : }
480 : }
481 2 : return nil
482 : }
483 :
484 2 : func (rw *blobFileRewriter) Rewrite(ctx context.Context) (blob.FileWriterStats, error) {
485 2 : if err := rw.generateHeap(ctx); err != nil {
486 0 : return blob.FileWriterStats{}, err
487 0 : }
488 2 : if rw.blkHeap.Len() == 0 {
489 0 : return blob.FileWriterStats{}, errors.AssertionFailedf("heap empty")
490 0 : }
491 :
492 : // Begin constructing our output blob file. We maintain a map of blockID
493 : // to accumulated liveness data across all referencing sstables.
494 2 : firstBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
495 2 : pending := blockValues{
496 2 : blockID: firstBlock.BlockID,
497 2 : valuesSize: firstBlock.ValuesSize,
498 2 : liveValueIDs: slices.Collect(sstable.IterSetBitsInRunLengthBitmap(firstBlock.Bitmap)),
499 2 : }
500 2 : for rw.blkHeap.Len() > 0 {
501 2 : nextBlock := heap.Pop(&rw.blkHeap).(*sstable.BlobRefLivenessEncoding)
502 2 :
503 2 : // If we are encountering a new block, write the last accumulated block
504 2 : // to the blob file.
505 2 : if pending.blockID != nextBlock.BlockID {
506 2 : // Write the last accumulated block's values to the blob file.
507 2 : err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
508 2 : if err != nil {
509 0 : return blob.FileWriterStats{}, err
510 0 : }
511 2 : pending = blockValues{blockID: nextBlock.BlockID, liveValueIDs: pending.liveValueIDs[:0]}
512 : }
513 : // Update the accumulated encoding for this block.
514 2 : pending.valuesSize += nextBlock.ValuesSize
515 2 : pending.liveValueIDs = slices.AppendSeq(pending.liveValueIDs,
516 2 : sstable.IterSetBitsInRunLengthBitmap(nextBlock.Bitmap))
517 : }
518 :
519 : // Copy the last accumulated block.
520 2 : err := rw.rw.CopyBlock(ctx, pending.blockID, pending.valuesSize, pending.liveValueIDs)
521 2 : if err != nil {
522 0 : return blob.FileWriterStats{}, err
523 0 : }
524 2 : return rw.rw.Close()
525 : }
|