LCOV - code coverage report
Current view: top level - pebble - ingest.go (source / functions) Coverage Total Hit
Test: 2025-09-24 08:18Z e1277c42 - tests + meta.lcov Lines: 87.7 % 1593 1397
Test Date: 2025-09-24 08:22:21 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "slices"
      11              :         "sort"
      12              :         "time"
      13              : 
      14              :         "github.com/cockroachdb/crlib/crtime"
      15              :         "github.com/cockroachdb/errors"
      16              :         "github.com/cockroachdb/pebble/internal/base"
      17              :         "github.com/cockroachdb/pebble/internal/cache"
      18              :         "github.com/cockroachdb/pebble/internal/invariants"
      19              :         "github.com/cockroachdb/pebble/internal/keyspan"
      20              :         "github.com/cockroachdb/pebble/internal/manifest"
      21              :         "github.com/cockroachdb/pebble/internal/overlap"
      22              :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      23              :         "github.com/cockroachdb/pebble/objstorage"
      24              :         "github.com/cockroachdb/pebble/objstorage/remote"
      25              :         "github.com/cockroachdb/pebble/sstable"
      26              :         "github.com/cockroachdb/pebble/sstable/block"
      27              : )
      28              : 
      29            2 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
      30            2 :         c := userCmp(a.UserKey, b.UserKey)
      31            2 :         if c != 0 {
      32            2 :                 return c
      33            2 :         }
      34            2 :         if a.IsExclusiveSentinel() {
      35            2 :                 if !b.IsExclusiveSentinel() {
      36            2 :                         return -1
      37            2 :                 }
      38            2 :         } else if b.IsExclusiveSentinel() {
      39            2 :                 return +1
      40            2 :         }
      41            2 :         return 0
      42              : }
      43              : 
      44            2 : func ingestValidateKey(opts *Options, key *InternalKey) error {
      45            2 :         if key.Kind() == InternalKeyKindInvalid {
      46            1 :                 return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
      47            1 :                         key.Pretty(opts.Comparer.FormatKey))
      48            1 :         }
      49            2 :         if key.SeqNum() != 0 {
      50            1 :                 return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
      51            1 :                         key.Pretty(opts.Comparer.FormatKey))
      52            1 :         }
      53            2 :         if err := opts.Comparer.ValidateKey.Validate(key.UserKey); err != nil {
      54            0 :                 return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s, %w",
      55            0 :                         key.Pretty(opts.Comparer.FormatKey), err)
      56            0 :         }
      57            2 :         return nil
      58              : }
      59              : 
      60              : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
      61              : // or shared by another node.
      62              : func ingestSynthesizeShared(
      63              :         opts *Options, sm SharedSSTMeta, tableNum base.TableNum,
      64            2 : ) (*manifest.TableMetadata, error) {
      65            2 :         if sm.Size == 0 {
      66            0 :                 // Disallow 0 file sizes
      67            0 :                 return nil, errors.New("pebble: cannot ingest shared file with size 0")
      68            0 :         }
      69              :         // Don't load table stats. Doing a round trip to shared storage, one SST
      70              :         // at a time is not worth it as it slows down ingestion.
      71            2 :         meta := &manifest.TableMetadata{
      72            2 :                 TableNum:     tableNum,
      73            2 :                 CreationTime: time.Now().Unix(),
      74            2 :                 Virtual:      true,
      75            2 :                 Size:         sm.Size,
      76            2 :         }
      77            2 :         if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
      78            2 :                 // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
      79            2 :                 //
      80            2 :                 // NB: We create new internal keys and pass them into ExtendPointKeyBounds
      81            2 :                 // so that we can sub a zero sequence number into the bounds. We can set
      82            2 :                 // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
      83            2 :                 // anyway. However, we do need to use the same sequence number across all
      84            2 :                 // bound keys at this step so that we end up with bounds that are consistent
      85            2 :                 // across point/range keys.
      86            2 :                 //
      87            2 :                 // Because of the sequence number rewriting, we cannot use the Kind of
      88            2 :                 // sm.SmallestPointKey. For example, the original SST might start with
      89            2 :                 // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
      90            2 :                 // rewriting the sequence numbers, these keys become a.SET.100 and
      91            2 :                 // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
      92            2 :                 // correct bound, we just use the maximum key kind (which sorts first).
      93            2 :                 // Similarly, we use the smallest key kind for the largest key.
      94            2 :                 smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable)
      95            2 :                 largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
      96            2 :                 if sm.LargestPointKey.IsExclusiveSentinel() {
      97            2 :                         largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
      98            2 :                 }
      99            2 :                 if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
     100            2 :                         smallestPointKey.Trailer < largestPointKey.Trailer {
     101            0 :                         // We get kinds from the sender, however we substitute our own sequence
     102            0 :                         // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
     103            0 :                         // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
     104            0 :                         // kinds need to be reversed now because DelSized > Set.
     105            0 :                         smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
     106            0 :                 }
     107            2 :                 meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
     108              :         }
     109            2 :         if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
     110            2 :                 // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
     111            2 :                 //
     112            2 :                 // See comment above on why we use a zero sequence number and these key
     113            2 :                 // kinds here.
     114            2 :                 smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
     115            2 :                 largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
     116            2 :                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
     117            2 :         }
     118              : 
     119              :         // For simplicity, we use the same number for both the FileNum and the
     120              :         // DiskFileNum (even though this is a virtual sstable). Pass the underlying
     121              :         // TableBacking's size to the same size as the virtualized view of the sstable.
     122              :         // This ensures that we don't over-prioritize this sstable for compaction just
     123              :         // yet, as we do not have a clear sense of what parts of this sstable are
     124              :         // referenced by other nodes.
     125            2 :         meta.InitVirtualBacking(base.DiskFileNum(tableNum), sm.Size)
     126            2 : 
     127            2 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     128            0 :                 return nil, err
     129            0 :         }
     130            2 :         return meta, nil
     131              : }
     132              : 
     133              : // ingestLoad1External loads the fileMetadata for one external sstable.
     134              : // Sequence number and target level calculation happens during prepare/apply.
     135              : func ingestLoad1External(
     136              :         opts *Options, e ExternalFile, tableNum base.TableNum,
     137            2 : ) (*manifest.TableMetadata, error) {
     138            2 :         if e.Size == 0 {
     139            0 :                 return nil, errors.New("pebble: cannot ingest external file with size 0")
     140            0 :         }
     141            2 :         if !e.HasRangeKey && !e.HasPointKey {
     142            0 :                 return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
     143            0 :         }
     144              : 
     145            2 :         if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
     146            1 :                 return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
     147            1 :         }
     148            2 :         if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
     149            0 :                 return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
     150            0 :         }
     151            2 :         if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
     152            1 :                 return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
     153            1 :         }
     154            2 :         if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
     155            1 :                 return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
     156            1 :         }
     157              : 
     158              :         // Don't load table stats. Doing a round trip to shared storage, one SST
     159              :         // at a time is not worth it as it slows down ingestion.
     160            2 :         meta := &manifest.TableMetadata{
     161            2 :                 TableNum:     tableNum,
     162            2 :                 CreationTime: time.Now().Unix(),
     163            2 :                 Size:         e.Size,
     164            2 :                 Virtual:      true,
     165            2 :         }
     166            2 :         // In the name of keeping this ingestion as fast as possible, we avoid *all*
     167            2 :         // existence checks and synthesize a table metadata with smallest/largest
     168            2 :         // keys that overlap whatever the passed-in span was.
     169            2 :         smallestCopy := slices.Clone(e.StartKey)
     170            2 :         largestCopy := slices.Clone(e.EndKey)
     171            2 :         if e.HasPointKey {
     172            2 :                 // Sequence numbers are updated later by
     173            2 :                 // ingestUpdateSeqNum, applying a squence number that
     174            2 :                 // is applied to all keys in the sstable.
     175            2 :                 if e.EndKeyIsInclusive {
     176            1 :                         meta.ExtendPointKeyBounds(
     177            1 :                                 opts.Comparer.Compare,
     178            1 :                                 base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
     179            1 :                                 base.MakeInternalKey(largestCopy, 0, 0))
     180            2 :                 } else {
     181            2 :                         meta.ExtendPointKeyBounds(
     182            2 :                                 opts.Comparer.Compare,
     183            2 :                                 base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
     184            2 :                                 base.MakeRangeDeleteSentinelKey(largestCopy))
     185            2 :                 }
     186              :         }
     187            2 :         if e.HasRangeKey {
     188            2 :                 meta.ExtendRangeKeyBounds(
     189            2 :                         opts.Comparer.Compare,
     190            2 :                         base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
     191            2 :                         base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
     192            2 :                 )
     193            2 :         }
     194              : 
     195            2 :         meta.SyntheticPrefixAndSuffix = sstable.MakeSyntheticPrefixAndSuffix(e.SyntheticPrefix, e.SyntheticSuffix)
     196            2 : 
     197            2 :         return meta, nil
     198              : }
     199              : 
     200              : type rangeKeyIngestValidator struct {
     201              :         // lastRangeKey is the last range key seen in the previous file.
     202              :         lastRangeKey keyspan.Span
     203              :         // comparer, if unset, disables range key validation.
     204              :         comparer *base.Comparer
     205              : }
     206              : 
     207            2 : func disableRangeKeyChecks() rangeKeyIngestValidator {
     208            2 :         return rangeKeyIngestValidator{}
     209            2 : }
     210              : 
     211              : func validateSuffixedBoundaries(
     212              :         cmp *base.Comparer, lastRangeKey keyspan.Span,
     213            2 : ) rangeKeyIngestValidator {
     214            2 :         return rangeKeyIngestValidator{
     215            2 :                 lastRangeKey: lastRangeKey,
     216            2 :                 comparer:     cmp,
     217            2 :         }
     218            2 : }
     219              : 
     220              : // Validate valides if the stored state of this rangeKeyIngestValidator allows for
     221              : // a file with the given nextFileSmallestKey to be ingested, such that the stored
     222              : // last file's largest range key defragments cleanly with the next file's smallest
     223              : // key if it was suffixed. If a value of nil is passed in for nextFileSmallestKey,
     224              : // that denotes the next file does not have a range key or there is no next file.
     225            2 : func (r *rangeKeyIngestValidator) Validate(nextFileSmallestKey *keyspan.Span) error {
     226            2 :         if r.comparer == nil {
     227            2 :                 return nil
     228            2 :         }
     229            2 :         if r.lastRangeKey.Valid() {
     230            2 :                 if r.comparer.Split.HasSuffix(r.lastRangeKey.End) {
     231            1 :                         if nextFileSmallestKey == nil || !r.comparer.Equal(r.lastRangeKey.End, nextFileSmallestKey.Start) {
     232            1 :                                 // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
     233            1 :                                 return errors.AssertionFailedf("pebble: ingest sstable has suffixed largest range key that does not match the start key of the next sstable: %s",
     234            1 :                                         r.comparer.FormatKey(r.lastRangeKey.End))
     235            1 :                         } else if !keyspan.DefragmentInternal.ShouldDefragment(r.comparer.CompareRangeSuffixes, &r.lastRangeKey, nextFileSmallestKey) {
     236            0 :                                 // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
     237            0 :                                 return errors.AssertionFailedf("pebble: ingest sstable has suffixed range key that won't defragment with next sstable: %s",
     238            0 :                                         r.comparer.FormatKey(r.lastRangeKey.End))
     239            0 :                         }
     240              :                 }
     241            2 :         } else if nextFileSmallestKey != nil && r.comparer.Split.HasSuffix(nextFileSmallestKey.Start) {
     242            0 :                 return errors.Newf("pebble: ingest sstable has suffixed range key start that won't defragment: %s",
     243            0 :                         r.comparer.FormatKey(nextFileSmallestKey.Start))
     244            0 :         }
     245            2 :         return nil
     246              : }
     247              : 
     248              : // ingestLoad1 creates the TableMetadata for one file. This file will be owned
     249              : // by this store.
     250              : //
     251              : // prevLastRangeKey is the last range key from the previous file. It is used to
     252              : // ensure that the range keys defragment cleanly across files. These checks
     253              : // are disabled if disableRangeKeyChecks is true.
     254              : func ingestLoad1(
     255              :         ctx context.Context,
     256              :         opts *Options,
     257              :         fmv FormatMajorVersion,
     258              :         readable objstorage.Readable,
     259              :         cacheHandle *cache.Handle,
     260              :         tableNum base.TableNum,
     261              :         rangeKeyValidator rangeKeyIngestValidator,
     262              : ) (
     263              :         meta *manifest.TableMetadata,
     264              :         lastRangeKey keyspan.Span,
     265              :         blockReadStats base.BlockReadStats,
     266              :         err error,
     267            2 : ) {
     268            2 :         o := opts.MakeReaderOptions()
     269            2 :         o.CacheOpts = sstableinternal.CacheOptions{
     270            2 :                 CacheHandle: cacheHandle,
     271            2 :                 FileNum:     base.PhysicalTableDiskFileNum(tableNum),
     272            2 :         }
     273            2 :         r, err := sstable.NewReader(ctx, readable, o)
     274            2 :         if err != nil {
     275            1 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
     276            1 :         }
     277            2 :         defer func() { _ = r.Close() }()
     278              : 
     279              :         // Avoid ingesting tables with format versions this DB doesn't support.
     280            2 :         tf, err := r.TableFormat()
     281            2 :         if err != nil {
     282            0 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     283            0 :         }
     284            2 :         if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
     285            1 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
     286            1 :                         "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
     287            1 :                         tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
     288            1 :                 )
     289            1 :         }
     290              : 
     291            2 :         if r.Attributes.Has(sstable.AttributeBlobValues) {
     292            1 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
     293            1 :                         "pebble: ingesting tables with blob references is not supported")
     294            1 :         }
     295              : 
     296            2 :         props, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
     297            2 :         if err != nil {
     298            1 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     299            1 :         }
     300              : 
     301              :         // If this is a columnar block, read key schema name from properties block.
     302            2 :         if tf.BlockColumnar() {
     303            2 :                 if _, ok := opts.KeySchemas[props.KeySchemaName]; !ok {
     304            0 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
     305            0 :                                 "pebble: table uses key schema %q unknown to the database",
     306            0 :                                 props.KeySchemaName)
     307            0 :                 }
     308              :         }
     309              : 
     310            2 :         meta = &manifest.TableMetadata{}
     311            2 :         meta.TableNum = tableNum
     312            2 :         meta.Size = max(uint64(readable.Size()), 1)
     313            2 :         meta.CreationTime = time.Now().Unix()
     314            2 :         meta.InitPhysicalBacking()
     315            2 : 
     316            2 :         // Avoid loading into the file cache for collecting stats if we
     317            2 :         // don't need to. If there are no range deletions, we have all the
     318            2 :         // information to compute the stats here.
     319            2 :         //
     320            2 :         // This is helpful in tests for avoiding awkwardness around deletion of
     321            2 :         // ingested files from MemFS. MemFS implements the Windows semantics of
     322            2 :         // disallowing removal of an open file. Under MemFS, if we don't populate
     323            2 :         // meta.Stats here, the file will be loaded into the file cache for
     324            2 :         // calculating stats before we can remove the original link.
     325            2 :         maybeSetStatsFromProperties(meta.PhysicalMeta(), &props)
     326            2 : 
     327            2 :         var iterStats base.InternalIteratorStats
     328            2 :         env := sstable.ReadEnv{
     329            2 :                 Block: block.ReadEnv{
     330            2 :                         Stats: &iterStats,
     331            2 :                 },
     332            2 :         }
     333            2 :         {
     334            2 :                 iterOpts := sstable.IterOptions{
     335            2 :                         Lower:                nil,
     336            2 :                         Upper:                nil,
     337            2 :                         Transforms:           sstable.NoTransforms,
     338            2 :                         Filterer:             nil,
     339            2 :                         FilterBlockSizeLimit: sstable.AlwaysUseFilterBlock,
     340            2 :                         Env:                  env,
     341            2 :                         ReaderProvider:       sstable.MakeTrivialReaderProvider(r),
     342            2 :                         BlobContext:          sstable.AssertNoBlobHandles,
     343            2 :                 }
     344            2 :                 iter, err := r.NewPointIter(ctx, iterOpts)
     345            2 :                 if err != nil {
     346            0 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     347            0 :                 }
     348            2 :                 defer func() { _ = iter.Close() }()
     349            2 :                 var smallest InternalKey
     350            2 :                 if kv := iter.First(); kv != nil {
     351            2 :                         if err := ingestValidateKey(opts, &kv.K); err != nil {
     352            1 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     353            1 :                         }
     354            2 :                         smallest = kv.K.Clone()
     355              :                 }
     356            2 :                 if err := iter.Error(); err != nil {
     357            1 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     358            1 :                 }
     359            2 :                 if kv := iter.Last(); kv != nil {
     360            2 :                         if err := ingestValidateKey(opts, &kv.K); err != nil {
     361            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     362            0 :                         }
     363            2 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
     364              :                 }
     365            2 :                 if err := iter.Error(); err != nil {
     366            1 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     367            1 :                 }
     368              :         }
     369              : 
     370            2 :         iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, env)
     371            2 :         if err != nil {
     372            0 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     373            0 :         }
     374            2 :         if iter != nil {
     375            2 :                 defer iter.Close()
     376            2 :                 var smallest InternalKey
     377            2 :                 if s, err := iter.First(); err != nil {
     378            0 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     379            2 :                 } else if s != nil {
     380            2 :                         key := s.SmallestKey()
     381            2 :                         if err := ingestValidateKey(opts, &key); err != nil {
     382            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     383            0 :                         }
     384            2 :                         smallest = key.Clone()
     385              :                 }
     386            2 :                 if s, err := iter.Last(); err != nil {
     387            0 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     388            2 :                 } else if s != nil {
     389            2 :                         k := s.SmallestKey()
     390            2 :                         if err := ingestValidateKey(opts, &k); err != nil {
     391            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     392            0 :                         }
     393            2 :                         largest := s.LargestKey().Clone()
     394            2 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
     395              :                 }
     396              :         }
     397              : 
     398              :         // Update the range-key bounds for the table.
     399            2 :         {
     400            2 :                 iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, env)
     401            2 :                 if err != nil {
     402            0 :                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     403            0 :                 }
     404            2 :                 if iter != nil {
     405            2 :                         defer iter.Close()
     406            2 :                         var smallest InternalKey
     407            2 :                         if s, err := iter.First(); err != nil {
     408            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     409            2 :                         } else if s != nil {
     410            2 :                                 key := s.SmallestKey()
     411            2 :                                 if err := ingestValidateKey(opts, &key); err != nil {
     412            0 :                                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     413            0 :                                 }
     414            2 :                                 smallest = key.Clone()
     415            2 :                                 // Range keys need some additional validation as we need to ensure they
     416            2 :                                 // defragment cleanly with the lastRangeKey from the previous file.
     417            2 :                                 if err := rangeKeyValidator.Validate(s); err != nil {
     418            0 :                                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     419            0 :                                 }
     420              :                         }
     421            2 :                         lastRangeKey = keyspan.Span{}
     422            2 :                         if s, err := iter.Last(); err != nil {
     423            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     424            2 :                         } else if s != nil {
     425            2 :                                 k := s.SmallestKey()
     426            2 :                                 if err := ingestValidateKey(opts, &k); err != nil {
     427            0 :                                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     428            0 :                                 }
     429              :                                 // As range keys are fragmented, the end key of the last range key in
     430              :                                 // the table provides the upper bound for the table.
     431            2 :                                 largest := s.LargestKey().Clone()
     432            2 :                                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
     433            2 :                                 lastRangeKey = s.Clone()
     434            0 :                         } else {
     435            0 :                                 // s == nil.
     436            0 :                                 if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
     437            0 :                                         return nil, keyspan.Span{}, base.BlockReadStats{}, err
     438            0 :                                 }
     439              :                         }
     440            2 :                 } else {
     441            2 :                         if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
     442            0 :                                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     443            0 :                         }
     444            2 :                         lastRangeKey = keyspan.Span{}
     445              :                 }
     446              :         }
     447              : 
     448            2 :         if !meta.HasPointKeys && !meta.HasRangeKeys {
     449            2 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, nil
     450            2 :         }
     451              : 
     452              :         // Sanity check that the various bounds on the file were set consistently.
     453            2 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     454            0 :                 return nil, keyspan.Span{}, base.BlockReadStats{}, err
     455            0 :         }
     456              : 
     457            2 :         return meta, lastRangeKey, iterStats.TotalBlockReads(), nil
     458              : }
     459              : 
     460              : type ingestLoadResult struct {
     461              :         local    []ingestLocalMeta
     462              :         shared   []ingestSharedMeta
     463              :         external []ingestExternalMeta
     464              : 
     465              :         externalFilesHaveLevel bool
     466              :         blockReadStats         base.BlockReadStats
     467              : }
     468              : 
     469              : type ingestLocalMeta struct {
     470              :         *manifest.TableMetadata
     471              :         path string
     472              : }
     473              : 
     474              : type ingestSharedMeta struct {
     475              :         *manifest.TableMetadata
     476              :         shared SharedSSTMeta
     477              : }
     478              : 
     479              : type ingestExternalMeta struct {
     480              :         *manifest.TableMetadata
     481              :         external ExternalFile
     482              :         // usedExistingBacking is true if the external file is reusing a backing
     483              :         // that existed before this ingestion. In this case, we called
     484              :         // VirtualBackings.Protect() on that backing; we will need to call
     485              :         // Unprotect() after the ingestion.
     486              :         usedExistingBacking bool
     487              : }
     488              : 
     489            2 : func (r *ingestLoadResult) fileCount() int {
     490            2 :         return len(r.local) + len(r.shared) + len(r.external)
     491            2 : }
     492              : 
     493              : func ingestLoad(
     494              :         ctx context.Context,
     495              :         opts *Options,
     496              :         fmv FormatMajorVersion,
     497              :         paths []string,
     498              :         shared []SharedSSTMeta,
     499              :         external []ExternalFile,
     500              :         cacheHandle *cache.Handle,
     501              :         pending []base.TableNum,
     502            2 : ) (ingestLoadResult, error) {
     503            2 :         localFileNums := pending[:len(paths)]
     504            2 :         sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
     505            2 :         externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
     506            2 : 
     507            2 :         var result ingestLoadResult
     508            2 :         result.local = make([]ingestLocalMeta, 0, len(paths))
     509            2 :         var lastRangeKey keyspan.Span
     510            2 :         var blockReadStats base.BlockReadStats
     511            2 :         // NB: we disable range key boundary assertions if we have shared or external files
     512            2 :         // present in this ingestion. This is because a suffixed range key in a local file
     513            2 :         // can possibly defragment with a suffixed range key in a shared or external file.
     514            2 :         // We also disable range key boundary assertions if we have CreateOnShared set to
     515            2 :         // true, as that means we could have suffixed RangeKeyDels or Unsets in the local
     516            2 :         // files that won't ever be surfaced, even if there are no shared or external files
     517            2 :         // in the ingestion.
     518            2 :         shouldDisableRangeKeyChecks := len(shared) > 0 || len(external) > 0 || opts.Experimental.CreateOnShared != remote.CreateOnSharedNone
     519            2 :         for i := range paths {
     520            2 :                 f, err := opts.FS.Open(paths[i])
     521            2 :                 if err != nil {
     522            1 :                         return ingestLoadResult{}, err
     523            1 :                 }
     524              : 
     525            2 :                 readable, err := sstable.NewSimpleReadable(f)
     526            2 :                 if err != nil {
     527            1 :                         return ingestLoadResult{}, err
     528            1 :                 }
     529            2 :                 var m *manifest.TableMetadata
     530            2 :                 rangeKeyValidator := disableRangeKeyChecks()
     531            2 :                 if !shouldDisableRangeKeyChecks {
     532            2 :                         rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
     533            2 :                 }
     534            2 :                 m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
     535            2 :                 if err != nil {
     536            1 :                         return ingestLoadResult{}, err
     537            1 :                 }
     538            2 :                 if m != nil {
     539            2 :                         result.local = append(result.local, ingestLocalMeta{
     540            2 :                                 TableMetadata: m,
     541            2 :                                 path:          paths[i],
     542            2 :                         })
     543            2 :                         result.blockReadStats = blockReadStats
     544            2 :                 }
     545              :         }
     546              : 
     547            2 :         if !shouldDisableRangeKeyChecks {
     548            2 :                 rangeKeyValidator := validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
     549            2 :                 if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
     550            1 :                         return ingestLoadResult{}, err
     551            1 :                 }
     552              :         }
     553              : 
     554              :         // Sort the shared files according to level.
     555            2 :         sort.Sort(sharedByLevel(shared))
     556            2 : 
     557            2 :         result.shared = make([]ingestSharedMeta, 0, len(shared))
     558            2 :         for i := range shared {
     559            2 :                 m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
     560            2 :                 if err != nil {
     561            0 :                         return ingestLoadResult{}, err
     562            0 :                 }
     563            2 :                 if shared[i].Level < sharedLevelsStart {
     564            0 :                         return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
     565            0 :                 }
     566            2 :                 result.shared = append(result.shared, ingestSharedMeta{
     567            2 :                         TableMetadata: m,
     568            2 :                         shared:        shared[i],
     569            2 :                 })
     570              :         }
     571            2 :         result.external = make([]ingestExternalMeta, 0, len(external))
     572            2 :         for i := range external {
     573            2 :                 m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
     574            2 :                 if err != nil {
     575            1 :                         return ingestLoadResult{}, err
     576            1 :                 }
     577            2 :                 result.external = append(result.external, ingestExternalMeta{
     578            2 :                         TableMetadata: m,
     579            2 :                         external:      external[i],
     580            2 :                 })
     581            2 :                 if external[i].Level > 0 {
     582            1 :                         if i != 0 && !result.externalFilesHaveLevel {
     583            0 :                                 return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
     584            0 :                         }
     585            1 :                         result.externalFilesHaveLevel = true
     586            2 :                 } else if result.externalFilesHaveLevel {
     587            0 :                         return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
     588            0 :                 }
     589              :         }
     590            2 :         return result, nil
     591              : }
     592              : 
     593            2 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
     594            2 :         // Verify that all the shared files (i.e. files in sharedMeta)
     595            2 :         // fit within the exciseSpan.
     596            2 :         for _, f := range lr.shared {
     597            2 :                 if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
     598            0 :                         return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
     599            0 :                 }
     600              :         }
     601              : 
     602            2 :         if lr.externalFilesHaveLevel {
     603            1 :                 for _, f := range lr.external {
     604            1 :                         if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
     605            0 :                                 return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
     606            0 :                         }
     607              :                 }
     608              :         }
     609              : 
     610            2 :         if len(lr.external) > 0 {
     611            2 :                 if len(lr.shared) > 0 {
     612            0 :                         // If external files are present alongside shared files,
     613            0 :                         // return an error.
     614            0 :                         return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
     615            0 :                 }
     616              : 
     617              :                 // Sort according to the smallest key.
     618            2 :                 slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
     619            2 :                         return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
     620            2 :                 })
     621            2 :                 for i := 1; i < len(lr.external); i++ {
     622            2 :                         if sstableKeyCompare(cmp, lr.external[i-1].Largest(), lr.external[i].Smallest()) >= 0 {
     623            1 :                                 return errors.Newf("pebble: external sstables have overlapping ranges")
     624            1 :                         }
     625              :                 }
     626            2 :                 return nil
     627              :         }
     628            2 :         if len(lr.local) <= 1 {
     629            2 :                 return nil
     630            2 :         }
     631              : 
     632              :         // Sort according to the smallest key.
     633            2 :         slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
     634            2 :                 return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
     635            2 :         })
     636              : 
     637            2 :         for i := 1; i < len(lr.local); i++ {
     638            2 :                 if sstableKeyCompare(cmp, lr.local[i-1].Largest(), lr.local[i].Smallest()) >= 0 {
     639            2 :                         return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
     640            2 :                 }
     641              :         }
     642            2 :         if len(lr.shared) == 0 {
     643            2 :                 return nil
     644            2 :         }
     645            0 :         filesInLevel := make([]*manifest.TableMetadata, 0, len(lr.shared))
     646            0 :         for l := sharedLevelsStart; l < numLevels; l++ {
     647            0 :                 filesInLevel = filesInLevel[:0]
     648            0 :                 for i := range lr.shared {
     649            0 :                         if lr.shared[i].shared.Level == uint8(l) {
     650            0 :                                 filesInLevel = append(filesInLevel, lr.shared[i].TableMetadata)
     651            0 :                         }
     652              :                 }
     653            0 :                 for i := range lr.external {
     654            0 :                         if lr.external[i].external.Level == uint8(l) {
     655            0 :                                 filesInLevel = append(filesInLevel, lr.external[i].TableMetadata)
     656            0 :                         }
     657              :                 }
     658            0 :                 slices.SortFunc(filesInLevel, func(a, b *manifest.TableMetadata) int {
     659            0 :                         return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
     660            0 :                 })
     661            0 :                 for i := 1; i < len(filesInLevel); i++ {
     662            0 :                         if sstableKeyCompare(cmp, filesInLevel[i-1].Largest(), filesInLevel[i].Smallest()) >= 0 {
     663            0 :                                 return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
     664            0 :                         }
     665              :                 }
     666              :         }
     667            0 :         return nil
     668              : }
     669              : 
     670            1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
     671            1 :         var firstErr error
     672            1 :         for i := range meta {
     673            1 :                 if err := objProvider.Remove(base.FileTypeTable, meta[i].TableBacking.DiskFileNum); err != nil {
     674            1 :                         firstErr = firstError(firstErr, err)
     675            1 :                 }
     676              :         }
     677            1 :         return firstErr
     678              : }
     679              : 
     680              : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
     681              : // copies of the ingested files.
     682              : func ingestLinkLocal(
     683              :         ctx context.Context,
     684              :         jobID JobID,
     685              :         opts *Options,
     686              :         objProvider objstorage.Provider,
     687              :         localMetas []ingestLocalMeta,
     688            2 : ) error {
     689            2 :         for i := range localMetas {
     690            2 :                 objMeta, err := objProvider.LinkOrCopyFromLocal(
     691            2 :                         ctx, opts.FS, localMetas[i].path, base.FileTypeTable, localMetas[i].TableBacking.DiskFileNum,
     692            2 :                         objstorage.CreateOptions{PreferSharedStorage: true},
     693            2 :                 )
     694            2 :                 if err != nil {
     695            1 :                         if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
     696            0 :                                 opts.Logger.Errorf("ingest cleanup failed: %v", err2)
     697            0 :                         }
     698            1 :                         return err
     699              :                 }
     700            2 :                 if opts.EventListener.TableCreated != nil {
     701            2 :                         opts.EventListener.TableCreated(TableCreateInfo{
     702            2 :                                 JobID:   int(jobID),
     703            2 :                                 Reason:  "ingesting",
     704            2 :                                 Path:    objProvider.Path(objMeta),
     705            2 :                                 FileNum: base.PhysicalTableDiskFileNum(localMetas[i].TableNum),
     706            2 :                         })
     707            2 :                 }
     708              :         }
     709            2 :         return nil
     710              : }
     711              : 
     712              : // ingestAttachRemote attaches remote objects to the storage provider.
     713              : //
     714              : // For external objects, we reuse existing FileBackings from the current version
     715              : // when possible.
     716              : //
     717              : // ingestUnprotectExternalBackings() must be called after this function (even in
     718              : // error cases).
     719            2 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
     720            2 :         remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
     721            2 :         for i := range lr.shared {
     722            2 :                 backing, err := lr.shared[i].shared.Backing.Get()
     723            2 :                 if err != nil {
     724            0 :                         return err
     725            0 :                 }
     726            2 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     727            2 :                         FileNum:  lr.shared[i].TableBacking.DiskFileNum,
     728            2 :                         FileType: base.FileTypeTable,
     729            2 :                         Backing:  backing,
     730            2 :                 })
     731              :         }
     732              : 
     733            2 :         d.findExistingBackingsForExternalObjects(lr.external)
     734            2 : 
     735            2 :         newTableBackings := make(map[remote.ObjectKey]*manifest.TableBacking, len(lr.external))
     736            2 :         for i := range lr.external {
     737            2 :                 meta := lr.external[i].TableMetadata
     738            2 :                 if meta.TableBacking != nil {
     739            2 :                         // The backing was filled in by findExistingBackingsForExternalObjects().
     740            2 :                         continue
     741              :                 }
     742            2 :                 key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
     743            2 :                 if backing, ok := newTableBackings[key]; ok {
     744            2 :                         // We already created the same backing in this loop. Update its size.
     745            2 :                         backing.Size += lr.external[i].external.Size
     746            2 :                         meta.AttachVirtualBacking(backing)
     747            2 :                         continue
     748              :                 }
     749            2 :                 providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
     750            2 :                 if err != nil {
     751            0 :                         return err
     752            0 :                 }
     753              :                 // We have to attach the remote object (and assign it a DiskFileNum). For
     754              :                 // simplicity, we use the same number for both the FileNum and the
     755              :                 // DiskFileNum (even though this is a virtual sstable).
     756            2 :                 size := max(lr.external[i].external.Size, 1)
     757            2 :                 meta.InitVirtualBacking(base.DiskFileNum(meta.TableNum), size)
     758            2 : 
     759            2 :                 // Set the underlying TableBacking's size to the same size as the virtualized
     760            2 :                 // view of the sstable. This ensures that we don't over-prioritize this
     761            2 :                 // sstable for compaction just yet, as we do not have a clear sense of
     762            2 :                 // what parts of this sstable are referenced by other nodes.
     763            2 :                 meta.TableBacking.Size = size
     764            2 :                 newTableBackings[key] = meta.TableBacking
     765            2 : 
     766            2 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     767            2 :                         FileNum:  meta.TableBacking.DiskFileNum,
     768            2 :                         FileType: base.FileTypeTable,
     769            2 :                         Backing:  providerBacking,
     770            2 :                 })
     771              :         }
     772              : 
     773            2 :         for i := range lr.external {
     774            2 :                 if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
     775            0 :                         return err
     776            0 :                 }
     777              :         }
     778              : 
     779            2 :         remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
     780            2 :         if err != nil {
     781            0 :                 return err
     782            0 :         }
     783              : 
     784            2 :         for i := range lr.shared {
     785            2 :                 // One corner case around file sizes we need to be mindful of, is that
     786            2 :                 // if one of the shareObjs was initially created by us (and has boomeranged
     787            2 :                 // back from another node), we'll need to update the TableBacking's size
     788            2 :                 // to be the true underlying size. Otherwise, we could hit errors when we
     789            2 :                 // open the db again after a crash/restart (see checkConsistency in open.go),
     790            2 :                 // plus it more accurately allows us to prioritize compactions of files
     791            2 :                 // that were originally created by us.
     792            2 :                 if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
     793            2 :                         size, err := d.objProvider.Size(remoteObjMetas[i])
     794            2 :                         if err != nil {
     795            0 :                                 return err
     796            0 :                         }
     797            2 :                         lr.shared[i].TableBacking.Size = max(uint64(size), 1)
     798              :                 }
     799              :         }
     800              : 
     801            2 :         if d.opts.EventListener.TableCreated != nil {
     802            2 :                 for i := range remoteObjMetas {
     803            2 :                         d.opts.EventListener.TableCreated(TableCreateInfo{
     804            2 :                                 JobID:   int(jobID),
     805            2 :                                 Reason:  "ingesting",
     806            2 :                                 Path:    d.objProvider.Path(remoteObjMetas[i]),
     807            2 :                                 FileNum: remoteObjMetas[i].DiskFileNum,
     808            2 :                         })
     809            2 :                 }
     810              :         }
     811              : 
     812            2 :         return nil
     813              : }
     814              : 
     815              : // findExistingBackingsForExternalObjects populates the TableBacking for external
     816              : // files which are already in use by the current version.
     817              : //
     818              : // We take a Ref and LatestRef on populated backings.
     819            2 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
     820            2 :         d.mu.Lock()
     821            2 :         defer d.mu.Unlock()
     822            2 : 
     823            2 :         for i := range metas {
     824            2 :                 diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
     825            2 :                 // We cross-check against fileBackings in the current version because it is
     826            2 :                 // possible that the external object is referenced by an sstable which only
     827            2 :                 // exists in a previous version. In that case, that object could be removed
     828            2 :                 // at any time so we cannot reuse it.
     829            2 :                 for _, n := range diskFileNums {
     830            2 :                         if backing, ok := d.mu.versions.latest.virtualBackings.Get(n); ok {
     831            2 :                                 // Protect this backing from being removed from the latest version. We
     832            2 :                                 // will unprotect in ingestUnprotectExternalBackings.
     833            2 :                                 d.mu.versions.latest.virtualBackings.Protect(n)
     834            2 :                                 metas[i].usedExistingBacking = true
     835            2 :                                 metas[i].AttachVirtualBacking(backing)
     836            2 : 
     837            2 :                                 // We can't update the size of the backing here, so make sure the
     838            2 :                                 // virtual size is sane.
     839            2 :                                 // TODO(radu): investigate what would it take to update the backing size.
     840            2 :                                 metas[i].Size = min(metas[i].Size, backing.Size)
     841            2 :                                 break
     842              :                         }
     843              :                 }
     844              :         }
     845              : }
     846              : 
     847              : // ingestUnprotectExternalBackings unprotects the file backings that were reused
     848              : // for external objects when the ingestion fails.
     849            2 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
     850            2 :         d.mu.Lock()
     851            2 :         defer d.mu.Unlock()
     852            2 : 
     853            2 :         for _, meta := range lr.external {
     854            2 :                 if meta.usedExistingBacking {
     855            2 :                         // If the backing is not use anywhere else and the ingest failed (or the
     856            2 :                         // ingested tables were already compacted away), this call will cause in
     857            2 :                         // the next version update to remove the backing.
     858            2 :                         d.mu.versions.latest.virtualBackings.Unprotect(meta.TableBacking.DiskFileNum)
     859            2 :                 }
     860              :         }
     861              : }
     862              : 
     863              : func setSeqNumInMetadata(
     864              :         m *manifest.TableMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
     865            2 : ) error {
     866            2 :         setSeqFn := func(k base.InternalKey) base.InternalKey {
     867            2 :                 return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
     868            2 :         }
     869              :         // NB: we set the fields directly here, rather than via their Extend*
     870              :         // methods, as we are updating sequence numbers.
     871            2 :         if m.HasPointKeys {
     872            2 :                 m.PointKeyBounds.SetSmallest(setSeqFn(m.PointKeyBounds.Smallest()))
     873            2 :         }
     874            2 :         if m.HasRangeKeys {
     875            2 :                 m.RangeKeyBounds.SetSmallest(setSeqFn(m.RangeKeyBounds.Smallest()))
     876            2 :         }
     877              :         // Only update the seqnum for the largest key if that key is not an
     878              :         // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
     879              :         // boundary), as doing so effectively drops the exclusive sentinel (by
     880              :         // lowering the seqnum from the max value), and extends the bounds of the
     881              :         // table.
     882              :         // NB: as the largest range key is always an exclusive sentinel, it is never
     883              :         // updated.
     884            2 :         if m.HasPointKeys && !m.PointKeyBounds.Largest().IsExclusiveSentinel() {
     885            2 :                 m.PointKeyBounds.SetLargest(setSeqFn(m.PointKeyBounds.Largest()))
     886            2 :         }
     887              :         // Setting smallestSeqNum == largestSeqNum triggers the setting of
     888              :         // Properties.GlobalSeqNum when an sstable is loaded.
     889            2 :         m.SmallestSeqNum = seqNum
     890            2 :         m.LargestSeqNum = seqNum
     891            2 :         m.LargestSeqNumAbsolute = seqNum
     892            2 :         // Ensure the new bounds are consistent.
     893            2 :         if err := m.Validate(cmp, format); err != nil {
     894            0 :                 return err
     895            0 :         }
     896            2 :         return nil
     897              : }
     898              : 
     899              : func ingestUpdateSeqNum(
     900              :         cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
     901            2 : ) error {
     902            2 :         // Shared sstables are required to be sorted by level ascending. We then
     903            2 :         // iterate the shared sstables in reverse, assigning the lower sequence
     904            2 :         // numbers to the shared sstables that will be ingested into the lower
     905            2 :         // (larger numbered) levels first. This ensures sequence number shadowing is
     906            2 :         // correct.
     907            2 :         for i := len(loadResult.shared) - 1; i >= 0; i-- {
     908            2 :                 if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
     909            0 :                         panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
     910              :                 }
     911            2 :                 if err := setSeqNumInMetadata(loadResult.shared[i].TableMetadata, seqNum, cmp, format); err != nil {
     912            0 :                         return err
     913            0 :                 }
     914            2 :                 seqNum++
     915              :         }
     916            2 :         for i := range loadResult.external {
     917            2 :                 if err := setSeqNumInMetadata(loadResult.external[i].TableMetadata, seqNum, cmp, format); err != nil {
     918            0 :                         return err
     919            0 :                 }
     920            2 :                 seqNum++
     921              :         }
     922            2 :         for i := range loadResult.local {
     923            2 :                 if err := setSeqNumInMetadata(loadResult.local[i].TableMetadata, seqNum, cmp, format); err != nil {
     924            0 :                         return err
     925            0 :                 }
     926            2 :                 seqNum++
     927              :         }
     928            2 :         return nil
     929              : }
     930              : 
     931              : // ingestTargetLevel returns the target level for a file being ingested.
     932              : // If suggestSplit is true, it accounts for ingest-time splitting as part of
     933              : // its target level calculation, and if a split candidate is found, that file
     934              : // is returned as the splitFile.
     935              : func ingestTargetLevel(
     936              :         ctx context.Context,
     937              :         cmp base.Compare,
     938              :         lsmOverlap overlap.WithLSM,
     939              :         baseLevel int,
     940              :         compactions map[compaction]struct{},
     941              :         meta *manifest.TableMetadata,
     942              :         suggestSplit bool,
     943            2 : ) (targetLevel int, splitFile *manifest.TableMetadata, err error) {
     944            2 :         // Find the lowest level which does not have any files which overlap meta. We
     945            2 :         // search from L0 to L6 looking for whether there are any files in the level
     946            2 :         // which overlap meta. We want the "lowest" level (where lower means
     947            2 :         // increasing level number) in order to reduce write amplification.
     948            2 :         //
     949            2 :         // There are 2 kinds of overlap we need to check for: file boundary overlap
     950            2 :         // and data overlap. Data overlap implies file boundary overlap. Note that it
     951            2 :         // is always possible to ingest into L0.
     952            2 :         //
     953            2 :         // To place meta at level i where i > 0:
     954            2 :         // - there must not be any data overlap with levels <= i, since that will
     955            2 :         //   violate the sequence number invariant.
     956            2 :         // - no file boundary overlap with level i, since that will violate the
     957            2 :         //   invariant that files do not overlap in levels i > 0.
     958            2 :         //   - if there is only a file overlap at a given level, and no data overlap,
     959            2 :         //     we can still slot a file at that level. We return the fileMetadata with
     960            2 :         //     which we have file boundary overlap (must be only one file, as sstable
     961            2 :         //     bounds are usually tight on user keys) and the caller is expected to split
     962            2 :         //     that sstable into two virtual sstables, allowing this file to go into that
     963            2 :         //     level. Note that if we have file boundary overlap with two files, which
     964            2 :         //     should only happen on rare occasions, we treat it as data overlap and
     965            2 :         //     don't use this optimization.
     966            2 :         //
     967            2 :         // The file boundary overlap check is simpler to conceptualize. Consider the
     968            2 :         // following example, in which the ingested file lies completely before or
     969            2 :         // after the file being considered.
     970            2 :         //
     971            2 :         //   |--|           |--|  ingested file: [a,b] or [f,g]
     972            2 :         //         |-----|        existing file: [c,e]
     973            2 :         //  _____________________
     974            2 :         //   a  b  c  d  e  f  g
     975            2 :         //
     976            2 :         // In both cases the ingested file can move to considering the next level.
     977            2 :         //
     978            2 :         // File boundary overlap does not necessarily imply data overlap. The check
     979            2 :         // for data overlap is a little more nuanced. Consider the following examples:
     980            2 :         //
     981            2 :         //  1. No data overlap:
     982            2 :         //
     983            2 :         //          |-|   |--|    ingested file: [cc-d] or [ee-ff]
     984            2 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     985            2 :         //  _____________________
     986            2 :         //   a  b  c  d  e  f  g
     987            2 :         //
     988            2 :         // In this case the ingested files can "fall through" this level. The checks
     989            2 :         // continue at the next level.
     990            2 :         //
     991            2 :         //  2. Data overlap:
     992            2 :         //
     993            2 :         //            |--|        ingested file: [d-e]
     994            2 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     995            2 :         //  _____________________
     996            2 :         //   a  b  c  d  e  f  g
     997            2 :         //
     998            2 :         // In this case the file cannot be ingested into this level as the point 'dd'
     999            2 :         // is in the way.
    1000            2 :         //
    1001            2 :         // It is worth noting that the check for data overlap is only approximate. In
    1002            2 :         // the previous example, the ingested table [d-e] could contain only the
    1003            2 :         // points 'd' and 'e', in which case the table would be eligible for
    1004            2 :         // considering lower levels. However, such a fine-grained check would need to
    1005            2 :         // be exhaustive (comparing points and ranges in both the ingested existing
    1006            2 :         // tables) and such a check is prohibitively expensive. Thus Pebble treats any
    1007            2 :         // existing point that falls within the ingested table bounds as being "data
    1008            2 :         // overlap".
    1009            2 : 
    1010            2 :         if lsmOverlap[0].Result == overlap.Data {
    1011            2 :                 return 0, nil, nil
    1012            2 :         }
    1013            2 :         targetLevel = 0
    1014            2 :         splitFile = nil
    1015            2 :         metaBounds := meta.UserKeyBounds()
    1016            2 :         for level := baseLevel; level < numLevels; level++ {
    1017            2 :                 var candidateSplitFile *manifest.TableMetadata
    1018            2 :                 switch lsmOverlap[level].Result {
    1019            2 :                 case overlap.Data:
    1020            2 :                         // We cannot ingest into or under this level; return the best target level
    1021            2 :                         // so far.
    1022            2 :                         return targetLevel, splitFile, nil
    1023              : 
    1024            2 :                 case overlap.OnlyBoundary:
    1025            2 :                         if !suggestSplit || lsmOverlap[level].SplitFile == nil {
    1026            2 :                                 // We can ingest under this level, but not into this level.
    1027            2 :                                 continue
    1028              :                         }
    1029              :                         // We can ingest into this level if we split this file.
    1030            2 :                         candidateSplitFile = lsmOverlap[level].SplitFile
    1031              : 
    1032            2 :                 case overlap.None:
    1033              :                 // We can ingest into this level.
    1034              : 
    1035            0 :                 default:
    1036            0 :                         return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
    1037              :                 }
    1038              : 
    1039              :                 // Check boundary overlap with any ongoing compactions. We consider an
    1040              :                 // overlapping compaction that's writing files to an output level as
    1041              :                 // equivalent to boundary overlap with files in that output level.
    1042              :                 //
    1043              :                 // We cannot check for data overlap with the new SSTs compaction will produce
    1044              :                 // since compaction hasn't been done yet. However, there's no need to check
    1045              :                 // since all keys in them will be from levels in [c.startLevel,
    1046              :                 // c.outputLevel], and all those levels have already had their data overlap
    1047              :                 // tested negative (else we'd have returned earlier).
    1048              :                 //
    1049              :                 // An alternative approach would be to cancel these compactions and proceed
    1050              :                 // with an ingest-time split on this level if necessary. However, compaction
    1051              :                 // cancellation can result in significant wasted effort and is best avoided
    1052              :                 // unless necessary.
    1053            2 :                 overlaps := false
    1054            2 :                 for c := range compactions {
    1055            2 :                         tblCompaction, ok := c.(*tableCompaction)
    1056            2 :                         if !ok {
    1057            1 :                                 continue
    1058              :                         }
    1059            2 :                         if tblCompaction.outputLevel == nil || level != tblCompaction.outputLevel.level {
    1060            2 :                                 continue
    1061              :                         }
    1062            2 :                         bounds := tblCompaction.Bounds()
    1063            2 :                         if bounds != nil && metaBounds.Overlaps(cmp, bounds) {
    1064            2 :                                 overlaps = true
    1065            2 :                                 break
    1066              :                         }
    1067              :                 }
    1068            2 :                 if !overlaps {
    1069            2 :                         targetLevel = level
    1070            2 :                         splitFile = candidateSplitFile
    1071            2 :                 }
    1072              :         }
    1073            2 :         return targetLevel, splitFile, nil
    1074              : }
    1075              : 
    1076              : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
    1077              : // atomic and semantically equivalent to creating a single batch containing all
    1078              : // of the mutations in the sstables. Ingestion may require the memtable to be
    1079              : // flushed. The ingested sstable files are moved into the DB and must reside on
    1080              : // the same filesystem as the DB. Sstables can be created for ingestion using
    1081              : // sstable.Writer. On success, Ingest removes the input paths.
    1082              : //
    1083              : // Ingested sstables must have been created with a known KeySchema (when written
    1084              : // with columnar blocks) and Comparer. They must not contain any references to
    1085              : // external blob files.
    1086              : //
    1087              : // Two types of sstables are accepted for ingestion(s): one is sstables present
    1088              : // in the instance's vfs.FS and can be referenced locally. The other is sstables
    1089              : // present in remote.Storage, referred to as shared or foreign sstables. These
    1090              : // shared sstables can be linked through objstorageprovider.Provider, and do not
    1091              : // need to already be present on the local vfs.FS. Foreign sstables must all fit
    1092              : // in an excise span, and are destined for a level specified in SharedSSTMeta.
    1093              : //
    1094              : // All sstables *must* be Sync()'d by the caller after all bytes are written
    1095              : // and before its file handle is closed; failure to do so could violate
    1096              : // durability or lead to corrupted on-disk state. This method cannot, in a
    1097              : // platform-and-FS-agnostic way, ensure that all sstables in the input are
    1098              : // properly synced to disk. Opening new file handles and Sync()-ing them
    1099              : // does not always guarantee durability; see the discussion here on that:
    1100              : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
    1101              : //
    1102              : // Ingestion loads each sstable into the lowest level of the LSM which it
    1103              : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
    1104              : // ingestion forces the memtable to flush, and then waits for the flush to
    1105              : // occur. In some cases, such as with no foreign sstables and no excise span,
    1106              : // ingestion that gets blocked on a memtable can join the flushable queue and
    1107              : // finish even before the memtable has been flushed.
    1108              : //
    1109              : // The steps for ingestion are:
    1110              : //
    1111              : //  1. Allocate table numbers for every sstable being ingested.
    1112              : //  2. Load the metadata for all sstables being ingested.
    1113              : //  3. Sort the sstables by smallest key, verifying non overlap (for local
    1114              : //     sstables).
    1115              : //  4. Hard link (or copy) the local sstables into the DB directory.
    1116              : //  5. Allocate a sequence number to use for all of the entries in the
    1117              : //     local sstables. This is the step where overlap with memtables is
    1118              : //     determined. If there is overlap, we remember the most recent memtable
    1119              : //     that overlaps.
    1120              : //  6. Update the sequence number in the ingested local sstables. (Remote
    1121              : //     sstables get fixed sequence numbers that were determined at load time.)
    1122              : //  7. Wait for the most recent memtable that overlaps to flush (if any).
    1123              : //  8. Add the ingested sstables to the version (DB.ingestApply).
    1124              : //     8.1.  If an excise span was specified, figure out what sstables in the
    1125              : //     current version overlap with the excise span, and create new virtual
    1126              : //     sstables out of those sstables that exclude the excised span (DB.excise).
    1127              : //  9. Publish the ingestion sequence number.
    1128              : //
    1129              : // Note that if the mutable memtable overlaps with ingestion, a flush of the
    1130              : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
    1131              : // mutations that get sequence numbers larger than the ingestion sequence
    1132              : // number get queued up behind the ingestion waiting for it to complete. This
    1133              : // can produce a noticeable hiccup in performance. See
    1134              : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
    1135              : // this hiccup.
    1136            2 : func (d *DB) Ingest(ctx context.Context, paths []string) error {
    1137            2 :         if err := d.closed.Load(); err != nil {
    1138            1 :                 panic(err)
    1139              :         }
    1140            2 :         if d.opts.ReadOnly {
    1141            1 :                 return ErrReadOnly
    1142            1 :         }
    1143            2 :         _, err := d.ingest(ctx, ingestArgs{Local: paths})
    1144            2 :         return err
    1145              : }
    1146              : 
    1147              : // IngestOperationStats provides some information about where in the LSM the
    1148              : // bytes were ingested.
    1149              : type IngestOperationStats struct {
    1150              :         // Bytes is the total bytes in the ingested sstables.
    1151              :         Bytes uint64
    1152              :         // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
    1153              :         // into L0. This value is approximate when flushable ingests are active and
    1154              :         // an ingest overlaps an entry in the flushable queue. Currently, this
    1155              :         // approximation is very rough, only including tables that overlapped the
    1156              :         // memtable. This estimate may be improved with #2112.
    1157              :         ApproxIngestedIntoL0Bytes uint64
    1158              :         // MemtableOverlappingFiles is the count of ingested sstables
    1159              :         // that overlapped keys in the memtables.
    1160              :         MemtableOverlappingFiles int
    1161              : }
    1162              : 
    1163              : // ExternalFile are external sstables that can be referenced through
    1164              : // objprovider and ingested as remote files that will not be refcounted or
    1165              : // cleaned up. For use with online restore. Note that the underlying sstable
    1166              : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
    1167              : // is expected to only read the keys within those bounds.
    1168              : type ExternalFile struct {
    1169              :         // Locator is the shared.Locator that can be used with objProvider to
    1170              :         // resolve a reference to this external sstable.
    1171              :         Locator remote.Locator
    1172              : 
    1173              :         // ObjName is the unique name of this sstable on Locator.
    1174              :         ObjName string
    1175              : 
    1176              :         // Size of the referenced proportion of the virtualized sstable. An estimate
    1177              :         // is acceptable in lieu of the backing file size.
    1178              :         Size uint64
    1179              : 
    1180              :         // StartKey and EndKey define the bounds of the sstable; the ingestion
    1181              :         // of this file will only result in keys within [StartKey, EndKey) if
    1182              :         // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
    1183              :         // These bounds are loose i.e. it's possible for keys to not span the
    1184              :         // entirety of this range.
    1185              :         //
    1186              :         // StartKey and EndKey user keys must not have suffixes.
    1187              :         //
    1188              :         // Multiple ExternalFiles in one ingestion must all have non-overlapping
    1189              :         // bounds.
    1190              :         StartKey, EndKey []byte
    1191              : 
    1192              :         // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
    1193              :         EndKeyIsInclusive bool
    1194              : 
    1195              :         // HasPointKey and HasRangeKey denote whether this file contains point keys
    1196              :         // or range keys. If both structs are false, an error is returned during
    1197              :         // ingestion.
    1198              :         HasPointKey, HasRangeKey bool
    1199              : 
    1200              :         // SyntheticPrefix will prepend this prefix to all keys in the file during
    1201              :         // iteration. Note that the backing file itself is not modified.
    1202              :         //
    1203              :         // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
    1204              :         SyntheticPrefix []byte
    1205              : 
    1206              :         // SyntheticSuffix will replace the suffix of every key in the file during
    1207              :         // iteration. Note that the file itself is not modified, rather, every key
    1208              :         // returned by an iterator will have the synthetic suffix.
    1209              :         //
    1210              :         // SyntheticSuffix can only be used under the following conditions:
    1211              :         //  - the synthetic suffix must sort before any non-empty suffixes in the
    1212              :         //    backing sst (the entire sst, not just the part restricted to Bounds).
    1213              :         //  - the backing sst must not contain multiple keys with the same prefix.
    1214              :         SyntheticSuffix []byte
    1215              : 
    1216              :         // Level denotes the level at which this file was present at read time
    1217              :         // if the external file was returned by a scan of an existing Pebble
    1218              :         // instance. If Level is 0, this field is ignored.
    1219              :         Level uint8
    1220              : }
    1221              : 
    1222              : // IngestWithStats does the same as Ingest, and additionally returns
    1223              : // IngestOperationStats.
    1224            1 : func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
    1225            1 :         if err := d.closed.Load(); err != nil {
    1226            0 :                 panic(err)
    1227              :         }
    1228            1 :         if d.opts.ReadOnly {
    1229            0 :                 return IngestOperationStats{}, ErrReadOnly
    1230            0 :         }
    1231            1 :         return d.ingest(ctx, ingestArgs{Local: paths})
    1232              : }
    1233              : 
    1234              : // IngestExternalFiles does the same as IngestWithStats, and additionally
    1235              : // accepts external files (with locator info that can be resolved using
    1236              : // d.opts.SharedStorage). These files must also be non-overlapping with
    1237              : // each other, and must be resolvable through d.objProvider.
    1238              : func (d *DB) IngestExternalFiles(
    1239              :         ctx context.Context, external []ExternalFile,
    1240            2 : ) (IngestOperationStats, error) {
    1241            2 :         if err := d.closed.Load(); err != nil {
    1242            0 :                 panic(err)
    1243              :         }
    1244              : 
    1245            2 :         if d.opts.ReadOnly {
    1246            0 :                 return IngestOperationStats{}, ErrReadOnly
    1247            0 :         }
    1248            2 :         if d.opts.Experimental.RemoteStorage == nil {
    1249            0 :                 return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
    1250            0 :         }
    1251            2 :         return d.ingest(ctx, ingestArgs{External: external})
    1252              : }
    1253              : 
    1254              : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
    1255              : // list of shared files to ingest that can be read from a remote.Storage through
    1256              : // a Provider. All the shared files must live within exciseSpan, and any existing
    1257              : // keys in exciseSpan are deleted by turning existing sstables into virtual
    1258              : // sstables (if not virtual already) and shrinking their spans to exclude
    1259              : // exciseSpan. See the comment at Ingest for a more complete picture of the
    1260              : // ingestion process.
    1261              : //
    1262              : // Panics if this DB instance was not instantiated with a remote.Storage and
    1263              : // shared sstables are present.
    1264              : func (d *DB) IngestAndExcise(
    1265              :         ctx context.Context,
    1266              :         paths []string,
    1267              :         shared []SharedSSTMeta,
    1268              :         external []ExternalFile,
    1269              :         exciseSpan KeyRange,
    1270            2 : ) (IngestOperationStats, error) {
    1271            2 :         if err := d.closed.Load(); err != nil {
    1272            0 :                 panic(err)
    1273              :         }
    1274            2 :         if d.opts.ReadOnly {
    1275            0 :                 return IngestOperationStats{}, ErrReadOnly
    1276            0 :         }
    1277              :         // Excise is only supported on prefix keys.
    1278            2 :         if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
    1279            0 :                 return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed start key")
    1280            0 :         }
    1281            2 :         if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
    1282            0 :                 return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed end key")
    1283            0 :         }
    1284            2 :         if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
    1285            0 :                 return IngestOperationStats{}, errors.Newf(
    1286            0 :                         "store has format major version %d; IngestAndExcise requires at least %d",
    1287            0 :                         v, FormatMinForSharedObjects,
    1288            0 :                 )
    1289            0 :         }
    1290            2 :         args := ingestArgs{
    1291            2 :                 Local:              paths,
    1292            2 :                 Shared:             shared,
    1293            2 :                 External:           external,
    1294            2 :                 ExciseSpan:         exciseSpan,
    1295            2 :                 ExciseBoundsPolicy: tightExciseBounds,
    1296            2 :         }
    1297            2 :         return d.ingest(ctx, args)
    1298              : }
    1299              : 
    1300              : // Both DB.mu and commitPipeline.mu must be held while this is called.
    1301              : func (d *DB) newIngestedFlushableEntry(
    1302              :         meta []*manifest.TableMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
    1303            2 : ) (*flushableEntry, error) {
    1304            2 :         // If there's an excise being done atomically with the same ingest, we
    1305            2 :         // assign the lowest sequence number in the set of sequence numbers for this
    1306            2 :         // ingestion to the excise. Note that we've already allocated fileCount+1
    1307            2 :         // sequence numbers in this case.
    1308            2 :         //
    1309            2 :         // This mimics the behaviour in the non-flushable ingest case (see the callsite
    1310            2 :         // for ingestUpdateSeqNum).
    1311            2 :         fileSeqNumStart := seqNum
    1312            2 :         if exciseSpan.Valid() {
    1313            2 :                 fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise.
    1314            2 :                 // The excise span will be retained by the flushable, outliving the
    1315            2 :                 // caller's ingestion call. Copy it.
    1316            2 :                 exciseSpan = KeyRange{
    1317            2 :                         Start: slices.Clone(exciseSpan.Start),
    1318            2 :                         End:   slices.Clone(exciseSpan.End),
    1319            2 :                 }
    1320            2 :         }
    1321              :         // Update the sequence number for all of the sstables in the
    1322              :         // metadata. Writing the metadata to the manifest when the
    1323              :         // version edit is applied is the mechanism that persists the
    1324              :         // sequence number. The sstables themselves are left unmodified.
    1325              :         // In this case, a version edit will only be written to the manifest
    1326              :         // when the flushable is eventually flushed. If Pebble restarts in that
    1327              :         // time, then we'll lose the ingest sequence number information. But this
    1328              :         // information will also be reconstructed on node restart.
    1329            2 :         for i, m := range meta {
    1330            2 :                 if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
    1331            0 :                         return nil, err
    1332            0 :                 }
    1333              :         }
    1334              : 
    1335            2 :         f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum)
    1336            2 : 
    1337            2 :         // NB: The logNum/seqNum are the WAL number which we're writing this entry
    1338            2 :         // to and the sequence number within the WAL which we'll write this entry
    1339            2 :         // to.
    1340            2 :         entry := d.newFlushableEntry(f, logNum, seqNum)
    1341            2 :         // The flushable entry starts off with a single reader ref, so increment
    1342            2 :         // the TableMetadata.Refs.
    1343            2 :         for _, file := range f.files {
    1344            2 :                 file.Ref()
    1345            2 :         }
    1346            2 :         entry.unrefFiles = func(of *manifest.ObsoleteFiles) {
    1347            2 :                 // Invoke Unref on each table. If any files become obsolete, they'll be
    1348            2 :                 // added to the set of obsolete files.
    1349            2 :                 for _, file := range f.files {
    1350            2 :                         file.Unref(of)
    1351            2 :                 }
    1352              :         }
    1353              : 
    1354            2 :         entry.flushForced = true
    1355            2 :         entry.releaseMemAccounting = func() {}
    1356            2 :         return entry, nil
    1357              : }
    1358              : 
    1359              : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
    1360              : // we're holding both locks, the order in which we rotate the memtable or
    1361              : // recycle the WAL in this function is irrelevant as long as the correct log
    1362              : // numbers are assigned to the appropriate flushable.
    1363              : func (d *DB) handleIngestAsFlushable(
    1364              :         meta []*manifest.TableMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
    1365            2 : ) error {
    1366            2 :         b := d.NewBatch()
    1367            2 :         if exciseSpan.Valid() {
    1368            2 :                 b.excise(exciseSpan.Start, exciseSpan.End)
    1369            2 :         }
    1370            2 :         for _, m := range meta {
    1371            2 :                 b.ingestSST(m.TableNum)
    1372            2 :         }
    1373            2 :         b.setSeqNum(seqNum)
    1374            2 : 
    1375            2 :         // If the WAL is disabled, then the logNum used to create the flushable
    1376            2 :         // entry doesn't matter. We just use the logNum assigned to the current
    1377            2 :         // mutable memtable. If the WAL is enabled, then this logNum will be
    1378            2 :         // overwritten by the logNum of the log which will contain the log entry
    1379            2 :         // for the ingestedFlushable.
    1380            2 :         logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1381            2 :         if !d.opts.DisableWAL {
    1382            2 :                 // We create a new WAL for the flushable instead of reusing the end of
    1383            2 :                 // the previous WAL. This simplifies the increment of the minimum
    1384            2 :                 // unflushed log number, and also simplifies WAL replay.
    1385            2 :                 var prevLogSize uint64
    1386            2 :                 logNum, prevLogSize = d.rotateWAL()
    1387            2 :                 // As the rotator of the WAL, we're responsible for updating the
    1388            2 :                 // previous flushable queue tail's log size.
    1389            2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
    1390            2 : 
    1391            2 :                 d.mu.Unlock()
    1392            2 :                 err := d.commit.directWrite(b)
    1393            2 :                 if err != nil {
    1394            0 :                         d.opts.Logger.Fatalf("%v", err)
    1395            0 :                 }
    1396            2 :                 d.mu.Lock()
    1397              :         }
    1398              : 
    1399            2 :         entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
    1400            2 :         if err != nil {
    1401            0 :                 return err
    1402            0 :         }
    1403            2 :         nextSeqNum := seqNum + base.SeqNum(b.Count())
    1404            2 : 
    1405            2 :         // Set newLogNum to the logNum of the previous flushable. This value is
    1406            2 :         // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
    1407            2 :         // the appropriate value below.
    1408            2 :         newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1409            2 :         if !d.opts.DisableWAL {
    1410            2 :                 // newLogNum will be the WAL num of the next mutable memtable which
    1411            2 :                 // comes after the ingestedFlushable in the flushable queue. The mutable
    1412            2 :                 // memtable will be created below.
    1413            2 :                 //
    1414            2 :                 // The prevLogSize returned by rotateWAL is the WAL to which the
    1415            2 :                 // flushable ingest keys were appended. This intermediary WAL is only
    1416            2 :                 // used to record the flushable ingest and nothing else.
    1417            2 :                 newLogNum, entry.logSize = d.rotateWAL()
    1418            2 :         }
    1419              : 
    1420            2 :         d.mu.versions.metrics.Ingest.Count++
    1421            2 :         currMem := d.mu.mem.mutable
    1422            2 :         // NB: Placing ingested sstables above the current memtables
    1423            2 :         // requires rotating of the existing memtables/WAL. There is
    1424            2 :         // some concern of churning through tiny memtables due to
    1425            2 :         // ingested sstables being placed on top of them, but those
    1426            2 :         // memtables would have to be flushed anyways.
    1427            2 :         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1428            2 :         d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
    1429            2 :         d.updateReadStateLocked(d.opts.DebugCheck)
    1430            2 :         // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
    1431            2 :         d.maybeScheduleFlush()
    1432            2 :         return nil
    1433              : }
    1434              : 
    1435              : type ingestArgs struct {
    1436              :         // Local sstables to ingest.
    1437              :         Local []string
    1438              :         // Shared sstables to ingest.
    1439              :         Shared []SharedSSTMeta
    1440              :         // External sstables to ingest.
    1441              :         External []ExternalFile
    1442              :         // ExciseSpan (unset if not excising).
    1443              :         ExciseSpan         KeyRange
    1444              :         ExciseBoundsPolicy exciseBoundsPolicy
    1445              : }
    1446              : 
    1447              : // See comment at Ingest() for details on how this works.
    1448            2 : func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats, error) {
    1449            2 :         paths := args.Local
    1450            2 :         shared := args.Shared
    1451            2 :         external := args.External
    1452            2 :         if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
    1453            0 :                 panic("cannot ingest shared sstables with nil SharedStorage")
    1454              :         }
    1455            2 :         if (args.ExciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
    1456            0 :                 return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
    1457            0 :         }
    1458            2 :         if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
    1459            1 :                 for i := range external {
    1460            1 :                         if len(external[i].SyntheticPrefix) > 0 {
    1461            1 :                                 return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
    1462            1 :                         }
    1463            1 :                         if len(external[i].SyntheticSuffix) > 0 {
    1464            1 :                                 return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
    1465            1 :                         }
    1466              :                 }
    1467              :         }
    1468              :         // Allocate table numbers for all files being ingested and mark them as
    1469              :         // pending in order to prevent them from being deleted. Note that this causes
    1470              :         // the file number ordering to be out of alignment with sequence number
    1471              :         // ordering. The sorting of L0 tables by sequence number avoids relying on
    1472              :         // that (busted) invariant.
    1473            2 :         pendingOutputs := make([]base.TableNum, len(paths)+len(shared)+len(external))
    1474            2 :         for i := 0; i < len(paths)+len(shared)+len(external); i++ {
    1475            2 :                 pendingOutputs[i] = d.mu.versions.getNextTableNum()
    1476            2 :         }
    1477              : 
    1478            2 :         jobID := d.newJobID()
    1479            2 : 
    1480            2 :         // Load the metadata for all the files being ingested. This step detects
    1481            2 :         // and elides empty sstables.
    1482            2 :         loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
    1483            2 :                 d.cacheHandle, pendingOutputs)
    1484            2 :         if err != nil {
    1485            1 :                 return IngestOperationStats{}, err
    1486            1 :         }
    1487              : 
    1488            2 :         if loadResult.fileCount() == 0 && !args.ExciseSpan.Valid() {
    1489            2 :                 // All of the sstables to be ingested were empty. Nothing to do.
    1490            2 :                 return IngestOperationStats{}, nil
    1491            2 :         }
    1492              : 
    1493              :         // Verify the sstables do not overlap.
    1494            2 :         if err := ingestSortAndVerify(d.cmp, loadResult, args.ExciseSpan); err != nil {
    1495            2 :                 return IngestOperationStats{}, err
    1496            2 :         }
    1497              : 
    1498              :         // Hard link the sstables into the DB directory. Since the sstables aren't
    1499              :         // referenced by a version, they won't be used. If the hard linking fails
    1500              :         // (e.g. because the files reside on a different filesystem), ingestLinkLocal
    1501              :         // will fall back to copying, and if that fails we undo our work and return an
    1502              :         // error.
    1503            2 :         if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
    1504            0 :                 return IngestOperationStats{}, err
    1505            0 :         }
    1506              : 
    1507            2 :         err = d.ingestAttachRemote(jobID, loadResult)
    1508            2 :         defer d.ingestUnprotectExternalBackings(loadResult)
    1509            2 :         if err != nil {
    1510            0 :                 return IngestOperationStats{}, err
    1511            0 :         }
    1512              : 
    1513              :         // Make the new tables durable. We need to do this at some point before we
    1514              :         // update the MANIFEST (via UpdateVersionLocked), otherwise a crash can have
    1515              :         // the tables referenced in the MANIFEST, but not present in the provider.
    1516            2 :         if err := d.objProvider.Sync(); err != nil {
    1517            1 :                 return IngestOperationStats{}, err
    1518            1 :         }
    1519              : 
    1520              :         // metaFlushableOverlaps is a map indicating which of the ingested sstables
    1521              :         // overlap some table in the flushable queue. It's used to approximate
    1522              :         // ingest-into-L0 stats when using flushable ingests.
    1523            2 :         metaFlushableOverlaps := make(map[base.TableNum]bool, loadResult.fileCount())
    1524            2 :         var mem *flushableEntry
    1525            2 :         var mut *memTable
    1526            2 :         // asFlushable indicates whether the sstable was ingested as a flushable.
    1527            2 :         var asFlushable bool
    1528            2 :         var waitFlushStart crtime.Mono
    1529            2 :         prepare := func(seqNum base.SeqNum) {
    1530            2 :                 // Note that d.commit.mu is held by commitPipeline when calling prepare.
    1531            2 : 
    1532            2 :                 // Determine the set of bounds we care about for the purpose of checking
    1533            2 :                 // for overlap among the flushables. If there's an excise span, we need
    1534            2 :                 // to check for overlap with its bounds as well.
    1535            2 :                 overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
    1536            2 :                 for _, m := range loadResult.local {
    1537            2 :                         overlapBounds = append(overlapBounds, m.TableMetadata)
    1538            2 :                 }
    1539            2 :                 for _, m := range loadResult.shared {
    1540            2 :                         overlapBounds = append(overlapBounds, m.TableMetadata)
    1541            2 :                 }
    1542            2 :                 for _, m := range loadResult.external {
    1543            2 :                         overlapBounds = append(overlapBounds, m.TableMetadata)
    1544            2 :                 }
    1545            2 :                 if args.ExciseSpan.Valid() {
    1546            2 :                         overlapBounds = append(overlapBounds, &args.ExciseSpan)
    1547            2 :                 }
    1548              : 
    1549            2 :                 d.mu.Lock()
    1550            2 :                 defer d.mu.Unlock()
    1551            2 : 
    1552            2 :                 if args.ExciseSpan.Valid() {
    1553            2 :                         // Check if any of the currently-open EventuallyFileOnlySnapshots
    1554            2 :                         // overlap in key ranges with the excise span. If so, we need to
    1555            2 :                         // check for memtable overlaps with all bounds of that
    1556            2 :                         // EventuallyFileOnlySnapshot in addition to the ingestion's own
    1557            2 :                         // bounds too.
    1558            2 :                         overlapBounds = append(overlapBounds, exciseOverlapBounds(
    1559            2 :                                 d.cmp, &d.mu.snapshots.snapshotList, args.ExciseSpan, seqNum)...)
    1560            2 :                 }
    1561              : 
    1562              :                 // Check to see if any files overlap with any of the memtables. The queue
    1563              :                 // is ordered from oldest to newest with the mutable memtable being the
    1564              :                 // last element in the slice. We want to wait for the newest table that
    1565              :                 // overlaps.
    1566              : 
    1567            2 :                 for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
    1568            2 :                         m := d.mu.mem.queue[i]
    1569            2 :                         m.computePossibleOverlaps(func(b bounded) shouldContinue {
    1570            2 :                                 // If this is the first table to overlap a flushable, save
    1571            2 :                                 // the flushable. This ingest must be ingested or flushed
    1572            2 :                                 // after it.
    1573            2 :                                 if mem == nil {
    1574            2 :                                         mem = m
    1575            2 :                                 }
    1576              : 
    1577            2 :                                 switch v := b.(type) {
    1578            2 :                                 case *manifest.TableMetadata:
    1579            2 :                                         // NB: False positives are possible if `m` is a flushable
    1580            2 :                                         // ingest that overlaps the file `v` in bounds but doesn't
    1581            2 :                                         // contain overlapping data. This is considered acceptable
    1582            2 :                                         // because it's rare (in CockroachDB a bound overlap likely
    1583            2 :                                         // indicates a data overlap), and blocking the commit
    1584            2 :                                         // pipeline while we perform I/O to check for overlap may be
    1585            2 :                                         // more disruptive than enqueueing this ingestion on the
    1586            2 :                                         // flushable queue and switching to a new memtable.
    1587            2 :                                         metaFlushableOverlaps[v.TableNum] = true
    1588            2 :                                 case *KeyRange:
    1589              :                                         // An excise span or an EventuallyFileOnlySnapshot protected range;
    1590              :                                         // not a file.
    1591            0 :                                 default:
    1592            0 :                                         panic("unreachable")
    1593              :                                 }
    1594            2 :                                 return continueIteration
    1595              :                         }, overlapBounds...)
    1596              :                 }
    1597              : 
    1598            2 :                 if mem == nil {
    1599            2 :                         // No overlap with any of the queued flushables, so no need to queue
    1600            2 :                         // after them.
    1601            2 : 
    1602            2 :                         // New writes with higher sequence numbers may be concurrently
    1603            2 :                         // committed. We must ensure they don't flush before this ingest
    1604            2 :                         // completes. To do that, we ref the mutable memtable as a writer,
    1605            2 :                         // preventing its flushing (and the flushing of all subsequent
    1606            2 :                         // flushables in the queue). Once we've acquired the manifest lock
    1607            2 :                         // to add the ingested sstables to the LSM, we can unref as we're
    1608            2 :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1609            2 :                         mut = d.mu.mem.mutable
    1610            2 :                         mut.writerRef()
    1611            2 :                         return
    1612            2 :                 }
    1613              : 
    1614              :                 // The ingestion overlaps with some entry in the flushable queue. If the
    1615              :                 // pre-conditions are met below, we can treat this ingestion as a flushable
    1616              :                 // ingest, otherwise we wait on the memtable flush before ingestion.
    1617              :                 //
    1618              :                 // TODO(aaditya): We should make flushableIngest compatible with remote
    1619              :                 // files.
    1620            2 :                 hasRemoteFiles := len(shared) > 0 || len(external) > 0
    1621            2 :                 canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
    1622            2 :                         // We require that either the queue of flushables is below the
    1623            2 :                         // stop-writes threshold (note that this is typically a conservative
    1624            2 :                         // check, since not every element of this queue will contribute the full
    1625            2 :                         // memtable memory size that could result in a write stall), or WAL
    1626            2 :                         // failover is permitting an unlimited queue without causing a write
    1627            2 :                         // stall. The latter condition is important to avoid delays in
    1628            2 :                         // visibility of concurrent writes that happen to get a sequence number
    1629            2 :                         // after this ingest and then must wait for this ingest that is itself
    1630            2 :                         // waiting on a large flush. See
    1631            2 :                         // https://github.com/cockroachdb/pebble/issues/4944 for an illustration
    1632            2 :                         // of this problem.
    1633            2 :                         (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold ||
    1634            2 :                                 d.mu.log.manager.ElevateWriteStallThresholdForFailover()) &&
    1635            2 :                         !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
    1636            2 :                         (!args.ExciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
    1637            2 :                 if !canIngestFlushable {
    1638            2 :                         // We're not able to ingest as a flushable,
    1639            2 :                         // so we must synchronously flush.
    1640            2 :                         //
    1641            2 :                         // TODO(bilal): Currently, if any of the files being ingested are shared,
    1642            2 :                         // we cannot use flushable ingests and need
    1643            2 :                         // to wait synchronously.
    1644            2 :                         if mem.flushable == d.mu.mem.mutable {
    1645            2 :                                 err = d.makeRoomForWrite(nil)
    1646            2 :                         }
    1647              :                         // New writes with higher sequence numbers may be concurrently
    1648              :                         // committed. We must ensure they don't flush before this ingest
    1649              :                         // completes. To do that, we ref the mutable memtable as a writer,
    1650              :                         // preventing its flushing (and the flushing of all subsequent
    1651              :                         // flushables in the queue). Once we've acquired the manifest lock
    1652              :                         // to add the ingested sstables to the LSM, we can unref as we're
    1653              :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1654            2 :                         mut = d.mu.mem.mutable
    1655            2 :                         mut.writerRef()
    1656            2 :                         mem.flushForced = true
    1657            2 :                         waitFlushStart = crtime.NowMono()
    1658            2 :                         d.maybeScheduleFlush()
    1659            2 :                         return
    1660              :                 }
    1661              :                 // Since there aren't too many memtables already queued up, we can
    1662              :                 // slide the ingested sstables on top of the existing memtables.
    1663            2 :                 asFlushable = true
    1664            2 :                 fileMetas := make([]*manifest.TableMetadata, len(loadResult.local))
    1665            2 :                 for i := range fileMetas {
    1666            2 :                         fileMetas[i] = loadResult.local[i].TableMetadata
    1667            2 :                 }
    1668            2 :                 err = d.handleIngestAsFlushable(fileMetas, seqNum, args.ExciseSpan)
    1669              :         }
    1670              : 
    1671            2 :         var ve *manifest.VersionEdit
    1672            2 :         var waitFlushDuration time.Duration
    1673            2 :         var manifestUpdateDuration time.Duration
    1674            2 :         apply := func(seqNum base.SeqNum) {
    1675            2 :                 if err != nil || asFlushable {
    1676            2 :                         // An error occurred during prepare.
    1677            2 :                         if mut != nil {
    1678            0 :                                 if mut.writerUnref() {
    1679            0 :                                         d.mu.Lock()
    1680            0 :                                         d.maybeScheduleFlush()
    1681            0 :                                         d.mu.Unlock()
    1682            0 :                                 }
    1683              :                         }
    1684            2 :                         return
    1685              :                 }
    1686              : 
    1687              :                 // If there's an excise being done atomically with the same ingest, we
    1688              :                 // assign the lowest sequence number in the set of sequence numbers for this
    1689              :                 // ingestion to the excise. Note that we've already allocated fileCount+1
    1690              :                 // sequence numbers in this case.
    1691            2 :                 if args.ExciseSpan.Valid() {
    1692            2 :                         seqNum++ // the first seqNum is reserved for the excise.
    1693            2 :                 }
    1694              :                 // Update the sequence numbers for all ingested sstables'
    1695              :                 // metadata. When the version edit is applied, the metadata is
    1696              :                 // written to the manifest, persisting the sequence number.
    1697              :                 // The sstables themselves are left unmodified.
    1698            2 :                 if err = ingestUpdateSeqNum(
    1699            2 :                         d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
    1700            2 :                 ); err != nil {
    1701            0 :                         if mut != nil {
    1702            0 :                                 if mut.writerUnref() {
    1703            0 :                                         d.mu.Lock()
    1704            0 :                                         d.maybeScheduleFlush()
    1705            0 :                                         d.mu.Unlock()
    1706            0 :                                 }
    1707              :                         }
    1708            0 :                         return
    1709              :                 }
    1710              : 
    1711              :                 // If we overlapped with a memtable in prepare wait for the flush to
    1712              :                 // finish.
    1713            2 :                 if mem != nil {
    1714            2 :                         <-mem.flushed
    1715            2 :                         waitFlushDuration = waitFlushStart.Elapsed()
    1716            2 :                 }
    1717              : 
    1718              :                 // Assign the sstables to the correct level in the LSM and apply the
    1719              :                 // version edit.
    1720            2 :                 ve, manifestUpdateDuration, err = d.ingestApply(ctx, jobID, loadResult, mut, args.ExciseSpan, args.ExciseBoundsPolicy, seqNum)
    1721              :         }
    1722              : 
    1723              :         // Only one ingest can occur at a time because if not, one would block waiting
    1724              :         // for the other to finish applying. This blocking would happen while holding
    1725              :         // the commit mutex which would prevent unrelated batches from writing their
    1726              :         // changes to the WAL and memtable. This will cause a bigger commit hiccup
    1727              :         // during ingestion.
    1728            2 :         seqNumCount := loadResult.fileCount()
    1729            2 :         if args.ExciseSpan.Valid() {
    1730            2 :                 seqNumCount++
    1731            2 :         }
    1732            2 :         d.commit.ingestSem <- struct{}{}
    1733            2 :         d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
    1734            2 :         <-d.commit.ingestSem
    1735            2 : 
    1736            2 :         if err != nil {
    1737            1 :                 if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
    1738            0 :                         d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
    1739            0 :                 }
    1740            2 :         } else {
    1741            2 :                 // Since we either created a hard link to the ingesting files, or copied
    1742            2 :                 // them over, it is safe to remove the originals paths.
    1743            2 :                 for i := range loadResult.local {
    1744            2 :                         path := loadResult.local[i].path
    1745            2 :                         if err2 := d.opts.FS.Remove(path); err2 != nil {
    1746            1 :                                 d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
    1747            1 :                         }
    1748              :                 }
    1749              :         }
    1750              : 
    1751              :         // TODO(jackson): Refactor this so that the case where there are no files
    1752              :         // but a valid excise span is not so exceptional.
    1753              : 
    1754            2 :         var stats IngestOperationStats
    1755            2 :         if loadResult.fileCount() > 0 {
    1756            2 :                 info := TableIngestInfo{
    1757            2 :                         JobID:                  int(jobID),
    1758            2 :                         Err:                    err,
    1759            2 :                         flushable:              asFlushable,
    1760            2 :                         WaitFlushDuration:      waitFlushDuration,
    1761            2 :                         ManifestUpdateDuration: manifestUpdateDuration,
    1762            2 :                         BlockReadDuration:      loadResult.blockReadStats.BlockReadDuration,
    1763            2 :                         BlockReadBytes:         loadResult.blockReadStats.BlockBytes - loadResult.blockReadStats.BlockBytesInCache,
    1764            2 :                 }
    1765            2 :                 if len(loadResult.local) > 0 {
    1766            2 :                         info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
    1767            2 :                 } else if len(loadResult.shared) > 0 {
    1768            2 :                         info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
    1769            2 :                 } else {
    1770            2 :                         info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
    1771            2 :                 }
    1772            2 :                 if ve != nil {
    1773            2 :                         info.Tables = make([]struct {
    1774            2 :                                 TableInfo
    1775            2 :                                 Level int
    1776            2 :                         }, len(ve.NewTables))
    1777            2 :                         for i := range ve.NewTables {
    1778            2 :                                 e := &ve.NewTables[i]
    1779            2 :                                 info.Tables[i].Level = e.Level
    1780            2 :                                 info.Tables[i].TableInfo = e.Meta.TableInfo()
    1781            2 :                                 stats.Bytes += e.Meta.Size
    1782            2 :                                 if e.Level == 0 {
    1783            2 :                                         stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
    1784            2 :                                 }
    1785            2 :                                 if metaFlushableOverlaps[e.Meta.TableNum] {
    1786            2 :                                         stats.MemtableOverlappingFiles++
    1787            2 :                                 }
    1788              :                         }
    1789            2 :                 } else if asFlushable {
    1790            2 :                         // NB: If asFlushable == true, there are no shared sstables.
    1791            2 :                         info.Tables = make([]struct {
    1792            2 :                                 TableInfo
    1793            2 :                                 Level int
    1794            2 :                         }, len(loadResult.local))
    1795            2 :                         for i, f := range loadResult.local {
    1796            2 :                                 info.Tables[i].Level = -1
    1797            2 :                                 info.Tables[i].TableInfo = f.TableInfo()
    1798            2 :                                 stats.Bytes += f.Size
    1799            2 :                                 // We don't have exact stats on which files will be ingested into
    1800            2 :                                 // L0, because actual ingestion into the LSM has been deferred until
    1801            2 :                                 // flush time. Instead, we infer based on memtable overlap.
    1802            2 :                                 //
    1803            2 :                                 // TODO(jackson): If we optimistically compute data overlap (#2112)
    1804            2 :                                 // before entering the commit pipeline, we can use that overlap to
    1805            2 :                                 // improve our approximation by incorporating overlap with L0, not
    1806            2 :                                 // just memtables.
    1807            2 :                                 if metaFlushableOverlaps[f.TableNum] {
    1808            2 :                                         stats.ApproxIngestedIntoL0Bytes += f.Size
    1809            2 :                                         stats.MemtableOverlappingFiles++
    1810            2 :                                 }
    1811              :                         }
    1812              :                 }
    1813            2 :                 d.opts.EventListener.TableIngested(info)
    1814              :         }
    1815              : 
    1816            2 :         return stats, err
    1817              : }
    1818              : 
    1819              : type ingestSplitFile struct {
    1820              :         // ingestFile is the file being ingested.
    1821              :         ingestFile *manifest.TableMetadata
    1822              :         // splitFile is the file that needs to be split to allow ingestFile to slot
    1823              :         // into `level` level.
    1824              :         splitFile *manifest.TableMetadata
    1825              :         // The level where ingestFile will go (and where splitFile already is).
    1826              :         level int
    1827              : }
    1828              : 
    1829              : // ingestSplit splits files specified in `files` and updates ve in-place to
    1830              : // account for existing files getting split into two virtual sstables. The map
    1831              : // `replacedFiles` contains an in-progress map of all files that have been
    1832              : // replaced with new virtual sstables in this version edit so far, which is also
    1833              : // updated in-place.
    1834              : //
    1835              : // d.mu as well as the manifest lock must be held when calling this method.
    1836              : func (d *DB) ingestSplit(
    1837              :         ctx context.Context,
    1838              :         ve *manifest.VersionEdit,
    1839              :         updateMetrics func(*manifest.TableMetadata, int, []manifest.NewTableEntry),
    1840              :         files []ingestSplitFile,
    1841              :         replacedTables map[base.TableNum][]manifest.NewTableEntry,
    1842            2 : ) error {
    1843            2 :         for _, s := range files {
    1844            2 :                 ingestFileBounds := s.ingestFile.UserKeyBounds()
    1845            2 :                 // replacedFiles can be thought of as a tree, where we start iterating with
    1846            2 :                 // s.splitFile and run its fileNum through replacedFiles, then find which of
    1847            2 :                 // the replaced files overlaps with s.ingestFile, which becomes the new
    1848            2 :                 // splitFile, then we check splitFile's replacements in replacedFiles again
    1849            2 :                 // for overlap with s.ingestFile, and so on until we either can't find the
    1850            2 :                 // current splitFile in replacedFiles (i.e. that's the file that now needs to
    1851            2 :                 // be split), or we don't find a file that overlaps with s.ingestFile, which
    1852            2 :                 // means a prior ingest split already produced enough room for s.ingestFile
    1853            2 :                 // to go into this level without necessitating another ingest split.
    1854            2 :                 splitFile := s.splitFile
    1855            2 :                 for splitFile != nil {
    1856            2 :                         replaced, ok := replacedTables[splitFile.TableNum]
    1857            2 :                         if !ok {
    1858            2 :                                 break
    1859              :                         }
    1860            2 :                         updatedSplitFile := false
    1861            2 :                         for i := range replaced {
    1862            2 :                                 if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
    1863            2 :                                         if updatedSplitFile {
    1864            0 :                                                 // This should never happen because the earlier ingestTargetLevel
    1865            0 :                                                 // function only finds split file candidates that are guaranteed to
    1866            0 :                                                 // have no data overlap, only boundary overlap. See the comments
    1867            0 :                                                 // in that method to see the definitions of data vs boundary
    1868            0 :                                                 // overlap. That, plus the fact that files in `replaced` are
    1869            0 :                                                 // guaranteed to have file bounds that are tight on user keys
    1870            0 :                                                 // (as that's what `d.excise` produces), means that the only case
    1871            0 :                                                 // where we overlap with two or more files in `replaced` is if we
    1872            0 :                                                 // actually had data overlap all along, or if the ingestion files
    1873            0 :                                                 // were overlapping, either of which is an invariant violation.
    1874            0 :                                                 panic("updated with two files in ingestSplit")
    1875              :                                         }
    1876            2 :                                         splitFile = replaced[i].Meta
    1877            2 :                                         updatedSplitFile = true
    1878              :                                 }
    1879              :                         }
    1880            2 :                         if !updatedSplitFile {
    1881            2 :                                 // None of the replaced files overlapped with the file being ingested.
    1882            2 :                                 // This can happen if we've already excised a span overlapping with
    1883            2 :                                 // this file, or if we have consecutive ingested files that can slide
    1884            2 :                                 // within the same gap between keys in an existing file. For instance,
    1885            2 :                                 // if an existing file has keys a and g and we're ingesting b-c, d-e,
    1886            2 :                                 // the first loop iteration will split the existing file into one that
    1887            2 :                                 // ends in a and another that starts at g, and the second iteration will
    1888            2 :                                 // fall into this case and require no splitting.
    1889            2 :                                 //
    1890            2 :                                 // No splitting necessary.
    1891            2 :                                 splitFile = nil
    1892            2 :                         }
    1893              :                 }
    1894            2 :                 if splitFile == nil {
    1895            2 :                         continue
    1896              :                 }
    1897              :                 // NB: excise operates on [start, end). We're splitting at [start, end]
    1898              :                 // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
    1899              :                 // of exclusive vs inclusive end bounds should not make a difference here
    1900              :                 // as we're guaranteed to not have any data overlap between splitFile and
    1901              :                 // s.ingestFile. d.excise will return an error if we pass an inclusive user
    1902              :                 // key bound _and_ we end up seeing data overlap at the end key.
    1903            2 :                 exciseBounds := base.UserKeyBoundsFromInternal(s.ingestFile.Smallest(), s.ingestFile.Largest())
    1904            2 :                 leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, splitFile, s.level, tightExciseBounds)
    1905            2 :                 if err != nil {
    1906            0 :                         return err
    1907            0 :                 }
    1908            2 :                 added := applyExciseToVersionEdit(ve, splitFile, leftTable, rightTable, s.level)
    1909            2 :                 replacedTables[splitFile.TableNum] = added
    1910            2 :                 for i := range added {
    1911            2 :                         addedBounds := added[i].Meta.UserKeyBounds()
    1912            2 :                         if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
    1913            0 :                                 panic("ingest-time split produced a file that overlaps with ingested file")
    1914              :                         }
    1915              :                 }
    1916            2 :                 updateMetrics(splitFile, s.level, added)
    1917              :         }
    1918              :         // Flatten the version edit by removing any entries from ve.NewFiles that
    1919              :         // are also in ve.DeletedFiles.
    1920            2 :         newNewFiles := ve.NewTables[:0]
    1921            2 :         for i := range ve.NewTables {
    1922            2 :                 fn := ve.NewTables[i].Meta.TableNum
    1923            2 :                 deEntry := manifest.DeletedTableEntry{Level: ve.NewTables[i].Level, FileNum: fn}
    1924            2 :                 if _, ok := ve.DeletedTables[deEntry]; ok {
    1925            2 :                         delete(ve.DeletedTables, deEntry)
    1926            2 :                 } else {
    1927            2 :                         newNewFiles = append(newNewFiles, ve.NewTables[i])
    1928            2 :                 }
    1929              :         }
    1930            2 :         ve.NewTables = newNewFiles
    1931            2 :         return nil
    1932              : }
    1933              : 
    1934              : func (d *DB) ingestApply(
    1935              :         ctx context.Context,
    1936              :         jobID JobID,
    1937              :         lr ingestLoadResult,
    1938              :         mut *memTable,
    1939              :         exciseSpan KeyRange,
    1940              :         exciseBoundsPolicy exciseBoundsPolicy,
    1941              :         exciseSeqNum base.SeqNum,
    1942            2 : ) (*manifest.VersionEdit, time.Duration, error) {
    1943            2 :         d.mu.Lock()
    1944            2 :         defer d.mu.Unlock()
    1945            2 : 
    1946            2 :         ve := &manifest.VersionEdit{
    1947            2 :                 NewTables: make([]manifest.NewTableEntry, lr.fileCount()),
    1948            2 :         }
    1949            2 :         if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
    1950            2 :                 ve.DeletedTables = map[manifest.DeletedTableEntry]*manifest.TableMetadata{}
    1951            2 :         }
    1952            2 :         var metrics levelMetricsDelta
    1953            2 : 
    1954            2 :         // Determine the target level inside UpdateVersionLocked. This prevents two
    1955            2 :         // concurrent ingestion jobs from using the same version to determine the
    1956            2 :         // target level, and also provides serialization with concurrent compaction
    1957            2 :         // and flush jobs.
    1958            2 :         manifestUpdateDuration, err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
    1959            2 :                 if mut != nil {
    1960            2 :                         // Unref the mutable memtable to allows its flush to proceed. Now that we've
    1961            2 :                         // acquired the manifest lock, we can be certain that if the mutable
    1962            2 :                         // memtable has received more recent conflicting writes, the flush won't
    1963            2 :                         // beat us to applying to the manifest resulting in sequence number
    1964            2 :                         // inversion. Even though we call maybeScheduleFlush right now, this flush
    1965            2 :                         // will apply after our ingestion.
    1966            2 :                         if mut.writerUnref() {
    1967            1 :                                 d.maybeScheduleFlush()
    1968            1 :                         }
    1969              :                 }
    1970              : 
    1971            2 :                 current := d.mu.versions.currentVersion()
    1972            2 :                 overlapChecker := &overlapChecker{
    1973            2 :                         comparer: d.opts.Comparer,
    1974            2 :                         newIters: d.newIters,
    1975            2 :                         opts: IterOptions{
    1976            2 :                                 logger:   d.opts.Logger,
    1977            2 :                                 Category: categoryIngest,
    1978            2 :                         },
    1979            2 :                         v: current,
    1980            2 :                 }
    1981            2 :                 shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
    1982            2 :                         d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
    1983            2 :                 baseLevel := d.mu.versions.picker.getBaseLevel()
    1984            2 :                 // filesToSplit is a list where each element is a pair consisting of a file
    1985            2 :                 // being ingested and a file being split to make room for an ingestion into
    1986            2 :                 // that level. Each ingested file will appear at most once in this list. It
    1987            2 :                 // is possible for split files to appear twice in this list.
    1988            2 :                 filesToSplit := make([]ingestSplitFile, 0)
    1989            2 :                 checkCompactions := false
    1990            2 :                 for i := 0; i < lr.fileCount(); i++ {
    1991            2 :                         // Determine the lowest level in the LSM for which the sstable doesn't
    1992            2 :                         // overlap any existing files in the level.
    1993            2 :                         var m *manifest.TableMetadata
    1994            2 :                         specifiedLevel := -1
    1995            2 :                         isShared := false
    1996            2 :                         isExternal := false
    1997            2 :                         if i < len(lr.local) {
    1998            2 :                                 // local file.
    1999            2 :                                 m = lr.local[i].TableMetadata
    2000            2 :                         } else if (i - len(lr.local)) < len(lr.shared) {
    2001            2 :                                 // shared file.
    2002            2 :                                 isShared = true
    2003            2 :                                 sharedIdx := i - len(lr.local)
    2004            2 :                                 m = lr.shared[sharedIdx].TableMetadata
    2005            2 :                                 specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
    2006            2 :                         } else {
    2007            2 :                                 // external file.
    2008            2 :                                 isExternal = true
    2009            2 :                                 externalIdx := i - (len(lr.local) + len(lr.shared))
    2010            2 :                                 m = lr.external[externalIdx].TableMetadata
    2011            2 :                                 if lr.externalFilesHaveLevel {
    2012            1 :                                         specifiedLevel = int(lr.external[externalIdx].external.Level)
    2013            1 :                                 }
    2014              :                         }
    2015              : 
    2016              :                         // Add to CreatedBackingTables if this is a new backing.
    2017              :                         //
    2018              :                         // Shared files always have a new backing. External files have new backings
    2019              :                         // iff the backing disk file num and the file num match (see ingestAttachRemote).
    2020            2 :                         if isShared || (isExternal && m.TableBacking.DiskFileNum == base.DiskFileNum(m.TableNum)) {
    2021            2 :                                 ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.TableBacking)
    2022            2 :                         }
    2023              : 
    2024            2 :                         f := &ve.NewTables[i]
    2025            2 :                         var err error
    2026            2 :                         if specifiedLevel != -1 {
    2027            2 :                                 f.Level = specifiedLevel
    2028            2 :                         } else {
    2029            2 :                                 var splitTable *manifest.TableMetadata
    2030            2 :                                 if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest()) && exciseSpan.Contains(d.cmp, m.Largest()) {
    2031            2 :                                         // This file fits perfectly within the excise span. We can slot it at
    2032            2 :                                         // L6, or sharedLevelsStart - 1 if we have shared files.
    2033            2 :                                         if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
    2034            2 :                                                 f.Level = sharedLevelsStart - 1
    2035            2 :                                                 if baseLevel > f.Level {
    2036            2 :                                                         f.Level = 0
    2037            2 :                                                 }
    2038            2 :                                         } else {
    2039            2 :                                                 f.Level = 6
    2040            2 :                                         }
    2041            2 :                                 } else {
    2042            2 :                                         // We check overlap against the LSM without holding DB.mu. Note that we
    2043            2 :                                         // are still holding the log lock, so the version cannot change.
    2044            2 :                                         // TODO(radu): perform this check optimistically outside of the log lock.
    2045            2 :                                         var lsmOverlap overlap.WithLSM
    2046            2 :                                         lsmOverlap, err = func() (overlap.WithLSM, error) {
    2047            2 :                                                 d.mu.Unlock()
    2048            2 :                                                 defer d.mu.Lock()
    2049            2 :                                                 return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
    2050            2 :                                         }()
    2051            2 :                                         if err == nil {
    2052            2 :                                                 f.Level, splitTable, err = ingestTargetLevel(
    2053            2 :                                                         ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
    2054            2 :                                                 )
    2055            2 :                                         }
    2056              :                                 }
    2057              : 
    2058            2 :                                 if splitTable != nil {
    2059            2 :                                         if invariants.Enabled {
    2060            2 :                                                 if lf := current.Levels[f.Level].Find(d.cmp, splitTable); lf.Empty() {
    2061            0 :                                                         panic("splitFile returned is not in level it should be")
    2062              :                                                 }
    2063              :                                         }
    2064              :                                         // We take advantage of the fact that we won't drop the db mutex
    2065              :                                         // between now and the call to UpdateVersionLocked. So, no files should
    2066              :                                         // get added to a new in-progress compaction at this point. We can
    2067              :                                         // avoid having to iterate on in-progress compactions to cancel them
    2068              :                                         // if none of the files being split have a compacting state.
    2069            2 :                                         if splitTable.IsCompacting() {
    2070            1 :                                                 checkCompactions = true
    2071            1 :                                         }
    2072            2 :                                         filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitTable, level: f.Level})
    2073              :                                 }
    2074              :                         }
    2075            2 :                         if err != nil {
    2076            0 :                                 return versionUpdate{}, err
    2077            0 :                         }
    2078            2 :                         if isShared && f.Level < sharedLevelsStart {
    2079            0 :                                 panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
    2080            0 :                                         f.Level, sharedLevelsStart))
    2081              :                         }
    2082            2 :                         f.Meta = m
    2083            2 :                         levelMetrics := metrics[f.Level]
    2084            2 :                         if levelMetrics == nil {
    2085            2 :                                 levelMetrics = &LevelMetrics{}
    2086            2 :                                 metrics[f.Level] = levelMetrics
    2087            2 :                         }
    2088            2 :                         levelMetrics.TablesCount++
    2089            2 :                         levelMetrics.TablesSize += int64(m.Size)
    2090            2 :                         levelMetrics.EstimatedReferencesSize += m.EstimatedReferenceSize()
    2091            2 :                         levelMetrics.TableBytesIngested += m.Size
    2092            2 :                         levelMetrics.TablesIngested++
    2093              :                 }
    2094              :                 // replacedTables maps files excised due to exciseSpan (or splitFiles returned
    2095              :                 // by ingestTargetLevel), to files that were created to replace it. This map
    2096              :                 // is used to resolve references to split files in filesToSplit, as it is
    2097              :                 // possible for a file that we want to split to no longer exist or have a
    2098              :                 // newer fileMetadata due to a split induced by another ingestion file, or an
    2099              :                 // excise.
    2100            2 :                 replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
    2101            2 :                 updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
    2102            2 :                         levelMetrics := metrics[level]
    2103            2 :                         if levelMetrics == nil {
    2104            2 :                                 levelMetrics = &LevelMetrics{}
    2105            2 :                                 metrics[level] = levelMetrics
    2106            2 :                         }
    2107            2 :                         levelMetrics.TablesCount--
    2108            2 :                         levelMetrics.TablesSize -= int64(m.Size)
    2109            2 :                         levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
    2110            2 :                         for i := range added {
    2111            2 :                                 levelMetrics.TablesCount++
    2112            2 :                                 levelMetrics.TablesSize += int64(added[i].Meta.Size)
    2113            2 :                                 levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
    2114            2 :                         }
    2115              :                 }
    2116            2 :                 var exciseBounds base.UserKeyBounds
    2117            2 :                 if exciseSpan.Valid() {
    2118            2 :                         exciseBounds = exciseSpan.UserKeyBounds()
    2119            2 :                         d.mu.versions.metrics.Ingest.ExciseIngestCount++
    2120            2 :                         // Iterate through all levels and find files that intersect with exciseSpan.
    2121            2 :                         //
    2122            2 :                         // TODO(bilal): We could drop the DB mutex here as we don't need it for
    2123            2 :                         // excises; we only need to hold the version lock which we already are
    2124            2 :                         // holding. However releasing the DB mutex could mess with the
    2125            2 :                         // ingestTargetLevel calculation that happened above, as it assumed that it
    2126            2 :                         // had a complete view of in-progress compactions that wouldn't change
    2127            2 :                         // until UpdateVersionLocked is called. If we were to drop the mutex now,
    2128            2 :                         // we could schedule another in-progress compaction that would go into the
    2129            2 :                         // chosen target level and lead to file overlap within level (which would
    2130            2 :                         // panic in UpdateVersionLocked). We should drop the db mutex here, do the
    2131            2 :                         // excise, then re-grab the DB mutex and rerun just the in-progress
    2132            2 :                         // compaction check to see if any new compactions are conflicting with our
    2133            2 :                         // chosen target levels for files, and if they are, we should signal those
    2134            2 :                         // compactions to error out.
    2135            2 :                         for layer, ls := range current.AllLevelsAndSublevels() {
    2136            2 :                                 for m := range ls.Overlaps(d.cmp, exciseSpan.UserKeyBounds()).All() {
    2137            2 :                                         leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, m, layer.Level(), exciseBoundsPolicy)
    2138            2 :                                         if err != nil {
    2139            0 :                                                 return versionUpdate{}, err
    2140            0 :                                         }
    2141            2 :                                         newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
    2142            2 :                                         replacedTables[m.TableNum] = newFiles
    2143            2 :                                         updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
    2144              :                                 }
    2145              :                         }
    2146            2 :                         if d.FormatMajorVersion() >= FormatExciseBoundsRecord {
    2147            2 :                                 ve.ExciseBoundsRecord = append(ve.ExciseBoundsRecord, manifest.ExciseOpEntry{
    2148            2 :                                         Bounds: exciseBounds,
    2149            2 :                                         SeqNum: exciseSeqNum,
    2150            2 :                                 })
    2151            2 :                         }
    2152              :                 }
    2153            2 :                 if len(filesToSplit) > 0 {
    2154            2 :                         // For the same reasons as the above call to excise, we hold the db mutex
    2155            2 :                         // while calling this method.
    2156            2 :                         if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedTables); err != nil {
    2157            0 :                                 return versionUpdate{}, err
    2158            0 :                         }
    2159              :                 }
    2160            2 :                 if len(filesToSplit) > 0 || exciseSpan.Valid() {
    2161            2 :                         for c := range d.mu.compact.inProgress {
    2162            2 :                                 if c.VersionEditApplied() {
    2163            2 :                                         continue
    2164              :                                 }
    2165              :                                 // Check if this compaction overlaps with the excise span. Note that just
    2166              :                                 // checking if the inputs individually overlap with the excise span
    2167              :                                 // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    2168              :                                 // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    2169              :                                 // doing a [c,d) excise at the same time as this compaction, we will have
    2170              :                                 // to error out the whole compaction as we can't guarantee it hasn't/won't
    2171              :                                 // write a file overlapping with the excise span.
    2172            2 :                                 bounds := c.Bounds()
    2173            2 :                                 if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
    2174            2 :                                         c.Cancel()
    2175            2 :                                 }
    2176              :                                 // Check if this compaction's inputs have been replaced due to an
    2177              :                                 // ingest-time split. In that case, cancel the compaction as a newly picked
    2178              :                                 // compaction would need to include any new files that slid in between
    2179              :                                 // previously-existing files. Note that we cancel any compaction that has a
    2180              :                                 // file that was ingest-split as an input, even if it started before this
    2181              :                                 // ingestion.
    2182            2 :                                 if checkCompactions {
    2183            1 :                                         for _, table := range c.Tables() {
    2184            1 :                                                 if _, ok := replacedTables[table.TableNum]; ok {
    2185            1 :                                                         c.Cancel()
    2186            1 :                                                         break
    2187              :                                                 }
    2188              :                                         }
    2189              :                                 }
    2190              :                         }
    2191              :                 }
    2192              : 
    2193            2 :                 return versionUpdate{
    2194            2 :                         VE:                      ve,
    2195            2 :                         JobID:                   jobID,
    2196            2 :                         Metrics:                 metrics,
    2197            2 :                         InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
    2198              :                 }, nil
    2199              :         })
    2200            2 :         if err != nil {
    2201            1 :                 return nil, 0, err
    2202            1 :         }
    2203              :         // Check for any EventuallyFileOnlySnapshots that could be watching for
    2204              :         // an excise on this span. There should be none as the
    2205              :         // computePossibleOverlaps steps should have forced these EFOS to transition
    2206              :         // to file-only snapshots by now. If we see any that conflict with this
    2207              :         // excise, panic.
    2208            2 :         if exciseSpan.Valid() {
    2209            2 :                 for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
    2210            1 :                         // Skip non-EFOS snapshots, and also skip any EFOS that were created
    2211            1 :                         // *after* the excise.
    2212            1 :                         if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
    2213            0 :                                 continue
    2214              :                         }
    2215            1 :                         efos := s.efos
    2216            1 :                         // TODO(bilal): We can make this faster by taking advantage of the sorted
    2217            1 :                         // nature of protectedRanges to do a sort.Search, or even maintaining a
    2218            1 :                         // global list of all protected ranges instead of having to peer into every
    2219            1 :                         // snapshot.
    2220            1 :                         for i := range efos.protectedRanges {
    2221            1 :                                 if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
    2222            0 :                                         panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
    2223              :                                 }
    2224              :                         }
    2225              :                 }
    2226              :         }
    2227              : 
    2228            2 :         d.mu.versions.metrics.Ingest.Count++
    2229            2 : 
    2230            2 :         d.updateReadStateLocked(d.opts.DebugCheck)
    2231            2 :         // updateReadStateLocked could have generated obsolete tables, schedule a
    2232            2 :         // cleanup job if necessary.
    2233            2 :         d.deleteObsoleteFiles(jobID)
    2234            2 :         d.updateTableStatsLocked(ve.NewTables)
    2235            2 :         // The ingestion may have pushed a level over the threshold for compaction,
    2236            2 :         // so check to see if one is necessary and schedule it.
    2237            2 :         d.maybeScheduleCompaction()
    2238            2 :         var toValidate []manifest.NewTableEntry
    2239            2 :         dedup := make(map[base.DiskFileNum]struct{})
    2240            2 :         for _, entry := range ve.NewTables {
    2241            2 :                 if _, ok := dedup[entry.Meta.TableBacking.DiskFileNum]; !ok {
    2242            2 :                         toValidate = append(toValidate, entry)
    2243            2 :                         dedup[entry.Meta.TableBacking.DiskFileNum] = struct{}{}
    2244            2 :                 }
    2245              :         }
    2246            2 :         d.maybeValidateSSTablesLocked(toValidate)
    2247            2 : 
    2248            2 :         return ve, manifestUpdateDuration, nil
    2249              : }
    2250              : 
    2251              : // maybeValidateSSTablesLocked adds the slice of newTableEntrys to the pending
    2252              : // queue of files to be validated, when the feature is enabled.
    2253              : //
    2254              : // Note that if two entries with the same backing file are added twice, then the
    2255              : // block checksums for the backing file will be validated twice.
    2256              : //
    2257              : // DB.mu must be locked when calling.
    2258            2 : func (d *DB) maybeValidateSSTablesLocked(newFiles []manifest.NewTableEntry) {
    2259            2 :         // Only add to the validation queue when the feature is enabled.
    2260            2 :         if !d.opts.Experimental.ValidateOnIngest {
    2261            2 :                 return
    2262            2 :         }
    2263              : 
    2264            2 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
    2265            2 :         if d.shouldValidateSSTablesLocked() {
    2266            2 :                 go d.validateSSTables()
    2267            2 :         }
    2268              : }
    2269              : 
    2270              : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
    2271              : // DB.mu must be locked when calling.
    2272            2 : func (d *DB) shouldValidateSSTablesLocked() bool {
    2273            2 :         return !d.mu.tableValidation.validating &&
    2274            2 :                 d.closed.Load() == nil &&
    2275            2 :                 d.opts.Experimental.ValidateOnIngest &&
    2276            2 :                 len(d.mu.tableValidation.pending) > 0
    2277            2 : }
    2278              : 
    2279              : // validateSSTables runs a round of validation on the tables in the pending
    2280              : // queue.
    2281            2 : func (d *DB) validateSSTables() {
    2282            2 :         d.mu.Lock()
    2283            2 :         if !d.shouldValidateSSTablesLocked() {
    2284            2 :                 d.mu.Unlock()
    2285            2 :                 return
    2286            2 :         }
    2287              : 
    2288            2 :         pending := d.mu.tableValidation.pending
    2289            2 :         d.mu.tableValidation.pending = nil
    2290            2 :         d.mu.tableValidation.validating = true
    2291            2 :         jobID := d.newJobIDLocked()
    2292            2 :         rs := d.loadReadState()
    2293            2 : 
    2294            2 :         // Drop DB.mu before performing IO.
    2295            2 :         d.mu.Unlock()
    2296            2 : 
    2297            2 :         // Validate all tables in the pending queue. This could lead to a situation
    2298            2 :         // where we are starving IO from other tasks due to having to page through
    2299            2 :         // all the blocks in all the sstables in the queue.
    2300            2 :         // TODO(travers): Add some form of pacing to avoid IO starvation.
    2301            2 : 
    2302            2 :         // If we fail to validate any files due to reasons other than uncovered
    2303            2 :         // corruption, accumulate them and re-queue them for another attempt.
    2304            2 :         var retry []manifest.NewTableEntry
    2305            2 : 
    2306            2 :         for _, f := range pending {
    2307            2 :                 // The file may have been moved or deleted since it was ingested, in
    2308            2 :                 // which case we skip.
    2309            2 :                 if !rs.current.Contains(f.Level, f.Meta) {
    2310            2 :                         // Assume the file was moved to a lower level. It is rare enough
    2311            2 :                         // that a table is moved or deleted between the time it was ingested
    2312            2 :                         // and the time the validation routine runs that the overall cost of
    2313            2 :                         // this inner loop is tolerably low, when amortized over all
    2314            2 :                         // ingested tables.
    2315            2 :                         found := false
    2316            2 :                         for i := f.Level + 1; i < numLevels; i++ {
    2317            2 :                                 if rs.current.Contains(i, f.Meta) {
    2318            1 :                                         found = true
    2319            1 :                                         break
    2320              :                                 }
    2321              :                         }
    2322            2 :                         if !found {
    2323            2 :                                 continue
    2324              :                         }
    2325              :                 }
    2326              : 
    2327              :                 // TOOD(radu): plumb a ReadEnv with a CategoryIngest stats collector through
    2328              :                 // to ValidateBlockChecksums.
    2329            2 :                 err := d.fileCache.withReader(context.TODO(), block.NoReadEnv,
    2330            2 :                         f.Meta, func(r *sstable.Reader, _ sstable.ReadEnv) error {
    2331            2 :                                 return r.ValidateBlockChecksums()
    2332            2 :                         })
    2333              : 
    2334            2 :                 if err != nil {
    2335            1 :                         if IsCorruptionError(err) {
    2336            1 :                                 // TODO(travers): Hook into the corruption reporting pipeline, once
    2337            1 :                                 // available. See pebble#1192.
    2338            1 :                                 d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
    2339            1 :                         } else {
    2340            1 :                                 // If there was some other, possibly transient, error that
    2341            1 :                                 // caused table validation to fail inform the EventListener and
    2342            1 :                                 // move on. We remember the table so that we can retry it in a
    2343            1 :                                 // subsequent table validation job.
    2344            1 :                                 //
    2345            1 :                                 // TODO(jackson): If the error is not transient, this will retry
    2346            1 :                                 // validation indefinitely. While not great, it's the same
    2347            1 :                                 // behavior as erroring flushes and compactions. We should
    2348            1 :                                 // address this as a part of #270.
    2349            1 :                                 d.opts.EventListener.BackgroundError(err)
    2350            1 :                                 retry = append(retry, f)
    2351            1 :                                 continue
    2352              :                         }
    2353              :                 }
    2354              : 
    2355            2 :                 d.opts.EventListener.TableValidated(TableValidatedInfo{
    2356            2 :                         JobID: int(jobID),
    2357            2 :                         Meta:  f.Meta,
    2358            2 :                 })
    2359              :         }
    2360            2 :         rs.unref()
    2361            2 :         d.mu.Lock()
    2362            2 :         defer d.mu.Unlock()
    2363            2 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
    2364            2 :         d.mu.tableValidation.validating = false
    2365            2 :         d.mu.tableValidation.cond.Broadcast()
    2366            2 :         if d.shouldValidateSSTablesLocked() {
    2367            2 :                 go d.validateSSTables()
    2368            2 :         }
    2369              : }
        

Generated by: LCOV version 2.0-1