LCOV - code coverage report
Current view: top level - pebble/sstable - suffix_rewriter.go (source / functions) Hit Total Coverage
Test: 2024-02-18 08:16Z d10a640f - meta test only.lcov Lines: 0 373 0.0 %
Date: 2024-02-18 08:16:46 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "bytes"
       5             :         "context"
       6             :         "math"
       7             :         "sync"
       8             : 
       9             :         "github.com/cespare/xxhash/v2"
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/internal/rangekey"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             : )
      17             : 
      18             : // RewriteKeySuffixes is deprecated.
      19             : //
      20             : // TODO(sumeer): remove after switching CockroachDB to RewriteKeySuffixesAndReturnFormat.
      21             : func RewriteKeySuffixes(
      22             :         sst []byte,
      23             :         rOpts ReaderOptions,
      24             :         out objstorage.Writable,
      25             :         o WriterOptions,
      26             :         from, to []byte,
      27             :         concurrency int,
      28           0 : ) (*WriterMetadata, error) {
      29           0 :         meta, _, err := RewriteKeySuffixesAndReturnFormat(sst, rOpts, out, o, from, to, concurrency)
      30           0 :         return meta, err
      31           0 : }
      32             : 
      33             : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
      34             : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
      35             : // replaced with `to` in every key. The input sstable must consist of only
      36             : // Sets or RangeKeySets and every key must have `from` as its suffix as
      37             : // determined by the Split function of the Comparer in the passed
      38             : // WriterOptions. Range deletes must not exist in this sstable, as they will
      39             : // be ignored.
      40             : //
      41             : // Data blocks are rewritten in parallel by `concurrency` workers and then
      42             : // assembled into a final SST. Filters are copied from the original SST without
      43             : // modification as they are not affected by the suffix, while block and table
      44             : // properties are only minimally recomputed.
      45             : //
      46             : // TODO(sumeer): document limitations, if any, due to this limited
      47             : // re-computation of properties (is there any loss of fidelity?).
      48             : //
      49             : // Any block property collectors configured in the WriterOptions must implement
      50             : // SuffixReplaceableBlockCollector.
      51             : //
      52             : // The WriterOptions.TableFormat is ignored, and the output sstable has the
      53             : // same TableFormat as the input, which is returned in case the caller wants
      54             : // to do some error checking. Suffix rewriting is meant to be efficient, and
      55             : // allowing changes in the TableFormat detracts from that efficiency.
      56             : //
      57             : // Any obsolete bits that key-value pairs may be annotated with are ignored
      58             : // and lost during the rewrite. Additionally, the output sstable has the
      59             : // pebble.obsolete.is_strict property set to false. These limitations could be
      60             : // removed if needed. The current use case for
      61             : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
      62             : // ingestion, where these files do not contain RANGEDELs and have one
      63             : // key-value pair per userkey -- so they trivially satisfy the strict
      64             : // criteria, and we don't need the obsolete bit as a performance optimization.
      65             : // For disaggregated storage, strict obsolete sstables are needed for L5 and
      66             : // L6, but at the time of writing, we expect such MVCC-compliant file
      67             : // ingestion to only ingest into levels L4 and higher. If this changes, we can
      68             : // do one of two things to get rid of this limitation:
      69             : //   - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
      70             : //     in the sstable to be rewritten. Validating no duplicate userkeys is
      71             : //     non-trivial when rewriting blocks in parallel, so we could encode the
      72             : //     pre-existing condition in the (existing) SnapshotPinnedKeys property --
      73             : //     we need to update the external sst writer to calculate and encode this
      74             : //     property.
      75             : //   - Preserve the obsolete bit (with changes to the blockIter).
      76             : func RewriteKeySuffixesAndReturnFormat(
      77             :         sst []byte,
      78             :         rOpts ReaderOptions,
      79             :         out objstorage.Writable,
      80             :         o WriterOptions,
      81             :         from, to []byte,
      82             :         concurrency int,
      83           0 : ) (*WriterMetadata, TableFormat, error) {
      84           0 :         r, err := NewMemReader(sst, rOpts)
      85           0 :         if err != nil {
      86           0 :                 return nil, TableFormatUnspecified, err
      87           0 :         }
      88           0 :         defer r.Close()
      89           0 :         return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
      90             : }
      91             : 
      92             : func rewriteKeySuffixesInBlocks(
      93             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
      94           0 : ) (*WriterMetadata, TableFormat, error) {
      95           0 :         if o.Comparer == nil || o.Comparer.Split == nil {
      96           0 :                 return nil, TableFormatUnspecified,
      97           0 :                         errors.New("a valid splitter is required to rewrite suffixes")
      98           0 :         }
      99           0 :         if concurrency < 1 {
     100           0 :                 return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
     101           0 :         }
     102             :         // Even though NumValueBlocks = 0 => NumValuesInValueBlocks = 0, check both
     103             :         // as a defensive measure.
     104           0 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0 {
     105           0 :                 return nil, TableFormatUnspecified,
     106           0 :                         errors.New("sstable with a single suffix should not have value blocks")
     107           0 :         }
     108             : 
     109           0 :         tableFormat := r.tableFormat
     110           0 :         o.TableFormat = tableFormat
     111           0 :         w := NewWriter(out, o)
     112           0 :         defer func() {
     113           0 :                 if w != nil {
     114           0 :                         w.Close()
     115           0 :                 }
     116             :         }()
     117             : 
     118           0 :         for _, c := range w.blockPropCollectors {
     119           0 :                 if _, ok := c.(SuffixReplaceableBlockCollector); !ok {
     120           0 :                         return nil, TableFormatUnspecified,
     121           0 :                                 errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     122           0 :                 }
     123             :         }
     124             : 
     125           0 :         l, err := r.Layout()
     126           0 :         if err != nil {
     127           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
     128           0 :         }
     129             : 
     130           0 :         if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
     131           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
     132           0 :         }
     133             : 
     134             :         // Copy over the range key block and replace suffixes in it if it exists.
     135           0 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     136           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting range key blocks")
     137           0 :         }
     138             : 
     139             :         // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
     140             :         // already have ensured this is valid if it exists).
     141           0 :         if w.filter != nil && l.Filter.Length > 0 {
     142           0 :                 filterBlock, _, err := readBlockBuf(r, l.Filter, nil)
     143           0 :                 if err != nil {
     144           0 :                         return nil, TableFormatUnspecified, errors.Wrap(err, "reading filter")
     145           0 :                 }
     146           0 :                 w.filter = copyFilterWriter{
     147           0 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
     148           0 :                 }
     149             :         }
     150             : 
     151           0 :         if err := w.Close(); err != nil {
     152           0 :                 w = nil
     153           0 :                 return nil, TableFormatUnspecified, err
     154           0 :         }
     155           0 :         writerMeta, err := w.Metadata()
     156           0 :         w = nil
     157           0 :         return writerMeta, tableFormat, err
     158             : }
     159             : 
     160             : var errBadKind = errors.New("key does not have expected kind (set)")
     161             : 
     162             : type blockWithSpan struct {
     163             :         start, end InternalKey
     164             :         data       []byte
     165             : }
     166             : 
     167             : func rewriteBlocks(
     168             :         r *Reader,
     169             :         restartInterval int,
     170             :         checksumType ChecksumType,
     171             :         compression Compression,
     172             :         input []BlockHandleWithProperties,
     173             :         output []blockWithSpan,
     174             :         totalWorkers, worker int,
     175             :         from, to []byte,
     176             :         split Split,
     177           0 : ) error {
     178           0 :         bw := blockWriter{
     179           0 :                 restartInterval: restartInterval,
     180           0 :         }
     181           0 :         buf := blockBuf{checksummer: checksummer{checksumType: checksumType}}
     182           0 :         if checksumType == ChecksumTypeXXHash {
     183           0 :                 buf.checksummer.xxHasher = xxhash.New()
     184           0 :         }
     185             : 
     186           0 :         var blockAlloc bytealloc.A
     187           0 :         var keyAlloc bytealloc.A
     188           0 :         var scratch InternalKey
     189           0 : 
     190           0 :         var inputBlock, inputBlockBuf []byte
     191           0 : 
     192           0 :         iter := &blockIter{}
     193           0 : 
     194           0 :         // We'll assume all blocks are _roughly_ equal so round-robin static partition
     195           0 :         // of each worker doing every ith block is probably enough.
     196           0 :         for i := worker; i < len(input); i += totalWorkers {
     197           0 :                 bh := input[i]
     198           0 : 
     199           0 :                 var err error
     200           0 :                 inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.BlockHandle, inputBlockBuf)
     201           0 :                 if err != nil {
     202           0 :                         return err
     203           0 :                 }
     204           0 :                 if err := iter.init(r.Compare, r.Split, inputBlock, r.Properties.GlobalSeqNum, false, nil); err != nil {
     205           0 :                         return err
     206           0 :                 }
     207             : 
     208           0 :                 if cap(bw.restarts) < int(iter.restarts) {
     209           0 :                         bw.restarts = make([]uint32, 0, iter.restarts)
     210           0 :                 }
     211           0 :                 if cap(bw.buf) == 0 {
     212           0 :                         bw.buf = make([]byte, 0, len(inputBlock))
     213           0 :                 }
     214           0 :                 if cap(bw.restarts) < int(iter.numRestarts) {
     215           0 :                         bw.restarts = make([]uint32, 0, iter.numRestarts)
     216           0 :                 }
     217             : 
     218           0 :                 for key, val := iter.First(); key != nil; key, val = iter.Next() {
     219           0 :                         if key.Kind() != InternalKeyKindSet {
     220           0 :                                 return errBadKind
     221           0 :                         }
     222           0 :                         si := split(key.UserKey)
     223           0 :                         oldSuffix := key.UserKey[si:]
     224           0 :                         if !bytes.Equal(oldSuffix, from) {
     225           0 :                                 err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     226           0 :                                 return err
     227           0 :                         }
     228           0 :                         newLen := si + len(to)
     229           0 :                         if cap(scratch.UserKey) < newLen {
     230           0 :                                 scratch.UserKey = make([]byte, 0, len(key.UserKey)*2+len(to)-len(from))
     231           0 :                         }
     232             : 
     233           0 :                         scratch.Trailer = key.Trailer
     234           0 :                         scratch.UserKey = scratch.UserKey[:newLen]
     235           0 :                         copy(scratch.UserKey, key.UserKey[:si])
     236           0 :                         copy(scratch.UserKey[si:], to)
     237           0 : 
     238           0 :                         // NB: for TableFormatPebblev3 and higher, since
     239           0 :                         // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
     240           0 :                         // in the block, which includes the 1-byte prefix. This is fine since bw
     241           0 :                         // also does not know about the prefix and will preserve it in bw.add.
     242           0 :                         v := val.InPlaceValue()
     243           0 :                         if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
     244           0 :                                 key.Kind() == InternalKeyKindSet {
     245           0 :                                 if len(v) < 1 {
     246           0 :                                         return errors.Errorf("value has no prefix")
     247           0 :                                 }
     248           0 :                                 prefix := valuePrefix(v[0])
     249           0 :                                 if isValueHandle(prefix) {
     250           0 :                                         return errors.Errorf("value prefix is incorrect")
     251           0 :                                 }
     252           0 :                                 if setHasSamePrefix(prefix) {
     253           0 :                                         return errors.Errorf("multiple keys with same key prefix")
     254           0 :                                 }
     255             :                         }
     256           0 :                         bw.add(scratch, v)
     257           0 :                         if output[i].start.UserKey == nil {
     258           0 :                                 keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
     259           0 :                         }
     260             :                 }
     261           0 :                 *iter = iter.resetForReuse()
     262           0 : 
     263           0 :                 keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)
     264           0 : 
     265           0 :                 finished := compressAndChecksum(bw.finish(), compression, &buf)
     266           0 : 
     267           0 :                 // copy our finished block into the output buffer.
     268           0 :                 blockAlloc, output[i].data = blockAlloc.Alloc(len(finished) + blockTrailerLen)
     269           0 :                 copy(output[i].data, finished)
     270           0 :                 copy(output[i].data[len(finished):], buf.tmp[:blockTrailerLen])
     271             :         }
     272           0 :         return nil
     273             : }
     274             : 
     275             : func rewriteDataBlocksToWriter(
     276             :         r *Reader,
     277             :         w *Writer,
     278             :         data []BlockHandleWithProperties,
     279             :         from, to []byte,
     280             :         split Split,
     281             :         concurrency int,
     282           0 : ) error {
     283           0 :         if r.Properties.NumEntries == 0 {
     284           0 :                 // No point keys.
     285           0 :                 return nil
     286           0 :         }
     287           0 :         blocks := make([]blockWithSpan, len(data))
     288           0 : 
     289           0 :         if w.filter != nil {
     290           0 :                 if r.Properties.FilterPolicyName != w.filter.policyName() {
     291           0 :                         return errors.New("mismatched filters")
     292           0 :                 }
     293           0 :                 if was, is := r.Properties.ComparerName, w.props.ComparerName; was != is {
     294           0 :                         return errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters", was, is)
     295           0 :                 }
     296             :         }
     297             : 
     298           0 :         g := &sync.WaitGroup{}
     299           0 :         g.Add(concurrency)
     300           0 :         errCh := make(chan error, concurrency)
     301           0 :         for i := 0; i < concurrency; i++ {
     302           0 :                 worker := i
     303           0 :                 go func() {
     304           0 :                         defer g.Done()
     305           0 :                         err := rewriteBlocks(
     306           0 :                                 r,
     307           0 :                                 w.dataBlockBuf.dataBlock.restartInterval,
     308           0 :                                 w.blockBuf.checksummer.checksumType,
     309           0 :                                 w.compression,
     310           0 :                                 data,
     311           0 :                                 blocks,
     312           0 :                                 concurrency,
     313           0 :                                 worker,
     314           0 :                                 from, to,
     315           0 :                                 split,
     316           0 :                         )
     317           0 :                         if err != nil {
     318           0 :                                 errCh <- err
     319           0 :                         }
     320             :                 }()
     321             :         }
     322           0 :         g.Wait()
     323           0 :         close(errCh)
     324           0 :         if err, ok := <-errCh; ok {
     325           0 :                 return err
     326           0 :         }
     327             : 
     328           0 :         var decoder blockPropertiesDecoder
     329           0 :         var oldShortIDs []shortID
     330           0 :         var oldProps [][]byte
     331           0 :         if len(w.blockPropCollectors) > 0 {
     332           0 :                 oldProps = make([][]byte, len(w.blockPropCollectors))
     333           0 :                 oldShortIDs = make([]shortID, math.MaxUint8)
     334           0 :                 for i, p := range w.blockPropCollectors {
     335           0 :                         if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
     336           0 :                                 was, is := shortID(byte(prop[0])), shortID(i)
     337           0 :                                 oldShortIDs[was] = is
     338           0 :                         }
     339             :                 }
     340             :         }
     341             : 
     342           0 :         for i := range blocks {
     343           0 :                 // Write the rewritten block to the file.
     344           0 :                 if err := w.writable.Write(blocks[i].data); err != nil {
     345           0 :                         return err
     346           0 :                 }
     347             : 
     348           0 :                 n := len(blocks[i].data)
     349           0 :                 bh := BlockHandle{Offset: w.meta.Size, Length: uint64(n) - blockTrailerLen}
     350           0 :                 // Update the overall size.
     351           0 :                 w.meta.Size += uint64(n)
     352           0 : 
     353           0 :                 // Load any previous values for our prop collectors into oldProps.
     354           0 :                 for i := range oldProps {
     355           0 :                         oldProps[i] = nil
     356           0 :                 }
     357           0 :                 decoder.props = data[i].Props
     358           0 :                 for !decoder.done() {
     359           0 :                         id, val, err := decoder.next()
     360           0 :                         if err != nil {
     361           0 :                                 return err
     362           0 :                         }
     363           0 :                         oldProps[oldShortIDs[id]] = val
     364             :                 }
     365             : 
     366           0 :                 for i, p := range w.blockPropCollectors {
     367           0 :                         if err := p.(SuffixReplaceableBlockCollector).UpdateKeySuffixes(oldProps[i], from, to); err != nil {
     368           0 :                                 return err
     369           0 :                         }
     370             :                 }
     371             : 
     372           0 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
     373           0 :                 if err != nil {
     374           0 :                         return err
     375           0 :                 }
     376           0 :                 var nextKey InternalKey
     377           0 :                 if i+1 < len(blocks) {
     378           0 :                         nextKey = blocks[i+1].start
     379           0 :                 }
     380           0 :                 if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
     381           0 :                         return err
     382           0 :                 }
     383             :         }
     384             : 
     385           0 :         w.meta.updateSeqNum(blocks[0].start.SeqNum())
     386           0 :         w.props.NumEntries = r.Properties.NumEntries
     387           0 :         w.props.RawKeySize = r.Properties.RawKeySize
     388           0 :         w.props.RawValueSize = r.Properties.RawValueSize
     389           0 :         w.meta.SetSmallestPointKey(blocks[0].start)
     390           0 :         w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
     391           0 :         return nil
     392             : }
     393             : 
     394           0 : func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
     395           0 :         iter, err := r.NewRawRangeKeyIter()
     396           0 :         if err != nil {
     397           0 :                 return err
     398           0 :         }
     399           0 :         if iter == nil {
     400           0 :                 // No range keys.
     401           0 :                 return nil
     402           0 :         }
     403           0 :         defer iter.Close()
     404           0 : 
     405           0 :         s, err := iter.First()
     406           0 :         for ; s != nil; s, err = iter.Next() {
     407           0 :                 if !s.Valid() {
     408           0 :                         break
     409             :                 }
     410           0 :                 for i := range s.Keys {
     411           0 :                         if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
     412           0 :                                 return errBadKind
     413           0 :                         }
     414           0 :                         if !bytes.Equal(s.Keys[i].Suffix, from) {
     415           0 :                                 return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
     416           0 :                         }
     417           0 :                         s.Keys[i].Suffix = to
     418             :                 }
     419             : 
     420           0 :                 err = rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
     421           0 :                         // Calling AddRangeKey instead of addRangeKeySpan bypasses the fragmenter.
     422           0 :                         // This is okay because the raw fragments off of `iter` are already
     423           0 :                         // fragmented, and suffix replacement should not affect fragmentation.
     424           0 :                         return w.AddRangeKey(k, v)
     425           0 :                 })
     426           0 :                 if err != nil {
     427           0 :                         return err
     428           0 :                 }
     429             :         }
     430           0 :         return err
     431             : }
     432             : 
     433             : type copyFilterWriter struct {
     434             :         origMetaName   string
     435             :         origPolicyName string
     436             :         data           []byte
     437             : }
     438             : 
     439           0 : func (copyFilterWriter) addKey(key []byte)         { panic("unimplemented") }
     440           0 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
     441           0 : func (c copyFilterWriter) metaName() string        { return c.origMetaName }
     442           0 : func (c copyFilterWriter) policyName() string      { return c.origPolicyName }
     443             : 
     444             : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
     445             : // single loop over the Reader that writes each key to the Writer with the new
     446             : // suffix. The is significantly slower than the parallelized rewriter, and does
     447             : // more work to rederive filters, props, etc.
     448             : //
     449             : // Any obsolete bits that key-value pairs may be annotated with are ignored
     450             : // and lost during the rewrite. Some of the obsolete bits may be recreated --
     451             : // specifically when there are multiple keys with the same user key.
     452             : // Additionally, the output sstable has the pebble.obsolete.is_strict property
     453             : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
     454             : func RewriteKeySuffixesViaWriter(
     455             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
     456           0 : ) (*WriterMetadata, error) {
     457           0 :         if o.Comparer == nil || o.Comparer.Split == nil {
     458           0 :                 return nil, errors.New("a valid splitter is required to rewrite suffixes")
     459           0 :         }
     460             : 
     461           0 :         o.IsStrictObsolete = false
     462           0 :         w := NewWriter(out, o)
     463           0 :         defer func() {
     464           0 :                 if w != nil {
     465           0 :                         w.Close()
     466           0 :                 }
     467             :         }()
     468           0 :         i, err := r.NewIter(nil, nil)
     469           0 :         if err != nil {
     470           0 :                 return nil, err
     471           0 :         }
     472           0 :         defer i.Close()
     473           0 : 
     474           0 :         k, v := i.First()
     475           0 :         var scratch InternalKey
     476           0 :         for k != nil {
     477           0 :                 if k.Kind() != InternalKeyKindSet {
     478           0 :                         return nil, errors.New("invalid key type")
     479           0 :                 }
     480           0 :                 oldSuffix := k.UserKey[r.Split(k.UserKey):]
     481           0 :                 if !bytes.Equal(oldSuffix, from) {
     482           0 :                         return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     483           0 :                 }
     484           0 :                 scratch.UserKey = append(scratch.UserKey[:0], k.UserKey[:len(k.UserKey)-len(from)]...)
     485           0 :                 scratch.UserKey = append(scratch.UserKey, to...)
     486           0 :                 scratch.Trailer = k.Trailer
     487           0 : 
     488           0 :                 val, _, err := v.Value(nil)
     489           0 :                 if err != nil {
     490           0 :                         return nil, err
     491           0 :                 }
     492           0 :                 if w.addPoint(scratch, val, false); err != nil {
     493           0 :                         return nil, err
     494           0 :                 }
     495           0 :                 k, v = i.Next()
     496             :         }
     497           0 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     498           0 :                 return nil, err
     499           0 :         }
     500           0 :         if err := w.Close(); err != nil {
     501           0 :                 w = nil
     502           0 :                 return nil, err
     503           0 :         }
     504           0 :         writerMeta, err := w.Metadata()
     505           0 :         w = nil
     506           0 :         return writerMeta, err
     507             : }
     508             : 
     509             : // NewMemReader opens a reader over the SST stored in the passed []byte.
     510           0 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
     511           0 :         return NewReader(newMemReader(sst), o)
     512           0 : }
     513             : 
     514           0 : func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) {
     515           0 :         raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen]
     516           0 :         if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
     517           0 :                 return nil, buf, err
     518           0 :         }
     519           0 :         typ := blockType(raw[bh.Length])
     520           0 :         raw = raw[:bh.Length]
     521           0 :         if typ == noCompressionBlockType {
     522           0 :                 return raw, buf, nil
     523           0 :         }
     524           0 :         decompressedLen, prefix, err := decompressedLen(typ, raw)
     525           0 :         if err != nil {
     526           0 :                 return nil, buf, err
     527           0 :         }
     528           0 :         if cap(buf) < decompressedLen {
     529           0 :                 buf = make([]byte, decompressedLen)
     530           0 :         }
     531           0 :         res, err := decompressInto(typ, raw[prefix:], buf[:decompressedLen])
     532           0 :         return res, buf, err
     533             : }
     534             : 
     535             : // memReader is a thin wrapper around a []byte such that it can be passed to
     536             : // sstable.Reader. It supports concurrent use, and does so without locking in
     537             : // contrast to the heavier read/write vfs.MemFile.
     538             : type memReader struct {
     539             :         b  []byte
     540             :         r  *bytes.Reader
     541             :         rh objstorage.NoopReadHandle
     542             : }
     543             : 
     544             : var _ objstorage.Readable = (*memReader)(nil)
     545             : 
     546           0 : func newMemReader(b []byte) *memReader {
     547           0 :         r := &memReader{
     548           0 :                 b: b,
     549           0 :                 r: bytes.NewReader(b),
     550           0 :         }
     551           0 :         r.rh = objstorage.MakeNoopReadHandle(r)
     552           0 :         return r
     553           0 : }
     554             : 
     555             : // ReadAt is part of objstorage.Readable.
     556           0 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
     557           0 :         n, err := m.r.ReadAt(p, off)
     558           0 :         if invariants.Enabled && err == nil && n != len(p) {
     559           0 :                 panic("short read")
     560             :         }
     561           0 :         return err
     562             : }
     563             : 
     564             : // Close is part of objstorage.Readable.
     565           0 : func (*memReader) Close() error {
     566           0 :         return nil
     567           0 : }
     568             : 
     569             : // Stat is part of objstorage.Readable.
     570           0 : func (m *memReader) Size() int64 {
     571           0 :         return int64(len(m.b))
     572           0 : }
     573             : 
     574             : // NewReadHandle is part of objstorage.Readable.
     575           0 : func (m *memReader) NewReadHandle(_ context.Context) objstorage.ReadHandle {
     576           0 :         return &m.rh
     577           0 : }

Generated by: LCOV version 1.14