LCOV - code coverage report
Current view: top level - pebble/metamorphic - build.go (source / functions) Hit Total Coverage
Test: 2024-04-10 08:16Z 6cdb88d4 - tests only.lcov Lines: 95 216 44.0 %
Date: 2024-04-10 08:16:32 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package metamorphic
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "slices"
      11             : 
      12             :         "github.com/cockroachdb/pebble"
      13             :         "github.com/cockroachdb/pebble/internal/base"
      14             :         "github.com/cockroachdb/pebble/internal/keyspan"
      15             :         "github.com/cockroachdb/pebble/internal/private"
      16             :         "github.com/cockroachdb/pebble/internal/rangekey"
      17             :         "github.com/cockroachdb/pebble/objstorage"
      18             :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      19             :         "github.com/cockroachdb/pebble/sstable"
      20             :         "github.com/cockroachdb/pebble/vfs"
      21             : )
      22             : 
      23             : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
      24             : // as an external file. Returns the sstable metadata.
      25             : //
      26             : // Closes the iterators in all cases.
      27             : func writeSSTForIngestion(
      28             :         t *Test,
      29             :         pointIter base.InternalIterator,
      30             :         rangeDelIter keyspan.FragmentIterator,
      31             :         rangeKeyIter keyspan.FragmentIterator,
      32             :         uniquePrefixes bool,
      33             :         syntheticSuffix sstable.SyntheticSuffix,
      34             :         syntheticPrefix sstable.SyntheticPrefix,
      35             :         writable objstorage.Writable,
      36             :         targetFMV pebble.FormatMajorVersion,
      37           1 : ) (*sstable.WriterMetadata, error) {
      38           1 :         writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
      39           1 :         if t.testOpts.disableValueBlocksForIngestSSTables {
      40           1 :                 writerOpts.DisableValueBlocks = true
      41           1 :         }
      42           1 :         w := sstable.NewWriter(writable, writerOpts)
      43           1 :         pointIterCloser := base.CloseHelper(pointIter)
      44           1 :         defer pointIterCloser.Close()
      45           1 :         rangeDelIterCloser := base.CloseHelper(rangeDelIter)
      46           1 :         defer rangeDelIterCloser.Close()
      47           1 :         rangeKeyIterCloser := base.CloseHelper(rangeKeyIter)
      48           1 :         defer rangeKeyIterCloser.Close()
      49           1 : 
      50           1 :         outputKey := func(key []byte) []byte {
      51           1 :                 if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
      52           1 :                         return slices.Clone(key)
      53           1 :                 }
      54           0 :                 if syntheticPrefix.IsSet() {
      55           0 :                         key = syntheticPrefix.Apply(key)
      56           0 :                 }
      57           0 :                 if syntheticSuffix.IsSet() {
      58           0 :                         n := t.opts.Comparer.Split(key)
      59           0 :                         key = append(key[:n:n], syntheticSuffix...)
      60           0 :                 }
      61           0 :                 return key
      62             :         }
      63             : 
      64           1 :         var lastUserKey []byte
      65           1 :         for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
      66           1 :                 // Ignore duplicate keys.
      67           1 :                 if lastUserKey != nil {
      68           1 :                         last := lastUserKey
      69           1 :                         this := key.UserKey
      70           1 :                         if uniquePrefixes {
      71           0 :                                 last = last[:t.opts.Comparer.Split(last)]
      72           0 :                                 this = this[:t.opts.Comparer.Split(this)]
      73           0 :                         }
      74           1 :                         if t.opts.Comparer.Equal(last, this) {
      75           1 :                                 continue
      76             :                         }
      77             :                 }
      78           1 :                 lastUserKey = append(lastUserKey[:0], key.UserKey...)
      79           1 : 
      80           1 :                 key.SetSeqNum(base.SeqNumZero)
      81           1 :                 // It's possible that we wrote the key on a batch from a db that supported
      82           1 :                 // DeleteSized, but will be ingesting into a db that does not. Detect this
      83           1 :                 // case and translate the key to an InternalKeyKindDelete.
      84           1 :                 if targetFMV < pebble.FormatDeleteSizedAndObsolete && key.Kind() == pebble.InternalKeyKindDeleteSized {
      85           0 :                         value = pebble.LazyValue{}
      86           0 :                         key.SetKind(pebble.InternalKeyKindDelete)
      87           0 :                 }
      88           1 :                 valBytes, _, err := value.Value(nil)
      89           1 :                 if err != nil {
      90           0 :                         return nil, err
      91           0 :                 }
      92           1 :                 k := *key
      93           1 :                 k.UserKey = outputKey(k.UserKey)
      94           1 :                 if err := w.Add(k, valBytes); err != nil {
      95           0 :                         return nil, err
      96           0 :                 }
      97             :         }
      98           1 :         if err := pointIterCloser.Close(); err != nil {
      99           0 :                 return nil, err
     100           0 :         }
     101             : 
     102           1 :         if rangeDelIter != nil {
     103           1 :                 span, err := rangeDelIter.First()
     104           1 :                 for ; span != nil; span, err = rangeDelIter.Next() {
     105           1 :                         if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
     106           0 :                                 return nil, err
     107           0 :                         }
     108             :                 }
     109           1 :                 if err != nil {
     110           0 :                         return nil, err
     111           0 :                 }
     112           1 :                 if err := rangeDelIterCloser.Close(); err != nil {
     113           0 :                         return nil, err
     114           0 :                 }
     115             :         }
     116             : 
     117           1 :         if rangeKeyIter != nil {
     118           1 :                 span, err := rangeKeyIter.First()
     119           1 :                 for ; span != nil; span, err = rangeKeyIter.Next() {
     120           1 :                         // Coalesce the keys of this span and then zero the sequence
     121           1 :                         // numbers. This is necessary in order to make the range keys within
     122           1 :                         // the ingested sstable internally consistent at the sequence number
     123           1 :                         // it's ingested at. The individual keys within a batch are
     124           1 :                         // committed at unique sequence numbers, whereas all the keys of an
     125           1 :                         // ingested sstable are given the same sequence number. A span
     126           1 :                         // containing keys that both set and unset the same suffix at the
     127           1 :                         // same sequence number is nonsensical, so we "coalesce" or collapse
     128           1 :                         // the keys.
     129           1 :                         collapsed := keyspan.Span{
     130           1 :                                 Start: outputKey(span.Start),
     131           1 :                                 End:   outputKey(span.End),
     132           1 :                                 Keys:  make([]keyspan.Key, 0, len(span.Keys)),
     133           1 :                         }
     134           1 :                         rangekey.Coalesce(
     135           1 :                                 t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
     136           1 :                         )
     137           1 :                         for i := range collapsed.Keys {
     138           1 :                                 collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
     139           1 :                         }
     140           1 :                         keyspan.SortKeysByTrailer(&collapsed.Keys)
     141           1 :                         if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
     142           0 :                                 return nil, err
     143           0 :                         }
     144             :                 }
     145           1 :                 if err != nil {
     146           0 :                         return nil, err
     147           0 :                 }
     148           1 :                 if err := rangeKeyIterCloser.Close(); err != nil {
     149           0 :                         return nil, err
     150           0 :                 }
     151             :         }
     152             : 
     153           1 :         if err := w.Close(); err != nil {
     154           0 :                 return nil, err
     155           0 :         }
     156           1 :         sstMeta, err := w.Metadata()
     157           1 :         if err != nil {
     158           0 :                 return nil, err
     159           0 :         }
     160           1 :         return sstMeta, nil
     161             : }
     162             : 
     163             : // buildForIngest builds a local SST file containing the keys in the given batch
     164             : // and returns its path and metadata.
     165             : func buildForIngest(
     166             :         t *Test, dbID objID, b *pebble.Batch, i int,
     167           1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
     168           1 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     169           1 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     170           1 :         if err != nil {
     171           0 :                 return "", nil, err
     172           0 :         }
     173           1 :         db := t.getDB(dbID)
     174           1 : 
     175           1 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     176           1 : 
     177           1 :         writable := objstorageprovider.NewFileWritable(f)
     178           1 :         meta, err := writeSSTForIngestion(
     179           1 :                 t,
     180           1 :                 iter, rangeDelIter, rangeKeyIter,
     181           1 :                 false, /* uniquePrefixes */
     182           1 :                 nil,   /* syntheticSuffix */
     183           1 :                 nil,   /* syntheticPrefix */
     184           1 :                 writable,
     185           1 :                 db.FormatMajorVersion(),
     186           1 :         )
     187           1 :         return path, meta, err
     188             : }
     189             : 
     190             : // buildForIngest builds a local SST file containing the keys in the given
     191             : // external object (truncated to the given bounds) and returns its path and
     192             : // metadata.
     193             : func buildForIngestExternalEmulation(
     194             :         t *Test,
     195             :         dbID objID,
     196             :         externalObjID objID,
     197             :         bounds pebble.KeyRange,
     198             :         syntheticSuffix sstable.SyntheticSuffix,
     199             :         syntheticPrefix sstable.SyntheticPrefix,
     200             :         i int,
     201           0 : ) (path string, _ *sstable.WriterMetadata) {
     202           0 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     203           0 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     204           0 :         panicIfErr(err)
     205           0 : 
     206           0 :         reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
     207           0 :         defer reader.Close()
     208           0 : 
     209           0 :         writable := objstorageprovider.NewFileWritable(f)
     210           0 :         // The underlying file should already have unique prefixes. Plus we are
     211           0 :         // emulating the external ingestion path which won't remove duplicate prefixes
     212           0 :         // if they exist.
     213           0 :         const uniquePrefixes = false
     214           0 :         meta, err := writeSSTForIngestion(
     215           0 :                 t,
     216           0 :                 pointIter, rangeDelIter, rangeKeyIter,
     217           0 :                 uniquePrefixes,
     218           0 :                 syntheticSuffix,
     219           0 :                 syntheticPrefix,
     220           0 :                 writable,
     221           0 :                 t.minFMV(),
     222           0 :         )
     223           0 :         if err != nil {
     224           0 :                 panic(err)
     225             :         }
     226           0 :         return path, meta
     227             : }
     228             : 
     229             : func openExternalObj(
     230             :         t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
     231             : ) (
     232             :         reader *sstable.Reader,
     233             :         pointIter base.InternalIterator,
     234             :         rangeDelIter keyspan.FragmentIterator,
     235             :         rangeKeyIter keyspan.FragmentIterator,
     236           0 : ) {
     237           0 :         objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
     238           0 :         panicIfErr(err)
     239           0 :         opts := sstable.ReaderOptions{
     240           0 :                 Comparer: t.opts.Comparer,
     241           0 :         }
     242           0 :         reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
     243           0 :         panicIfErr(err)
     244           0 : 
     245           0 :         start := bounds.Start
     246           0 :         end := bounds.End
     247           0 :         if syntheticPrefix.IsSet() {
     248           0 :                 start = syntheticPrefix.Invert(start)
     249           0 :                 end = syntheticPrefix.Invert(end)
     250           0 :         }
     251           0 :         pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
     252           0 :         panicIfErr(err)
     253           0 : 
     254           0 :         rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
     255           0 :         panicIfErr(err)
     256           0 :         if rangeDelIter != nil {
     257           0 :                 rangeDelIter = keyspan.Truncate(
     258           0 :                         t.opts.Comparer.Compare,
     259           0 :                         rangeDelIter,
     260           0 :                         base.UserKeyBoundsEndExclusive(start, end),
     261           0 :                 )
     262           0 :         }
     263             : 
     264           0 :         rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
     265           0 :         panicIfErr(err)
     266           0 :         if rangeKeyIter != nil {
     267           0 :                 rangeKeyIter = keyspan.Truncate(
     268           0 :                         t.opts.Comparer.Compare,
     269           0 :                         rangeKeyIter,
     270           0 :                         base.UserKeyBoundsEndExclusive(start, end),
     271           0 :                 )
     272           0 :         }
     273           0 :         return reader, pointIter, rangeDelIter, rangeKeyIter
     274             : }
     275             : 
     276             : // externalObjIsEmpty returns true if the given external object has no point or
     277             : // range keys withing the given bounds.
     278             : func externalObjIsEmpty(
     279             :         t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
     280           0 : ) bool {
     281           0 :         reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
     282           0 :         defer reader.Close()
     283           0 :         defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
     284           0 : 
     285           0 :         key, _ := pointIter.First()
     286           0 :         panicIfErr(pointIter.Error())
     287           0 :         if key != nil {
     288           0 :                 return false
     289           0 :         }
     290           0 :         for _, it := range []keyspan.FragmentIterator{rangeDelIter, rangeKeyIter} {
     291           0 :                 if it != nil {
     292           0 :                         span, err := it.First()
     293           0 :                         panicIfErr(err)
     294           0 :                         if span != nil {
     295           0 :                                 return false
     296           0 :                         }
     297             :                 }
     298             :         }
     299           0 :         return true
     300             : }
     301             : 
     302           0 : func panicIfErr(err error) {
     303           0 :         if err != nil {
     304           0 :                 panic(err)
     305             :         }
     306             : }

Generated by: LCOV version 1.14