LCOV - code coverage report
Current view: top level - pebble/sstable - suffix_rewriter.go (source / functions) Hit Total Coverage
Test: 2024-03-28 08:15Z b6e563f6 - tests only.lcov Lines: 289 379 76.3 %
Date: 2024-03-28 08:16:17 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "bytes"
       5             :         "context"
       6             :         "math"
       7             :         "sync"
       8             : 
       9             :         "github.com/cespare/xxhash/v2"
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/internal/rangekey"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             : )
      17             : 
      18             : // RewriteKeySuffixes is deprecated.
      19             : //
      20             : // TODO(sumeer): remove after switching CockroachDB to RewriteKeySuffixesAndReturnFormat.
      21             : func RewriteKeySuffixes(
      22             :         sst []byte,
      23             :         rOpts ReaderOptions,
      24             :         out objstorage.Writable,
      25             :         o WriterOptions,
      26             :         from, to []byte,
      27             :         concurrency int,
      28           0 : ) (*WriterMetadata, error) {
      29           0 :         meta, _, err := RewriteKeySuffixesAndReturnFormat(sst, rOpts, out, o, from, to, concurrency)
      30           0 :         return meta, err
      31           0 : }
      32             : 
      33             : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
      34             : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
      35             : // replaced with `to` in every key. The input sstable must consist of only
      36             : // Sets or RangeKeySets and every key must have `from` as its suffix as
      37             : // determined by the Split function of the Comparer in the passed
      38             : // WriterOptions. Range deletes must not exist in this sstable, as they will
      39             : // be ignored.
      40             : //
      41             : // Data blocks are rewritten in parallel by `concurrency` workers and then
      42             : // assembled into a final SST. Filters are copied from the original SST without
      43             : // modification as they are not affected by the suffix, while block and table
      44             : // properties are only minimally recomputed.
      45             : //
      46             : // TODO(sumeer): document limitations, if any, due to this limited
      47             : // re-computation of properties (is there any loss of fidelity?).
      48             : //
      49             : // Any block property collectors configured in the WriterOptions must implement
      50             : // SuffixReplaceableBlockCollector.
      51             : //
      52             : // The WriterOptions.TableFormat is ignored, and the output sstable has the
      53             : // same TableFormat as the input, which is returned in case the caller wants
      54             : // to do some error checking. Suffix rewriting is meant to be efficient, and
      55             : // allowing changes in the TableFormat detracts from that efficiency.
      56             : //
      57             : // Any obsolete bits that key-value pairs may be annotated with are ignored
      58             : // and lost during the rewrite. Additionally, the output sstable has the
      59             : // pebble.obsolete.is_strict property set to false. These limitations could be
      60             : // removed if needed. The current use case for
      61             : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
      62             : // ingestion, where these files do not contain RANGEDELs and have one
      63             : // key-value pair per userkey -- so they trivially satisfy the strict
      64             : // criteria, and we don't need the obsolete bit as a performance optimization.
      65             : // For disaggregated storage, strict obsolete sstables are needed for L5 and
      66             : // L6, but at the time of writing, we expect such MVCC-compliant file
      67             : // ingestion to only ingest into levels L4 and higher. If this changes, we can
      68             : // do one of two things to get rid of this limitation:
      69             : //   - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
      70             : //     in the sstable to be rewritten. Validating no duplicate userkeys is
      71             : //     non-trivial when rewriting blocks in parallel, so we could encode the
      72             : //     pre-existing condition in the (existing) SnapshotPinnedKeys property --
      73             : //     we need to update the external sst writer to calculate and encode this
      74             : //     property.
      75             : //   - Preserve the obsolete bit (with changes to the blockIter).
      76             : func RewriteKeySuffixesAndReturnFormat(
      77             :         sst []byte,
      78             :         rOpts ReaderOptions,
      79             :         out objstorage.Writable,
      80             :         o WriterOptions,
      81             :         from, to []byte,
      82             :         concurrency int,
      83           0 : ) (*WriterMetadata, TableFormat, error) {
      84           0 :         r, err := NewMemReader(sst, rOpts)
      85           0 :         if err != nil {
      86           0 :                 return nil, TableFormatUnspecified, err
      87           0 :         }
      88           0 :         defer r.Close()
      89           0 :         return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
      90             : }
      91             : 
      92             : func rewriteKeySuffixesInBlocks(
      93             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
      94           1 : ) (*WriterMetadata, TableFormat, error) {
      95           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
      96           1 :                 return nil, TableFormatUnspecified,
      97           1 :                         errors.New("a valid splitter is required to rewrite suffixes")
      98           1 :         }
      99           1 :         if concurrency < 1 {
     100           0 :                 return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
     101           0 :         }
     102             :         // Even though NumValueBlocks = 0 => NumValuesInValueBlocks = 0, check both
     103             :         // as a defensive measure.
     104           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0 {
     105           0 :                 return nil, TableFormatUnspecified,
     106           0 :                         errors.New("sstable with a single suffix should not have value blocks")
     107           0 :         }
     108             : 
     109           1 :         tableFormat := r.tableFormat
     110           1 :         o.TableFormat = tableFormat
     111           1 :         w := NewWriter(out, o)
     112           1 :         defer func() {
     113           1 :                 if w != nil {
     114           1 :                         w.Close()
     115           1 :                 }
     116             :         }()
     117             : 
     118           1 :         for _, c := range w.blockPropCollectors {
     119           1 :                 if _, ok := c.(SuffixReplaceableBlockCollector); !ok {
     120           0 :                         return nil, TableFormatUnspecified,
     121           0 :                                 errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     122           0 :                 }
     123             :         }
     124             : 
     125           1 :         l, err := r.Layout()
     126           1 :         if err != nil {
     127           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
     128           0 :         }
     129             : 
     130           1 :         if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
     131           1 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
     132           1 :         }
     133             : 
     134             :         // Copy over the range key block and replace suffixes in it if it exists.
     135           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     136           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting range key blocks")
     137           0 :         }
     138             : 
     139             :         // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
     140             :         // already have ensured this is valid if it exists).
     141           1 :         if w.filter != nil && l.Filter.Length > 0 {
     142           1 :                 filterBlock, _, err := readBlockBuf(r, l.Filter, nil)
     143           1 :                 if err != nil {
     144           0 :                         return nil, TableFormatUnspecified, errors.Wrap(err, "reading filter")
     145           0 :                 }
     146           1 :                 w.filter = copyFilterWriter{
     147           1 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
     148           1 :                 }
     149             :         }
     150             : 
     151           1 :         if err := w.Close(); err != nil {
     152           0 :                 w = nil
     153           0 :                 return nil, TableFormatUnspecified, err
     154           0 :         }
     155           1 :         writerMeta, err := w.Metadata()
     156           1 :         w = nil
     157           1 :         return writerMeta, tableFormat, err
     158             : }
     159             : 
     160             : var errBadKind = errors.New("key does not have expected kind (set)")
     161             : 
     162             : type blockWithSpan struct {
     163             :         start, end InternalKey
     164             :         data       []byte
     165             : }
     166             : 
     167             : func rewriteBlocks(
     168             :         r *Reader,
     169             :         restartInterval int,
     170             :         checksumType ChecksumType,
     171             :         compression Compression,
     172             :         input []BlockHandleWithProperties,
     173             :         output []blockWithSpan,
     174             :         totalWorkers, worker int,
     175             :         from, to []byte,
     176             :         split Split,
     177           1 : ) error {
     178           1 :         bw := blockWriter{
     179           1 :                 restartInterval: restartInterval,
     180           1 :         }
     181           1 :         buf := blockBuf{checksummer: checksummer{checksumType: checksumType}}
     182           1 :         if checksumType == ChecksumTypeXXHash {
     183           0 :                 buf.checksummer.xxHasher = xxhash.New()
     184           0 :         }
     185             : 
     186           1 :         var blockAlloc bytealloc.A
     187           1 :         var keyAlloc bytealloc.A
     188           1 :         var scratch InternalKey
     189           1 : 
     190           1 :         var inputBlock, inputBlockBuf []byte
     191           1 : 
     192           1 :         iter := &blockIter{}
     193           1 : 
     194           1 :         // We'll assume all blocks are _roughly_ equal so round-robin static partition
     195           1 :         // of each worker doing every ith block is probably enough.
     196           1 :         for i := worker; i < len(input); i += totalWorkers {
     197           1 :                 bh := input[i]
     198           1 : 
     199           1 :                 var err error
     200           1 :                 inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.BlockHandle, inputBlockBuf)
     201           1 :                 if err != nil {
     202           0 :                         return err
     203           0 :                 }
     204           1 :                 if err := iter.init(r.Compare, r.Split, inputBlock, NoTransforms); err != nil {
     205           0 :                         return err
     206           0 :                 }
     207             : 
     208           1 :                 if cap(bw.restarts) < int(iter.restarts) {
     209           1 :                         bw.restarts = make([]uint32, 0, iter.restarts)
     210           1 :                 }
     211           1 :                 if cap(bw.buf) == 0 {
     212           1 :                         bw.buf = make([]byte, 0, len(inputBlock))
     213           1 :                 }
     214           1 :                 if cap(bw.restarts) < int(iter.numRestarts) {
     215           0 :                         bw.restarts = make([]uint32, 0, iter.numRestarts)
     216           0 :                 }
     217             : 
     218           1 :                 for key, val := iter.First(); key != nil; key, val = iter.Next() {
     219           1 :                         if key.Kind() != InternalKeyKindSet {
     220           0 :                                 return errBadKind
     221           0 :                         }
     222           1 :                         si := split(key.UserKey)
     223           1 :                         oldSuffix := key.UserKey[si:]
     224           1 :                         if !bytes.Equal(oldSuffix, from) {
     225           1 :                                 err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     226           1 :                                 return err
     227           1 :                         }
     228           1 :                         newLen := si + len(to)
     229           1 :                         if cap(scratch.UserKey) < newLen {
     230           1 :                                 scratch.UserKey = make([]byte, 0, len(key.UserKey)*2+len(to)-len(from))
     231           1 :                         }
     232             : 
     233           1 :                         scratch.Trailer = key.Trailer
     234           1 :                         scratch.UserKey = scratch.UserKey[:newLen]
     235           1 :                         copy(scratch.UserKey, key.UserKey[:si])
     236           1 :                         copy(scratch.UserKey[si:], to)
     237           1 : 
     238           1 :                         // NB: for TableFormatPebblev3 and higher, since
     239           1 :                         // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
     240           1 :                         // in the block, which includes the 1-byte prefix. This is fine since bw
     241           1 :                         // also does not know about the prefix and will preserve it in bw.add.
     242           1 :                         v := val.InPlaceValue()
     243           1 :                         if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
     244           1 :                                 key.Kind() == InternalKeyKindSet {
     245           1 :                                 if len(v) < 1 {
     246           0 :                                         return errors.Errorf("value has no prefix")
     247           0 :                                 }
     248           1 :                                 prefix := valuePrefix(v[0])
     249           1 :                                 if isValueHandle(prefix) {
     250           0 :                                         return errors.Errorf("value prefix is incorrect")
     251           0 :                                 }
     252           1 :                                 if setHasSamePrefix(prefix) {
     253           0 :                                         return errors.Errorf("multiple keys with same key prefix")
     254           0 :                                 }
     255             :                         }
     256           1 :                         bw.add(scratch, v)
     257           1 :                         if output[i].start.UserKey == nil {
     258           1 :                                 keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
     259           1 :                         }
     260             :                 }
     261           1 :                 *iter = iter.resetForReuse()
     262           1 : 
     263           1 :                 keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)
     264           1 : 
     265           1 :                 finished := compressAndChecksum(bw.finish(), compression, &buf)
     266           1 : 
     267           1 :                 // copy our finished block into the output buffer.
     268           1 :                 blockAlloc, output[i].data = blockAlloc.Alloc(len(finished) + blockTrailerLen)
     269           1 :                 copy(output[i].data, finished)
     270           1 :                 copy(output[i].data[len(finished):], buf.tmp[:blockTrailerLen])
     271             :         }
     272           1 :         return nil
     273             : }
     274             : 
     275           1 : func checkWriterFilterMatchesReader(r *Reader, w *Writer) error {
     276           1 :         if r.Properties.FilterPolicyName != w.filter.policyName() {
     277           0 :                 return errors.New("mismatched filters")
     278           0 :         }
     279           1 :         if was, is := r.Properties.ComparerName, w.props.ComparerName; was != is {
     280           1 :                 return errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters", was, is)
     281           1 :         }
     282           1 :         return nil
     283             : }
     284             : 
     285             : func rewriteDataBlocksToWriter(
     286             :         r *Reader,
     287             :         w *Writer,
     288             :         data []BlockHandleWithProperties,
     289             :         from, to []byte,
     290             :         split Split,
     291             :         concurrency int,
     292           1 : ) error {
     293           1 :         if r.Properties.NumEntries == 0 {
     294           1 :                 // No point keys.
     295           1 :                 return nil
     296           1 :         }
     297           1 :         blocks := make([]blockWithSpan, len(data))
     298           1 : 
     299           1 :         if w.filter != nil {
     300           1 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     301           1 :                         return err
     302           1 :                 }
     303             :         }
     304             : 
     305           1 :         g := &sync.WaitGroup{}
     306           1 :         g.Add(concurrency)
     307           1 :         errCh := make(chan error, concurrency)
     308           1 :         for i := 0; i < concurrency; i++ {
     309           1 :                 worker := i
     310           1 :                 go func() {
     311           1 :                         defer g.Done()
     312           1 :                         err := rewriteBlocks(
     313           1 :                                 r,
     314           1 :                                 w.dataBlockBuf.dataBlock.restartInterval,
     315           1 :                                 w.blockBuf.checksummer.checksumType,
     316           1 :                                 w.compression,
     317           1 :                                 data,
     318           1 :                                 blocks,
     319           1 :                                 concurrency,
     320           1 :                                 worker,
     321           1 :                                 from, to,
     322           1 :                                 split,
     323           1 :                         )
     324           1 :                         if err != nil {
     325           1 :                                 errCh <- err
     326           1 :                         }
     327             :                 }()
     328             :         }
     329           1 :         g.Wait()
     330           1 :         close(errCh)
     331           1 :         if err, ok := <-errCh; ok {
     332           1 :                 return err
     333           1 :         }
     334             : 
     335           1 :         var decoder blockPropertiesDecoder
     336           1 :         var oldShortIDs []shortID
     337           1 :         var oldProps [][]byte
     338           1 :         if len(w.blockPropCollectors) > 0 {
     339           1 :                 oldProps = make([][]byte, len(w.blockPropCollectors))
     340           1 :                 oldShortIDs = make([]shortID, math.MaxUint8)
     341           1 :                 for i, p := range w.blockPropCollectors {
     342           1 :                         if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
     343           1 :                                 was, is := shortID(byte(prop[0])), shortID(i)
     344           1 :                                 oldShortIDs[was] = is
     345           1 :                         }
     346             :                 }
     347             :         }
     348             : 
     349           1 :         for i := range blocks {
     350           1 :                 // Write the rewritten block to the file.
     351           1 :                 if err := w.writable.Write(blocks[i].data); err != nil {
     352           0 :                         return err
     353           0 :                 }
     354             : 
     355           1 :                 n := len(blocks[i].data)
     356           1 :                 bh := BlockHandle{Offset: w.meta.Size, Length: uint64(n) - blockTrailerLen}
     357           1 :                 // Update the overall size.
     358           1 :                 w.meta.Size += uint64(n)
     359           1 : 
     360           1 :                 // Load any previous values for our prop collectors into oldProps.
     361           1 :                 for i := range oldProps {
     362           1 :                         oldProps[i] = nil
     363           1 :                 }
     364           1 :                 decoder.props = data[i].Props
     365           1 :                 for !decoder.done() {
     366           1 :                         id, val, err := decoder.next()
     367           1 :                         if err != nil {
     368           0 :                                 return err
     369           0 :                         }
     370           1 :                         oldProps[oldShortIDs[id]] = val
     371             :                 }
     372             : 
     373           1 :                 for i, p := range w.blockPropCollectors {
     374           1 :                         if err := p.(SuffixReplaceableBlockCollector).UpdateKeySuffixes(oldProps[i], from, to); err != nil {
     375           0 :                                 return err
     376           0 :                         }
     377             :                 }
     378             : 
     379           1 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
     380           1 :                 if err != nil {
     381           0 :                         return err
     382           0 :                 }
     383           1 :                 var nextKey InternalKey
     384           1 :                 if i+1 < len(blocks) {
     385           1 :                         nextKey = blocks[i+1].start
     386           1 :                 }
     387           1 :                 if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
     388           0 :                         return err
     389           0 :                 }
     390             :         }
     391             : 
     392           1 :         w.meta.updateSeqNum(blocks[0].start.SeqNum())
     393           1 :         w.props.NumEntries = r.Properties.NumEntries
     394           1 :         w.props.RawKeySize = r.Properties.RawKeySize
     395           1 :         w.props.RawValueSize = r.Properties.RawValueSize
     396           1 :         w.meta.SetSmallestPointKey(blocks[0].start)
     397           1 :         w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
     398           1 :         return nil
     399             : }
     400             : 
     401           1 : func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
     402           1 :         iter, err := r.NewRawRangeKeyIter(NoTransforms)
     403           1 :         if err != nil {
     404           0 :                 return err
     405           0 :         }
     406           1 :         if iter == nil {
     407           1 :                 // No range keys.
     408           1 :                 return nil
     409           1 :         }
     410           1 :         defer iter.Close()
     411           1 : 
     412           1 :         s, err := iter.First()
     413           1 :         for ; s != nil; s, err = iter.Next() {
     414           1 :                 if !s.Valid() {
     415           0 :                         break
     416             :                 }
     417           1 :                 for i := range s.Keys {
     418           1 :                         if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
     419           0 :                                 return errBadKind
     420           0 :                         }
     421           1 :                         if !bytes.Equal(s.Keys[i].Suffix, from) {
     422           0 :                                 return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
     423           0 :                         }
     424           1 :                         s.Keys[i].Suffix = to
     425             :                 }
     426             : 
     427           1 :                 err = rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
     428           1 :                         // Calling AddRangeKey instead of addRangeKeySpan bypasses the fragmenter.
     429           1 :                         // This is okay because the raw fragments off of `iter` are already
     430           1 :                         // fragmented, and suffix replacement should not affect fragmentation.
     431           1 :                         return w.AddRangeKey(k, v)
     432           1 :                 })
     433           1 :                 if err != nil {
     434           0 :                         return err
     435           0 :                 }
     436             :         }
     437           1 :         return err
     438             : }
     439             : 
     440             : type copyFilterWriter struct {
     441             :         origMetaName   string
     442             :         origPolicyName string
     443             :         data           []byte
     444             : }
     445             : 
     446           0 : func (copyFilterWriter) addKey(key []byte)         { panic("unimplemented") }
     447           1 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
     448           1 : func (c copyFilterWriter) metaName() string        { return c.origMetaName }
     449           1 : func (c copyFilterWriter) policyName() string      { return c.origPolicyName }
     450             : 
     451             : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
     452             : // single loop over the Reader that writes each key to the Writer with the new
     453             : // suffix. The is significantly slower than the parallelized rewriter, and does
     454             : // more work to rederive filters, props, etc.
     455             : //
     456             : // Any obsolete bits that key-value pairs may be annotated with are ignored
     457             : // and lost during the rewrite. Some of the obsolete bits may be recreated --
     458             : // specifically when there are multiple keys with the same user key.
     459             : // Additionally, the output sstable has the pebble.obsolete.is_strict property
     460             : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
     461             : func RewriteKeySuffixesViaWriter(
     462             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
     463           1 : ) (*WriterMetadata, error) {
     464           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
     465           0 :                 return nil, errors.New("a valid splitter is required to rewrite suffixes")
     466           0 :         }
     467             : 
     468           1 :         o.IsStrictObsolete = false
     469           1 :         w := NewWriter(out, o)
     470           1 :         defer func() {
     471           1 :                 if w != nil {
     472           0 :                         w.Close()
     473           0 :                 }
     474             :         }()
     475           1 :         i, err := r.NewIter(NoTransforms, nil, nil)
     476           1 :         if err != nil {
     477           0 :                 return nil, err
     478           0 :         }
     479           1 :         defer i.Close()
     480           1 : 
     481           1 :         k, v := i.First()
     482           1 :         var scratch InternalKey
     483           1 :         for k != nil {
     484           1 :                 if k.Kind() != InternalKeyKindSet {
     485           0 :                         return nil, errors.New("invalid key type")
     486           0 :                 }
     487           1 :                 oldSuffix := k.UserKey[r.Split(k.UserKey):]
     488           1 :                 if !bytes.Equal(oldSuffix, from) {
     489           0 :                         return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     490           0 :                 }
     491           1 :                 scratch.UserKey = append(scratch.UserKey[:0], k.UserKey[:len(k.UserKey)-len(from)]...)
     492           1 :                 scratch.UserKey = append(scratch.UserKey, to...)
     493           1 :                 scratch.Trailer = k.Trailer
     494           1 : 
     495           1 :                 val, _, err := v.Value(nil)
     496           1 :                 if err != nil {
     497           0 :                         return nil, err
     498           0 :                 }
     499           1 :                 if w.addPoint(scratch, val, false); err != nil {
     500           0 :                         return nil, err
     501           0 :                 }
     502           1 :                 k, v = i.Next()
     503             :         }
     504           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     505           0 :                 return nil, err
     506           0 :         }
     507           1 :         if err := w.Close(); err != nil {
     508           0 :                 w = nil
     509           0 :                 return nil, err
     510           0 :         }
     511           1 :         writerMeta, err := w.Metadata()
     512           1 :         w = nil
     513           1 :         return writerMeta, err
     514             : }
     515             : 
     516             : // NewMemReader opens a reader over the SST stored in the passed []byte.
     517           1 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
     518           1 :         return NewReader(newMemReader(sst), o)
     519           1 : }
     520             : 
     521           1 : func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) {
     522           1 :         raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen]
     523           1 :         if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
     524           0 :                 return nil, buf, err
     525           0 :         }
     526           1 :         typ := blockType(raw[bh.Length])
     527           1 :         raw = raw[:bh.Length]
     528           1 :         if typ == noCompressionBlockType {
     529           1 :                 return raw, buf, nil
     530           1 :         }
     531           1 :         decompressedLen, prefix, err := decompressedLen(typ, raw)
     532           1 :         if err != nil {
     533           0 :                 return nil, buf, err
     534           0 :         }
     535           1 :         if cap(buf) < decompressedLen {
     536           1 :                 buf = make([]byte, decompressedLen)
     537           1 :         }
     538           1 :         dst := buf[:decompressedLen]
     539           1 :         err = decompressInto(typ, raw[prefix:], dst)
     540           1 :         return dst, buf, err
     541             : }
     542             : 
     543             : // memReader is a thin wrapper around a []byte such that it can be passed to
     544             : // sstable.Reader. It supports concurrent use, and does so without locking in
     545             : // contrast to the heavier read/write vfs.MemFile.
     546             : type memReader struct {
     547             :         b  []byte
     548             :         r  *bytes.Reader
     549             :         rh objstorage.NoopReadHandle
     550             : }
     551             : 
     552             : var _ objstorage.Readable = (*memReader)(nil)
     553             : 
     554           1 : func newMemReader(b []byte) *memReader {
     555           1 :         r := &memReader{
     556           1 :                 b: b,
     557           1 :                 r: bytes.NewReader(b),
     558           1 :         }
     559           1 :         r.rh = objstorage.MakeNoopReadHandle(r)
     560           1 :         return r
     561           1 : }
     562             : 
     563             : // ReadAt is part of objstorage.Readable.
     564           1 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
     565           1 :         n, err := m.r.ReadAt(p, off)
     566           1 :         if invariants.Enabled && err == nil && n != len(p) {
     567           0 :                 panic("short read")
     568             :         }
     569           1 :         return err
     570             : }
     571             : 
     572             : // Close is part of objstorage.Readable.
     573           1 : func (*memReader) Close() error {
     574           1 :         return nil
     575           1 : }
     576             : 
     577             : // Stat is part of objstorage.Readable.
     578           1 : func (m *memReader) Size() int64 {
     579           1 :         return int64(len(m.b))
     580           1 : }
     581             : 
     582             : // NewReadHandle is part of objstorage.Readable.
     583           1 : func (m *memReader) NewReadHandle(_ context.Context) objstorage.ReadHandle {
     584           1 :         return &m.rh
     585           1 : }

Generated by: LCOV version 1.14