LCOV - code coverage report
Current view: top level - pebble/sstable - copier.go (source / functions) Hit Total Coverage
Test: 2024-08-28 08:16Z 34e929e1 - tests only.lcov Lines: 134 198 67.7 %
Date: 2024-08-28 08:16:42 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      15             :         "github.com/cockroachdb/pebble/sstable/block"
      16             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      17             : )
      18             : 
      19             : // CopySpan produces a copy of a approximate subset of an input sstable.
      20             : //
      21             : // The produced sstable contains all keys from the input sstable in the span
      22             : // [start, end), as well as potentially some additional keys from the original
      23             : // file that were adjacent to but outside that span.
      24             : //
      25             : // CopySpan differs from simply seeking a reader to start and iterating until
      26             : // the end passing the results to a writer in that it does not write the new
      27             : // sstable from scratch, key-by-key, recompressing each key into new blocks and
      28             : // computing new filters and properties. Instead, it finds data _blocks_ that
      29             : // intersect the requested span and copies those, whole, to the new file,
      30             : // avoiding all decompression and recompression work. It then copies the
      31             : // original bloom filter - this filter is valid for the subset of data as well,
      32             : // just with potentially a higher false positive rate compared to one that would
      33             : // be computed just from the keys in it.
      34             : //
      35             : // The resulting sstable will have no block properties.
      36             : //
      37             : // The function might return ErrEmptySpan if there are no blocks that could
      38             : // include keys in the given range. See ErrEmptySpan for more details.
      39             : //
      40             : // Closes input and finishes or aborts output in all cases, including on errors.
      41             : //
      42             : // Note that CopySpan is not aware of any suffix or prefix replacement; the
      43             : // caller must account for those when specifying the bounds.
      44             : func CopySpan(
      45             :         ctx context.Context,
      46             :         input objstorage.Readable,
      47             :         r *Reader,
      48             :         rOpts ReaderOptions,
      49             :         output objstorage.Writable,
      50             :         o WriterOptions,
      51             :         start, end InternalKey,
      52           1 : ) (size uint64, _ error) {
      53           1 :         defer input.Close()
      54           1 : 
      55           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      56           0 :                 return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
      57           0 :         }
      58             : 
      59             :         // If our input has not filters, our output cannot have filters either.
      60           1 :         if r.tableFilter == nil {
      61           1 :                 o.FilterPolicy = nil
      62           1 :         }
      63           1 :         o.TableFormat = r.tableFormat
      64           1 :         w := NewRawWriter(output, o)
      65           1 : 
      66           1 :         // We don't want the writer to attempt to write out block property data in
      67           1 :         // index blocks. This data won't be valid since we're not passing the actual
      68           1 :         // key data through the writer. We also remove the table-level properties
      69           1 :         // below.
      70           1 :         //
      71           1 :         // TODO(dt,radu): Figure out how to populate the prop collector state with
      72           1 :         // block props from the original sst.
      73           1 :         w.blockPropCollectors = nil
      74           1 : 
      75           1 :         defer func() {
      76           1 :                 if w != nil {
      77           1 :                         // set w.err to any non-nil error just so it aborts instead of finishing.
      78           1 :                         w.err = base.ErrNotFound
      79           1 :                         // w.Close now owns calling output.Abort().
      80           1 :                         w.Close()
      81           1 :                 }
      82             :         }()
      83             : 
      84           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      85           0 :                 // We just checked for these conditions above.
      86           0 :                 return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
      87           0 :         }
      88             : 
      89           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
      90           1 :         // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
      91           1 :         // and lower-level index blocks with one read.
      92           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
      93           1 :                 r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
      94           1 :         defer rh.Close()
      95           1 :         rh.SetupForCompaction()
      96           1 :         indexH, err := r.readIndex(ctx, rh, nil, nil)
      97           1 :         if err != nil {
      98           0 :                 return 0, err
      99           0 :         }
     100           1 :         defer indexH.Release()
     101           1 : 
     102           1 :         // Set the filter block to be copied over if it exists. It will return false
     103           1 :         // positives for keys in blocks of the original file that we don't copy, but
     104           1 :         // filters can always have false positives, so this is fine.
     105           1 :         if r.tableFilter != nil {
     106           0 :                 filterBlock, err := r.readFilter(ctx, rh, nil, nil)
     107           0 :                 if err != nil {
     108           0 :                         return 0, errors.Wrap(err, "reading filter")
     109           0 :                 }
     110           0 :                 filterBytes := append([]byte{}, filterBlock.Get()...)
     111           0 :                 filterBlock.Release()
     112           0 :                 w.filter = copyFilterWriter{
     113           0 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
     114           0 :                 }
     115             :         }
     116             : 
     117             :         // Copy all the props from the source file; we can't compute our own for many
     118             :         // that depend on seeing every key, such as total count or size so we copy the
     119             :         // original props instead. This will result in over-counts but that is safer
     120             :         // than under-counts.
     121           1 :         w.props = r.Properties
     122           1 :         // Remove all user properties to disable block properties, which we do not
     123           1 :         // calculate.
     124           1 :         w.props.UserProperties = nil
     125           1 :         // Reset props that we'll re-derive as we build our own index.
     126           1 :         w.props.IndexPartitions = 0
     127           1 :         w.props.TopLevelIndexSize = 0
     128           1 :         w.props.IndexSize = 0
     129           1 :         w.props.IndexType = 0
     130           1 :         if w.filter != nil {
     131           0 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     132           0 :                         return 0, err
     133           0 :                 }
     134             :         }
     135             : 
     136             :         // Find the blocks that intersect our span.
     137           1 :         blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
     138           1 :         if err != nil {
     139           0 :                 return 0, err
     140           0 :         }
     141             : 
     142             :         // In theory an empty SST is fine, but #3409 means they are not. We could make
     143             :         // a non-empty sst by copying something outside the span, but #3907 means that
     144             :         // the empty virtual span would still be a problem, so don't bother.
     145           1 :         if len(blocks) < 1 {
     146           1 :                 return 0, ErrEmptySpan
     147           1 :         }
     148             : 
     149             :         // Copy all blocks byte-for-byte without doing any per-key processing.
     150           1 :         var blocksNotInCache []indexEntry
     151           1 : 
     152           1 :         copyBlocksToFile := func(blocks []indexEntry) error {
     153           1 :                 blockOffset := blocks[0].bh.Offset
     154           1 :                 // The block lengths don't include their trailers, which just sit after the
     155           1 :                 // block length, before the next offset; We get the ones between the blocks
     156           1 :                 // we copy implicitly but need to explicitly add the last trailer to length.
     157           1 :                 length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
     158           1 :                 if spanEnd := length + blockOffset; spanEnd < blockOffset {
     159           0 :                         return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
     160           0 :                 }
     161           1 :                 if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
     162           0 :                         return err
     163           0 :                 }
     164             :                 // Update w.meta.Size so subsequently flushed metadata has correct offsets.
     165           1 :                 w.meta.Size += length
     166           1 :                 for i := range blocks {
     167           1 :                         blocks[i].bh.Offset = w.layout.offset
     168           1 :                         // blocks[i].bh.Length remains unmodified.
     169           1 :                         if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
     170           0 :                                 return err
     171           0 :                         }
     172           1 :                         w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
     173             :                 }
     174           1 :                 return nil
     175             :         }
     176           1 :         for i := range blocks {
     177           1 :                 h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
     178           1 :                 if h.Get() == nil {
     179           1 :                         // Cache miss. Add this block to the list of blocks that are not in cache.
     180           1 :                         blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
     181           1 :                         continue
     182             :                 }
     183             : 
     184             :                 // Cache hit.
     185           1 :                 rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
     186           1 :                 if len(blocksNotInCache) > 0 {
     187           0 :                         // We have some blocks that were not in cache preceding this block.
     188           0 :                         // Copy them using objstorage.Copy.
     189           0 :                         if err := copyBlocksToFile(blocksNotInCache); err != nil {
     190           0 :                                 h.Release()
     191           0 :                                 return 0, err
     192           0 :                         }
     193           0 :                         blocksNotInCache = nil
     194             :                 }
     195             : 
     196             :                 // layout.WriteDataBlock keeps layout.offset up-to-date for us.
     197           1 :                 bh, err := w.layout.WriteDataBlock(h.Get(), &w.dataBlockBuf.blockBuf)
     198           1 :                 h.Release()
     199           1 :                 if err != nil {
     200           0 :                         return 0, err
     201           0 :                 }
     202           1 :                 blocks[i].bh.Handle = bh
     203           1 :                 if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
     204           0 :                         return 0, err
     205           0 :                 }
     206           1 :                 w.meta.Size += uint64(bh.Length) + block.TrailerLen
     207             :         }
     208             : 
     209           1 :         if len(blocksNotInCache) > 0 {
     210           1 :                 // We have some remaining blocks that were not in cache. Copy them
     211           1 :                 // using objstorage.Copy.
     212           1 :                 if err := copyBlocksToFile(blocksNotInCache); err != nil {
     213           0 :                         return 0, err
     214           0 :                 }
     215           1 :                 blocksNotInCache = nil
     216             :         }
     217             : 
     218             :         // TODO(dt): Copy range keys (the fact there are none is checked above).
     219             :         // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
     220             : 
     221           1 :         if err := w.Close(); err != nil {
     222           0 :                 w = nil
     223           0 :                 return 0, err
     224           0 :         }
     225           1 :         wrote := w.meta.Size
     226           1 :         w = nil
     227           1 :         return wrote, nil
     228             : }
     229             : 
     230             : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
     231             : // requested span.
     232             : //
     233             : // Note that CopySpan's determination of block overlap is best effort - we may
     234             : // copy a block that doesn't actually contain any keys in the span, in which
     235             : // case we won't generate this error. We currently only generate this error when
     236             : // the span start is beyond all keys in the physical sstable.
     237             : var ErrEmptySpan = errors.New("cannot copy empty span")
     238             : 
     239             : // indexEntry captures the two components of an sst index entry: the key and the
     240             : // decoded block handle value.
     241             : type indexEntry struct {
     242             :         sep InternalKey
     243             :         bh  block.HandleWithProperties
     244             : }
     245             : 
     246             : // intersectingIndexEntries returns the entries from the index with separator
     247             : // keys contained by [start, end), i.e. the subset of the sst's index that
     248             : // intersects the provided span.
     249             : func intersectingIndexEntries(
     250             :         ctx context.Context,
     251             :         r *Reader,
     252             :         rh objstorage.ReadHandle,
     253             :         indexH block.BufferHandle,
     254             :         start, end InternalKey,
     255           1 : ) ([]indexEntry, error) {
     256           1 :         top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     257           1 :         if err != nil {
     258           0 :                 return nil, err
     259           0 :         }
     260           1 :         defer top.Close()
     261           1 : 
     262           1 :         var alloc bytealloc.A
     263           1 :         res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
     264           1 :         for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
     265           1 :                 bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     266           1 :                 if err != nil {
     267           0 :                         return nil, err
     268           0 :                 }
     269           1 :                 if r.Properties.IndexType != twoLevelIndex {
     270           1 :                         entry := indexEntry{bh: bh, sep: kv.K}
     271           1 :                         alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     272           1 :                         alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     273           1 :                         res = append(res, entry)
     274           1 :                 } else {
     275           1 :                         subBlk, err := r.readBlock(ctx, bh.Handle, nil, rh, nil, nil, nil)
     276           1 :                         if err != nil {
     277           0 :                                 return nil, err
     278           0 :                         }
     279           1 :                         defer subBlk.Release() // in-loop, but it is a short loop.
     280           1 : 
     281           1 :                         sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
     282           1 :                         if err != nil {
     283           0 :                                 return nil, err
     284           0 :                         }
     285           1 :                         defer sub.Close() // in-loop, but it is a short loop.
     286           1 : 
     287           1 :                         for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
     288           1 :                                 bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
     289           1 :                                 if err != nil {
     290           0 :                                         return nil, err
     291           0 :                                 }
     292           1 :                                 entry := indexEntry{bh: bh, sep: kv.K}
     293           1 :                                 alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     294           1 :                                 alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     295           1 :                                 res = append(res, entry)
     296           1 :                                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     297           1 :                                         break
     298             :                                 }
     299             :                         }
     300           1 :                         if err := sub.Error(); err != nil {
     301           0 :                                 return nil, err
     302           0 :                         }
     303             :                 }
     304           1 :                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     305           1 :                         break
     306             :                 }
     307             :         }
     308           1 :         return res, top.Error()
     309             : }
     310             : 
     311             : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
     312             : // exists to ensure it is visible in profiles/stack traces if we are looking at
     313             : // cluster copying more than expected.
     314             : //
     315             : // Finishes or Aborts output; does *not* Close input.
     316             : func copyWholeFileBecauseOfUnsupportedFeature(
     317             :         ctx context.Context, input objstorage.Readable, output objstorage.Writable,
     318           0 : ) (size uint64, _ error) {
     319           0 :         length := uint64(input.Size())
     320           0 :         rh := input.NewReadHandle(objstorage.NoReadBefore)
     321           0 :         rh.SetupForCompaction()
     322           0 :         if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
     323           0 :                 output.Abort()
     324           0 :                 return 0, err
     325           0 :         }
     326           0 :         return length, output.Finish()
     327             : }

Generated by: LCOV version 1.14