LCOV - code coverage report
Current view: top level - pebble/sstable - suffix_rewriter.go (source / functions) Hit Total Coverage
Test: 2024-10-06 08:16Z 649e50ad - tests + meta.lcov Lines: 178 231 77.1 %
Date: 2024-10-06 08:17:18 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package sstable
       6             : 
       7             : import (
       8             :         "bytes"
       9             :         "cmp"
      10             :         "context"
      11             :         "math"
      12             :         "slices"
      13             :         "sync"
      14             : 
      15             :         "github.com/cockroachdb/errors"
      16             :         "github.com/cockroachdb/pebble/internal/base"
      17             :         "github.com/cockroachdb/pebble/internal/bytealloc"
      18             :         "github.com/cockroachdb/pebble/internal/invariants"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/sstable/block"
      21             : )
      22             : 
      23             : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
      24             : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
      25             : // replaced with `to` in every key. The input sstable must consist of only
      26             : // Sets or RangeKeySets and every key must have `from` as its suffix as
      27             : // determined by the Split function of the Comparer in the passed
      28             : // WriterOptions. Range deletes must not exist in this sstable, as they will
      29             : // be ignored.
      30             : //
      31             : // Data blocks are rewritten in parallel by `concurrency` workers and then
      32             : // assembled into a final SST. Filters are copied from the original SST without
      33             : // modification as they are not affected by the suffix, while block and table
      34             : // properties are only minimally recomputed.
      35             : //
      36             : // TODO(sumeer): document limitations, if any, due to this limited
      37             : // re-computation of properties (is there any loss of fidelity?).
      38             : //
      39             : // Any block property collectors configured in the WriterOptions must implement
      40             : // AddCollectedWithSuffixChange.
      41             : //
      42             : // The WriterOptions.TableFormat is ignored, and the output sstable has the
      43             : // same TableFormat as the input, which is returned in case the caller wants
      44             : // to do some error checking. Suffix rewriting is meant to be efficient, and
      45             : // allowing changes in the TableFormat detracts from that efficiency.
      46             : //
      47             : // Any obsolete bits that key-value pairs may be annotated with are ignored
      48             : // and lost during the rewrite. Additionally, the output sstable has the
      49             : // pebble.obsolete.is_strict property set to false. These limitations could be
      50             : // removed if needed. The current use case for
      51             : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
      52             : // ingestion, where these files do not contain RANGEDELs and have one
      53             : // key-value pair per userkey -- so they trivially satisfy the strict
      54             : // criteria, and we don't need the obsolete bit as a performance optimization.
      55             : // For disaggregated storage, strict obsolete sstables are needed for L5 and
      56             : // L6, but at the time of writing, we expect such MVCC-compliant file
      57             : // ingestion to only ingest into levels L4 and higher. If this changes, we can
      58             : // do one of two things to get rid of this limitation:
      59             : //   - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
      60             : //     in the sstable to be rewritten. Validating no duplicate userkeys is
      61             : //     non-trivial when rewriting blocks in parallel, so we could encode the
      62             : //     pre-existing condition in the (existing) SnapshotPinnedKeys property --
      63             : //     we need to update the external sst writer to calculate and encode this
      64             : //     property.
      65             : //   - Preserve the obsolete bit (with changes to the blockIter).
      66             : func RewriteKeySuffixesAndReturnFormat(
      67             :         sst []byte,
      68             :         rOpts ReaderOptions,
      69             :         out objstorage.Writable,
      70             :         o WriterOptions,
      71             :         from, to []byte,
      72             :         concurrency int,
      73           0 : ) (*WriterMetadata, TableFormat, error) {
      74           0 :         r, err := NewMemReader(sst, rOpts)
      75           0 :         if err != nil {
      76           0 :                 return nil, TableFormatUnspecified, err
      77           0 :         }
      78           0 :         defer r.Close()
      79           0 :         return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
      80             : }
      81             : 
      82             : func rewriteKeySuffixesInBlocks(
      83             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
      84           1 : ) (*WriterMetadata, TableFormat, error) {
      85           1 :         o = o.ensureDefaults()
      86           1 :         switch {
      87           0 :         case concurrency < 1:
      88           0 :                 return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
      89           0 :         case r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0:
      90           0 :                 return nil, TableFormatUnspecified,
      91           0 :                         errors.New("sstable with a single suffix should not have value blocks")
      92           1 :         case r.Properties.ComparerName != o.Comparer.Name:
      93           1 :                 return nil, TableFormatUnspecified, errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters",
      94           1 :                         r.Properties.ComparerName, o.Comparer.Name)
      95           0 :         case o.FilterPolicy != nil && r.Properties.FilterPolicyName != o.FilterPolicy.Name():
      96           0 :                 return nil, TableFormatUnspecified, errors.New("mismatched filters")
      97             :         }
      98             : 
      99           1 :         o.TableFormat = r.tableFormat
     100           1 :         w := NewRawWriter(out, o)
     101           1 :         defer func() {
     102           1 :                 if w != nil {
     103           1 :                         w.Close()
     104           1 :                 }
     105             :         }()
     106             : 
     107           1 :         if err := w.rewriteSuffixes(r, o, from, to, concurrency); err != nil {
     108           1 :                 return nil, TableFormatUnspecified, err
     109           1 :         }
     110             : 
     111           1 :         if err := w.Close(); err != nil {
     112           0 :                 w = nil
     113           0 :                 return nil, TableFormatUnspecified, err
     114           0 :         }
     115           1 :         writerMeta, err := w.Metadata()
     116           1 :         w = nil
     117           1 :         return writerMeta, r.tableFormat, err
     118             : }
     119             : 
     120             : var errBadKind = errors.New("key does not have expected kind (set)")
     121             : 
     122             : type blockWithSpan struct {
     123             :         start, end base.InternalKey
     124             :         physical   block.PhysicalBlock
     125             : }
     126             : 
     127             : type blockRewriter interface {
     128             :         RewriteSuffixes(
     129             :                 input []byte, from []byte, to []byte,
     130             :         ) (start, end base.InternalKey, rewritten []byte, err error)
     131             : }
     132             : 
     133             : func rewriteDataBlocksInParallel(
     134             :         r *Reader,
     135             :         opts WriterOptions,
     136             :         input []block.HandleWithProperties,
     137             :         from, to []byte,
     138             :         concurrency int,
     139             :         newDataBlockRewriter func() blockRewriter,
     140           1 : ) ([]blockWithSpan, error) {
     141           1 :         if r.Properties.NumEntries == 0 {
     142           1 :                 // No point keys.
     143           1 :                 return nil, nil
     144           1 :         }
     145           1 :         output := make([]blockWithSpan, len(input))
     146           1 : 
     147           1 :         g := &sync.WaitGroup{}
     148           1 :         g.Add(concurrency)
     149           1 :         type workerErr struct {
     150           1 :                 worker int
     151           1 :                 err    error
     152           1 :         }
     153           1 :         errCh := make(chan workerErr, concurrency)
     154           1 :         for j := 0; j < concurrency; j++ {
     155           1 :                 worker := j
     156           1 :                 go func() {
     157           1 :                         defer g.Done()
     158           1 :                         rw := newDataBlockRewriter()
     159           1 :                         var blockAlloc bytealloc.A
     160           1 :                         var compressedBuf []byte
     161           1 :                         var inputBlock, inputBlockBuf []byte
     162           1 :                         checksummer := block.Checksummer{Type: opts.Checksum}
     163           1 :                         // We'll assume all blocks are _roughly_ equal so round-robin static partition
     164           1 :                         // of each worker doing every ith block is probably enough.
     165           1 :                         err := func() error {
     166           1 :                                 for i := worker; i < len(input); i += concurrency {
     167           1 :                                         bh := input[i]
     168           1 :                                         var err error
     169           1 :                                         inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.Handle, inputBlockBuf)
     170           1 :                                         if err != nil {
     171           0 :                                                 return err
     172           0 :                                         }
     173           1 :                                         var outputBlock []byte
     174           1 :                                         output[i].start, output[i].end, outputBlock, err =
     175           1 :                                                 rw.RewriteSuffixes(inputBlock, from, to)
     176           1 :                                         if err != nil {
     177           1 :                                                 return err
     178           1 :                                         }
     179           1 :                                         compressedBuf = compressedBuf[:cap(compressedBuf)]
     180           1 :                                         finished := block.CompressAndChecksum(&compressedBuf, outputBlock, opts.Compression, &checksummer)
     181           1 :                                         output[i].physical = finished.CloneWithByteAlloc(&blockAlloc)
     182             :                                 }
     183           1 :                                 return nil
     184             :                         }()
     185           1 :                         if err != nil {
     186           1 :                                 errCh <- workerErr{worker: worker, err: err}
     187           1 :                         }
     188             :                 }()
     189             :         }
     190           1 :         g.Wait()
     191           1 :         close(errCh)
     192           1 :         if werr, ok := <-errCh; ok {
     193           1 :                 // Collect errors from all workers and sort them by worker for determinism.
     194           1 :                 werrs := []workerErr{werr}
     195           1 :                 for werr := range errCh {
     196           1 :                         werrs = append(werrs, werr)
     197           1 :                 }
     198           1 :                 slices.SortFunc(werrs, func(a, b workerErr) int { return cmp.Compare(a.worker, b.worker) })
     199           1 :                 return nil, werrs[0].err
     200             :         }
     201           1 :         return output, nil
     202             : }
     203             : 
     204           1 : func rewriteRangeKeyBlockToWriter(r *Reader, w RawWriter, from, to []byte) error {
     205           1 :         iter, err := r.NewRawRangeKeyIter(context.TODO(), NoFragmentTransforms)
     206           1 :         if err != nil {
     207           0 :                 return err
     208           0 :         }
     209           1 :         if iter == nil {
     210           1 :                 // No range keys.
     211           1 :                 return nil
     212           1 :         }
     213           1 :         defer iter.Close()
     214           1 : 
     215           1 :         s, err := iter.First()
     216           1 :         for ; s != nil; s, err = iter.Next() {
     217           1 :                 if !s.Valid() {
     218           0 :                         break
     219             :                 }
     220           1 :                 for i := range s.Keys {
     221           1 :                         if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
     222           0 :                                 return errBadKind
     223           0 :                         }
     224           1 :                         if !bytes.Equal(s.Keys[i].Suffix, from) {
     225           0 :                                 return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
     226           0 :                         }
     227           1 :                         s.Keys[i].Suffix = to
     228             :                 }
     229             : 
     230           1 :                 if err := w.EncodeSpan(*s); err != nil {
     231           0 :                         return err
     232           0 :                 }
     233             :         }
     234           1 :         return err
     235             : }
     236             : 
     237             : // getShortIDs returns a slice keyed by the shortIDs of the block property
     238             : // collector in r, with the values containing a new shortID corresponding to the
     239             : // index of the corresponding block property collector in collectors.
     240             : //
     241             : // getShortIDs errors if any of the collectors are not found in the sstable.
     242           1 : func getShortIDs(r *Reader, collectors []BlockPropertyCollector) ([]shortID, error) {
     243           1 :         if len(collectors) == 0 {
     244           1 :                 return nil, nil
     245           1 :         }
     246           1 :         shortIDs := make([]shortID, math.MaxUint8)
     247           1 :         for i := range shortIDs {
     248           1 :                 shortIDs[i] = invalidShortID
     249           1 :         }
     250           1 :         for i, p := range collectors {
     251           1 :                 prop, ok := r.Properties.UserProperties[p.Name()]
     252           1 :                 if !ok {
     253           0 :                         return nil, errors.Errorf("sstable does not contain property %s", p.Name())
     254           0 :                 }
     255           1 :                 shortIDs[shortID(prop[0])] = shortID(i)
     256             :         }
     257           1 :         return shortIDs, nil
     258             : }
     259             : 
     260             : type copyFilterWriter struct {
     261             :         origMetaName   string
     262             :         origPolicyName string
     263             :         data           []byte
     264             : }
     265             : 
     266           0 : func (copyFilterWriter) addKey(key []byte)         { panic("unimplemented") }
     267           2 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
     268           2 : func (c copyFilterWriter) metaName() string        { return c.origMetaName }
     269           2 : func (c copyFilterWriter) policyName() string      { return c.origPolicyName }
     270             : 
     271             : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
     272             : // single loop over the Reader that writes each key to the Writer with the new
     273             : // suffix. The is significantly slower than the parallelized rewriter, and does
     274             : // more work to rederive filters, props, etc.
     275             : //
     276             : // Any obsolete bits that key-value pairs may be annotated with are ignored
     277             : // and lost during the rewrite. Some of the obsolete bits may be recreated --
     278             : // specifically when there are multiple keys with the same user key.
     279             : // Additionally, the output sstable has the pebble.obsolete.is_strict property
     280             : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
     281             : func RewriteKeySuffixesViaWriter(
     282             :         r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
     283           1 : ) (*WriterMetadata, error) {
     284           1 :         if o.Comparer == nil || o.Comparer.Split == nil {
     285           0 :                 return nil, errors.New("a valid splitter is required to rewrite suffixes")
     286           0 :         }
     287             : 
     288           1 :         o.IsStrictObsolete = false
     289           1 :         w := newRowWriter(out, o)
     290           1 :         defer func() {
     291           1 :                 if w != nil {
     292           0 :                         w.Close()
     293           0 :                 }
     294             :         }()
     295           1 :         i, err := r.NewIter(NoTransforms, nil, nil)
     296           1 :         if err != nil {
     297           0 :                 return nil, err
     298           0 :         }
     299           1 :         defer i.Close()
     300           1 : 
     301           1 :         kv := i.First()
     302           1 :         var scratch InternalKey
     303           1 :         for kv != nil {
     304           1 :                 if kv.Kind() != InternalKeyKindSet {
     305           0 :                         return nil, errors.New("invalid key type")
     306           0 :                 }
     307           1 :                 oldSuffix := kv.K.UserKey[r.Split(kv.K.UserKey):]
     308           1 :                 if !bytes.Equal(oldSuffix, from) {
     309           0 :                         return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
     310           0 :                 }
     311           1 :                 scratch.UserKey = append(scratch.UserKey[:0], kv.K.UserKey[:len(kv.K.UserKey)-len(from)]...)
     312           1 :                 scratch.UserKey = append(scratch.UserKey, to...)
     313           1 :                 scratch.Trailer = kv.K.Trailer
     314           1 : 
     315           1 :                 val, _, err := kv.Value(nil)
     316           1 :                 if err != nil {
     317           0 :                         return nil, err
     318           0 :                 }
     319           1 :                 w.addPoint(scratch, val, false)
     320           1 :                 kv = i.Next()
     321             :         }
     322           1 :         if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
     323           0 :                 return nil, err
     324           0 :         }
     325           1 :         if err := w.Close(); err != nil {
     326           0 :                 w = nil
     327           0 :                 return nil, err
     328           0 :         }
     329           1 :         writerMeta, err := w.Metadata()
     330           1 :         w = nil
     331           1 :         return writerMeta, err
     332             : }
     333             : 
     334             : // NewMemReader opens a reader over the SST stored in the passed []byte.
     335           1 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
     336           1 :         // Since all operations are from memory, plumbing a context here is not useful.
     337           1 :         return NewReader(context.Background(), newMemReader(sst), o)
     338           1 : }
     339             : 
     340           1 : func readBlockBuf(r *Reader, bh block.Handle, buf []byte) ([]byte, []byte, error) {
     341           1 :         raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+block.TrailerLen]
     342           1 :         if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
     343           0 :                 return nil, buf, err
     344           0 :         }
     345           1 :         algo := block.CompressionIndicator(raw[bh.Length])
     346           1 :         raw = raw[:bh.Length]
     347           1 :         if algo == block.NoCompressionIndicator {
     348           1 :                 return raw, buf, nil
     349           1 :         }
     350           1 :         decompressedLen, prefix, err := block.DecompressedLen(algo, raw)
     351           1 :         if err != nil {
     352           0 :                 return nil, buf, err
     353           0 :         }
     354           1 :         if cap(buf) < decompressedLen {
     355           1 :                 buf = make([]byte, decompressedLen)
     356           1 :         }
     357           1 :         dst := buf[:decompressedLen]
     358           1 :         err = block.DecompressInto(algo, raw[prefix:], dst)
     359           1 :         return dst, buf, err
     360             : }
     361             : 
     362             : // memReader is a thin wrapper around a []byte such that it can be passed to
     363             : // sstable.Reader. It supports concurrent use, and does so without locking in
     364             : // contrast to the heavier read/write vfs.MemFile.
     365             : type memReader struct {
     366             :         b  []byte
     367             :         r  *bytes.Reader
     368             :         rh objstorage.NoopReadHandle
     369             : }
     370             : 
     371             : var _ objstorage.Readable = (*memReader)(nil)
     372             : 
     373           1 : func newMemReader(b []byte) *memReader {
     374           1 :         r := &memReader{
     375           1 :                 b: b,
     376           1 :                 r: bytes.NewReader(b),
     377           1 :         }
     378           1 :         r.rh = objstorage.MakeNoopReadHandle(r)
     379           1 :         return r
     380           1 : }
     381             : 
     382             : // ReadAt is part of objstorage.Readable.
     383           1 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
     384           1 :         n, err := m.r.ReadAt(p, off)
     385           1 :         if invariants.Enabled && err == nil && n != len(p) {
     386           0 :                 panic("short read")
     387             :         }
     388           1 :         return err
     389             : }
     390             : 
     391             : // Close is part of objstorage.Readable.
     392           1 : func (*memReader) Close() error {
     393           1 :         return nil
     394           1 : }
     395             : 
     396             : // Stat is part of objstorage.Readable.
     397           1 : func (m *memReader) Size() int64 {
     398           1 :         return int64(len(m.b))
     399           1 : }
     400             : 
     401             : // NewReadHandle is part of objstorage.Readable.
     402           1 : func (m *memReader) NewReadHandle(readBeforeSize objstorage.ReadBeforeSize) objstorage.ReadHandle {
     403           1 :         return &m.rh
     404           1 : }

Generated by: LCOV version 1.14