LCOV - code coverage report
Current view: top level - pebble/metamorphic/metamorphic - build.go (source / functions) Coverage Total Hit
Test: 2025-06-22 08:18Z 23da05ed - tests only.lcov Lines: 79.3 % 217 172
Test Date: 2025-06-22 08:19:38 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package metamorphic
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "slices"
      11              : 
      12              :         "github.com/cockroachdb/errors"
      13              :         "github.com/cockroachdb/pebble"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/keyspan"
      16              :         "github.com/cockroachdb/pebble/internal/private"
      17              :         "github.com/cockroachdb/pebble/internal/rangekey"
      18              :         "github.com/cockroachdb/pebble/objstorage"
      19              :         "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
      20              :         "github.com/cockroachdb/pebble/sstable"
      21              :         "github.com/cockroachdb/pebble/vfs"
      22              : )
      23              : 
      24              : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
      25              : // as an external file. Returns the sstable metadata.
      26              : //
      27              : // Closes the iterators in all cases.
      28              : func writeSSTForIngestion(
      29              :         t *Test,
      30              :         pointIter base.InternalIterator,
      31              :         rangeDelIter keyspan.FragmentIterator,
      32              :         rangeKeyIter keyspan.FragmentIterator,
      33              :         uniquePrefixes bool,
      34              :         syntheticSuffix sstable.SyntheticSuffix,
      35              :         syntheticPrefix sstable.SyntheticPrefix,
      36              :         writable objstorage.Writable,
      37              :         targetFMV pebble.FormatMajorVersion,
      38            1 : ) (*sstable.WriterMetadata, error) {
      39            1 :         writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
      40            1 :         if t.testOpts.disableValueBlocksForIngestSSTables {
      41            1 :                 writerOpts.DisableValueBlocks = true
      42            1 :         }
      43            1 :         w := sstable.NewWriter(writable, writerOpts)
      44            1 :         pointIterCloser := base.CloseHelper(pointIter)
      45            1 :         defer func() {
      46            1 :                 _ = pointIterCloser.Close()
      47            1 :                 if rangeDelIter != nil {
      48            0 :                         rangeDelIter.Close()
      49            0 :                 }
      50            1 :                 if rangeKeyIter != nil {
      51            0 :                         rangeKeyIter.Close()
      52            0 :                 }
      53              :         }()
      54              : 
      55            1 :         outputKey := func(key []byte, syntheticSuffix sstable.SyntheticSuffix) []byte {
      56            1 :                 if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
      57            1 :                         return slices.Clone(key)
      58            1 :                 }
      59            1 :                 if syntheticPrefix.IsSet() {
      60            1 :                         key = syntheticPrefix.Apply(key)
      61            1 :                 }
      62            1 :                 if syntheticSuffix.IsSet() {
      63            1 :                         n := t.opts.Comparer.Split(key)
      64            1 :                         key = append(key[:n:n], syntheticSuffix...)
      65            1 :                 }
      66            1 :                 return key
      67              :         }
      68              : 
      69            1 :         var lastUserKey []byte
      70            1 :         for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
      71            1 :                 // Ignore duplicate keys.
      72            1 :                 if lastUserKey != nil {
      73            1 :                         last := lastUserKey
      74            1 :                         this := kv.K.UserKey
      75            1 :                         if uniquePrefixes {
      76            1 :                                 last = last[:t.opts.Comparer.Split(last)]
      77            1 :                                 this = this[:t.opts.Comparer.Split(this)]
      78            1 :                         }
      79            1 :                         if t.opts.Comparer.Equal(last, this) {
      80            0 :                                 continue
      81              :                         }
      82              :                 }
      83            1 :                 lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
      84            1 : 
      85            1 :                 k := *kv
      86            1 :                 k.K.SetSeqNum(base.SeqNumZero)
      87            1 :                 k.K.UserKey = outputKey(k.K.UserKey, syntheticSuffix)
      88            1 :                 value := kv.LazyValue()
      89            1 :                 // It's possible that we wrote the key on a batch from a db that supported
      90            1 :                 // DeleteSized, but will be ingesting into a db that does not. Detect this
      91            1 :                 // case and translate the key to an InternalKeyKindDelete.
      92            1 :                 if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
      93            0 :                         value = pebble.LazyValue{}
      94            0 :                         k.K.SetKind(pebble.InternalKeyKindDelete)
      95            0 :                 }
      96            1 :                 valBytes, _, err := value.Value(nil)
      97            1 :                 if err != nil {
      98            0 :                         return nil, err
      99            0 :                 }
     100            1 :                 t.opts.Comparer.ValidateKey.MustValidate(k.K.UserKey)
     101            1 :                 if err := w.Raw().Add(k.K, valBytes, false); err != nil {
     102            0 :                         return nil, err
     103            0 :                 }
     104              :         }
     105            1 :         if err := pointIterCloser.Close(); err != nil {
     106            0 :                 return nil, err
     107            0 :         }
     108              : 
     109            1 :         if rangeDelIter != nil {
     110            1 :                 span, err := rangeDelIter.First()
     111            1 :                 for ; span != nil; span, err = rangeDelIter.Next() {
     112            1 :                         if syntheticSuffix.IsSet() {
     113            0 :                                 panic("synthetic suffix with RangeDel")
     114              :                         }
     115            1 :                         start := outputKey(span.Start, nil)
     116            1 :                         end := outputKey(span.End, nil)
     117            1 :                         t.opts.Comparer.ValidateKey.MustValidate(start)
     118            1 :                         t.opts.Comparer.ValidateKey.MustValidate(end)
     119            1 :                         if err := w.DeleteRange(start, end); err != nil {
     120            0 :                                 return nil, err
     121            0 :                         }
     122              :                 }
     123            1 :                 if err != nil {
     124            0 :                         return nil, err
     125            0 :                 }
     126            1 :                 rangeDelIter.Close()
     127            1 :                 rangeDelIter = nil
     128              :         }
     129              : 
     130            1 :         if rangeKeyIter != nil {
     131            1 :                 span, err := rangeKeyIter.First()
     132            1 :                 for ; span != nil; span, err = rangeKeyIter.Next() {
     133            1 :                         // Coalesce the keys of this span and then zero the sequence
     134            1 :                         // numbers. This is necessary in order to make the range keys within
     135            1 :                         // the ingested sstable internally consistent at the sequence number
     136            1 :                         // it's ingested at. The individual keys within a batch are
     137            1 :                         // committed at unique sequence numbers, whereas all the keys of an
     138            1 :                         // ingested sstable are given the same sequence number. A span
     139            1 :                         // containing keys that both set and unset the same suffix at the
     140            1 :                         // same sequence number is nonsensical, so we "coalesce" or collapse
     141            1 :                         // the keys.
     142            1 :                         collapsed := keyspan.Span{
     143            1 :                                 Start: outputKey(span.Start, nil),
     144            1 :                                 End:   outputKey(span.End, nil),
     145            1 :                                 Keys:  make([]keyspan.Key, 0, len(span.Keys)),
     146            1 :                         }
     147            1 :                         t.opts.Comparer.ValidateKey.MustValidate(collapsed.Start)
     148            1 :                         t.opts.Comparer.ValidateKey.MustValidate(collapsed.End)
     149            1 :                         keys := span.Keys
     150            1 :                         if syntheticSuffix.IsSet() {
     151            0 :                                 keys = slices.Clone(span.Keys)
     152            0 :                                 for i := range keys {
     153            0 :                                         if keys[i].Kind() == base.InternalKeyKindRangeKeyUnset {
     154            0 :                                                 panic("RangeKeyUnset with synthetic suffix")
     155              :                                         }
     156            0 :                                         if len(keys[i].Suffix) > 0 {
     157            0 :                                                 keys[i].Suffix = syntheticSuffix
     158            0 :                                         }
     159              :                                 }
     160              :                         }
     161            1 :                         rangekey.Coalesce(t.opts.Comparer.CompareRangeSuffixes, keys, &collapsed.Keys)
     162            1 :                         for i := range collapsed.Keys {
     163            1 :                                 collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
     164            1 :                         }
     165            1 :                         keyspan.SortKeysByTrailer(collapsed.Keys)
     166            1 :                         if err := w.Raw().EncodeSpan(collapsed); err != nil {
     167            0 :                                 return nil, err
     168            0 :                         }
     169              :                 }
     170            1 :                 if err != nil {
     171            0 :                         return nil, err
     172            0 :                 }
     173            1 :                 rangeKeyIter.Close()
     174            1 :                 rangeKeyIter = nil
     175              :         }
     176              : 
     177            1 :         if err := w.Close(); err != nil {
     178            0 :                 return nil, err
     179            0 :         }
     180            1 :         sstMeta, err := w.Raw().Metadata()
     181            1 :         if err != nil {
     182            0 :                 return nil, err
     183            0 :         }
     184            1 :         return sstMeta, nil
     185              : }
     186              : 
     187              : // buildForIngest builds a local SST file containing the keys in the given batch
     188              : // and returns its path and metadata.
     189              : func buildForIngest(
     190              :         t *Test, dbID objID, b *pebble.Batch, i int,
     191            1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
     192            1 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     193            1 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     194            1 :         if err != nil {
     195            0 :                 return "", nil, err
     196            0 :         }
     197            1 :         db := t.getDB(dbID)
     198            1 : 
     199            1 :         iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
     200            1 : 
     201            1 :         writable := objstorageprovider.NewFileWritable(f)
     202            1 :         meta, err := writeSSTForIngestion(
     203            1 :                 t,
     204            1 :                 iter, rangeDelIter, rangeKeyIter,
     205            1 :                 false, /* uniquePrefixes */
     206            1 :                 nil,   /* syntheticSuffix */
     207            1 :                 nil,   /* syntheticPrefix */
     208            1 :                 writable,
     209            1 :                 db.FormatMajorVersion(),
     210            1 :         )
     211            1 :         return path, meta, err
     212              : }
     213              : 
     214              : // buildForIngest builds a local SST file containing the keys in the given
     215              : // external object (truncated to the given bounds) and returns its path and
     216              : // metadata.
     217              : func buildForIngestExternalEmulation(
     218              :         t *Test,
     219              :         dbID objID,
     220              :         externalObjID objID,
     221              :         bounds pebble.KeyRange,
     222              :         syntheticSuffix sstable.SyntheticSuffix,
     223              :         syntheticPrefix sstable.SyntheticPrefix,
     224              :         i int,
     225            1 : ) (path string, _ *sstable.WriterMetadata) {
     226            1 :         path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
     227            1 :         f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
     228            1 :         panicIfErr(err)
     229            1 : 
     230            1 :         reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
     231            1 :         defer func() { _ = reader.Close() }()
     232              : 
     233            1 :         writable := objstorageprovider.NewFileWritable(f)
     234            1 :         // The underlying file should already have unique prefixes. Plus we are
     235            1 :         // emulating the external ingestion path which won't remove duplicate prefixes
     236            1 :         // if they exist.
     237            1 :         const uniquePrefixes = false
     238            1 :         meta, err := writeSSTForIngestion(
     239            1 :                 t,
     240            1 :                 pointIter, rangeDelIter, rangeKeyIter,
     241            1 :                 uniquePrefixes,
     242            1 :                 syntheticSuffix,
     243            1 :                 syntheticPrefix,
     244            1 :                 writable,
     245            1 :                 t.minFMV(),
     246            1 :         )
     247            1 :         if err != nil {
     248            0 :                 panic(err)
     249              :         }
     250            1 :         return path, meta
     251              : }
     252              : 
     253              : func openExternalObj(
     254              :         t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
     255              : ) (
     256              :         reader *sstable.Reader,
     257              :         pointIter base.InternalIterator,
     258              :         rangeDelIter keyspan.FragmentIterator,
     259              :         rangeKeyIter keyspan.FragmentIterator,
     260            1 : ) {
     261            1 :         objMeta := t.getExternalObj(externalObjID)
     262            1 :         objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), objMeta.objName)
     263            1 :         panicIfErr(err)
     264            1 :         opts := t.opts.MakeReaderOptions()
     265            1 :         reader, err = sstable.NewReader(
     266            1 :                 context.Background(),
     267            1 :                 objstorageprovider.NewRemoteReadable(objReader, objSize),
     268            1 :                 opts,
     269            1 :         )
     270            1 :         if err != nil {
     271            0 :                 panic(errors.CombineErrors(err, objReader.Close()))
     272              :         }
     273              : 
     274            1 :         start := bounds.Start
     275            1 :         end := bounds.End
     276            1 :         if syntheticPrefix.IsSet() {
     277            1 :                 start = syntheticPrefix.Invert(start)
     278            1 :                 end = syntheticPrefix.Invert(end)
     279            1 :         }
     280            1 :         pointIter, err = reader.NewIter(sstable.NoTransforms, start, end, sstable.AssertNoBlobHandles)
     281            1 :         panicIfErr(err)
     282            1 : 
     283            1 :         rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, sstable.NoReadEnv)
     284            1 :         panicIfErr(err)
     285            1 :         if rangeDelIter != nil {
     286            1 :                 rangeDelIter = keyspan.Truncate(
     287            1 :                         t.opts.Comparer.Compare,
     288            1 :                         rangeDelIter,
     289            1 :                         base.UserKeyBoundsEndExclusive(start, end),
     290            1 :                 )
     291            1 :         }
     292              : 
     293            1 :         rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms, sstable.NoReadEnv)
     294            1 :         panicIfErr(err)
     295            1 :         if rangeKeyIter != nil {
     296            0 :                 rangeKeyIter = keyspan.Truncate(
     297            0 :                         t.opts.Comparer.Compare,
     298            0 :                         rangeKeyIter,
     299            0 :                         base.UserKeyBoundsEndExclusive(start, end),
     300            0 :                 )
     301            0 :         }
     302            1 :         return reader, pointIter, rangeDelIter, rangeKeyIter
     303              : }
     304              : 
     305            1 : func panicIfErr(err error) {
     306            1 :         if err != nil {
     307            0 :                 panic(err)
     308              :         }
     309              : }
        

Generated by: LCOV version 2.0-1