LCOV - code coverage report
Current view: top level - pebble/sstable - suffix_rewriter.go (source / functions) Hit Total Coverage
Test: 2024-05-10 08:16Z a43bb5d5 - tests only.lcov Lines: 295 383 77.0 %
Date: 2024-05-10 08:18:12 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "bytes"
       5             :         "context"
       6             :         "math"
       7             :         "sync"
       8             : 
       9             :         "github.com/cespare/xxhash/v2"
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/internal/rangekey"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             : )
      17             : 
      18             : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
      19             : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
      20             : // replaced with `to` in every key. The input sstable must consist of only
      21             : // Sets or RangeKeySets and every key must have `from` as its suffix as
      22             : // determined by the Split function of the Comparer in the passed
      23             : // WriterOptions. Range deletes must not exist in this sstable, as they will
      24             : // be ignored.
      25             : //
      26             : // Data blocks are rewritten in parallel by `concurrency` workers and then
      27             : // assembled into a final SST. Filters are copied from the original SST without
      28             : // modification as they are not affected by the suffix, while block and table
      29             : // properties are only minimally recomputed.
      30             : //
      31             : // TODO(sumeer): document limitations, if any, due to this limited
      32             : // re-computation of properties (is there any loss of fidelity?).
      33             : //
      34             : // Any block property collectors configured in the WriterOptions must implement
      35             : // AddCollectedWithSuffixChange.
      36             : //
      37             : // The WriterOptions.TableFormat is ignored, and the output sstable has the
      38             : // same TableFormat as the input, which is returned in case the caller wants
      39             : // to do some error checking. Suffix rewriting is meant to be efficient, and
      40             : // allowing changes in the TableFormat detracts from that efficiency.
      41             : //
      42             : // Any obsolete bits that key-value pairs may be annotated with are ignored
      43             : // and lost during the rewrite. Additionally, the output sstable has the
      44             : // pebble.obsolete.is_strict property set to false. These limitations could be
      45             : // removed if needed. The current use case for
      46             : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
      47             : // ingestion, where these files do not contain RANGEDELs and have one
      48             : // key-value pair per userkey -- so they trivially satisfy the strict
      49             : // criteria, and we don't need the obsolete bit as a performance optimization.
      50             : // For disaggregated storage, strict obsolete sstables are needed for L5 and
      51             : // L6, but at the time of writing, we expect such MVCC-compliant file
      52             : // ingestion to only ingest into levels L4 and higher. If this changes, we can
      53             : // do one of two things to get rid of this limitation:
      54             : //   - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
      55             : //     in the sstable to be rewritten. Validating no duplicate userkeys is
      56             : //     non-trivial when rewriting blocks in parallel, so we could encode the
      57             : //     pre-existing condition in the (existing) SnapshotPinnedKeys property --
      58             : //     we need to update the external sst writer to calculate and encode this
      59             : //     property.
      60             : //   - Preserve the obsolete bit (with changes to the blockIter).
      61             : func RewriteKeySuffixesAndReturnFormat(
      62             :         sst []byte,
      63             :         rOpts ReaderOptions,
      64             :         out objstorage.Writable,
      65             :         o WriterOptions,
      66             :         from, to []byte,
      67             :         concurrency int,
      68           0 : ) (*WriterMetadata, TableFormat, error) {
      69           0 :         r, err := NewMemReader(sst, rOpts)
      70           0 :         if err != nil {
      71           0 :                 return nil, TableFormatUnspecified, err
      72           0 :         }
      73           0 :         defer r.Close()
      74           0 :         return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
      75             : }
      76             : 
      77             : func rewriteKeySuffixesInBlocks(
      78             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
      79           1 : ) (*WriterMetadata, TableFormat, error) {
      80           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
      81           1 :                 return nil, TableFormatUnspecified,
      82           1 :                         errors.New("a valid splitter is required to rewrite suffixes")
      83           1 :         }
      84           1 :         if concurrency < 1 {
      85           0 :                 return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
      86           0 :         }
      87             :         // Even though NumValueBlocks = 0 => NumValuesInValueBlocks = 0, check both
      88             :         // as a defensive measure.
      89           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0 {
      90           0 :                 return nil, TableFormatUnspecified,
      91           0 :                         errors.New("sstable with a single suffix should not have value blocks")
      92           0 :         }
      93             : 
      94           1 :         tableFormat := r.tableFormat
      95           1 :         o.TableFormat = tableFormat
      96           1 :         w := NewWriter(out, o)
      97           1 :         defer func() {
      98           1 :                 if w != nil {
      99           1 :                         w.Close()
     100           1 :                 }
     101             :         }()
     102             : 
     103           1 :         for _, c := range w.blockPropCollectors {
     104           1 :                 if !c.SupportsSuffixReplacement() {
     105           0 :                         return nil, TableFormatUnspecified,
     106           0 :                                 errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     107           0 :                 }
     108             :         }
     109             : 
     110           1 :         l, err := r.Layout()
     111           1 :         if err != nil {
     112           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
     113           0 :         }
     114             : 
     115           1 :         if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
     116           1 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
     117           1 :         }
     118             : 
     119             :         // Copy over the range key block and replace suffixes in it if it exists.
     120           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     121           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting range key blocks")
     122           0 :         }
     123             : 
     124             :         // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
     125             :         // already have ensured this is valid if it exists).
     126           1 :         if w.filter != nil && l.Filter.Length > 0 {
     127           1 :                 filterBlock, _, err := readBlockBuf(r, l.Filter, nil)
     128           1 :                 if err != nil {
     129           0 :                         return nil, TableFormatUnspecified, errors.Wrap(err, "reading filter")
     130           0 :                 }
     131           1 :                 w.filter = copyFilterWriter{
     132           1 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
     133           1 :                 }
     134             :         }
     135             : 
     136           1 :         if err := w.Close(); err != nil {
     137           0 :                 w = nil
     138           0 :                 return nil, TableFormatUnspecified, err
     139           0 :         }
     140           1 :         writerMeta, err := w.Metadata()
     141           1 :         w = nil
     142           1 :         return writerMeta, tableFormat, err
     143             : }
     144             : 
     145             : var errBadKind = errors.New("key does not have expected kind (set)")
     146             : 
     147             : type blockWithSpan struct {
     148             :         start, end InternalKey
     149             :         data       []byte
     150             : }
     151             : 
     152             : func rewriteBlocks(
     153             :         r *Reader,
     154             :         restartInterval int,
     155             :         checksumType ChecksumType,
     156             :         compression Compression,
     157             :         input []BlockHandleWithProperties,
     158             :         output []blockWithSpan,
     159             :         totalWorkers, worker int,
     160             :         from, to []byte,
     161             :         split Split,
     162           1 : ) error {
     163           1 :         bw := blockWriter{
     164           1 :                 restartInterval: restartInterval,
     165           1 :         }
     166           1 :         buf := blockBuf{checksummer: checksummer{checksumType: checksumType}}
     167           1 :         if checksumType == ChecksumTypeXXHash {
     168           0 :                 buf.checksummer.xxHasher = xxhash.New()
     169           0 :         }
     170             : 
     171           1 :         var blockAlloc bytealloc.A
     172           1 :         var keyAlloc bytealloc.A
     173           1 :         var scratch InternalKey
     174           1 : 
     175           1 :         var inputBlock, inputBlockBuf []byte
     176           1 : 
     177           1 :         iter := &blockIter{}
     178           1 : 
     179           1 :         // We'll assume all blocks are _roughly_ equal so round-robin static partition
     180           1 :         // of each worker doing every ith block is probably enough.
     181           1 :         for i := worker; i < len(input); i += totalWorkers {
     182           1 :                 bh := input[i]
     183           1 : 
     184           1 :                 var err error
     185           1 :                 inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.BlockHandle, inputBlockBuf)
     186           1 :                 if err != nil {
     187           0 :                         return err
     188           0 :                 }
     189           1 :                 if err := iter.init(r.Compare, r.Split, inputBlock, NoTransforms); err != nil {
     190           0 :                         return err
     191           0 :                 }
     192             : 
     193           1 :                 if cap(bw.restarts) < int(iter.restarts) {
     194           1 :                         bw.restarts = make([]uint32, 0, iter.restarts)
     195           1 :                 }
     196           1 :                 if cap(bw.buf) == 0 {
     197           1 :                         bw.buf = make([]byte, 0, len(inputBlock))
     198           1 :                 }
     199           1 :                 if cap(bw.restarts) < int(iter.numRestarts) {
     200           0 :                         bw.restarts = make([]uint32, 0, iter.numRestarts)
     201           0 :                 }
     202             : 
     203           1 :                 for kv := iter.First(); kv != nil; kv = iter.Next() {
     204           1 :                         if kv.Kind() != InternalKeyKindSet {
     205           0 :                                 return errBadKind
     206           0 :                         }
     207           1 :                         si := split(kv.K.UserKey)
     208           1 :                         oldSuffix := kv.K.UserKey[si:]
     209           1 :                         if !bytes.Equal(oldSuffix, from) {
     210           1 :                                 err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     211           1 :                                 return err
     212           1 :                         }
     213           1 :                         newLen := si + len(to)
     214           1 :                         if cap(scratch.UserKey) < newLen {
     215           1 :                                 scratch.UserKey = make([]byte, 0, len(kv.K.UserKey)*2+len(to)-len(from))
     216           1 :                         }
     217             : 
     218           1 :                         scratch.Trailer = kv.K.Trailer
     219           1 :                         scratch.UserKey = scratch.UserKey[:newLen]
     220           1 :                         copy(scratch.UserKey, kv.K.UserKey[:si])
     221           1 :                         copy(scratch.UserKey[si:], to)
     222           1 : 
     223           1 :                         // NB: for TableFormatPebblev3 and higher, since
     224           1 :                         // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
     225           1 :                         // in the block, which includes the 1-byte prefix. This is fine since bw
     226           1 :                         // also does not know about the prefix and will preserve it in bw.add.
     227           1 :                         v := kv.InPlaceValue()
     228           1 :                         if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
     229           1 :                                 kv.Kind() == InternalKeyKindSet {
     230           1 :                                 if len(v) < 1 {
     231           0 :                                         return errors.Errorf("value has no prefix")
     232           0 :                                 }
     233           1 :                                 prefix := valuePrefix(v[0])
     234           1 :                                 if isValueHandle(prefix) {
     235           0 :                                         return errors.Errorf("value prefix is incorrect")
     236           0 :                                 }
     237           1 :                                 if setHasSamePrefix(prefix) {
     238           0 :                                         return errors.Errorf("multiple keys with same key prefix")
     239           0 :                                 }
     240             :                         }
     241           1 :                         bw.add(scratch, v)
     242           1 :                         if output[i].start.UserKey == nil {
     243           1 :                                 keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
     244           1 :                         }
     245             :                 }
     246           1 :                 *iter = iter.resetForReuse()
     247           1 : 
     248           1 :                 keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)
     249           1 : 
     250           1 :                 finished := compressAndChecksum(bw.finish(), compression, &buf)
     251           1 : 
     252           1 :                 // copy our finished block into the output buffer.
     253           1 :                 blockAlloc, output[i].data = blockAlloc.Alloc(len(finished) + blockTrailerLen)
     254           1 :                 copy(output[i].data, finished)
     255           1 :                 copy(output[i].data[len(finished):], buf.tmp[:blockTrailerLen])
     256             :         }
     257           1 :         return nil
     258             : }
     259             : 
     260           1 : func checkWriterFilterMatchesReader(r *Reader, w *Writer) error {
     261           1 :         if r.Properties.FilterPolicyName != w.filter.policyName() {
     262           0 :                 return errors.New("mismatched filters")
     263           0 :         }
     264           1 :         if was, is := r.Properties.ComparerName, w.props.ComparerName; was != is {
     265           1 :                 return errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters", was, is)
     266           1 :         }
     267           1 :         return nil
     268             : }
     269             : 
     270             : func rewriteDataBlocksToWriter(
     271             :         r *Reader,
     272             :         w *Writer,
     273             :         data []BlockHandleWithProperties,
     274             :         from, to []byte,
     275             :         split Split,
     276             :         concurrency int,
     277           1 : ) error {
     278           1 :         if r.Properties.NumEntries == 0 {
     279           1 :                 // No point keys.
     280           1 :                 return nil
     281           1 :         }
     282           1 :         blocks := make([]blockWithSpan, len(data))
     283           1 : 
     284           1 :         if w.filter != nil {
     285           1 :                 if err := checkWriterFilterMatchesReader(r, w); err != nil {
     286           1 :                         return err
     287           1 :                 }
     288             :         }
     289             : 
     290           1 :         g := &sync.WaitGroup{}
     291           1 :         g.Add(concurrency)
     292           1 :         errCh := make(chan error, concurrency)
     293           1 :         for i := 0; i < concurrency; i++ {
     294           1 :                 worker := i
     295           1 :                 go func() {
     296           1 :                         defer g.Done()
     297           1 :                         err := rewriteBlocks(
     298           1 :                                 r,
     299           1 :                                 w.dataBlockBuf.dataBlock.restartInterval,
     300           1 :                                 w.blockBuf.checksummer.checksumType,
     301           1 :                                 w.compression,
     302           1 :                                 data,
     303           1 :                                 blocks,
     304           1 :                                 concurrency,
     305           1 :                                 worker,
     306           1 :                                 from, to,
     307           1 :                                 split,
     308           1 :                         )
     309           1 :                         if err != nil {
     310           1 :                                 errCh <- err
     311           1 :                         }
     312             :                 }()
     313             :         }
     314           1 :         g.Wait()
     315           1 :         close(errCh)
     316           1 :         if err, ok := <-errCh; ok {
     317           1 :                 return err
     318           1 :         }
     319             : 
     320             :         // oldShortIDs maps the shortID for the block property collector in the old
     321             :         // blocks to the shortID in the new blocks. Initialized once for the sstable.
     322           1 :         var oldShortIDs []shortID
     323           1 :         // oldProps is the property value in an old block, indexed by the shortID in
     324           1 :         // the new block. Slice is reused for each block.
     325           1 :         var oldProps [][]byte
     326           1 :         if len(w.blockPropCollectors) > 0 {
     327           1 :                 oldProps = make([][]byte, len(w.blockPropCollectors))
     328           1 :                 oldShortIDs = make([]shortID, math.MaxUint8)
     329           1 :                 for i := range oldShortIDs {
     330           1 :                         oldShortIDs[i] = invalidShortID
     331           1 :                 }
     332           1 :                 for i, p := range w.blockPropCollectors {
     333           1 :                         if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
     334           1 :                                 was, is := shortID(prop[0]), shortID(i)
     335           1 :                                 oldShortIDs[was] = is
     336           1 :                         } else {
     337           0 :                                 return errors.Errorf("sstable does not contain property %s", p.Name())
     338           0 :                         }
     339             :                 }
     340             :         }
     341             : 
     342           1 :         for i := range blocks {
     343           1 :                 // Write the rewritten block to the file.
     344           1 :                 if err := w.writable.Write(blocks[i].data); err != nil {
     345           0 :                         return err
     346           0 :                 }
     347             : 
     348           1 :                 n := len(blocks[i].data)
     349           1 :                 bh := BlockHandle{Offset: w.meta.Size, Length: uint64(n) - blockTrailerLen}
     350           1 :                 // Update the overall size.
     351           1 :                 w.meta.Size += uint64(n)
     352           1 : 
     353           1 :                 // Load any previous values for our prop collectors into oldProps.
     354           1 :                 for i := range oldProps {
     355           1 :                         oldProps[i] = nil
     356           1 :                 }
     357           1 :                 decoder := makeBlockPropertiesDecoder(len(oldProps), data[i].Props)
     358           1 :                 for !decoder.Done() {
     359           1 :                         id, val, err := decoder.Next()
     360           1 :                         if err != nil {
     361           0 :                                 return err
     362           0 :                         }
     363           1 :                         if oldShortIDs[id].IsValid() {
     364           1 :                                 oldProps[oldShortIDs[id]] = val
     365           1 :                         }
     366             :                 }
     367             : 
     368           1 :                 for i, p := range w.blockPropCollectors {
     369           1 :                         if err := p.AddCollectedWithSuffixReplacement(oldProps[i], from, to); err != nil {
     370           0 :                                 return err
     371           0 :                         }
     372             :                 }
     373             : 
     374           1 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
     375           1 :                 if err != nil {
     376           0 :                         return err
     377           0 :                 }
     378           1 :                 var nextKey InternalKey
     379           1 :                 if i+1 < len(blocks) {
     380           1 :                         nextKey = blocks[i+1].start
     381           1 :                 }
     382           1 :                 if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
     383           0 :                         return err
     384           0 :                 }
     385             :         }
     386             : 
     387           1 :         w.meta.updateSeqNum(blocks[0].start.SeqNum())
     388           1 :         w.props.NumEntries = r.Properties.NumEntries
     389           1 :         w.props.RawKeySize = r.Properties.RawKeySize
     390           1 :         w.props.RawValueSize = r.Properties.RawValueSize
     391           1 :         w.meta.SetSmallestPointKey(blocks[0].start)
     392           1 :         w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
     393           1 :         return nil
     394             : }
     395             : 
     396           1 : func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
     397           1 :         iter, err := r.NewRawRangeKeyIter(NoTransforms)
     398           1 :         if err != nil {
     399           0 :                 return err
     400           0 :         }
     401           1 :         if iter == nil {
     402           1 :                 // No range keys.
     403           1 :                 return nil
     404           1 :         }
     405           1 :         defer iter.Close()
     406           1 : 
     407           1 :         s, err := iter.First()
     408           1 :         for ; s != nil; s, err = iter.Next() {
     409           1 :                 if !s.Valid() {
     410           0 :                         break
     411             :                 }
     412           1 :                 for i := range s.Keys {
     413           1 :                         if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
     414           0 :                                 return errBadKind
     415           0 :                         }
     416           1 :                         if !bytes.Equal(s.Keys[i].Suffix, from) {
     417           0 :                                 return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
     418           0 :                         }
     419           1 :                         s.Keys[i].Suffix = to
     420             :                 }
     421             : 
     422           1 :                 err = rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
     423           1 :                         // Calling AddRangeKey instead of addRangeKeySpan bypasses the fragmenter.
     424           1 :                         // This is okay because the raw fragments off of `iter` are already
     425           1 :                         // fragmented, and suffix replacement should not affect fragmentation.
     426           1 :                         return w.AddRangeKey(k, v)
     427           1 :                 })
     428           1 :                 if err != nil {
     429           0 :                         return err
     430           0 :                 }
     431             :         }
     432           1 :         return err
     433             : }
     434             : 
     435             : type copyFilterWriter struct {
     436             :         origMetaName   string
     437             :         origPolicyName string
     438             :         data           []byte
     439             : }
     440             : 
     441           0 : func (copyFilterWriter) addKey(key []byte)         { panic("unimplemented") }
     442           1 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
     443           1 : func (c copyFilterWriter) metaName() string        { return c.origMetaName }
     444           1 : func (c copyFilterWriter) policyName() string      { return c.origPolicyName }
     445             : 
     446             : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
     447             : // single loop over the Reader that writes each key to the Writer with the new
     448             : // suffix. The is significantly slower than the parallelized rewriter, and does
     449             : // more work to rederive filters, props, etc.
     450             : //
     451             : // Any obsolete bits that key-value pairs may be annotated with are ignored
     452             : // and lost during the rewrite. Some of the obsolete bits may be recreated --
     453             : // specifically when there are multiple keys with the same user key.
     454             : // Additionally, the output sstable has the pebble.obsolete.is_strict property
     455             : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
     456             : func RewriteKeySuffixesViaWriter(
     457             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
     458           1 : ) (*WriterMetadata, error) {
     459           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
     460           0 :                 return nil, errors.New("a valid splitter is required to rewrite suffixes")
     461           0 :         }
     462             : 
     463           1 :         o.IsStrictObsolete = false
     464           1 :         w := NewWriter(out, o)
     465           1 :         defer func() {
     466           1 :                 if w != nil {
     467           0 :                         w.Close()
     468           0 :                 }
     469             :         }()
     470           1 :         i, err := r.NewIter(NoTransforms, nil, nil)
     471           1 :         if err != nil {
     472           0 :                 return nil, err
     473           0 :         }
     474           1 :         defer i.Close()
     475           1 : 
     476           1 :         kv := i.First()
     477           1 :         var scratch InternalKey
     478           1 :         for kv != nil {
     479           1 :                 if kv.Kind() != InternalKeyKindSet {
     480           0 :                         return nil, errors.New("invalid key type")
     481           0 :                 }
     482           1 :                 oldSuffix := kv.K.UserKey[r.Split(kv.K.UserKey):]
     483           1 :                 if !bytes.Equal(oldSuffix, from) {
     484           0 :                         return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     485           0 :                 }
     486           1 :                 scratch.UserKey = append(scratch.UserKey[:0], kv.K.UserKey[:len(kv.K.UserKey)-len(from)]...)
     487           1 :                 scratch.UserKey = append(scratch.UserKey, to...)
     488           1 :                 scratch.Trailer = kv.K.Trailer
     489           1 : 
     490           1 :                 val, _, err := kv.Value(nil)
     491           1 :                 if err != nil {
     492           0 :                         return nil, err
     493           0 :                 }
     494           1 :                 if w.addPoint(scratch, val, false); err != nil {
     495           0 :                         return nil, err
     496           0 :                 }
     497           1 :                 kv = i.Next()
     498             :         }
     499           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     500           0 :                 return nil, err
     501           0 :         }
     502           1 :         if err := w.Close(); err != nil {
     503           0 :                 w = nil
     504           0 :                 return nil, err
     505           0 :         }
     506           1 :         writerMeta, err := w.Metadata()
     507           1 :         w = nil
     508           1 :         return writerMeta, err
     509             : }
     510             : 
     511             : // NewMemReader opens a reader over the SST stored in the passed []byte.
     512           1 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
     513           1 :         return NewReader(newMemReader(sst), o)
     514           1 : }
     515             : 
     516           1 : func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) {
     517           1 :         raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen]
     518           1 :         if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
     519           0 :                 return nil, buf, err
     520           0 :         }
     521           1 :         typ := blockType(raw[bh.Length])
     522           1 :         raw = raw[:bh.Length]
     523           1 :         if typ == noCompressionBlockType {
     524           1 :                 return raw, buf, nil
     525           1 :         }
     526           1 :         decompressedLen, prefix, err := decompressedLen(typ, raw)
     527           1 :         if err != nil {
     528           0 :                 return nil, buf, err
     529           0 :         }
     530           1 :         if cap(buf) < decompressedLen {
     531           1 :                 buf = make([]byte, decompressedLen)
     532           1 :         }
     533           1 :         dst := buf[:decompressedLen]
     534           1 :         err = decompressInto(typ, raw[prefix:], dst)
     535           1 :         return dst, buf, err
     536             : }
     537             : 
     538             : // memReader is a thin wrapper around a []byte such that it can be passed to
     539             : // sstable.Reader. It supports concurrent use, and does so without locking in
     540             : // contrast to the heavier read/write vfs.MemFile.
     541             : type memReader struct {
     542             :         b  []byte
     543             :         r  *bytes.Reader
     544             :         rh objstorage.NoopReadHandle
     545             : }
     546             : 
     547             : var _ objstorage.Readable = (*memReader)(nil)
     548             : 
     549           1 : func newMemReader(b []byte) *memReader {
     550           1 :         r := &memReader{
     551           1 :                 b: b,
     552           1 :                 r: bytes.NewReader(b),
     553           1 :         }
     554           1 :         r.rh = objstorage.MakeNoopReadHandle(r)
     555           1 :         return r
     556           1 : }
     557             : 
     558             : // ReadAt is part of objstorage.Readable.
     559           1 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
     560           1 :         n, err := m.r.ReadAt(p, off)
     561           1 :         if invariants.Enabled && err == nil && n != len(p) {
     562           0 :                 panic("short read")
     563             :         }
     564           1 :         return err
     565             : }
     566             : 
     567             : // Close is part of objstorage.Readable.
     568           1 : func (*memReader) Close() error {
     569           1 :         return nil
     570           1 : }
     571             : 
     572             : // Stat is part of objstorage.Readable.
     573           1 : func (m *memReader) Size() int64 {
     574           1 :         return int64(len(m.b))
     575           1 : }
     576             : 
     577             : // NewReadHandle is part of objstorage.Readable.
     578           1 : func (m *memReader) NewReadHandle(_ context.Context) objstorage.ReadHandle {
     579           1 :         return &m.rh
     580           1 : }

Generated by: LCOV version 1.14