LCOV - code coverage report
Current view: top level - pebble/sstable - suffix_rewriter.go (source / functions) Hit Total Coverage
Test: 2023-12-12 08:16Z 9848bcdb - tests + meta.lcov Lines: 286 381 75.1 %
Date: 2023-12-12 08:17:04 Functions: 0 0 -

          Line data    Source code
       1             : package sstable
       2             : 
       3             : import (
       4             :         "bytes"
       5             :         "context"
       6             :         "math"
       7             :         "sync"
       8             : 
       9             :         "github.com/cespare/xxhash/v2"
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      13             :         "github.com/cockroachdb/pebble/internal/invariants"
      14             :         "github.com/cockroachdb/pebble/internal/rangekey"
      15             :         "github.com/cockroachdb/pebble/objstorage"
      16             : )
      17             : 
      18             : // RewriteKeySuffixes is deprecated.
      19             : //
      20             : // TODO(sumeer): remove after switching CockroachDB to RewriteKeySuffixesAndReturnFormat.
      21             : func RewriteKeySuffixes(
      22             :         sst []byte,
      23             :         rOpts ReaderOptions,
      24             :         out objstorage.Writable,
      25             :         o WriterOptions,
      26             :         from, to []byte,
      27             :         concurrency int,
      28           0 : ) (*WriterMetadata, error) {
      29           0 :         meta, _, err := RewriteKeySuffixesAndReturnFormat(sst, rOpts, out, o, from, to, concurrency)
      30           0 :         return meta, err
      31           0 : }
      32             : 
      33             : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
      34             : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
      35             : // replaced with `to` in every key. The input sstable must consist of only
      36             : // Sets or RangeKeySets and every key must have `from` as its suffix as
      37             : // determined by the Split function of the Comparer in the passed
      38             : // WriterOptions. Range deletes must not exist in this sstable, as they will
      39             : // be ignored.
      40             : //
      41             : // Data blocks are rewritten in parallel by `concurrency` workers and then
      42             : // assembled into a final SST. Filters are copied from the original SST without
      43             : // modification as they are not affected by the suffix, while block and table
      44             : // properties are only minimally recomputed.
      45             : //
      46             : // TODO(sumeer): document limitations, if any, due to this limited
      47             : // re-computation of properties (is there any loss of fidelity?).
      48             : //
      49             : // Any block and table property collectors configured in the WriterOptions must
      50             : // implement SuffixReplaceableTableCollector/SuffixReplaceableBlockCollector.
      51             : //
      52             : // The WriterOptions.TableFormat is ignored, and the output sstable has the
      53             : // same TableFormat as the input, which is returned in case the caller wants
      54             : // to do some error checking. Suffix rewriting is meant to be efficient, and
      55             : // allowing changes in the TableFormat detracts from that efficiency.
      56             : //
      57             : // Any obsolete bits that key-value pairs may be annotated with are ignored
      58             : // and lost during the rewrite. Additionally, the output sstable has the
      59             : // pebble.obsolete.is_strict property set to false. These limitations could be
      60             : // removed if needed. The current use case for
      61             : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
      62             : // ingestion, where these files do not contain RANGEDELs and have one
      63             : // key-value pair per userkey -- so they trivially satisfy the strict
      64             : // criteria, and we don't need the obsolete bit as a performance optimization.
      65             : // For disaggregated storage, strict obsolete sstables are needed for L5 and
      66             : // L6, but at the time of writing, we expect such MVCC-compliant file
      67             : // ingestion to only ingest into levels L4 and higher. If this changes, we can
      68             : // do one of two things to get rid of this limitation:
      69             : //   - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
      70             : //     in the sstable to be rewritten. Validating no duplicate userkeys is
      71             : //     non-trivial when rewriting blocks in parallel, so we could encode the
      72             : //     pre-existing condition in the (existing) SnapshotPinnedKeys property --
      73             : //     we need to update the external sst writer to calculate and encode this
      74             : //     property.
      75             : //   - Preserve the obsolete bit (with changes to the blockIter).
      76             : func RewriteKeySuffixesAndReturnFormat(
      77             :         sst []byte,
      78             :         rOpts ReaderOptions,
      79             :         out objstorage.Writable,
      80             :         o WriterOptions,
      81             :         from, to []byte,
      82             :         concurrency int,
      83           0 : ) (*WriterMetadata, TableFormat, error) {
      84           0 :         r, err := NewMemReader(sst, rOpts)
      85           0 :         if err != nil {
      86           0 :                 return nil, TableFormatUnspecified, err
      87           0 :         }
      88           0 :         defer r.Close()
      89           0 :         return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
      90             : }
      91             : 
      92             : func rewriteKeySuffixesInBlocks(
      93             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
      94           1 : ) (*WriterMetadata, TableFormat, error) {
      95           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
      96           1 :                 return nil, TableFormatUnspecified,
      97           1 :                         errors.New("a valid splitter is required to rewrite suffixes")
      98           1 :         }
      99           1 :         if concurrency < 1 {
     100           0 :                 return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
     101           0 :         }
     102             :         // Even though NumValueBlocks = 0 => NumValuesInValueBlocks = 0, check both
     103             :         // as a defensive measure.
     104           1 :         if r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0 {
     105           0 :                 return nil, TableFormatUnspecified,
     106           0 :                         errors.New("sstable with a single suffix should not have value blocks")
     107           0 :         }
     108             : 
     109           1 :         tableFormat := r.tableFormat
     110           1 :         o.TableFormat = tableFormat
     111           1 :         w := NewWriter(out, o)
     112           1 :         defer func() {
     113           1 :                 if w != nil {
     114           1 :                         w.Close()
     115           1 :                 }
     116             :         }()
     117             : 
     118           1 :         for _, c := range w.propCollectors {
     119           1 :                 if _, ok := c.(SuffixReplaceableTableCollector); !ok {
     120           0 :                         return nil, TableFormatUnspecified,
     121           0 :                                 errors.Errorf("property collector %s does not support suffix replacement", c.Name())
     122           0 :                 }
     123             :         }
     124           1 :         for _, c := range w.blockPropCollectors {
     125           1 :                 if _, ok := c.(SuffixReplaceableBlockCollector); !ok {
     126           0 :                         return nil, TableFormatUnspecified,
     127           0 :                                 errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
     128           0 :                 }
     129             :         }
     130             : 
     131           1 :         l, err := r.Layout()
     132           1 :         if err != nil {
     133           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
     134           0 :         }
     135             : 
     136           1 :         if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
     137           1 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
     138           1 :         }
     139             : 
     140             :         // Copy over the range key block and replace suffixes in it if it exists.
     141           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     142           0 :                 return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting range key blocks")
     143           0 :         }
     144             : 
     145             :         // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
     146             :         // already have ensured this is valid if it exists).
     147           1 :         if w.filter != nil && l.Filter.Length > 0 {
     148           1 :                 filterBlock, _, err := readBlockBuf(r, l.Filter, nil)
     149           1 :                 if err != nil {
     150           0 :                         return nil, TableFormatUnspecified, errors.Wrap(err, "reading filter")
     151           0 :                 }
     152           1 :                 w.filter = copyFilterWriter{
     153           1 :                         origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
     154           1 :                 }
     155             :         }
     156             : 
     157           1 :         if err := w.Close(); err != nil {
     158           0 :                 w = nil
     159           0 :                 return nil, TableFormatUnspecified, err
     160           0 :         }
     161           1 :         writerMeta, err := w.Metadata()
     162           1 :         w = nil
     163           1 :         return writerMeta, tableFormat, err
     164             : }
     165             : 
     166             : var errBadKind = errors.New("key does not have expected kind (set)")
     167             : 
     168             : type blockWithSpan struct {
     169             :         start, end InternalKey
     170             :         data       []byte
     171             : }
     172             : 
     173             : func rewriteBlocks(
     174             :         r *Reader,
     175             :         restartInterval int,
     176             :         checksumType ChecksumType,
     177             :         compression Compression,
     178             :         input []BlockHandleWithProperties,
     179             :         output []blockWithSpan,
     180             :         totalWorkers, worker int,
     181             :         from, to []byte,
     182             :         split Split,
     183           1 : ) error {
     184           1 :         bw := blockWriter{
     185           1 :                 restartInterval: restartInterval,
     186           1 :         }
     187           1 :         buf := blockBuf{checksummer: checksummer{checksumType: checksumType}}
     188           1 :         if checksumType == ChecksumTypeXXHash {
     189           0 :                 buf.checksummer.xxHasher = xxhash.New()
     190           0 :         }
     191             : 
     192           1 :         var blockAlloc bytealloc.A
     193           1 :         var keyAlloc bytealloc.A
     194           1 :         var scratch InternalKey
     195           1 : 
     196           1 :         var inputBlock, inputBlockBuf []byte
     197           1 : 
     198           1 :         iter := &blockIter{}
     199           1 : 
     200           1 :         // We'll assume all blocks are _roughly_ equal so round-robin static partition
     201           1 :         // of each worker doing every ith block is probably enough.
     202           1 :         for i := worker; i < len(input); i += totalWorkers {
     203           1 :                 bh := input[i]
     204           1 : 
     205           1 :                 var err error
     206           1 :                 inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.BlockHandle, inputBlockBuf)
     207           1 :                 if err != nil {
     208           0 :                         return err
     209           0 :                 }
     210           1 :                 if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false); err != nil {
     211           0 :                         return err
     212           0 :                 }
     213             : 
     214           1 :                 if cap(bw.restarts) < int(iter.restarts) {
     215           1 :                         bw.restarts = make([]uint32, 0, iter.restarts)
     216           1 :                 }
     217           1 :                 if cap(bw.buf) == 0 {
     218           1 :                         bw.buf = make([]byte, 0, len(inputBlock))
     219           1 :                 }
     220           1 :                 if cap(bw.restarts) < int(iter.numRestarts) {
     221           0 :                         bw.restarts = make([]uint32, 0, iter.numRestarts)
     222           0 :                 }
     223             : 
     224           1 :                 for key, val := iter.First(); key != nil; key, val = iter.Next() {
     225           1 :                         if key.Kind() != InternalKeyKindSet {
     226           0 :                                 return errBadKind
     227           0 :                         }
     228           1 :                         si := split(key.UserKey)
     229           1 :                         oldSuffix := key.UserKey[si:]
     230           1 :                         if !bytes.Equal(oldSuffix, from) {
     231           1 :                                 err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     232           1 :                                 return err
     233           1 :                         }
     234           1 :                         newLen := si + len(to)
     235           1 :                         if cap(scratch.UserKey) < newLen {
     236           1 :                                 scratch.UserKey = make([]byte, 0, len(key.UserKey)*2+len(to)-len(from))
     237           1 :                         }
     238             : 
     239           1 :                         scratch.Trailer = key.Trailer
     240           1 :                         scratch.UserKey = scratch.UserKey[:newLen]
     241           1 :                         copy(scratch.UserKey, key.UserKey[:si])
     242           1 :                         copy(scratch.UserKey[si:], to)
     243           1 : 
     244           1 :                         // NB: for TableFormatPebblev3 and higher, since
     245           1 :                         // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
     246           1 :                         // in the block, which includes the 1-byte prefix. This is fine since bw
     247           1 :                         // also does not know about the prefix and will preserve it in bw.add.
     248           1 :                         v := val.InPlaceValue()
     249           1 :                         if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
     250           1 :                                 key.Kind() == InternalKeyKindSet {
     251           1 :                                 if len(v) < 1 {
     252           0 :                                         return errors.Errorf("value has no prefix")
     253           0 :                                 }
     254           1 :                                 prefix := valuePrefix(v[0])
     255           1 :                                 if isValueHandle(prefix) {
     256           0 :                                         return errors.Errorf("value prefix is incorrect")
     257           0 :                                 }
     258           1 :                                 if setHasSamePrefix(prefix) {
     259           0 :                                         return errors.Errorf("multiple keys with same key prefix")
     260           0 :                                 }
     261             :                         }
     262           1 :                         bw.add(scratch, v)
     263           1 :                         if output[i].start.UserKey == nil {
     264           1 :                                 keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
     265           1 :                         }
     266             :                 }
     267           1 :                 *iter = iter.resetForReuse()
     268           1 : 
     269           1 :                 keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)
     270           1 : 
     271           1 :                 finished := compressAndChecksum(bw.finish(), compression, &buf)
     272           1 : 
     273           1 :                 // copy our finished block into the output buffer.
     274           1 :                 blockAlloc, output[i].data = blockAlloc.Alloc(len(finished) + blockTrailerLen)
     275           1 :                 copy(output[i].data, finished)
     276           1 :                 copy(output[i].data[len(finished):], buf.tmp[:blockTrailerLen])
     277             :         }
     278           1 :         return nil
     279             : }
     280             : 
     281             : func rewriteDataBlocksToWriter(
     282             :         r *Reader,
     283             :         w *Writer,
     284             :         data []BlockHandleWithProperties,
     285             :         from, to []byte,
     286             :         split Split,
     287             :         concurrency int,
     288           1 : ) error {
     289           1 :         if r.Properties.NumEntries == 0 {
     290           1 :                 // No point keys.
     291           1 :                 return nil
     292           1 :         }
     293           1 :         blocks := make([]blockWithSpan, len(data))
     294           1 : 
     295           1 :         if w.filter != nil {
     296           1 :                 if r.Properties.FilterPolicyName != w.filter.policyName() {
     297           0 :                         return errors.New("mismatched filters")
     298           0 :                 }
     299           1 :                 if was, is := r.Properties.ComparerName, w.props.ComparerName; was != is {
     300           1 :                         return errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters", was, is)
     301           1 :                 }
     302             :         }
     303             : 
     304           1 :         g := &sync.WaitGroup{}
     305           1 :         g.Add(concurrency)
     306           1 :         errCh := make(chan error, concurrency)
     307           1 :         for i := 0; i < concurrency; i++ {
     308           1 :                 worker := i
     309           1 :                 go func() {
     310           1 :                         defer g.Done()
     311           1 :                         err := rewriteBlocks(
     312           1 :                                 r,
     313           1 :                                 w.dataBlockBuf.dataBlock.restartInterval,
     314           1 :                                 w.blockBuf.checksummer.checksumType,
     315           1 :                                 w.compression,
     316           1 :                                 data,
     317           1 :                                 blocks,
     318           1 :                                 concurrency,
     319           1 :                                 worker,
     320           1 :                                 from, to,
     321           1 :                                 split,
     322           1 :                         )
     323           1 :                         if err != nil {
     324           1 :                                 errCh <- err
     325           1 :                         }
     326             :                 }()
     327             :         }
     328           1 :         g.Wait()
     329           1 :         close(errCh)
     330           1 :         if err, ok := <-errCh; ok {
     331           1 :                 return err
     332           1 :         }
     333             : 
     334           1 :         for _, p := range w.propCollectors {
     335           1 :                 if err := p.(SuffixReplaceableTableCollector).UpdateKeySuffixes(r.Properties.UserProperties, from, to); err != nil {
     336           0 :                         return err
     337           0 :                 }
     338             :         }
     339             : 
     340           1 :         var decoder blockPropertiesDecoder
     341           1 :         var oldShortIDs []shortID
     342           1 :         var oldProps [][]byte
     343           1 :         if len(w.blockPropCollectors) > 0 {
     344           1 :                 oldProps = make([][]byte, len(w.blockPropCollectors))
     345           1 :                 oldShortIDs = make([]shortID, math.MaxUint8)
     346           1 :                 for i, p := range w.blockPropCollectors {
     347           1 :                         if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
     348           1 :                                 was, is := shortID(byte(prop[0])), shortID(i)
     349           1 :                                 oldShortIDs[was] = is
     350           1 :                         }
     351             :                 }
     352             :         }
     353             : 
     354           1 :         for i := range blocks {
     355           1 :                 // Write the rewritten block to the file.
     356           1 :                 if err := w.writable.Write(blocks[i].data); err != nil {
     357           0 :                         return err
     358           0 :                 }
     359             : 
     360           1 :                 n := len(blocks[i].data)
     361           1 :                 bh := BlockHandle{Offset: w.meta.Size, Length: uint64(n) - blockTrailerLen}
     362           1 :                 // Update the overall size.
     363           1 :                 w.meta.Size += uint64(n)
     364           1 : 
     365           1 :                 // Load any previous values for our prop collectors into oldProps.
     366           1 :                 for i := range oldProps {
     367           1 :                         oldProps[i] = nil
     368           1 :                 }
     369           1 :                 decoder.props = data[i].Props
     370           1 :                 for !decoder.done() {
     371           1 :                         id, val, err := decoder.next()
     372           1 :                         if err != nil {
     373           0 :                                 return err
     374           0 :                         }
     375           1 :                         oldProps[oldShortIDs[id]] = val
     376             :                 }
     377             : 
     378           1 :                 for i, p := range w.blockPropCollectors {
     379           1 :                         if err := p.(SuffixReplaceableBlockCollector).UpdateKeySuffixes(oldProps[i], from, to); err != nil {
     380           0 :                                 return err
     381           0 :                         }
     382             :                 }
     383             : 
     384           1 :                 bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
     385           1 :                 if err != nil {
     386           0 :                         return err
     387           0 :                 }
     388           1 :                 var nextKey InternalKey
     389           1 :                 if i+1 < len(blocks) {
     390           1 :                         nextKey = blocks[i+1].start
     391           1 :                 }
     392           1 :                 if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
     393           0 :                         return err
     394           0 :                 }
     395             :         }
     396             : 
     397           1 :         w.meta.updateSeqNum(blocks[0].start.SeqNum())
     398           1 :         w.props.NumEntries = r.Properties.NumEntries
     399           1 :         w.props.RawKeySize = r.Properties.RawKeySize
     400           1 :         w.props.RawValueSize = r.Properties.RawValueSize
     401           1 :         w.meta.SetSmallestPointKey(blocks[0].start)
     402           1 :         w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
     403           1 :         return nil
     404             : }
     405             : 
     406           1 : func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
     407           1 :         iter, err := r.NewRawRangeKeyIter()
     408           1 :         if err != nil {
     409           0 :                 return err
     410           0 :         }
     411           1 :         if iter == nil {
     412           1 :                 // No range keys.
     413           1 :                 return nil
     414           1 :         }
     415           1 :         defer iter.Close()
     416           1 : 
     417           1 :         for s := iter.First(); s != nil; s = iter.Next() {
     418           1 :                 if !s.Valid() {
     419           0 :                         break
     420             :                 }
     421           1 :                 for i := range s.Keys {
     422           1 :                         if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
     423           0 :                                 return errBadKind
     424           0 :                         }
     425           1 :                         if !bytes.Equal(s.Keys[i].Suffix, from) {
     426           0 :                                 return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
     427           0 :                         }
     428           1 :                         s.Keys[i].Suffix = to
     429             :                 }
     430             : 
     431           1 :                 err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
     432           1 :                         // Calling AddRangeKey instead of addRangeKeySpan bypasses the fragmenter.
     433           1 :                         // This is okay because the raw fragments off of `iter` are already
     434           1 :                         // fragmented, and suffix replacement should not affect fragmentation.
     435           1 :                         return w.AddRangeKey(k, v)
     436           1 :                 })
     437           1 :                 if err != nil {
     438           0 :                         return err
     439           0 :                 }
     440             :         }
     441             : 
     442           1 :         return nil
     443             : }
     444             : 
     445             : type copyFilterWriter struct {
     446             :         origMetaName   string
     447             :         origPolicyName string
     448             :         data           []byte
     449             : }
     450             : 
     451           0 : func (copyFilterWriter) addKey(key []byte)         { panic("unimplemented") }
     452           1 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
     453           1 : func (c copyFilterWriter) metaName() string        { return c.origMetaName }
     454           1 : func (c copyFilterWriter) policyName() string      { return c.origPolicyName }
     455             : 
     456             : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
     457             : // single loop over the Reader that writes each key to the Writer with the new
     458             : // suffix. The is significantly slower than the parallelized rewriter, and does
     459             : // more work to rederive filters, props, etc.
     460             : //
     461             : // Any obsolete bits that key-value pairs may be annotated with are ignored
     462             : // and lost during the rewrite. Some of the obsolete bits may be recreated --
     463             : // specifically when there are multiple keys with the same user key.
     464             : // Additionally, the output sstable has the pebble.obsolete.is_strict property
     465             : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
     466             : func RewriteKeySuffixesViaWriter(
     467             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
     468           1 : ) (*WriterMetadata, error) {
     469           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
     470           0 :                 return nil, errors.New("a valid splitter is required to rewrite suffixes")
     471           0 :         }
     472             : 
     473           1 :         o.IsStrictObsolete = false
     474           1 :         w := NewWriter(out, o)
     475           1 :         defer func() {
     476           1 :                 if w != nil {
     477           0 :                         w.Close()
     478           0 :                 }
     479             :         }()
     480           1 :         i, err := r.NewIter(nil, nil)
     481           1 :         if err != nil {
     482           0 :                 return nil, err
     483           0 :         }
     484           1 :         defer i.Close()
     485           1 : 
     486           1 :         k, v := i.First()
     487           1 :         var scratch InternalKey
     488           1 :         for k != nil {
     489           1 :                 if k.Kind() != InternalKeyKindSet {
     490           0 :                         return nil, errors.New("invalid key type")
     491           0 :                 }
     492           1 :                 oldSuffix := k.UserKey[r.Split(k.UserKey):]
     493           1 :                 if !bytes.Equal(oldSuffix, from) {
     494           0 :                         return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     495           0 :                 }
     496           1 :                 scratch.UserKey = append(scratch.UserKey[:0], k.UserKey[:len(k.UserKey)-len(from)]...)
     497           1 :                 scratch.UserKey = append(scratch.UserKey, to...)
     498           1 :                 scratch.Trailer = k.Trailer
     499           1 : 
     500           1 :                 val, _, err := v.Value(nil)
     501           1 :                 if err != nil {
     502           0 :                         return nil, err
     503           0 :                 }
     504           1 :                 if w.addPoint(scratch, val, false); err != nil {
     505           0 :                         return nil, err
     506           0 :                 }
     507           1 :                 k, v = i.Next()
     508             :         }
     509           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     510           0 :                 return nil, err
     511           0 :         }
     512           1 :         if err := w.Close(); err != nil {
     513           0 :                 w = nil
     514           0 :                 return nil, err
     515           0 :         }
     516           1 :         writerMeta, err := w.Metadata()
     517           1 :         w = nil
     518           1 :         return writerMeta, err
     519             : }
     520             : 
     521             : // NewMemReader opens a reader over the SST stored in the passed []byte.
     522           1 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
     523           1 :         return NewReader(newMemReader(sst), o)
     524           1 : }
     525             : 
     526           1 : func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) {
     527           1 :         raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen]
     528           1 :         if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
     529           0 :                 return nil, buf, err
     530           0 :         }
     531           1 :         typ := blockType(raw[bh.Length])
     532           1 :         raw = raw[:bh.Length]
     533           1 :         if typ == noCompressionBlockType {
     534           1 :                 return raw, buf, nil
     535           1 :         }
     536           1 :         decompressedLen, prefix, err := decompressedLen(typ, raw)
     537           1 :         if err != nil {
     538           0 :                 return nil, buf, err
     539           0 :         }
     540           1 :         if cap(buf) < decompressedLen {
     541           1 :                 buf = make([]byte, decompressedLen)
     542           1 :         }
     543           1 :         res, err := decompressInto(typ, raw[prefix:], buf[:decompressedLen])
     544           1 :         return res, buf, err
     545             : }
     546             : 
     547             : // memReader is a thin wrapper around a []byte such that it can be passed to
     548             : // sstable.Reader. It supports concurrent use, and does so without locking in
     549             : // contrast to the heavier read/write vfs.MemFile.
     550             : type memReader struct {
     551             :         b  []byte
     552             :         r  *bytes.Reader
     553             :         rh objstorage.NoopReadHandle
     554             : }
     555             : 
     556             : var _ objstorage.Readable = (*memReader)(nil)
     557             : 
     558           1 : func newMemReader(b []byte) *memReader {
     559           1 :         r := &memReader{
     560           1 :                 b: b,
     561           1 :                 r: bytes.NewReader(b),
     562           1 :         }
     563           1 :         r.rh = objstorage.MakeNoopReadHandle(r)
     564           1 :         return r
     565           1 : }
     566             : 
     567             : // ReadAt is part of objstorage.Readable.
     568           1 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
     569           1 :         n, err := m.r.ReadAt(p, off)
     570           1 :         if invariants.Enabled && err == nil && n != len(p) {
     571           0 :                 panic("short read")
     572             :         }
     573           1 :         return err
     574             : }
     575             : 
     576             : // Close is part of objstorage.Readable.
     577           1 : func (*memReader) Close() error {
     578           1 :         return nil
     579           1 : }
     580             : 
     581             : // Stat is part of objstorage.Readable.
     582           1 : func (m *memReader) Size() int64 {
     583           1 :         return int64(len(m.b))
     584           1 : }
     585             : 
     586             : // NewReadHandle is part of objstorage.Readable.
     587           1 : func (m *memReader) NewReadHandle(_ context.Context) objstorage.ReadHandle {
     588           1 :         return &m.rh
     589           1 : }

Generated by: LCOV version 1.14