LCOV - code coverage report
Current view: top level - pebble/sstable - copier.go (source / functions) Hit Total Coverage
Test: 2024-08-01 08:16Z 7f76f1a8 - meta test only.lcov Lines: 125 171 73.1 %
Date: 2024-08-01 08:17:16 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      15             :         "github.com/cockroachdb/pebble/sstable/block"
      16             :         "github.com/cockroachdb/pebble/sstable/rowblk"
      17             : )
      18             : 
      19             : // CopySpan produces a copy of a approximate subset of an input sstable.
      20             : //
      21             : // The produced sstable contains all keys from the input sstable in the span
      22             : // [start, end), as well as potentially some additional keys from the original
      23             : // file that were adjacent to but outside that span.
      24             : //
      25             : // CopySpan differs from simply seeking a reader to start and iterating until
      26             : // the end passing the results to a writer in that it does not write the new
      27             : // sstable from scratch, key-by-key, recompressing each key into new blocks and
      28             : // computing new filters and properties. Instead, it finds data _blocks_ that
      29             : // intersect the requested span and copies those, whole, to the new file,
      30             : // avoiding all decompression and recompression work. It then copies the
      31             : // original bloom filter - this filter is valid for the subset of data as well,
      32             : // just with potentially a higher false positive rate compared to one that would
      33             : // be computed just from the keys in it.
      34             : //
      35             : // The resulting sstable will have no block properties.
      36             : //
      37             : // The function might return ErrEmptySpan if there are no blocks that could
      38             : // include keys in the given range. See ErrEmptySpan for more details.
      39             : //
      40             : // Closes input and finishes or aborts output in all cases, including on errors.
      41             : //
      42             : // Note that CopySpan is not aware of any suffix or prefix replacement; the
      43             : // caller must account for those when specifying the bounds.
      44             : func CopySpan(
      45             :         ctx context.Context,
      46             :         input objstorage.Readable,
      47             :         rOpts ReaderOptions,
      48             :         output objstorage.Writable,
      49             :         o WriterOptions,
      50             :         start, end InternalKey,
      51           1 : ) (size uint64, _ error) {
      52           1 :         r, err := NewReader(ctx, input, rOpts)
      53           1 :         if err != nil {
      54           0 :                 input.Close()
      55           0 :                 output.Abort()
      56           0 :                 return 0, err
      57           0 :         }
      58           1 :         defer r.Close() // r.Close now owns calling input.Close().
      59           1 : 
      60           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      61           1 :                 return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
      62           1 :         }
      63             : 
      64             :         // If our input has not filters, our output cannot have filters either.
      65           1 :         if r.tableFilter == nil {
      66           1 :                 o.FilterPolicy = nil
      67           1 :         }
      68           1 :         o.TableFormat = r.tableFormat
      69           1 :         w := NewRawWriter(output, o)
      70           1 : 
      71           1 :         // We don't want the writer to attempt to write out block property data in
      72           1 :         // index blocks. This data won't be valid since we're not passing the actual
      73           1 :         // key data through the writer. We also remove the table-level properties
      74           1 :         // below.
      75           1 :         //
      76           1 :         // TODO(dt,radu): Figure out how to populate the prop collector state with
      77           1 :         // block props from the original sst.
      78           1 :         w.blockPropCollectors = nil
      79           1 : 
      80           1 :         defer func() {
      81           1 :                 if w != nil {
      82           0 :                         // set w.err to any non-nil error just so it aborts instead of finishing.
      83           0 :                         w.err = base.ErrNotFound
      84           0 :                         // w.Close now owns calling output.Abort().
      85           0 :                         w.Close()
      86           0 :                 }
      87             :         }()
      88             : 
      89           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      90           0 :                 // We just checked for these conditions above.
      91           0 :                 return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
      92           0 :         }
      93             : 
      94           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
      95           1 :         // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
      96           1 :         // and lower-level index blocks with one read.
      97           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
      98           1 :                 r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
      99           1 :         defer rh.Close()
     100           1 :         indexH, err := r.readIndex(ctx, rh, nil, nil)
     101           1 :         if err != nil {
     102           0 :                 return 0, err
     103           0 :         }
     104           1 :         defer indexH.Release()
     105           1 : 
     106           1 :         // Set the filter block to be copied over if it exists. It will return false
     107           1 :         // positives for keys in blocks of the original file that we don't copy, but
     108           1 :         // filters can always have false positives, so this is fine.
     109           1 :         if r.tableFilter != nil {
     110           1 :                 filterBlock, err := r.readFilter(ctx, rh, nil, nil)
     111           1 :                 if err != nil {
     112           0 :                         return 0, errors.Wrap(err, "reading filter")
     113           0 :                 }
     114           1 :                 filterBytes := append([]byte{}, filterBlock.Get()...)
     115           1 :                 filterBlock.Release()
     116           1 :                 w.filter = copyFilterWriter{
     117           1 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
     118           1 :                 }
     119             :         }
     120             : 
     121             :         // Copy all the props from the source file; we can't compute our own for many
     122             :         // that depend on seeing every key, such as total count or size so we copy the
     123             :         // original props instead. This will result in over-counts but that is safer
     124             :         // than under-counts.
     125           1 :         w.props = r.Properties
     126           1 :         // Remove all user properties to disable block properties, which we do not
     127           1 :         // calculate.
     128           1 :         w.props.UserProperties = nil
     129           1 :         // Reset props that we'll re-derive as we build our own index.
     130           1 :         w.props.IndexPartitions = 0
     131           1 :         w.props.TopLevelIndexSize = 0
     132           1 :         w.props.IndexSize = 0
     133           1 :         w.props.IndexType = 0
     134           1 :         if w.filter != nil {
     135           1 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     136           0 :                         return 0, err
     137           0 :                 }
     138             :         }
     139             : 
     140             :         // Find the blocks that intersect our span.
     141           1 :         blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
     142           1 :         if err != nil {
     143           0 :                 return 0, err
     144           0 :         }
     145             : 
     146             :         // In theory an empty SST is fine, but #3409 means they are not. We could make
     147             :         // a non-empty sst by copying something outside the span, but #3907 means that
     148             :         // the empty virtual span would still be a problem, so don't bother.
     149           1 :         if len(blocks) < 1 {
     150           0 :                 return 0, ErrEmptySpan
     151           0 :         }
     152             : 
     153             :         // Find the span of the input file that contains all our blocks, and then copy
     154             :         // it byte-for-byte without doing any per-key processing.
     155           1 :         offset := blocks[0].bh.Offset
     156           1 : 
     157           1 :         // The block lengths don't include their trailers, which just sit after the
     158           1 :         // block length, before the next offset; We get the ones between the blocks
     159           1 :         // we copy implicitly but need to explicitly add the last trailer to length.
     160           1 :         length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - offset
     161           1 : 
     162           1 :         if spanEnd := length + offset; spanEnd < offset {
     163           0 :                 return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
     164           0 :         }
     165             : 
     166           1 :         if err := objstorage.Copy(ctx, r.readable, w.layout.writable, offset, length); err != nil {
     167           0 :                 return 0, err
     168           0 :         }
     169           1 :         w.layout.offset += length
     170           1 : 
     171           1 :         // Update w.meta.Size so subsequently flushed metadata has correct offsets.
     172           1 :         w.meta.Size += length
     173           1 : 
     174           1 :         // Now we can setup index entries for all the blocks we just copied, pointing
     175           1 :         // into the copied span.
     176           1 :         for i := range blocks {
     177           1 :                 blocks[i].bh.Offset -= offset
     178           1 :                 if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
     179           0 :                         return 0, err
     180           0 :                 }
     181             :         }
     182             : 
     183             :         // TODO(dt): Copy range keys (the fact there are none is checked above).
     184             :         // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
     185             : 
     186           1 :         if err := w.Close(); err != nil {
     187           0 :                 w = nil
     188           0 :                 return 0, err
     189           0 :         }
     190           1 :         wrote := w.meta.Size
     191           1 :         w = nil
     192           1 :         return wrote, nil
     193             : }
     194             : 
     195             : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
     196             : // requested span.
     197             : //
     198             : // Note that CopySpan's determination of block overlap is best effort - we may
     199             : // copy a block that doesn't actually contain any keys in the span, in which
     200             : // case we won't generate this error. We currently only generate this error when
     201             : // the span start is beyond all keys in the physical sstable.
     202             : var ErrEmptySpan = errors.New("cannot copy empty span")
     203             : 
     204             : // indexEntry captures the two components of an sst index entry: the key and the
     205             : // decoded block handle value.
     206             : type indexEntry struct {
     207             :         sep InternalKey
     208             :         bh  BlockHandleWithProperties
     209             : }
     210             : 
     211             : // intersectingIndexEntries returns the entries from the index with separator
     212             : // keys contained by [start, end), i.e. the subset of the sst's index that
     213             : // intersects the provided span.
     214             : func intersectingIndexEntries(
     215             :         ctx context.Context,
     216             :         r *Reader,
     217             :         rh objstorage.ReadHandle,
     218             :         indexH block.BufferHandle,
     219             :         start, end InternalKey,
     220           1 : ) ([]indexEntry, error) {
     221           1 :         top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     222           1 :         if err != nil {
     223           0 :                 return nil, err
     224           0 :         }
     225           1 :         defer top.Close()
     226           1 : 
     227           1 :         var alloc bytealloc.A
     228           1 :         res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
     229           1 :         for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
     230           1 :                 bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     231           1 :                 if err != nil {
     232           0 :                         return nil, err
     233           0 :                 }
     234           1 :                 if r.Properties.IndexType != twoLevelIndex {
     235           1 :                         entry := indexEntry{bh: bh, sep: kv.K}
     236           1 :                         alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     237           1 :                         alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     238           1 :                         res = append(res, entry)
     239           1 :                 } else {
     240           1 :                         subBlk, err := r.readBlock(ctx, bh.Handle, nil, rh, nil, nil, nil)
     241           1 :                         if err != nil {
     242           0 :                                 return nil, err
     243           0 :                         }
     244           1 :                         defer subBlk.Release() // in-loop, but it is a short loop.
     245           1 : 
     246           1 :                         sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
     247           1 :                         if err != nil {
     248           0 :                                 return nil, err
     249           0 :                         }
     250           1 :                         defer sub.Close() // in-loop, but it is a short loop.
     251           1 : 
     252           1 :                         for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
     253           1 :                                 bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     254           1 :                                 if err != nil {
     255           0 :                                         return nil, err
     256           0 :                                 }
     257           1 :                                 entry := indexEntry{bh: bh, sep: kv.K}
     258           1 :                                 alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     259           1 :                                 alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     260           1 :                                 res = append(res, entry)
     261           1 :                                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     262           1 :                                         break
     263             :                                 }
     264             :                         }
     265           1 :                         if err := sub.Error(); err != nil {
     266           0 :                                 return nil, err
     267           0 :                         }
     268             :                 }
     269           1 :                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     270           1 :                         break
     271             :                 }
     272             :         }
     273           1 :         return res, top.Error()
     274             : }
     275             : 
     276             : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
     277             : // exists to ensure it is visible in profiles/stack traces if we are looking at
     278             : // cluster copying more than expected.
     279             : //
     280             : // Finishes or Aborts output; does *not* Close input.
     281             : func copyWholeFileBecauseOfUnsupportedFeature(
     282             :         ctx context.Context, input objstorage.Readable, output objstorage.Writable,
     283           1 : ) (size uint64, _ error) {
     284           1 :         length := uint64(input.Size())
     285           1 :         if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
     286           0 :                 output.Abort()
     287           0 :                 return 0, err
     288           0 :         }
     289           1 :         return length, output.Finish()
     290             : }

Generated by: LCOV version 1.14