LCOV - code coverage report
Current view: top level - pebble/sstable - copier.go (source / functions) Hit Total Coverage
Test: 2024-05-29 08:16Z 7cdb5eed - tests only.lcov Lines: 115 168 68.5 %
Date: 2024-05-29 08:16:55 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/objstorage"
      14             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      15             : )
      16             : 
      17             : // CopySpan produces a copy of a approximate subset of an input sstable.
      18             : //
      19             : // The produced sstable contains all keys from the input sstable in the span
      20             : // [start, end), as well as potentially some additional keys from the original
      21             : // file that were adjacent to but outside that span.
      22             : //
      23             : // CopySpan differs from simply seeking a reader to start and iterating until
      24             : // the end passing the results to a writer in that it does not write the new
      25             : // sstable from scratch, key-by-key, recompressing each key into new blocks and
      26             : // computing new filters and properties. Instead, it finds data _blocks_ that
      27             : // intersect the requested span and copies those, whole, to the new file,
      28             : // avoiding all decompression and recompression work. It then copies the
      29             : // original bloom filter - this filter is valid for the subset of data as well,
      30             : // just with potentially a higher false positive rate compared to one that would
      31             : // be computed just from the keys in it.
      32             : //
      33             : // The resulting sstable will have no block properties.
      34             : //
      35             : // The function might return ErrEmptySpan if there are no blocks that could
      36             : // include keys in the given range. See ErrEmptySpan for more details.
      37             : //
      38             : // Closes input and finishes or aborts output in all cases, including on errors.
      39             : //
      40             : // Note that CopySpan is not aware of any suffix or prefix replacement; the
      41             : // caller must account for those when specifying the bounds.
      42             : func CopySpan(
      43             :         ctx context.Context,
      44             :         input objstorage.Readable,
      45             :         rOpts ReaderOptions,
      46             :         output objstorage.Writable,
      47             :         o WriterOptions,
      48             :         start, end InternalKey,
      49           1 : ) (size uint64, _ error) {
      50           1 :         r, err := NewReader(input, rOpts)
      51           1 :         if err != nil {
      52           0 :                 input.Close()
      53           0 :                 output.Abort()
      54           0 :                 return 0, err
      55           0 :         }
      56           1 :         defer r.Close() // r.Close now owns calling input.Close().
      57           1 : 
      58           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      59           0 :                 return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
      60           0 :         }
      61             : 
      62             :         // If our input has not filters, our output cannot have filters either.
      63           1 :         if r.tableFilter == nil {
      64           1 :                 o.FilterPolicy = nil
      65           1 :         }
      66           1 :         o.TableFormat = r.tableFormat
      67           1 :         w := NewWriter(output, o)
      68           1 : 
      69           1 :         // We don't want the writer to attempt to write out block property data in
      70           1 :         // index blocks. This data won't be valid since we're not passing the actual
      71           1 :         // key data through the writer. We also remove the table-level properties
      72           1 :         // below.
      73           1 :         //
      74           1 :         // TODO(dt,radu): Figure out how to populate the prop collector state with
      75           1 :         // block props from the original sst.
      76           1 :         w.blockPropCollectors = nil
      77           1 : 
      78           1 :         defer func() {
      79           1 :                 if w != nil {
      80           1 :                         // set w.err to any non-nil error just so it aborts instead of finishing.
      81           1 :                         w.err = base.ErrNotFound
      82           1 :                         // w.Close now owns calling output.Abort().
      83           1 :                         w.Close()
      84           1 :                 }
      85             :         }()
      86             : 
      87           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
      88           0 :                 // We just checked for these conditions above.
      89           0 :                 return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
      90           0 :         }
      91             : 
      92           1 :         var preallocRH objstorageprovider.PreallocatedReadHandle
      93           1 :         // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
      94           1 :         // and lower-level index blocks with one read.
      95           1 :         rh := objstorageprovider.UsePreallocatedReadHandle(
      96           1 :                 ctx, r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
      97           1 :         defer rh.Close()
      98           1 :         indexH, err := r.readIndex(ctx, rh, nil, nil)
      99           1 :         if err != nil {
     100           0 :                 return 0, err
     101           0 :         }
     102           1 :         defer indexH.Release()
     103           1 : 
     104           1 :         // Set the filter block to be copied over if it exists. It will return false
     105           1 :         // positives for keys in blocks of the original file that we don't copy, but
     106           1 :         // filters can always have false positives, so this is fine.
     107           1 :         if r.tableFilter != nil {
     108           0 :                 filterBlock, err := r.readFilter(ctx, rh, nil, nil)
     109           0 :                 if err != nil {
     110           0 :                         return 0, errors.Wrap(err, "reading filter")
     111           0 :                 }
     112           0 :                 filterBytes := append([]byte{}, filterBlock.Get()...)
     113           0 :                 filterBlock.Release()
     114           0 :                 w.filter = copyFilterWriter{
     115           0 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
     116           0 :                 }
     117             :         }
     118             : 
     119             :         // Copy all the props from the source file; we can't compute our own for many
     120             :         // that depend on seeing every key, such as total count or size so we copy the
     121             :         // original props instead. This will result in over-counts but that is safer
     122             :         // than under-counts.
     123           1 :         w.props = r.Properties
     124           1 :         // Remove all user properties to disable block properties, which we do not
     125           1 :         // calculate.
     126           1 :         w.props.UserProperties = nil
     127           1 :         // Reset props that we'll re-derive as we build our own index.
     128           1 :         w.props.IndexPartitions = 0
     129           1 :         w.props.TopLevelIndexSize = 0
     130           1 :         w.props.IndexSize = 0
     131           1 :         w.props.IndexType = 0
     132           1 :         if w.filter != nil {
     133           0 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     134           0 :                         return 0, err
     135           0 :                 }
     136             :         }
     137             : 
     138             :         // Find the blocks that intersect our span.
     139           1 :         blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
     140           1 :         if err != nil {
     141           0 :                 return 0, err
     142           0 :         }
     143             : 
     144             :         // In theory an empty SST is fine, but #3409 means they are not. We could make
     145             :         // a non-empty sst by copying something outside the span, but #3907 means that
     146             :         // the empty virtual span would still be a problem, so don't bother.
     147           1 :         if len(blocks) < 1 {
     148           1 :                 return 0, ErrEmptySpan
     149           1 :         }
     150             : 
     151             :         // Find the span of the input file that contains all our blocks, and then copy
     152             :         // it byte-for-byte without doing any per-key processing.
     153           1 :         offset := blocks[0].bh.Offset
     154           1 : 
     155           1 :         // The block lengths don't include their trailers, which just sit after the
     156           1 :         // block length, before the next offset; We get the ones between the blocks
     157           1 :         // we copy implicitly but need to explicitly add the last trailer to length.
     158           1 :         length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + blockTrailerLen - offset
     159           1 : 
     160           1 :         if spanEnd := length + offset; spanEnd < offset {
     161           0 :                 return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
     162           0 :         }
     163             : 
     164           1 :         if err := objstorage.Copy(ctx, r.readable, w.writable, offset, length); err != nil {
     165           0 :                 return 0, err
     166           0 :         }
     167             :         // Update w.meta.Size so subsequently flushed metadata has correct offsets.
     168           1 :         w.meta.Size += length
     169           1 : 
     170           1 :         // Now we can setup index entries for all the blocks we just copied, pointing
     171           1 :         // into the copied span.
     172           1 :         for i := range blocks {
     173           1 :                 blocks[i].bh.Offset -= offset
     174           1 :                 if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
     175           0 :                         return 0, err
     176           0 :                 }
     177             :         }
     178             : 
     179             :         // TODO(dt): Copy range keys (the fact there are none is checked above).
     180             :         // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
     181             : 
     182           1 :         if err := w.Close(); err != nil {
     183           0 :                 w = nil
     184           0 :                 return 0, err
     185           0 :         }
     186           1 :         wrote := w.meta.Size
     187           1 :         w = nil
     188           1 :         return wrote, nil
     189             : }
     190             : 
     191             : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
     192             : // requested span.
     193             : //
     194             : // Note that CopySpan's determination of block overlap is best effort - we may
     195             : // copy a block that doesn't actually contain any keys in the span, in which
     196             : // case we won't generate this error. We currently only generate this error when
     197             : // the span start is beyond all keys in the physical sstable.
     198             : var ErrEmptySpan = errors.New("cannot copy empty span")
     199             : 
     200             : // indexEntry captures the two components of an sst index entry: the key and the
     201             : // decoded block handle value.
     202             : type indexEntry struct {
     203             :         sep InternalKey
     204             :         bh  BlockHandleWithProperties
     205             : }
     206             : 
     207             : // intersectingIndexEntries returns the entries from the index with separator
     208             : // keys contained by [start, end), i.e. the subset of the sst's index that
     209             : // intersects the provided span.
     210             : func intersectingIndexEntries(
     211             :         ctx context.Context,
     212             :         r *Reader,
     213             :         rh objstorage.ReadHandle,
     214             :         indexH bufferHandle,
     215             :         start, end InternalKey,
     216           1 : ) ([]indexEntry, error) {
     217           1 :         top, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
     218           1 :         if err != nil {
     219           0 :                 return nil, err
     220           0 :         }
     221           1 :         defer top.Close()
     222           1 : 
     223           1 :         var alloc bytealloc.A
     224           1 :         res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
     225           1 :         for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
     226           1 :                 bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     227           1 :                 if err != nil {
     228           0 :                         return nil, err
     229           0 :                 }
     230           1 :                 if r.Properties.IndexType != twoLevelIndex {
     231           1 :                         entry := indexEntry{bh: bh, sep: kv.K}
     232           1 :                         alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     233           1 :                         alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     234           1 :                         res = append(res, entry)
     235           1 :                 } else {
     236           1 :                         subBlk, err := r.readBlock(ctx, bh.BlockHandle, nil, rh, nil, nil, nil)
     237           1 :                         if err != nil {
     238           0 :                                 return nil, err
     239           0 :                         }
     240           1 :                         defer subBlk.Release() // in-loop, but it is a short loop.
     241           1 : 
     242           1 :                         sub, err := newBlockIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
     243           1 :                         if err != nil {
     244           0 :                                 return nil, err
     245           0 :                         }
     246           1 :                         defer sub.Close() // in-loop, but it is a short loop.
     247           1 : 
     248           1 :                         for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
     249           1 :                                 bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
     250           1 :                                 if err != nil {
     251           0 :                                         return nil, err
     252           0 :                                 }
     253           1 :                                 entry := indexEntry{bh: bh, sep: kv.K}
     254           1 :                                 alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
     255           1 :                                 alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
     256           1 :                                 res = append(res, entry)
     257           1 :                                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     258           1 :                                         break
     259             :                                 }
     260             :                         }
     261           1 :                         if err := sub.Error(); err != nil {
     262           0 :                                 return nil, err
     263           0 :                         }
     264             :                 }
     265           1 :                 if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
     266           1 :                         break
     267             :                 }
     268             :         }
     269           1 :         return res, top.Error()
     270             : }
     271             : 
     272             : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
     273             : // exists to ensure it is visible in profiles/stack traces if we are looking at
     274             : // cluster copying more than expected.
     275             : //
     276             : // Finishes or Aborts output; does *not* Close input.
     277             : func copyWholeFileBecauseOfUnsupportedFeature(
     278             :         ctx context.Context, input objstorage.Readable, output objstorage.Writable,
     279           0 : ) (size uint64, _ error) {
     280           0 :         length := uint64(input.Size())
     281           0 :         if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
     282           0 :                 output.Abort()
     283           0 :                 return 0, err
     284           0 :         }
     285           0 :         return length, output.Finish()
     286             : }

Generated by: LCOV version 1.14