LCOV - code coverage report
Current view: top level - pebble/sstable - copier.go (source / functions) Hit Total Coverage
Test: 2025-01-09 08:16Z 97ad2bcc - tests only.lcov Lines: 109 160 68.1 %
Date: 2025-01-09 08:17:03 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      15             :         "github.com/cockroachdb/pebble/sstable/block"
      16             : )
      17             : 
      18             : // CopySpan produces a copy of a approximate subset of an input sstable.
      19             : //
      20             : // The produced sstable contains all keys from the input sstable in the span
      21             : // [start, end), as well as potentially some additional keys from the original
      22             : // file that were adjacent to but outside that span.
      23             : //
      24             : // CopySpan differs from simply seeking a reader to start and iterating until
      25             : // the end passing the results to a writer in that it does not write the new
      26             : // sstable from scratch, key-by-key, recompressing each key into new blocks and
      27             : // computing new filters and properties. Instead, it finds data _blocks_ that
      28             : // intersect the requested span and copies those, whole, to the new file,
      29             : // avoiding all decompression and recompression work. It then copies the
      30             : // original bloom filter - this filter is valid for the subset of data as well,
      31             : // just with potentially a higher false positive rate compared to one that would
      32             : // be computed just from the keys in it.
      33             : //
      34             : // The resulting sstable will have no block properties.
      35             : //
      36             : // The function might return ErrEmptySpan if there are no blocks that could
      37             : // include keys in the given range. See ErrEmptySpan for more details.
      38             : //
      39             : // Closes input and finishes or aborts output in all cases, including on errors.
      40             : //
      41             : // Note that CopySpan is not aware of any suffix or prefix replacement; the
      42             : // caller must account for those when specifying the bounds.
      43             : func CopySpan(
      44             :         ctx context.Context,
      45             :         input objstorage.Readable,
      46             :         r *Reader,
      47             :         rOpts ReaderOptions,
      48             :         output objstorage.Writable,
      49             :         o WriterOptions,
      50             :         start, end InternalKey,
      51           1 : ) (size uint64, _ error) {
      52           1 :         defer input.Close()
      53           1 : 
      54           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      55           0 :                 return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
      56           0 :         }
      57             : 
      58             :         // If our input has not filters, our output cannot have filters either.
      59           1 :         if r.tableFilter == nil {
      60           1 :                 o.FilterPolicy = nil
      61           1 :         }
      62           1 :         o.TableFormat = r.tableFormat
      63           1 :         // We don't want the writer to attempt to write out block property data in
      64           1 :         // index blocks. This data won't be valid since we're not passing the actual
      65           1 :         // key data through the writer. We also remove the table-level properties
      66           1 :         // below.
      67           1 :         //
      68           1 :         // TODO(dt,radu): Figure out how to populate the prop collector state with
      69           1 :         // block props from the original sst.
      70           1 :         o.BlockPropertyCollectors = nil
      71           1 :         o.disableObsoleteCollector = true
      72           1 :         w := NewRawWriter(output, o)
      73           1 : 
      74           1 :         defer func() {
      75           1 :                 if w != nil {
      76           1 :                         w.Close()
      77           1 :                 }
      78             :         }()
      79             : 
      80           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      81           0 :                 // We just checked for these conditions above.
      82           0 :                 return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
      83           0 :         }
      84             : 
      85           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
      86           1 :         // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
      87           1 :         // and lower-level index blocks with one read.
      88           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
      89           1 :                 r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
      90           1 :         defer rh.Close()
      91           1 :         rh.SetupForCompaction()
      92           1 :         indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, rh)
      93           1 :         if err != nil {
      94           0 :                 return 0, err
      95           0 :         }
      96           1 :         defer indexH.Release()
      97           1 : 
      98           1 :         // Set the filter block to be copied over if it exists. It will return false
      99           1 :         // positives for keys in blocks of the original file that we don't copy, but
     100           1 :         // filters can always have false positives, so this is fine.
     101           1 :         if r.tableFilter != nil {
     102           0 :                 filterBlock, err := r.readFilterBlock(ctx, noEnv, rh, r.filterBH)
     103           0 :                 if err != nil {
     104           0 :                         return 0, errors.Wrap(err, "reading filter")
     105           0 :                 }
     106           0 :                 filterBytes := append([]byte{}, filterBlock.BlockData()...)
     107           0 :                 filterBlock.Release()
     108           0 :                 w.copyFilter(filterBytes, r.Properties.FilterPolicyName)
     109             :         }
     110             : 
     111             :         // Copy all the props from the source file; we can't compute our own for many
     112             :         // that depend on seeing every key, such as total count or size so we copy the
     113             :         // original props instead. This will result in over-counts but that is safer
     114             :         // than under-counts.
     115           1 :         w.copyProperties(r.Properties)
     116           1 : 
     117           1 :         // Find the blocks that intersect our span.
     118           1 :         blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
     119           1 :         if err != nil {
     120           0 :                 return 0, err
     121           0 :         }
     122             : 
     123             :         // In theory an empty SST is fine, but #3409 means they are not. We could make
     124             :         // a non-empty sst by copying something outside the span, but #3907 means that
     125             :         // the empty virtual span would still be a problem, so don't bother.
     126           1 :         if len(blocks) < 1 {
     127           1 :                 return 0, ErrEmptySpan
     128           1 :         }
     129             : 
     130             :         // Copy all blocks byte-for-byte without doing any per-key processing.
     131           1 :         var blocksNotInCache []indexEntry
     132           1 : 
     133           1 :         for i := range blocks {
     134           1 :                 h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
     135           1 :                 if !h.Valid() {
     136           1 :                         // Cache miss. Add this block to the list of blocks that are not in cache.
     137           1 :                         blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
     138           1 :                         continue
     139             :                 }
     140             : 
     141             :                 // Cache hit.
     142           1 :                 rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
     143           1 :                 if len(blocksNotInCache) > 0 {
     144           1 :                         // We have some blocks that were not in cache preceding this block.
     145           1 :                         // Copy them using objstorage.Copy.
     146           1 :                         if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
     147           0 :                                 h.Release()
     148           0 :                                 return 0, err
     149           0 :                         }
     150           1 :                         blocksNotInCache = nil
     151             :                 }
     152             : 
     153           1 :                 err := w.addDataBlock(block.CacheBufferHandle(h).BlockData(), blocks[i].sep, blocks[i].bh)
     154           1 :                 h.Release()
     155           1 :                 if err != nil {
     156           0 :                         return 0, err
     157           0 :                 }
     158             :         }
     159             : 
     160           1 :         if len(blocksNotInCache) > 0 {
     161           0 :                 // We have some remaining blocks that were not in cache. Copy them
     162           0 :                 // using objstorage.Copy.
     163           0 :                 if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
     164           0 :                         return 0, err
     165           0 :                 }
     166           0 :                 blocksNotInCache = nil
     167             :         }
     168             : 
     169             :         // TODO(dt): Copy range keys (the fact there are none is checked above).
     170             :         // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
     171             : 
     172           1 :         if err := w.Close(); err != nil {
     173           0 :                 w = nil
     174           0 :                 return 0, err
     175           0 :         }
     176           1 :         meta, err := w.Metadata()
     177           1 :         if err != nil {
     178           0 :                 return 0, err
     179           0 :         }
     180           1 :         wrote := meta.Size
     181           1 :         w = nil
     182           1 :         return wrote, nil
     183             : }
     184             : 
     185             : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
     186             : // requested span.
     187             : //
     188             : // Note that CopySpan's determination of block overlap is best effort - we may
     189             : // copy a block that doesn't actually contain any keys in the span, in which
     190             : // case we won't generate this error. We currently only generate this error when
     191             : // the span start is beyond all keys in the physical sstable.
     192             : var ErrEmptySpan = errors.New("cannot copy empty span")
     193             : 
     194             : // indexEntry captures the two components of an sst index entry: the key and the
     195             : // decoded block handle value.
     196             : type indexEntry struct {
     197             :         sep []byte
     198             :         bh  block.HandleWithProperties
     199             : }
     200             : 
     201             : // intersectingIndexEntries returns the entries from the index with separator
     202             : // keys contained by [start, end), i.e. the subset of the sst's index that
     203             : // intersects the provided span.
     204             : func intersectingIndexEntries(
     205             :         ctx context.Context,
     206             :         r *Reader,
     207             :         rh objstorage.ReadHandle,
     208             :         indexH block.BufferHandle,
     209             :         start, end InternalKey,
     210           1 : ) ([]indexEntry, error) {
     211           1 :         top := r.tableFormat.newIndexIter()
     212           1 :         err := top.Init(r.Comparer, indexH.BlockData(), NoTransforms)
     213           1 :         if err != nil {
     214           0 :                 return nil, err
     215           0 :         }
     216           1 :         defer top.Close()
     217           1 : 
     218           1 :         var alloc bytealloc.A
     219           1 :         res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
     220           1 :         for valid := top.SeekGE(start.UserKey); valid; valid = top.Next() {
     221           1 :                 bh, err := top.BlockHandleWithProperties()
     222           1 :                 if err != nil {
     223           0 :                         return nil, err
     224           0 :                 }
     225           1 :                 if r.Properties.IndexType != twoLevelIndex {
     226           1 :                         entry := indexEntry{bh: bh, sep: top.Separator()}
     227           1 :                         alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     228           1 :                         alloc, entry.sep = alloc.Copy(entry.sep)
     229           1 :                         res = append(res, entry)
     230           1 :                 } else {
     231           1 :                         subBlk, err := r.readIndexBlock(ctx, noEnv, rh, bh.Handle)
     232           1 :                         if err != nil {
     233           0 :                                 return nil, err
     234           0 :                         }
     235           1 :                         defer subBlk.Release() // in-loop, but it is a short loop.
     236           1 : 
     237           1 :                         sub := r.tableFormat.newIndexIter()
     238           1 :                         err = sub.Init(r.Comparer, subBlk.BlockData(), NoTransforms)
     239           1 :                         if err != nil {
     240           0 :                                 return nil, err
     241           0 :                         }
     242           1 :                         defer sub.Close() // in-loop, but it is a short loop.
     243           1 : 
     244           1 :                         for valid := sub.SeekGE(start.UserKey); valid; valid = sub.Next() {
     245           1 :                                 bh, err := sub.BlockHandleWithProperties()
     246           1 :                                 if err != nil {
     247           0 :                                         return nil, err
     248           0 :                                 }
     249           1 :                                 entry := indexEntry{bh: bh, sep: sub.Separator()}
     250           1 :                                 alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     251           1 :                                 alloc, entry.sep = alloc.Copy(entry.sep)
     252           1 :                                 res = append(res, entry)
     253           1 :                                 if r.Compare(end.UserKey, entry.sep) <= 0 {
     254           1 :                                         break
     255             :                                 }
     256             :                         }
     257             :                 }
     258           1 :                 if top.SeparatorGT(end.UserKey, true /* inclusively */) {
     259           1 :                         break
     260             :                 }
     261             :         }
     262           1 :         return res, nil
     263             : }
     264             : 
     265             : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
     266             : // exists to ensure it is visible in profiles/stack traces if we are looking at
     267             : // cluster copying more than expected.
     268             : //
     269             : // Finishes or Aborts output; does *not* Close input.
     270             : func copyWholeFileBecauseOfUnsupportedFeature(
     271             :         ctx context.Context, input objstorage.Readable, output objstorage.Writable,
     272           0 : ) (size uint64, _ error) {
     273           0 :         length := uint64(input.Size())
     274           0 :         rh := input.NewReadHandle(objstorage.NoReadBefore)
     275           0 :         rh.SetupForCompaction()
     276           0 :         if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
     277           0 :                 output.Abort()
     278           0 :                 return 0, err
     279           0 :         }
     280           0 :         return length, output.Finish()
     281             : }

Generated by: LCOV version 1.14