LCOV - code coverage report
Current view: top level - pebble/sstable - copier.go (source / functions) Hit Total Coverage
Test: 2024-04-06 08:15Z 45b7a800 - tests + meta.lcov Lines: 111 162 68.5 %
Date: 2024-04-06 08:16:41 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             : )
      15             : 
      16             : // CopySpan produces a copy of a approximate subset of an input sstable.
      17             : //
      18             : // The produced sstable contains all keys from the input sstable in the span
      19             : // [start, end), as well as potentially some additional keys from the original
      20             : // file that were adjacent to but outside that span.
      21             : //
      22             : // CopySpan differs from simply seeking a reader to start and iterating until
      23             : // the end passing the results to a writer in that it does not write the new
      24             : // sstable from scratch, key-by-key, recompressing each key into new blocks and
      25             : // computing new filters and properties. Instead, it finds data _blocks_ that
      26             : // intersect the requested span and copies those, whole, to the new file,
      27             : // avoiding all decompression and recompression work. It then copies the
      28             : // original bloom filter - this filter is valid for the subset of data as well,
      29             : // just with potentially a higher false positive rate compared to one that would
      30             : // be computed just from the keys in it.
      31             : //
      32             : // The resulting sstable will have no block properties.
      33             : //
      34             : // Closes input and finishes or aborts output, including on non-nil errors.
      35             : //
      36             : // Note that CopySpan is not aware of any suffix or prefix replacement; the
      37             : // caller must account for those when specifying the bounds.
      38             : func CopySpan(
      39             :         ctx context.Context,
      40             :         input objstorage.Readable,
      41             :         rOpts ReaderOptions,
      42             :         output objstorage.Writable,
      43             :         o WriterOptions,
      44             :         start, end InternalKey,
      45           2 : ) (size uint64, _ error) {
      46           2 :         r, err := NewReader(input, rOpts)
      47           2 :         if err != nil {
      48           0 :                 input.Close()
      49           0 :                 output.Abort()
      50           0 :                 return 0, err
      51           0 :         }
      52           2 :         defer r.Close() // r.Close now owns calling input.Close().
      53           2 : 
      54           2 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      55           0 :                 return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
      56           0 :         }
      57             : 
      58             :         // If our input has not filters, our output cannot have filters either.
      59           2 :         if r.tableFilter == nil {
      60           2 :                 o.FilterPolicy = nil
      61           2 :         }
      62           2 :         o.TableFormat = r.tableFormat
      63           2 :         w := NewWriter(output, o)
      64           2 : 
      65           2 :         // We don't want the writer to attempt to write out block property data in
      66           2 :         // index blocks. This data won't be valid since we're not passing the actual
      67           2 :         // key data through the writer. We also remove the table-level properties
      68           2 :         // below.
      69           2 :         //
      70           2 :         // TODO(dt,radu): Figure out how to populate the prop collector state with
      71           2 :         // block props from the original sst.
      72           2 :         w.blockPropCollectors = nil
      73           2 : 
      74           2 :         defer func() {
      75           2 :                 if w != nil {
      76           0 :                         // set w.err to any non-nil error just so it aborts instead of finishing.
      77           0 :                         w.err = base.ErrNotFound
      78           0 :                         // w.Close now owns calling output.Abort().
      79           0 :                         w.Close()
      80           0 :                 }
      81             :         }()
      82             : 
      83           2 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      84           0 :                 return 0, errors.New("cannot CopySpan sstables with value blocks or range keys")
      85           0 :         }
      86             : 
      87             :         // Set the filter block to be copied over if it exists. It will return false
      88             :         // positives for keys in blocks of the original file that we don't copy, but
      89             :         // filters can always have false positives, so this is fine.
      90           2 :         if r.tableFilter != nil {
      91           1 :                 filterBlock, err := r.readFilter(ctx, nil, nil)
      92           1 :                 if err != nil {
      93           0 :                         return 0, errors.Wrap(err, "reading filter")
      94           0 :                 }
      95           1 :                 filterBytes := append([]byte{}, filterBlock.Get()...)
      96           1 :                 filterBlock.Release()
      97           1 :                 w.filter = copyFilterWriter{
      98           1 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
      99           1 :                 }
     100             :         }
     101             : 
     102             :         // Copy all the props from the source file; we can't compute our own for many
     103             :         // that depend on seeing every key, such as total count or size so we copy the
     104             :         // original props instead. This will result in over-counts but that is safer
     105             :         // than under-counts.
     106           2 :         w.props = r.Properties
     107           2 :         // Remove all user properties to disable block properties, which we do not
     108           2 :         // calculate.
     109           2 :         w.props.UserProperties = nil
     110           2 :         // Reset props that we'll re-derive as we build our own index.
     111           2 :         w.props.IndexPartitions = 0
     112           2 :         w.props.TopLevelIndexSize = 0
     113           2 :         w.props.IndexSize = 0
     114           2 :         w.props.IndexType = 0
     115           2 :         if w.filter != nil {
     116           1 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     117           0 :                         return 0, err
     118           0 :                 }
     119             :         }
     120             : 
     121             :         // Find the blocks that intersect our span.
     122           2 :         blocks, err := intersectingIndexEntries(ctx, r, start, end)
     123           2 :         if err != nil {
     124           0 :                 return 0, err
     125           0 :         }
     126             : 
     127             :         // In theory an empty SST is fine, but #3409 means they are not. We could make
     128             :         // a non-empty sst by copying something outside the span, but #3907 means that
     129             :         // the empty virtual span would still be a problem, so don't bother.
     130           2 :         if len(blocks) < 1 {
     131           0 :                 return 0, errors.Newf("CopySpan cannot copy empty span %s %s", start, end)
     132           0 :         }
     133             : 
     134             :         // Find the span of the input file that contains all our blocks, and then copy
     135             :         // it byte-for-byte without doing any per-key processing.
     136           2 :         offset := blocks[0].bh.Offset
     137           2 : 
     138           2 :         // The block lengths don't include their trailers, which just sit after the
     139           2 :         // block length, before the next offset; We get the ones between the blocks
     140           2 :         // we copy implicitly but need to explicitly add the last trailer to length.
     141           2 :         length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + blockTrailerLen - offset
     142           2 : 
     143           2 :         if spanEnd := length + offset; spanEnd < offset {
     144           0 :                 return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
     145           0 :         }
     146             : 
     147           2 :         if err := objstorage.Copy(ctx, r.readable, w.writable, offset, length); err != nil {
     148           0 :                 return 0, err
     149           0 :         }
     150             :         // Update w.meta.Size so subsequently flushed metadata has correct offsets.
     151           2 :         w.meta.Size += length
     152           2 : 
     153           2 :         // Now we can setup index entries for all the blocks we just copied, pointing
     154           2 :         // into the copied span.
     155           2 :         for i := range blocks {
     156           2 :                 blocks[i].bh.Offset -= offset
     157           2 :                 if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
     158           0 :                         return 0, err
     159           0 :                 }
     160             :         }
     161             : 
     162             :         // TODO(dt): Copy range keys (the fact there are none is checked above).
     163             :         // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
     164             : 
     165           2 :         if err := w.Close(); err != nil {
     166           0 :                 w = nil
     167           0 :                 return 0, err
     168           0 :         }
     169           2 :         wrote := w.meta.Size
     170           2 :         w = nil
     171           2 :         return wrote, nil
     172             : }
     173             : 
     174             : // indexEntry captures the two components of an sst index entry: the key and the
     175             : // decoded block handle value.
     176             : type indexEntry struct {
     177             :         sep InternalKey
     178             :         bh  BlockHandleWithProperties
     179             : }
     180             : 
     181             : // intersectingIndexEntries returns the entries from the index with separator
     182             : // keys contained by [start, end), i.e. the subset of the sst's index that
     183             : // intersects the provided span.
     184             : func intersectingIndexEntries(
     185             :         ctx context.Context, r *Reader, start, end InternalKey,
     186           2 : ) ([]indexEntry, error) {
     187           2 :         indexH, err := r.readIndex(ctx, nil, nil)
     188           2 :         if err != nil {
     189           0 :                 return nil, err
     190           0 :         }
     191           2 :         defer indexH.Release()
     192           2 :         top, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     193           2 :         if err != nil {
     194           0 :                 return nil, err
     195           0 :         }
     196           2 :         defer top.Close()
     197           2 : 
     198           2 :         var rh objstorage.ReadHandle
     199           2 :         if r.Properties.IndexType == twoLevelIndex {
     200           2 :                 rh = r.readable.NewReadHandle(ctx)
     201           2 :                 defer rh.Close()
     202           2 :         }
     203             : 
     204           2 :         var alloc bytealloc.A
     205           2 :         res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
     206           2 :         for key, value := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); key != nil; key, value = top.Next() {
     207           2 :                 bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     208           2 :                 if err != nil {
     209           0 :                         return nil, err
     210           0 :                 }
     211           2 :                 if r.Properties.IndexType != twoLevelIndex {
     212           2 :                         entry := indexEntry{bh: bh, sep: *key}
     213           2 :                         alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     214           2 :                         alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     215           2 :                         res = append(res, entry)
     216           2 :                 } else {
     217           2 :                         subBlk, err := r.readBlock(ctx, bh.BlockHandle, nil, rh, nil, nil, nil)
     218           2 :                         if err != nil {
     219           0 :                                 return nil, err
     220           0 :                         }
     221           2 :                         defer subBlk.Release() // in-loop, but it is a short loop.
     222           2 : 
     223           2 :                         sub, err := newBlockIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
     224           2 :                         if err != nil {
     225           0 :                                 return nil, err
     226           0 :                         }
     227           2 :                         defer sub.Close() // in-loop, but it is a short loop.
     228           2 : 
     229           2 :                         for key, value := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); key != nil; key, value = sub.Next() {
     230           2 :                                 bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
     231           2 :                                 if err != nil {
     232           0 :                                         return nil, err
     233           0 :                                 }
     234           2 :                                 entry := indexEntry{bh: bh, sep: *key}
     235           2 :                                 alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     236           2 :                                 alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     237           2 :                                 res = append(res, entry)
     238           2 :                                 if base.InternalCompare(r.Compare, end, *key) <= 0 {
     239           2 :                                         break
     240             :                                 }
     241             :                         }
     242           2 :                         if err := sub.Error(); err != nil {
     243           0 :                                 return nil, err
     244           0 :                         }
     245             :                 }
     246           2 :                 if base.InternalCompare(r.Compare, end, *key) <= 0 {
     247           2 :                         break
     248             :                 }
     249             :         }
     250           2 :         return res, top.Error()
     251             : }
     252             : 
     253             : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
     254             : // exists to ensure it is visible in profiles/stack traces if we are looking at
     255             : // cluster copying more than expected.
     256             : //
     257             : // Finishes or Aborts output; does *not* Close input.
     258             : func copyWholeFileBecauseOfUnsupportedFeature(
     259             :         ctx context.Context, input objstorage.Readable, output objstorage.Writable,
     260           0 : ) (size uint64, _ error) {
     261           0 :         length := uint64(input.Size())
     262           0 :         if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
     263           0 :                 output.Abort()
     264           0 :                 return 0, err
     265           0 :         }
     266           0 :         return length, output.Finish()
     267             : }

Generated by: LCOV version 1.14