LCOV - code coverage report
Current view: top level - pebble/metamorphic - build.go (source / functions) Hit Total Coverage
Test: 2024-08-11 08:16Z 791b3749 - tests + meta.lcov Lines: 172 212 81.1 %
Date: 2024-08-11 08:17:31 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "slices"
      11             : 
      12             :         "github.com/cockroachdb/pebble"
      13             :         "github.com/cockroachdb/pebble/internal/base"
      14             :         "github.com/cockroachdb/pebble/internal/keyspan"
      15             :         "github.com/cockroachdb/pebble/internal/private"
      16             :         "github.com/cockroachdb/pebble/internal/rangekey"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      19             :         "github.com/cockroachdb/pebble/sstable"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             : )
      22             : 
      23             : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
      24             : // as an external file. Returns the sstable metadata.
      25             : //
      26             : // Closes the iterators in all cases.
      27             : func writeSSTForIngestion(
      28             :         t *Test,
      29             :         pointIter base.InternalIterator,
      30             :         rangeDelIter keyspan.FragmentIterator,
      31             :         rangeKeyIter keyspan.FragmentIterator,
      32             :         uniquePrefixes bool,
      33             :         syntheticSuffix sstable.SyntheticSuffix,
      34             :         syntheticPrefix sstable.SyntheticPrefix,
      35             :         writable objstorage.Writable,
      36             :         targetFMV pebble.FormatMajorVersion,
      37           2 : ) (*sstable.WriterMetadata, error) {
      38           2 :         writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
      39           2 :         if t.testOpts.disableValueBlocksForIngestSSTables {
      40           2 :                 writerOpts.DisableValueBlocks = true
      41           2 :         }
      42           2 :         w := sstable.NewWriter(writable, writerOpts)
      43           2 :         pointIterCloser := base.CloseHelper(pointIter)
      44           2 :         defer func() {
      45           2 :                 _ = pointIterCloser.Close()
      46           2 :                 if rangeDelIter != nil {
      47           0 :                         rangeDelIter.Close()
      48           0 :                 }
      49           2 :                 if rangeKeyIter != nil {
      50           0 :                         rangeKeyIter.Close()
      51           0 :                 }
      52             :         }()
      53             : 
      54           2 :         outputKey := func(key []byte, syntheticSuffix sstable.SyntheticSuffix) []byte {
      55           2 :                 if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
      56           2 :                         return slices.Clone(key)
      57           2 :                 }
      58           2 :                 if syntheticPrefix.IsSet() {
      59           2 :                         key = syntheticPrefix.Apply(key)
      60           2 :                 }
      61           2 :                 if syntheticSuffix.IsSet() {
      62           2 :                         n := t.opts.Comparer.Split(key)
      63           2 :                         key = append(key[:n:n], syntheticSuffix...)
      64           2 :                 }
      65           2 :                 return key
      66             :         }
      67             : 
      68           2 :         var lastUserKey []byte
      69           2 :         for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
      70           2 :                 // Ignore duplicate keys.
      71           2 :                 if lastUserKey != nil {
      72           2 :                         last := lastUserKey
      73           2 :                         this := kv.K.UserKey
      74           2 :                         if uniquePrefixes {
      75           2 :                                 last = last[:t.opts.Comparer.Split(last)]
      76           2 :                                 this = this[:t.opts.Comparer.Split(this)]
      77           2 :                         }
      78           2 :                         if t.opts.Comparer.Equal(last, this) {
      79           2 :                                 continue
      80             :                         }
      81             :                 }
      82           2 :                 lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
      83           2 : 
      84           2 :                 k := *kv
      85           2 :                 k.K.SetSeqNum(base.SeqNumZero)
      86           2 :                 k.K.UserKey = outputKey(k.K.UserKey, syntheticSuffix)
      87           2 :                 value := kv.V
      88           2 :                 // It's possible that we wrote the key on a batch from a db that supported
      89           2 :                 // DeleteSized, but will be ingesting into a db that does not. Detect this
      90           2 :                 // case and translate the key to an InternalKeyKindDelete.
      91           2 :                 if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
      92           1 :                         value = pebble.LazyValue{}
      93           1 :                         k.K.SetKind(pebble.InternalKeyKindDelete)
      94           1 :                 }
      95           2 :                 valBytes, _, err := value.Value(nil)
      96           2 :                 if err != nil {
      97           0 :                         return nil, err
      98           0 :                 }
      99           2 :                 if err := w.Raw().Add(k.K, valBytes); err != nil {
     100           0 :                         return nil, err
     101           0 :                 }
     102             :         }
     103           2 :         if err := pointIterCloser.Close(); err != nil {
     104           0 :                 return nil, err
     105           0 :         }
     106             : 
     107           2 :         if rangeDelIter != nil {
     108           2 :                 span, err := rangeDelIter.First()
     109           2 :                 for ; span != nil; span, err = rangeDelIter.Next() {
     110           2 :                         if syntheticSuffix.IsSet() {
     111           0 :                                 panic("synthetic suffix with RangeDel")
     112             :                         }
     113           2 :                         if err := w.DeleteRange(outputKey(span.Start, nil), outputKey(span.End, nil)); err != nil {
     114           0 :                                 return nil, err
     115           0 :                         }
     116             :                 }
     117           2 :                 if err != nil {
     118           0 :                         return nil, err
     119           0 :                 }
     120           2 :                 rangeDelIter.Close()
     121           2 :                 rangeDelIter = nil
     122             :         }
     123             : 
     124           2 :         if rangeKeyIter != nil {
     125           2 :                 span, err := rangeKeyIter.First()
     126           2 :                 for ; span != nil; span, err = rangeKeyIter.Next() {
     127           2 :                         // Coalesce the keys of this span and then zero the sequence
     128           2 :                         // numbers. This is necessary in order to make the range keys within
     129           2 :                         // the ingested sstable internally consistent at the sequence number
     130           2 :                         // it's ingested at. The individual keys within a batch are
     131           2 :                         // committed at unique sequence numbers, whereas all the keys of an
     132           2 :                         // ingested sstable are given the same sequence number. A span
     133           2 :                         // containing keys that both set and unset the same suffix at the
     134           2 :                         // same sequence number is nonsensical, so we "coalesce" or collapse
     135           2 :                         // the keys.
     136           2 :                         collapsed := keyspan.Span{
     137           2 :                                 Start: outputKey(span.Start, nil),
     138           2 :                                 End:   outputKey(span.End, nil),
     139           2 :                                 Keys:  make([]keyspan.Key, 0, len(span.Keys)),
     140           2 :                         }
     141           2 :                         keys := span.Keys
     142           2 :                         if syntheticSuffix.IsSet() {
     143           0 :                                 keys = slices.Clone(span.Keys)
     144           0 :                                 for i := range keys {
     145           0 :                                         if keys[i].Kind() == base.InternalKeyKindRangeKeyUnset {
     146           0 :                                                 panic("RangeKeyUnset with synthetic suffix")
     147             :                                         }
     148           0 :                                         if len(keys[i].Suffix) > 0 {
     149           0 :                                                 keys[i].Suffix = syntheticSuffix
     150           0 :                                         }
     151             :                                 }
     152             :                         }
     153           2 :                         rangekey.Coalesce(t.opts.Comparer.CompareSuffixes, keys, &collapsed.Keys)
     154           2 :                         for i := range collapsed.Keys {
     155           2 :                                 collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
     156           2 :                         }
     157           2 :                         keyspan.SortKeysByTrailer(collapsed.Keys)
     158           2 :                         if err := w.Raw().EncodeSpan(collapsed); err != nil {
     159           0 :                                 return nil, err
     160           0 :                         }
     161             :                 }
     162           2 :                 if err != nil {
     163           0 :                         return nil, err
     164           0 :                 }
     165           2 :                 rangeKeyIter.Close()
     166           2 :                 rangeKeyIter = nil
     167             :         }
     168             : 
     169           2 :         if err := w.Close(); err != nil {
     170           0 :                 return nil, err
     171           0 :         }
     172           2 :         sstMeta, err := w.Raw().Metadata()
     173           2 :         if err != nil {
     174           0 :                 return nil, err
     175           0 :         }
     176           2 :         return sstMeta, nil
     177             : }
     178             : 
     179             : // buildForIngest builds a local SST file containing the keys in the given batch
     180             : // and returns its path and metadata.
     181             : func buildForIngest(
     182             :         t *Test, dbID objID, b *pebble.Batch, i int,
     183           2 : ) (path string, _ *sstable.WriterMetadata, _ error) {
     184           2 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     185           2 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     186           2 :         if err != nil {
     187           0 :                 return "", nil, err
     188           0 :         }
     189           2 :         db := t.getDB(dbID)
     190           2 : 
     191           2 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     192           2 : 
     193           2 :         writable := objstorageprovider.NewFileWritable(f)
     194           2 :         meta, err := writeSSTForIngestion(
     195           2 :                 t,
     196           2 :                 iter, rangeDelIter, rangeKeyIter,
     197           2 :                 false, /* uniquePrefixes */
     198           2 :                 nil,   /* syntheticSuffix */
     199           2 :                 nil,   /* syntheticPrefix */
     200           2 :                 writable,
     201           2 :                 db.FormatMajorVersion(),
     202           2 :         )
     203           2 :         return path, meta, err
     204             : }
     205             : 
     206             : // buildForIngest builds a local SST file containing the keys in the given
     207             : // external object (truncated to the given bounds) and returns its path and
     208             : // metadata.
     209             : func buildForIngestExternalEmulation(
     210             :         t *Test,
     211             :         dbID objID,
     212             :         externalObjID objID,
     213             :         bounds pebble.KeyRange,
     214             :         syntheticSuffix sstable.SyntheticSuffix,
     215             :         syntheticPrefix sstable.SyntheticPrefix,
     216             :         i int,
     217           2 : ) (path string, _ *sstable.WriterMetadata) {
     218           2 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     219           2 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     220           2 :         panicIfErr(err)
     221           2 : 
     222           2 :         reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
     223           2 :         defer reader.Close()
     224           2 : 
     225           2 :         writable := objstorageprovider.NewFileWritable(f)
     226           2 :         // The underlying file should already have unique prefixes. Plus we are
     227           2 :         // emulating the external ingestion path which won't remove duplicate prefixes
     228           2 :         // if they exist.
     229           2 :         const uniquePrefixes = false
     230           2 :         meta, err := writeSSTForIngestion(
     231           2 :                 t,
     232           2 :                 pointIter, rangeDelIter, rangeKeyIter,
     233           2 :                 uniquePrefixes,
     234           2 :                 syntheticSuffix,
     235           2 :                 syntheticPrefix,
     236           2 :                 writable,
     237           2 :                 t.minFMV(),
     238           2 :         )
     239           2 :         if err != nil {
     240           0 :                 panic(err)
     241             :         }
     242           2 :         return path, meta
     243             : }
     244             : 
     245             : func openExternalObj(
     246             :         t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
     247             : ) (
     248             :         reader *sstable.Reader,
     249             :         pointIter base.InternalIterator,
     250             :         rangeDelIter keyspan.FragmentIterator,
     251             :         rangeKeyIter keyspan.FragmentIterator,
     252           2 : ) {
     253           2 :         objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
     254           2 :         panicIfErr(err)
     255           2 :         opts := sstable.ReaderOptions{
     256           2 :                 Comparer: t.opts.Comparer,
     257           2 :         }
     258           2 :         reader, err = sstable.NewReader(
     259           2 :                 context.Background(),
     260           2 :                 objstorageprovider.NewRemoteReadable(objReader, objSize),
     261           2 :                 opts,
     262           2 :         )
     263           2 :         panicIfErr(err)
     264           2 : 
     265           2 :         start := bounds.Start
     266           2 :         end := bounds.End
     267           2 :         if syntheticPrefix.IsSet() {
     268           2 :                 start = syntheticPrefix.Invert(start)
     269           2 :                 end = syntheticPrefix.Invert(end)
     270           2 :         }
     271           2 :         pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
     272           2 :         panicIfErr(err)
     273           2 : 
     274           2 :         rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
     275           2 :         panicIfErr(err)
     276           2 :         if rangeDelIter != nil {
     277           1 :                 rangeDelIter = keyspan.Truncate(
     278           1 :                         t.opts.Comparer.Compare,
     279           1 :                         rangeDelIter,
     280           1 :                         base.UserKeyBoundsEndExclusive(start, end),
     281           1 :                 )
     282           1 :         }
     283             : 
     284           2 :         rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms)
     285           2 :         panicIfErr(err)
     286           2 :         if rangeKeyIter != nil {
     287           0 :                 rangeKeyIter = keyspan.Truncate(
     288           0 :                         t.opts.Comparer.Compare,
     289           0 :                         rangeKeyIter,
     290           0 :                         base.UserKeyBoundsEndExclusive(start, end),
     291           0 :                 )
     292           0 :         }
     293           2 :         return reader, pointIter, rangeDelIter, rangeKeyIter
     294             : }
     295             : 
     296           2 : func panicIfErr(err error) {
     297           2 :         if err != nil {
     298           0 :                 panic(err)
     299             :         }
     300             : }

Generated by: LCOV version 1.14