LCOV - code coverage report
Current view: top level - pebble - ingest.go (source / functions) Hit Total Coverage
Test: 2024-12-09 08:17Z 0f9bf63d - tests + meta.lcov Lines: 1600 1824 87.7 %
Date: 2024-12-09 08:18:29 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "slices"
      11             :         "sort"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/cache"
      17             :         "github.com/cockroachdb/pebble/internal/invariants"
      18             :         "github.com/cockroachdb/pebble/internal/keyspan"
      19             :         "github.com/cockroachdb/pebble/internal/manifest"
      20             :         "github.com/cockroachdb/pebble/internal/overlap"
      21             :         "github.com/cockroachdb/pebble/internal/sstableinternal"
      22             :         "github.com/cockroachdb/pebble/objstorage"
      23             :         "github.com/cockroachdb/pebble/objstorage/remote"
      24             :         "github.com/cockroachdb/pebble/sstable"
      25             : )
      26             : 
      27           2 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
      28           2 :         c := userCmp(a.UserKey, b.UserKey)
      29           2 :         if c != 0 {
      30           2 :                 return c
      31           2 :         }
      32           2 :         if a.IsExclusiveSentinel() {
      33           2 :                 if !b.IsExclusiveSentinel() {
      34           2 :                         return -1
      35           2 :                 }
      36           2 :         } else if b.IsExclusiveSentinel() {
      37           2 :                 return +1
      38           2 :         }
      39           2 :         return 0
      40             : }
      41             : 
      42             : // KeyRange encodes a key range in user key space. A KeyRange's Start is
      43             : // inclusive while its End is exclusive.
      44             : //
      45             : // KeyRange is equivalent to base.UserKeyBounds with exclusive end.
      46             : type KeyRange struct {
      47             :         Start, End []byte
      48             : }
      49             : 
      50             : // Valid returns true if the KeyRange is defined.
      51           2 : func (k *KeyRange) Valid() bool {
      52           2 :         return k.Start != nil && k.End != nil
      53           2 : }
      54             : 
      55             : // Contains returns whether the specified key exists in the KeyRange.
      56           2 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
      57           2 :         v := cmp(key.UserKey, k.End)
      58           2 :         return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
      59           2 : }
      60             : 
      61             : // UserKeyBounds returns the KeyRange as UserKeyBounds. Also implements the internal `bounded` interface.
      62           2 : func (k KeyRange) UserKeyBounds() base.UserKeyBounds {
      63           2 :         return base.UserKeyBoundsEndExclusive(k.Start, k.End)
      64           2 : }
      65             : 
      66             : // OverlapsInternalKeyRange checks if the specified internal key range has an
      67             : // overlap with the KeyRange. Note that we aren't checking for full containment
      68             : // of smallest-largest within k, rather just that there's some intersection
      69             : // between the two ranges.
      70           2 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
      71           2 :         ukb := k.UserKeyBounds()
      72           2 :         b := base.UserKeyBoundsFromInternal(smallest, largest)
      73           2 :         return ukb.Overlaps(cmp, &b)
      74           2 : }
      75             : 
      76             : // Overlaps checks if the specified file has an overlap with the KeyRange.
      77             : // Note that we aren't checking for full containment of m within k, rather just
      78             : // that there's some intersection between m and k's bounds.
      79           1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
      80           1 :         b := k.UserKeyBounds()
      81           1 :         return m.Overlaps(cmp, &b)
      82           1 : }
      83             : 
      84             : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
      85             : // Note that we aren't checking for full containment of either span in the other,
      86             : // just that there's a key x that is in both key ranges.
      87           2 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
      88           2 :         return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
      89           2 : }
      90             : 
      91           2 : func ingestValidateKey(opts *Options, key *InternalKey) error {
      92           2 :         if key.Kind() == InternalKeyKindInvalid {
      93           1 :                 return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
      94           1 :                         key.Pretty(opts.Comparer.FormatKey))
      95           1 :         }
      96           2 :         if key.SeqNum() != 0 {
      97           1 :                 return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
      98           1 :                         key.Pretty(opts.Comparer.FormatKey))
      99           1 :         }
     100           2 :         return nil
     101             : }
     102             : 
     103             : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
     104             : // or shared by another node.
     105             : func ingestSynthesizeShared(
     106             :         opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
     107           2 : ) (*fileMetadata, error) {
     108           2 :         if sm.Size == 0 {
     109           0 :                 // Disallow 0 file sizes
     110           0 :                 return nil, errors.New("pebble: cannot ingest shared file with size 0")
     111           0 :         }
     112             :         // Don't load table stats. Doing a round trip to shared storage, one SST
     113             :         // at a time is not worth it as it slows down ingestion.
     114           2 :         meta := &fileMetadata{
     115           2 :                 FileNum:      fileNum,
     116           2 :                 CreationTime: time.Now().Unix(),
     117           2 :                 Virtual:      true,
     118           2 :                 Size:         sm.Size,
     119           2 :         }
     120           2 :         // For simplicity, we use the same number for both the FileNum and the
     121           2 :         // DiskFileNum (even though this is a virtual sstable). Pass the underlying
     122           2 :         // FileBacking's size to the same size as the virtualized view of the sstable.
     123           2 :         // This ensures that we don't over-prioritize this sstable for compaction just
     124           2 :         // yet, as we do not have a clear sense of what parts of this sstable are
     125           2 :         // referenced by other nodes.
     126           2 :         meta.InitProviderBacking(base.DiskFileNum(fileNum), sm.Size)
     127           2 : 
     128           2 :         if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
     129           2 :                 // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
     130           2 :                 //
     131           2 :                 // NB: We create new internal keys and pass them into ExtendPointKeyBounds
     132           2 :                 // so that we can sub a zero sequence number into the bounds. We can set
     133           2 :                 // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
     134           2 :                 // anyway. However, we do need to use the same sequence number across all
     135           2 :                 // bound keys at this step so that we end up with bounds that are consistent
     136           2 :                 // across point/range keys.
     137           2 :                 //
     138           2 :                 // Because of the sequence number rewriting, we cannot use the Kind of
     139           2 :                 // sm.SmallestPointKey. For example, the original SST might start with
     140           2 :                 // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
     141           2 :                 // rewriting the sequence numbers, these keys become a.SET.100 and
     142           2 :                 // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
     143           2 :                 // correct bound, we just use the maximum key kind (which sorts first).
     144           2 :                 // Similarly, we use the smallest key kind for the largest key.
     145           2 :                 smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable)
     146           2 :                 largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
     147           2 :                 if sm.LargestPointKey.IsExclusiveSentinel() {
     148           2 :                         largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
     149           2 :                 }
     150           2 :                 if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
     151           2 :                         smallestPointKey.Trailer < largestPointKey.Trailer {
     152           0 :                         // We get kinds from the sender, however we substitute our own sequence
     153           0 :                         // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
     154           0 :                         // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
     155           0 :                         // kinds need to be reversed now because DelSized > Set.
     156           0 :                         smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
     157           0 :                 }
     158           2 :                 meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
     159             :         }
     160           2 :         if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
     161           2 :                 // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
     162           2 :                 //
     163           2 :                 // See comment above on why we use a zero sequence number and these key
     164           2 :                 // kinds here.
     165           2 :                 smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
     166           2 :                 largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
     167           2 :                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
     168           2 :         }
     169           2 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     170           0 :                 return nil, err
     171           0 :         }
     172           2 :         return meta, nil
     173             : }
     174             : 
     175             : // ingestLoad1External loads the fileMetadata for one external sstable.
     176             : // Sequence number and target level calculation happens during prepare/apply.
     177             : func ingestLoad1External(
     178             :         opts *Options, e ExternalFile, fileNum base.FileNum,
     179           2 : ) (*fileMetadata, error) {
     180           2 :         if e.Size == 0 {
     181           0 :                 return nil, errors.New("pebble: cannot ingest external file with size 0")
     182           0 :         }
     183           2 :         if !e.HasRangeKey && !e.HasPointKey {
     184           0 :                 return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
     185           0 :         }
     186             : 
     187           2 :         if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
     188           1 :                 return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
     189           1 :         }
     190           2 :         if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
     191           0 :                 return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
     192           0 :         }
     193           2 :         if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
     194           1 :                 return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
     195           1 :         }
     196           2 :         if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
     197           1 :                 return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
     198           1 :         }
     199             : 
     200             :         // Don't load table stats. Doing a round trip to shared storage, one SST
     201             :         // at a time is not worth it as it slows down ingestion.
     202           2 :         meta := &fileMetadata{
     203           2 :                 FileNum:      fileNum,
     204           2 :                 CreationTime: time.Now().Unix(),
     205           2 :                 Virtual:      true,
     206           2 :                 Size:         e.Size,
     207           2 :         }
     208           2 : 
     209           2 :         // In the name of keeping this ingestion as fast as possible, we avoid
     210           2 :         // *all* existence checks and synthesize a file metadata with smallest/largest
     211           2 :         // keys that overlap whatever the passed-in span was.
     212           2 :         smallestCopy := slices.Clone(e.StartKey)
     213           2 :         largestCopy := slices.Clone(e.EndKey)
     214           2 :         if e.HasPointKey {
     215           2 :                 // Sequence numbers are updated later by
     216           2 :                 // ingestUpdateSeqNum, applying a squence number that
     217           2 :                 // is applied to all keys in the sstable.
     218           2 :                 if e.EndKeyIsInclusive {
     219           1 :                         meta.ExtendPointKeyBounds(
     220           1 :                                 opts.Comparer.Compare,
     221           1 :                                 base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
     222           1 :                                 base.MakeInternalKey(largestCopy, 0, 0))
     223           2 :                 } else {
     224           2 :                         meta.ExtendPointKeyBounds(
     225           2 :                                 opts.Comparer.Compare,
     226           2 :                                 base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
     227           2 :                                 base.MakeRangeDeleteSentinelKey(largestCopy))
     228           2 :                 }
     229             :         }
     230           2 :         if e.HasRangeKey {
     231           2 :                 meta.ExtendRangeKeyBounds(
     232           2 :                         opts.Comparer.Compare,
     233           2 :                         base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
     234           2 :                         base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
     235           2 :                 )
     236           2 :         }
     237             : 
     238           2 :         meta.SyntheticPrefixAndSuffix = sstable.MakeSyntheticPrefixAndSuffix(e.SyntheticPrefix, e.SyntheticSuffix)
     239           2 : 
     240           2 :         return meta, nil
     241             : }
     242             : 
     243             : type rangeKeyIngestValidator struct {
     244             :         // lastRangeKey is the last range key seen in the previous file.
     245             :         lastRangeKey keyspan.Span
     246             :         // comparer, if unset, disables range key validation.
     247             :         comparer *base.Comparer
     248             : }
     249             : 
     250           2 : func disableRangeKeyChecks() rangeKeyIngestValidator {
     251           2 :         return rangeKeyIngestValidator{}
     252           2 : }
     253             : 
     254             : func validateSuffixedBoundaries(
     255             :         cmp *base.Comparer, lastRangeKey keyspan.Span,
     256           2 : ) rangeKeyIngestValidator {
     257           2 :         return rangeKeyIngestValidator{
     258           2 :                 lastRangeKey: lastRangeKey,
     259           2 :                 comparer:     cmp,
     260           2 :         }
     261           2 : }
     262             : 
     263             : // Validate valides if the stored state of this rangeKeyIngestValidator allows for
     264             : // a file with the given nextFileSmallestKey to be ingested, such that the stored
     265             : // last file's largest range key defragments cleanly with the next file's smallest
     266             : // key if it was suffixed. If a value of nil is passed in for nextFileSmallestKey,
     267             : // that denotes the next file does not have a range key or there is no next file.
     268           2 : func (r *rangeKeyIngestValidator) Validate(nextFileSmallestKey *keyspan.Span) error {
     269           2 :         if r.comparer == nil {
     270           2 :                 return nil
     271           2 :         }
     272           2 :         if r.lastRangeKey.Valid() {
     273           2 :                 if r.comparer.Split.HasSuffix(r.lastRangeKey.End) {
     274           1 :                         if nextFileSmallestKey == nil || !r.comparer.Equal(r.lastRangeKey.End, nextFileSmallestKey.Start) {
     275           1 :                                 // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
     276           1 :                                 return errors.AssertionFailedf("pebble: ingest sstable has suffixed largest range key that does not match the start key of the next sstable: %s",
     277           1 :                                         r.comparer.FormatKey(r.lastRangeKey.End))
     278           1 :                         } else if !keyspan.DefragmentInternal.ShouldDefragment(r.comparer.CompareRangeSuffixes, &r.lastRangeKey, nextFileSmallestKey) {
     279           0 :                                 // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
     280           0 :                                 return errors.AssertionFailedf("pebble: ingest sstable has suffixed range key that won't defragment with next sstable: %s",
     281           0 :                                         r.comparer.FormatKey(r.lastRangeKey.End))
     282           0 :                         }
     283             :                 }
     284           2 :         } else if nextFileSmallestKey != nil && r.comparer.Split.HasSuffix(nextFileSmallestKey.Start) {
     285           0 :                 return errors.Newf("pebble: ingest sstable has suffixed range key start that won't defragment: %s",
     286           0 :                         r.comparer.FormatKey(nextFileSmallestKey.Start))
     287           0 :         }
     288           2 :         return nil
     289             : }
     290             : 
     291             : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
     292             : // by this store.
     293             : //
     294             : // prevLastRangeKey is the last range key from the previous file. It is used to
     295             : // ensure that the range keys defragment cleanly across files. These checks
     296             : // are disabled if disableRangeKeyChecks is true.
     297             : func ingestLoad1(
     298             :         ctx context.Context,
     299             :         opts *Options,
     300             :         fmv FormatMajorVersion,
     301             :         readable objstorage.Readable,
     302             :         cacheID cache.ID,
     303             :         fileNum base.FileNum,
     304             :         rangeKeyValidator rangeKeyIngestValidator,
     305           2 : ) (meta *fileMetadata, lastRangeKey keyspan.Span, err error) {
     306           2 :         o := opts.MakeReaderOptions()
     307           2 :         o.SetInternalCacheOpts(sstableinternal.CacheOptions{
     308           2 :                 Cache:   opts.Cache,
     309           2 :                 CacheID: cacheID,
     310           2 :                 FileNum: base.PhysicalTableDiskFileNum(fileNum),
     311           2 :         })
     312           2 :         r, err := sstable.NewReader(ctx, readable, o)
     313           2 :         if err != nil {
     314           1 :                 return nil, keyspan.Span{}, err
     315           1 :         }
     316           2 :         defer r.Close()
     317           2 : 
     318           2 :         // Avoid ingesting tables with format versions this DB doesn't support.
     319           2 :         tf, err := r.TableFormat()
     320           2 :         if err != nil {
     321           0 :                 return nil, keyspan.Span{}, err
     322           0 :         }
     323           2 :         if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
     324           1 :                 return nil, keyspan.Span{}, errors.Newf(
     325           1 :                         "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
     326           1 :                         tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
     327           1 :                 )
     328           1 :         }
     329           2 :         if tf.BlockColumnar() {
     330           2 :                 if _, ok := opts.KeySchemas[r.Properties.KeySchemaName]; !ok {
     331           0 :                         return nil, keyspan.Span{}, errors.Newf(
     332           0 :                                 "pebble: table uses key schema %q unknown to the database",
     333           0 :                                 r.Properties.KeySchemaName)
     334           0 :                 }
     335             :         }
     336             : 
     337           2 :         meta = &fileMetadata{}
     338           2 :         meta.FileNum = fileNum
     339           2 :         meta.Size = uint64(readable.Size())
     340           2 :         meta.CreationTime = time.Now().Unix()
     341           2 :         meta.InitPhysicalBacking()
     342           2 : 
     343           2 :         // Avoid loading into the file cache for collecting stats if we
     344           2 :         // don't need to. If there are no range deletions, we have all the
     345           2 :         // information to compute the stats here.
     346           2 :         //
     347           2 :         // This is helpful in tests for avoiding awkwardness around deletion of
     348           2 :         // ingested files from MemFS. MemFS implements the Windows semantics of
     349           2 :         // disallowing removal of an open file. Under MemFS, if we don't populate
     350           2 :         // meta.Stats here, the file will be loaded into the file cache for
     351           2 :         // calculating stats before we can remove the original link.
     352           2 :         maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
     353           2 : 
     354           2 :         {
     355           2 :                 iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
     356           2 :                 if err != nil {
     357           1 :                         return nil, keyspan.Span{}, err
     358           1 :                 }
     359           2 :                 defer iter.Close()
     360           2 :                 var smallest InternalKey
     361           2 :                 if kv := iter.First(); kv != nil {
     362           2 :                         if err := ingestValidateKey(opts, &kv.K); err != nil {
     363           1 :                                 return nil, keyspan.Span{}, err
     364           1 :                         }
     365           2 :                         smallest = kv.K.Clone()
     366             :                 }
     367           2 :                 if err := iter.Error(); err != nil {
     368           1 :                         return nil, keyspan.Span{}, err
     369           1 :                 }
     370           2 :                 if kv := iter.Last(); kv != nil {
     371           2 :                         if err := ingestValidateKey(opts, &kv.K); err != nil {
     372           0 :                                 return nil, keyspan.Span{}, err
     373           0 :                         }
     374           2 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
     375             :                 }
     376           2 :                 if err := iter.Error(); err != nil {
     377           1 :                         return nil, keyspan.Span{}, err
     378           1 :                 }
     379             :         }
     380             : 
     381           2 :         iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms)
     382           2 :         if err != nil {
     383           0 :                 return nil, keyspan.Span{}, err
     384           0 :         }
     385           2 :         if iter != nil {
     386           2 :                 defer iter.Close()
     387           2 :                 var smallest InternalKey
     388           2 :                 if s, err := iter.First(); err != nil {
     389           0 :                         return nil, keyspan.Span{}, err
     390           2 :                 } else if s != nil {
     391           2 :                         key := s.SmallestKey()
     392           2 :                         if err := ingestValidateKey(opts, &key); err != nil {
     393           0 :                                 return nil, keyspan.Span{}, err
     394           0 :                         }
     395           2 :                         smallest = key.Clone()
     396             :                 }
     397           2 :                 if s, err := iter.Last(); err != nil {
     398           0 :                         return nil, keyspan.Span{}, err
     399           2 :                 } else if s != nil {
     400           2 :                         k := s.SmallestKey()
     401           2 :                         if err := ingestValidateKey(opts, &k); err != nil {
     402           0 :                                 return nil, keyspan.Span{}, err
     403           0 :                         }
     404           2 :                         largest := s.LargestKey().Clone()
     405           2 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
     406             :                 }
     407             :         }
     408             : 
     409             :         // Update the range-key bounds for the table.
     410           2 :         {
     411           2 :                 iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms)
     412           2 :                 if err != nil {
     413           0 :                         return nil, keyspan.Span{}, err
     414           0 :                 }
     415           2 :                 if iter != nil {
     416           2 :                         defer iter.Close()
     417           2 :                         var smallest InternalKey
     418           2 :                         if s, err := iter.First(); err != nil {
     419           0 :                                 return nil, keyspan.Span{}, err
     420           2 :                         } else if s != nil {
     421           2 :                                 key := s.SmallestKey()
     422           2 :                                 if err := ingestValidateKey(opts, &key); err != nil {
     423           0 :                                         return nil, keyspan.Span{}, err
     424           0 :                                 }
     425           2 :                                 smallest = key.Clone()
     426           2 :                                 // Range keys need some additional validation as we need to ensure they
     427           2 :                                 // defragment cleanly with the lastRangeKey from the previous file.
     428           2 :                                 if err := rangeKeyValidator.Validate(s); err != nil {
     429           0 :                                         return nil, keyspan.Span{}, err
     430           0 :                                 }
     431             :                         }
     432           2 :                         lastRangeKey = keyspan.Span{}
     433           2 :                         if s, err := iter.Last(); err != nil {
     434           0 :                                 return nil, keyspan.Span{}, err
     435           2 :                         } else if s != nil {
     436           2 :                                 k := s.SmallestKey()
     437           2 :                                 if err := ingestValidateKey(opts, &k); err != nil {
     438           0 :                                         return nil, keyspan.Span{}, err
     439           0 :                                 }
     440             :                                 // As range keys are fragmented, the end key of the last range key in
     441             :                                 // the table provides the upper bound for the table.
     442           2 :                                 largest := s.LargestKey().Clone()
     443           2 :                                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
     444           2 :                                 lastRangeKey = s.Clone()
     445           0 :                         } else {
     446           0 :                                 // s == nil.
     447           0 :                                 rangeKeyValidator.Validate(nil /* nextFileSmallestKey */)
     448           0 :                         }
     449           2 :                 } else {
     450           2 :                         rangeKeyValidator.Validate(nil /* nextFileSmallestKey */)
     451           2 :                         lastRangeKey = keyspan.Span{}
     452           2 :                 }
     453             :         }
     454             : 
     455           2 :         if !meta.HasPointKeys && !meta.HasRangeKeys {
     456           2 :                 return nil, keyspan.Span{}, nil
     457           2 :         }
     458             : 
     459             :         // Sanity check that the various bounds on the file were set consistently.
     460           2 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     461           0 :                 return nil, keyspan.Span{}, err
     462           0 :         }
     463             : 
     464           2 :         return meta, lastRangeKey, nil
     465             : }
     466             : 
     467             : type ingestLoadResult struct {
     468             :         local    []ingestLocalMeta
     469             :         shared   []ingestSharedMeta
     470             :         external []ingestExternalMeta
     471             : 
     472             :         externalFilesHaveLevel bool
     473             : }
     474             : 
     475             : type ingestLocalMeta struct {
     476             :         *fileMetadata
     477             :         path string
     478             : }
     479             : 
     480             : type ingestSharedMeta struct {
     481             :         *fileMetadata
     482             :         shared SharedSSTMeta
     483             : }
     484             : 
     485             : type ingestExternalMeta struct {
     486             :         *fileMetadata
     487             :         external ExternalFile
     488             :         // usedExistingBacking is true if the external file is reusing a backing
     489             :         // that existed before this ingestion. In this case, we called
     490             :         // VirtualBackings.Protect() on that backing; we will need to call
     491             :         // Unprotect() after the ingestion.
     492             :         usedExistingBacking bool
     493             : }
     494             : 
     495           2 : func (r *ingestLoadResult) fileCount() int {
     496           2 :         return len(r.local) + len(r.shared) + len(r.external)
     497           2 : }
     498             : 
     499             : func ingestLoad(
     500             :         ctx context.Context,
     501             :         opts *Options,
     502             :         fmv FormatMajorVersion,
     503             :         paths []string,
     504             :         shared []SharedSSTMeta,
     505             :         external []ExternalFile,
     506             :         cacheID cache.ID,
     507             :         pending []base.FileNum,
     508           2 : ) (ingestLoadResult, error) {
     509           2 :         localFileNums := pending[:len(paths)]
     510           2 :         sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
     511           2 :         externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
     512           2 : 
     513           2 :         var result ingestLoadResult
     514           2 :         result.local = make([]ingestLocalMeta, 0, len(paths))
     515           2 :         var lastRangeKey keyspan.Span
     516           2 :         // NB: we disable range key boundary assertions if we have shared or external files
     517           2 :         // present in this ingestion. This is because a suffixed range key in a local file
     518           2 :         // can possibly defragment with a suffixed range key in a shared or external file.
     519           2 :         // We also disable range key boundary assertions if we have CreateOnShared set to
     520           2 :         // true, as that means we could have suffixed RangeKeyDels or Unsets in the local
     521           2 :         // files that won't ever be surfaced, even if there are no shared or external files
     522           2 :         // in the ingestion.
     523           2 :         shouldDisableRangeKeyChecks := len(shared) > 0 || len(external) > 0 || opts.Experimental.CreateOnShared != remote.CreateOnSharedNone
     524           2 :         for i := range paths {
     525           2 :                 f, err := opts.FS.Open(paths[i])
     526           2 :                 if err != nil {
     527           1 :                         return ingestLoadResult{}, err
     528           1 :                 }
     529             : 
     530           2 :                 readable, err := sstable.NewSimpleReadable(f)
     531           2 :                 if err != nil {
     532           1 :                         return ingestLoadResult{}, err
     533           1 :                 }
     534           2 :                 var m *fileMetadata
     535           2 :                 rangeKeyValidator := disableRangeKeyChecks()
     536           2 :                 if !shouldDisableRangeKeyChecks {
     537           2 :                         rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
     538           2 :                 }
     539           2 :                 m, lastRangeKey, err = ingestLoad1(ctx, opts, fmv, readable, cacheID, localFileNums[i], rangeKeyValidator)
     540           2 :                 if err != nil {
     541           1 :                         return ingestLoadResult{}, err
     542           1 :                 }
     543           2 :                 if m != nil {
     544           2 :                         result.local = append(result.local, ingestLocalMeta{
     545           2 :                                 fileMetadata: m,
     546           2 :                                 path:         paths[i],
     547           2 :                         })
     548           2 :                 }
     549             :         }
     550             : 
     551           2 :         if !shouldDisableRangeKeyChecks {
     552           2 :                 rangeKeyValidator := validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
     553           2 :                 if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
     554           1 :                         return ingestLoadResult{}, err
     555           1 :                 }
     556             :         }
     557             : 
     558             :         // Sort the shared files according to level.
     559           2 :         sort.Sort(sharedByLevel(shared))
     560           2 : 
     561           2 :         result.shared = make([]ingestSharedMeta, 0, len(shared))
     562           2 :         for i := range shared {
     563           2 :                 m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
     564           2 :                 if err != nil {
     565           0 :                         return ingestLoadResult{}, err
     566           0 :                 }
     567           2 :                 if shared[i].Level < sharedLevelsStart {
     568           0 :                         return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
     569           0 :                 }
     570           2 :                 result.shared = append(result.shared, ingestSharedMeta{
     571           2 :                         fileMetadata: m,
     572           2 :                         shared:       shared[i],
     573           2 :                 })
     574             :         }
     575           2 :         result.external = make([]ingestExternalMeta, 0, len(external))
     576           2 :         for i := range external {
     577           2 :                 m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
     578           2 :                 if err != nil {
     579           1 :                         return ingestLoadResult{}, err
     580           1 :                 }
     581           2 :                 result.external = append(result.external, ingestExternalMeta{
     582           2 :                         fileMetadata: m,
     583           2 :                         external:     external[i],
     584           2 :                 })
     585           2 :                 if external[i].Level > 0 {
     586           1 :                         if i != 0 && !result.externalFilesHaveLevel {
     587           0 :                                 return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
     588           0 :                         }
     589           1 :                         result.externalFilesHaveLevel = true
     590           2 :                 } else if result.externalFilesHaveLevel {
     591           0 :                         return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
     592           0 :                 }
     593             :         }
     594           2 :         return result, nil
     595             : }
     596             : 
     597           2 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
     598           2 :         // Verify that all the shared files (i.e. files in sharedMeta)
     599           2 :         // fit within the exciseSpan.
     600           2 :         for _, f := range lr.shared {
     601           2 :                 if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
     602           0 :                         return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
     603           0 :                 }
     604             :         }
     605             : 
     606           2 :         if lr.externalFilesHaveLevel {
     607           1 :                 for _, f := range lr.external {
     608           1 :                         if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
     609           0 :                                 return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
     610           0 :                         }
     611             :                 }
     612             :         }
     613             : 
     614           2 :         if len(lr.external) > 0 {
     615           2 :                 if len(lr.shared) > 0 {
     616           0 :                         // If external files are present alongside shared files,
     617           0 :                         // return an error.
     618           0 :                         return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
     619           0 :                 }
     620             : 
     621             :                 // Sort according to the smallest key.
     622           2 :                 slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
     623           2 :                         return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
     624           2 :                 })
     625           2 :                 for i := 1; i < len(lr.external); i++ {
     626           2 :                         if sstableKeyCompare(cmp, lr.external[i-1].Largest, lr.external[i].Smallest) >= 0 {
     627           1 :                                 return errors.Newf("pebble: external sstables have overlapping ranges")
     628           1 :                         }
     629             :                 }
     630           2 :                 return nil
     631             :         }
     632           2 :         if len(lr.local) <= 1 {
     633           2 :                 return nil
     634           2 :         }
     635             : 
     636             :         // Sort according to the smallest key.
     637           2 :         slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
     638           2 :                 return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
     639           2 :         })
     640             : 
     641           2 :         for i := 1; i < len(lr.local); i++ {
     642           2 :                 if sstableKeyCompare(cmp, lr.local[i-1].Largest, lr.local[i].Smallest) >= 0 {
     643           2 :                         return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
     644           2 :                 }
     645             :         }
     646           2 :         if len(lr.shared) == 0 {
     647           2 :                 return nil
     648           2 :         }
     649           0 :         filesInLevel := make([]*fileMetadata, 0, len(lr.shared))
     650           0 :         for l := sharedLevelsStart; l < numLevels; l++ {
     651           0 :                 filesInLevel = filesInLevel[:0]
     652           0 :                 for i := range lr.shared {
     653           0 :                         if lr.shared[i].shared.Level == uint8(l) {
     654           0 :                                 filesInLevel = append(filesInLevel, lr.shared[i].fileMetadata)
     655           0 :                         }
     656             :                 }
     657           0 :                 for i := range lr.external {
     658           0 :                         if lr.external[i].external.Level == uint8(l) {
     659           0 :                                 filesInLevel = append(filesInLevel, lr.external[i].fileMetadata)
     660           0 :                         }
     661             :                 }
     662           0 :                 slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
     663           0 :                         return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
     664           0 :                 })
     665           0 :                 for i := 1; i < len(filesInLevel); i++ {
     666           0 :                         if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
     667           0 :                                 return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
     668           0 :                         }
     669             :                 }
     670             :         }
     671           0 :         return nil
     672             : }
     673             : 
     674           1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
     675           1 :         var firstErr error
     676           1 :         for i := range meta {
     677           1 :                 if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
     678           1 :                         firstErr = firstError(firstErr, err)
     679           1 :                 }
     680             :         }
     681           1 :         return firstErr
     682             : }
     683             : 
     684             : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
     685             : // copies of the ingested files.
     686             : func ingestLinkLocal(
     687             :         ctx context.Context,
     688             :         jobID JobID,
     689             :         opts *Options,
     690             :         objProvider objstorage.Provider,
     691             :         localMetas []ingestLocalMeta,
     692           2 : ) error {
     693           2 :         for i := range localMetas {
     694           2 :                 objMeta, err := objProvider.LinkOrCopyFromLocal(
     695           2 :                         ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
     696           2 :                         objstorage.CreateOptions{PreferSharedStorage: true},
     697           2 :                 )
     698           2 :                 if err != nil {
     699           1 :                         if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
     700           0 :                                 opts.Logger.Errorf("ingest cleanup failed: %v", err2)
     701           0 :                         }
     702           1 :                         return err
     703             :                 }
     704           2 :                 if opts.EventListener.TableCreated != nil {
     705           2 :                         opts.EventListener.TableCreated(TableCreateInfo{
     706           2 :                                 JobID:   int(jobID),
     707           2 :                                 Reason:  "ingesting",
     708           2 :                                 Path:    objProvider.Path(objMeta),
     709           2 :                                 FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
     710           2 :                         })
     711           2 :                 }
     712             :         }
     713           2 :         return nil
     714             : }
     715             : 
     716             : // ingestAttachRemote attaches remote objects to the storage provider.
     717             : //
     718             : // For external objects, we reuse existing FileBackings from the current version
     719             : // when possible.
     720             : //
     721             : // ingestUnprotectExternalBackings() must be called after this function (even in
     722             : // error cases).
     723           2 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
     724           2 :         remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
     725           2 :         for i := range lr.shared {
     726           2 :                 backing, err := lr.shared[i].shared.Backing.Get()
     727           2 :                 if err != nil {
     728           0 :                         return err
     729           0 :                 }
     730           2 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     731           2 :                         FileNum:  lr.shared[i].FileBacking.DiskFileNum,
     732           2 :                         FileType: fileTypeTable,
     733           2 :                         Backing:  backing,
     734           2 :                 })
     735             :         }
     736             : 
     737           2 :         d.findExistingBackingsForExternalObjects(lr.external)
     738           2 : 
     739           2 :         newFileBackings := make(map[remote.ObjectKey]*fileBacking, len(lr.external))
     740           2 :         for i := range lr.external {
     741           2 :                 meta := lr.external[i].fileMetadata
     742           2 :                 if meta.FileBacking != nil {
     743           2 :                         // The backing was filled in by findExistingBackingsForExternalObjects().
     744           2 :                         continue
     745             :                 }
     746           2 :                 key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
     747           2 :                 if backing, ok := newFileBackings[key]; ok {
     748           2 :                         // We already created the same backing in this loop.
     749           2 :                         meta.FileBacking = backing
     750           2 :                         continue
     751             :                 }
     752           2 :                 providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
     753           2 :                 if err != nil {
     754           0 :                         return err
     755           0 :                 }
     756             :                 // We have to attach the remote object (and assign it a DiskFileNum). For
     757             :                 // simplicity, we use the same number for both the FileNum and the
     758             :                 // DiskFileNum (even though this is a virtual sstable).
     759           2 :                 meta.InitProviderBacking(base.DiskFileNum(meta.FileNum), lr.external[i].external.Size)
     760           2 : 
     761           2 :                 // Set the underlying FileBacking's size to the same size as the virtualized
     762           2 :                 // view of the sstable. This ensures that we don't over-prioritize this
     763           2 :                 // sstable for compaction just yet, as we do not have a clear sense of
     764           2 :                 // what parts of this sstable are referenced by other nodes.
     765           2 :                 meta.FileBacking.Size = lr.external[i].external.Size
     766           2 :                 newFileBackings[key] = meta.FileBacking
     767           2 : 
     768           2 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     769           2 :                         FileNum:  meta.FileBacking.DiskFileNum,
     770           2 :                         FileType: fileTypeTable,
     771           2 :                         Backing:  providerBacking,
     772           2 :                 })
     773             :         }
     774             : 
     775           2 :         for i := range lr.external {
     776           2 :                 if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
     777           0 :                         return err
     778           0 :                 }
     779             :         }
     780             : 
     781           2 :         remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
     782           2 :         if err != nil {
     783           0 :                 return err
     784           0 :         }
     785             : 
     786           2 :         for i := range lr.shared {
     787           2 :                 // One corner case around file sizes we need to be mindful of, is that
     788           2 :                 // if one of the shareObjs was initially created by us (and has boomeranged
     789           2 :                 // back from another node), we'll need to update the FileBacking's size
     790           2 :                 // to be the true underlying size. Otherwise, we could hit errors when we
     791           2 :                 // open the db again after a crash/restart (see checkConsistency in open.go),
     792           2 :                 // plus it more accurately allows us to prioritize compactions of files
     793           2 :                 // that were originally created by us.
     794           2 :                 if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
     795           2 :                         size, err := d.objProvider.Size(remoteObjMetas[i])
     796           2 :                         if err != nil {
     797           0 :                                 return err
     798           0 :                         }
     799           2 :                         lr.shared[i].FileBacking.Size = uint64(size)
     800             :                 }
     801             :         }
     802             : 
     803           2 :         if d.opts.EventListener.TableCreated != nil {
     804           2 :                 for i := range remoteObjMetas {
     805           2 :                         d.opts.EventListener.TableCreated(TableCreateInfo{
     806           2 :                                 JobID:   int(jobID),
     807           2 :                                 Reason:  "ingesting",
     808           2 :                                 Path:    d.objProvider.Path(remoteObjMetas[i]),
     809           2 :                                 FileNum: remoteObjMetas[i].DiskFileNum,
     810           2 :                         })
     811           2 :                 }
     812             :         }
     813             : 
     814           2 :         return nil
     815             : }
     816             : 
     817             : // findExistingBackingsForExternalObjects populates the FileBacking for external
     818             : // files which are already in use by the current version.
     819             : //
     820             : // We take a Ref and LatestRef on populated backings.
     821           2 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
     822           2 :         d.mu.Lock()
     823           2 :         defer d.mu.Unlock()
     824           2 : 
     825           2 :         for i := range metas {
     826           2 :                 diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
     827           2 :                 // We cross-check against fileBackings in the current version because it is
     828           2 :                 // possible that the external object is referenced by an sstable which only
     829           2 :                 // exists in a previous version. In that case, that object could be removed
     830           2 :                 // at any time so we cannot reuse it.
     831           2 :                 for _, n := range diskFileNums {
     832           2 :                         if backing, ok := d.mu.versions.virtualBackings.Get(n); ok {
     833           2 :                                 // Protect this backing from being removed from the latest version. We
     834           2 :                                 // will unprotect in ingestUnprotectExternalBackings.
     835           2 :                                 d.mu.versions.virtualBackings.Protect(n)
     836           2 :                                 metas[i].usedExistingBacking = true
     837           2 :                                 metas[i].FileBacking = backing
     838           2 :                                 break
     839             :                         }
     840             :                 }
     841             :         }
     842             : }
     843             : 
     844             : // ingestUnprotectExternalBackings unprotects the file backings that were reused
     845             : // for external objects when the ingestion fails.
     846           2 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
     847           2 :         d.mu.Lock()
     848           2 :         defer d.mu.Unlock()
     849           2 : 
     850           2 :         for _, meta := range lr.external {
     851           2 :                 if meta.usedExistingBacking {
     852           2 :                         // If the backing is not use anywhere else and the ingest failed (or the
     853           2 :                         // ingested tables were already compacted away), this call will cause in
     854           2 :                         // the next version update to remove the backing.
     855           2 :                         d.mu.versions.virtualBackings.Unprotect(meta.FileBacking.DiskFileNum)
     856           2 :                 }
     857             :         }
     858             : }
     859             : 
     860             : func setSeqNumInMetadata(
     861             :         m *fileMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
     862           2 : ) error {
     863           2 :         setSeqFn := func(k base.InternalKey) base.InternalKey {
     864           2 :                 return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
     865           2 :         }
     866             :         // NB: we set the fields directly here, rather than via their Extend*
     867             :         // methods, as we are updating sequence numbers.
     868           2 :         if m.HasPointKeys {
     869           2 :                 m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
     870           2 :         }
     871           2 :         if m.HasRangeKeys {
     872           2 :                 m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
     873           2 :         }
     874           2 :         m.Smallest = setSeqFn(m.Smallest)
     875           2 :         // Only update the seqnum for the largest key if that key is not an
     876           2 :         // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
     877           2 :         // boundary), as doing so effectively drops the exclusive sentinel (by
     878           2 :         // lowering the seqnum from the max value), and extends the bounds of the
     879           2 :         // table.
     880           2 :         // NB: as the largest range key is always an exclusive sentinel, it is never
     881           2 :         // updated.
     882           2 :         if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
     883           2 :                 m.LargestPointKey = setSeqFn(m.LargestPointKey)
     884           2 :         }
     885           2 :         if !m.Largest.IsExclusiveSentinel() {
     886           2 :                 m.Largest = setSeqFn(m.Largest)
     887           2 :         }
     888             :         // Setting smallestSeqNum == largestSeqNum triggers the setting of
     889             :         // Properties.GlobalSeqNum when an sstable is loaded.
     890           2 :         m.SmallestSeqNum = seqNum
     891           2 :         m.LargestSeqNum = seqNum
     892           2 :         m.LargestSeqNumAbsolute = seqNum
     893           2 :         // Ensure the new bounds are consistent.
     894           2 :         if err := m.Validate(cmp, format); err != nil {
     895           0 :                 return err
     896           0 :         }
     897           2 :         return nil
     898             : }
     899             : 
     900             : func ingestUpdateSeqNum(
     901             :         cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
     902           2 : ) error {
     903           2 :         // Shared sstables are required to be sorted by level ascending. We then
     904           2 :         // iterate the shared sstables in reverse, assigning the lower sequence
     905           2 :         // numbers to the shared sstables that will be ingested into the lower
     906           2 :         // (larger numbered) levels first. This ensures sequence number shadowing is
     907           2 :         // correct.
     908           2 :         for i := len(loadResult.shared) - 1; i >= 0; i-- {
     909           2 :                 if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
     910           0 :                         panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
     911             :                 }
     912           2 :                 if err := setSeqNumInMetadata(loadResult.shared[i].fileMetadata, seqNum, cmp, format); err != nil {
     913           0 :                         return err
     914           0 :                 }
     915           2 :                 seqNum++
     916             :         }
     917           2 :         for i := range loadResult.external {
     918           2 :                 if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
     919           0 :                         return err
     920           0 :                 }
     921           2 :                 seqNum++
     922             :         }
     923           2 :         for i := range loadResult.local {
     924           2 :                 if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
     925           0 :                         return err
     926           0 :                 }
     927           2 :                 seqNum++
     928             :         }
     929           2 :         return nil
     930             : }
     931             : 
     932             : // ingestTargetLevel returns the target level for a file being ingested.
     933             : // If suggestSplit is true, it accounts for ingest-time splitting as part of
     934             : // its target level calculation, and if a split candidate is found, that file
     935             : // is returned as the splitFile.
     936             : func ingestTargetLevel(
     937             :         ctx context.Context,
     938             :         cmp base.Compare,
     939             :         lsmOverlap overlap.WithLSM,
     940             :         baseLevel int,
     941             :         compactions map[*compaction]struct{},
     942             :         meta *fileMetadata,
     943             :         suggestSplit bool,
     944           2 : ) (targetLevel int, splitFile *fileMetadata, err error) {
     945           2 :         // Find the lowest level which does not have any files which overlap meta. We
     946           2 :         // search from L0 to L6 looking for whether there are any files in the level
     947           2 :         // which overlap meta. We want the "lowest" level (where lower means
     948           2 :         // increasing level number) in order to reduce write amplification.
     949           2 :         //
     950           2 :         // There are 2 kinds of overlap we need to check for: file boundary overlap
     951           2 :         // and data overlap. Data overlap implies file boundary overlap. Note that it
     952           2 :         // is always possible to ingest into L0.
     953           2 :         //
     954           2 :         // To place meta at level i where i > 0:
     955           2 :         // - there must not be any data overlap with levels <= i, since that will
     956           2 :         //   violate the sequence number invariant.
     957           2 :         // - no file boundary overlap with level i, since that will violate the
     958           2 :         //   invariant that files do not overlap in levels i > 0.
     959           2 :         //   - if there is only a file overlap at a given level, and no data overlap,
     960           2 :         //     we can still slot a file at that level. We return the fileMetadata with
     961           2 :         //     which we have file boundary overlap (must be only one file, as sstable
     962           2 :         //     bounds are usually tight on user keys) and the caller is expected to split
     963           2 :         //     that sstable into two virtual sstables, allowing this file to go into that
     964           2 :         //     level. Note that if we have file boundary overlap with two files, which
     965           2 :         //     should only happen on rare occasions, we treat it as data overlap and
     966           2 :         //     don't use this optimization.
     967           2 :         //
     968           2 :         // The file boundary overlap check is simpler to conceptualize. Consider the
     969           2 :         // following example, in which the ingested file lies completely before or
     970           2 :         // after the file being considered.
     971           2 :         //
     972           2 :         //   |--|           |--|  ingested file: [a,b] or [f,g]
     973           2 :         //         |-----|        existing file: [c,e]
     974           2 :         //  _____________________
     975           2 :         //   a  b  c  d  e  f  g
     976           2 :         //
     977           2 :         // In both cases the ingested file can move to considering the next level.
     978           2 :         //
     979           2 :         // File boundary overlap does not necessarily imply data overlap. The check
     980           2 :         // for data overlap is a little more nuanced. Consider the following examples:
     981           2 :         //
     982           2 :         //  1. No data overlap:
     983           2 :         //
     984           2 :         //          |-|   |--|    ingested file: [cc-d] or [ee-ff]
     985           2 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     986           2 :         //  _____________________
     987           2 :         //   a  b  c  d  e  f  g
     988           2 :         //
     989           2 :         // In this case the ingested files can "fall through" this level. The checks
     990           2 :         // continue at the next level.
     991           2 :         //
     992           2 :         //  2. Data overlap:
     993           2 :         //
     994           2 :         //            |--|        ingested file: [d-e]
     995           2 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     996           2 :         //  _____________________
     997           2 :         //   a  b  c  d  e  f  g
     998           2 :         //
     999           2 :         // In this case the file cannot be ingested into this level as the point 'dd'
    1000           2 :         // is in the way.
    1001           2 :         //
    1002           2 :         // It is worth noting that the check for data overlap is only approximate. In
    1003           2 :         // the previous example, the ingested table [d-e] could contain only the
    1004           2 :         // points 'd' and 'e', in which case the table would be eligible for
    1005           2 :         // considering lower levels. However, such a fine-grained check would need to
    1006           2 :         // be exhaustive (comparing points and ranges in both the ingested existing
    1007           2 :         // tables) and such a check is prohibitively expensive. Thus Pebble treats any
    1008           2 :         // existing point that falls within the ingested table bounds as being "data
    1009           2 :         // overlap".
    1010           2 : 
    1011           2 :         if lsmOverlap[0].Result == overlap.Data {
    1012           2 :                 return 0, nil, nil
    1013           2 :         }
    1014           2 :         targetLevel = 0
    1015           2 :         splitFile = nil
    1016           2 :         for level := baseLevel; level < numLevels; level++ {
    1017           2 :                 var candidateSplitFile *fileMetadata
    1018           2 :                 switch lsmOverlap[level].Result {
    1019           2 :                 case overlap.Data:
    1020           2 :                         // We cannot ingest into or under this level; return the best target level
    1021           2 :                         // so far.
    1022           2 :                         return targetLevel, splitFile, nil
    1023             : 
    1024           2 :                 case overlap.OnlyBoundary:
    1025           2 :                         if !suggestSplit || lsmOverlap[level].SplitFile == nil {
    1026           2 :                                 // We can ingest under this level, but not into this level.
    1027           2 :                                 continue
    1028             :                         }
    1029             :                         // We can ingest into this level if we split this file.
    1030           2 :                         candidateSplitFile = lsmOverlap[level].SplitFile
    1031             : 
    1032           2 :                 case overlap.None:
    1033             :                 // We can ingest into this level.
    1034             : 
    1035           0 :                 default:
    1036           0 :                         return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
    1037             :                 }
    1038             : 
    1039             :                 // Check boundary overlap with any ongoing compactions. We consider an
    1040             :                 // overlapping compaction that's writing files to an output level as
    1041             :                 // equivalent to boundary overlap with files in that output level.
    1042             :                 //
    1043             :                 // We cannot check for data overlap with the new SSTs compaction will produce
    1044             :                 // since compaction hasn't been done yet. However, there's no need to check
    1045             :                 // since all keys in them will be from levels in [c.startLevel,
    1046             :                 // c.outputLevel], and all those levels have already had their data overlap
    1047             :                 // tested negative (else we'd have returned earlier).
    1048             :                 //
    1049             :                 // An alternative approach would be to cancel these compactions and proceed
    1050             :                 // with an ingest-time split on this level if necessary. However, compaction
    1051             :                 // cancellation can result in significant wasted effort and is best avoided
    1052             :                 // unless necessary.
    1053           2 :                 overlaps := false
    1054           2 :                 for c := range compactions {
    1055           2 :                         if c.outputLevel == nil || level != c.outputLevel.level {
    1056           2 :                                 continue
    1057             :                         }
    1058           2 :                         if cmp(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
    1059           2 :                                 cmp(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
    1060           2 :                                 overlaps = true
    1061           2 :                                 break
    1062             :                         }
    1063             :                 }
    1064           2 :                 if !overlaps {
    1065           2 :                         targetLevel = level
    1066           2 :                         splitFile = candidateSplitFile
    1067           2 :                 }
    1068             :         }
    1069           2 :         return targetLevel, splitFile, nil
    1070             : }
    1071             : 
    1072             : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
    1073             : // atomic and semantically equivalent to creating a single batch containing all
    1074             : // of the mutations in the sstables. Ingestion may require the memtable to be
    1075             : // flushed. The ingested sstable files are moved into the DB and must reside on
    1076             : // the same filesystem as the DB. Sstables can be created for ingestion using
    1077             : // sstable.Writer. On success, Ingest removes the input paths.
    1078             : //
    1079             : // Two types of sstables are accepted for ingestion(s): one is sstables present
    1080             : // in the instance's vfs.FS and can be referenced locally. The other is sstables
    1081             : // present in remote.Storage, referred to as shared or foreign sstables. These
    1082             : // shared sstables can be linked through objstorageprovider.Provider, and do not
    1083             : // need to already be present on the local vfs.FS. Foreign sstables must all fit
    1084             : // in an excise span, and are destined for a level specified in SharedSSTMeta.
    1085             : //
    1086             : // All sstables *must* be Sync()'d by the caller after all bytes are written
    1087             : // and before its file handle is closed; failure to do so could violate
    1088             : // durability or lead to corrupted on-disk state. This method cannot, in a
    1089             : // platform-and-FS-agnostic way, ensure that all sstables in the input are
    1090             : // properly synced to disk. Opening new file handles and Sync()-ing them
    1091             : // does not always guarantee durability; see the discussion here on that:
    1092             : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
    1093             : //
    1094             : // Ingestion loads each sstable into the lowest level of the LSM which it
    1095             : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
    1096             : // ingestion forces the memtable to flush, and then waits for the flush to
    1097             : // occur. In some cases, such as with no foreign sstables and no excise span,
    1098             : // ingestion that gets blocked on a memtable can join the flushable queue and
    1099             : // finish even before the memtable has been flushed.
    1100             : //
    1101             : // The steps for ingestion are:
    1102             : //
    1103             : //  1. Allocate file numbers for every sstable being ingested.
    1104             : //  2. Load the metadata for all sstables being ingested.
    1105             : //  3. Sort the sstables by smallest key, verifying non overlap (for local
    1106             : //     sstables).
    1107             : //  4. Hard link (or copy) the local sstables into the DB directory.
    1108             : //  5. Allocate a sequence number to use for all of the entries in the
    1109             : //     local sstables. This is the step where overlap with memtables is
    1110             : //     determined. If there is overlap, we remember the most recent memtable
    1111             : //     that overlaps.
    1112             : //  6. Update the sequence number in the ingested local sstables. (Remote
    1113             : //     sstables get fixed sequence numbers that were determined at load time.)
    1114             : //  7. Wait for the most recent memtable that overlaps to flush (if any).
    1115             : //  8. Add the ingested sstables to the version (DB.ingestApply).
    1116             : //     8.1.  If an excise span was specified, figure out what sstables in the
    1117             : //     current version overlap with the excise span, and create new virtual
    1118             : //     sstables out of those sstables that exclude the excised span (DB.excise).
    1119             : //  9. Publish the ingestion sequence number.
    1120             : //
    1121             : // Note that if the mutable memtable overlaps with ingestion, a flush of the
    1122             : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
    1123             : // mutations that get sequence numbers larger than the ingestion sequence
    1124             : // number get queued up behind the ingestion waiting for it to complete. This
    1125             : // can produce a noticeable hiccup in performance. See
    1126             : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
    1127             : // this hiccup.
    1128           2 : func (d *DB) Ingest(ctx context.Context, paths []string) error {
    1129           2 :         if err := d.closed.Load(); err != nil {
    1130           1 :                 panic(err)
    1131             :         }
    1132           2 :         if d.opts.ReadOnly {
    1133           1 :                 return ErrReadOnly
    1134           1 :         }
    1135           2 :         _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, nil /* external */)
    1136           2 :         return err
    1137             : }
    1138             : 
    1139             : // IngestOperationStats provides some information about where in the LSM the
    1140             : // bytes were ingested.
    1141             : type IngestOperationStats struct {
    1142             :         // Bytes is the total bytes in the ingested sstables.
    1143             :         Bytes uint64
    1144             :         // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
    1145             :         // into L0. This value is approximate when flushable ingests are active and
    1146             :         // an ingest overlaps an entry in the flushable queue. Currently, this
    1147             :         // approximation is very rough, only including tables that overlapped the
    1148             :         // memtable. This estimate may be improved with #2112.
    1149             :         ApproxIngestedIntoL0Bytes uint64
    1150             :         // MemtableOverlappingFiles is the count of ingested sstables
    1151             :         // that overlapped keys in the memtables.
    1152             :         MemtableOverlappingFiles int
    1153             : }
    1154             : 
    1155             : // ExternalFile are external sstables that can be referenced through
    1156             : // objprovider and ingested as remote files that will not be refcounted or
    1157             : // cleaned up. For use with online restore. Note that the underlying sstable
    1158             : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
    1159             : // is expected to only read the keys within those bounds.
    1160             : type ExternalFile struct {
    1161             :         // Locator is the shared.Locator that can be used with objProvider to
    1162             :         // resolve a reference to this external sstable.
    1163             :         Locator remote.Locator
    1164             : 
    1165             :         // ObjName is the unique name of this sstable on Locator.
    1166             :         ObjName string
    1167             : 
    1168             :         // Size of the referenced proportion of the virtualized sstable. An estimate
    1169             :         // is acceptable in lieu of the backing file size.
    1170             :         Size uint64
    1171             : 
    1172             :         // StartKey and EndKey define the bounds of the sstable; the ingestion
    1173             :         // of this file will only result in keys within [StartKey, EndKey) if
    1174             :         // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
    1175             :         // These bounds are loose i.e. it's possible for keys to not span the
    1176             :         // entirety of this range.
    1177             :         //
    1178             :         // StartKey and EndKey user keys must not have suffixes.
    1179             :         //
    1180             :         // Multiple ExternalFiles in one ingestion must all have non-overlapping
    1181             :         // bounds.
    1182             :         StartKey, EndKey []byte
    1183             : 
    1184             :         // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
    1185             :         EndKeyIsInclusive bool
    1186             : 
    1187             :         // HasPointKey and HasRangeKey denote whether this file contains point keys
    1188             :         // or range keys. If both structs are false, an error is returned during
    1189             :         // ingestion.
    1190             :         HasPointKey, HasRangeKey bool
    1191             : 
    1192             :         // SyntheticPrefix will prepend this suffix to all keys in the file during
    1193             :         // iteration. Note that the backing file itself is not modified.
    1194             :         //
    1195             :         // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
    1196             :         SyntheticPrefix []byte
    1197             : 
    1198             :         // SyntheticSuffix will replace the suffix of every key in the file during
    1199             :         // iteration. Note that the file itself is not modified, rather, every key
    1200             :         // returned by an iterator will have the synthetic suffix.
    1201             :         //
    1202             :         // SyntheticSuffix can only be used under the following conditions:
    1203             :         //  - the synthetic suffix must sort before any non-empty suffixes in the
    1204             :         //    backing sst (the entire sst, not just the part restricted to Bounds).
    1205             :         //  - the backing sst must not contain multiple keys with the same prefix.
    1206             :         SyntheticSuffix []byte
    1207             : 
    1208             :         // Level denotes the level at which this file was present at read time
    1209             :         // if the external file was returned by a scan of an existing Pebble
    1210             :         // instance. If Level is 0, this field is ignored.
    1211             :         Level uint8
    1212             : }
    1213             : 
    1214             : // IngestWithStats does the same as Ingest, and additionally returns
    1215             : // IngestOperationStats.
    1216           1 : func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
    1217           1 :         if err := d.closed.Load(); err != nil {
    1218           0 :                 panic(err)
    1219             :         }
    1220           1 :         if d.opts.ReadOnly {
    1221           0 :                 return IngestOperationStats{}, ErrReadOnly
    1222           0 :         }
    1223           1 :         return d.ingest(ctx, paths, nil, KeyRange{}, nil)
    1224             : }
    1225             : 
    1226             : // IngestExternalFiles does the same as IngestWithStats, and additionally
    1227             : // accepts external files (with locator info that can be resolved using
    1228             : // d.opts.SharedStorage). These files must also be non-overlapping with
    1229             : // each other, and must be resolvable through d.objProvider.
    1230             : func (d *DB) IngestExternalFiles(
    1231             :         ctx context.Context, external []ExternalFile,
    1232           2 : ) (IngestOperationStats, error) {
    1233           2 :         if err := d.closed.Load(); err != nil {
    1234           0 :                 panic(err)
    1235             :         }
    1236             : 
    1237           2 :         if d.opts.ReadOnly {
    1238           0 :                 return IngestOperationStats{}, ErrReadOnly
    1239           0 :         }
    1240           2 :         if d.opts.Experimental.RemoteStorage == nil {
    1241           0 :                 return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
    1242           0 :         }
    1243           2 :         return d.ingest(ctx, nil, nil, KeyRange{}, external)
    1244             : }
    1245             : 
    1246             : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
    1247             : // list of shared files to ingest that can be read from a remote.Storage through
    1248             : // a Provider. All the shared files must live within exciseSpan, and any existing
    1249             : // keys in exciseSpan are deleted by turning existing sstables into virtual
    1250             : // sstables (if not virtual already) and shrinking their spans to exclude
    1251             : // exciseSpan. See the comment at Ingest for a more complete picture of the
    1252             : // ingestion process.
    1253             : //
    1254             : // Panics if this DB instance was not instantiated with a remote.Storage and
    1255             : // shared sstables are present.
    1256             : func (d *DB) IngestAndExcise(
    1257             :         ctx context.Context,
    1258             :         paths []string,
    1259             :         shared []SharedSSTMeta,
    1260             :         external []ExternalFile,
    1261             :         exciseSpan KeyRange,
    1262           2 : ) (IngestOperationStats, error) {
    1263           2 :         if err := d.closed.Load(); err != nil {
    1264           0 :                 panic(err)
    1265             :         }
    1266           2 :         if d.opts.ReadOnly {
    1267           0 :                 return IngestOperationStats{}, ErrReadOnly
    1268           0 :         }
    1269           2 :         if invariants.Enabled {
    1270           2 :                 // Excise is only supported on prefix keys.
    1271           2 :                 if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
    1272           0 :                         panic("IngestAndExcise called with suffixed start key")
    1273             :                 }
    1274           2 :                 if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
    1275           0 :                         panic("IngestAndExcise called with suffixed end key")
    1276             :                 }
    1277             :         }
    1278           2 :         if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
    1279           0 :                 return IngestOperationStats{}, errors.Errorf(
    1280           0 :                         "store has format major version %d; IngestAndExcise requires at least %d",
    1281           0 :                         v, FormatMinForSharedObjects,
    1282           0 :                 )
    1283           0 :         }
    1284           2 :         return d.ingest(ctx, paths, shared, exciseSpan, external)
    1285             : }
    1286             : 
    1287             : // Both DB.mu and commitPipeline.mu must be held while this is called.
    1288             : func (d *DB) newIngestedFlushableEntry(
    1289             :         meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
    1290           2 : ) (*flushableEntry, error) {
    1291           2 :         // If there's an excise being done atomically with the same ingest, we
    1292           2 :         // assign the lowest sequence number in the set of sequence numbers for this
    1293           2 :         // ingestion to the excise. Note that we've already allocated fileCount+1
    1294           2 :         // sequence numbers in this case.
    1295           2 :         //
    1296           2 :         // This mimics the behaviour in the non-flushable ingest case (see the callsite
    1297           2 :         // for ingestUpdateSeqNum).
    1298           2 :         fileSeqNumStart := seqNum
    1299           2 :         if exciseSpan.Valid() {
    1300           2 :                 fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise.
    1301           2 :         }
    1302             :         // Update the sequence number for all of the sstables in the
    1303             :         // metadata. Writing the metadata to the manifest when the
    1304             :         // version edit is applied is the mechanism that persists the
    1305             :         // sequence number. The sstables themselves are left unmodified.
    1306             :         // In this case, a version edit will only be written to the manifest
    1307             :         // when the flushable is eventually flushed. If Pebble restarts in that
    1308             :         // time, then we'll lose the ingest sequence number information. But this
    1309             :         // information will also be reconstructed on node restart.
    1310           2 :         for i, m := range meta {
    1311           2 :                 if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
    1312           0 :                         return nil, err
    1313           0 :                 }
    1314             :         }
    1315             : 
    1316           2 :         f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum)
    1317           2 : 
    1318           2 :         // NB: The logNum/seqNum are the WAL number which we're writing this entry
    1319           2 :         // to and the sequence number within the WAL which we'll write this entry
    1320           2 :         // to.
    1321           2 :         entry := d.newFlushableEntry(f, logNum, seqNum)
    1322           2 :         // The flushable entry starts off with a single reader ref, so increment
    1323           2 :         // the FileMetadata.Refs.
    1324           2 :         for _, file := range f.files {
    1325           2 :                 file.FileBacking.Ref()
    1326           2 :         }
    1327           2 :         entry.unrefFiles = func() []*fileBacking {
    1328           2 :                 var obsolete []*fileBacking
    1329           2 :                 for _, file := range f.files {
    1330           2 :                         if file.FileBacking.Unref() == 0 {
    1331           2 :                                 obsolete = append(obsolete, file.FileMetadata.FileBacking)
    1332           2 :                         }
    1333             :                 }
    1334           2 :                 return obsolete
    1335             :         }
    1336             : 
    1337           2 :         entry.flushForced = true
    1338           2 :         entry.releaseMemAccounting = func() {}
    1339           2 :         return entry, nil
    1340             : }
    1341             : 
    1342             : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
    1343             : // we're holding both locks, the order in which we rotate the memtable or
    1344             : // recycle the WAL in this function is irrelevant as long as the correct log
    1345             : // numbers are assigned to the appropriate flushable.
    1346             : func (d *DB) handleIngestAsFlushable(
    1347             :         meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
    1348           2 : ) error {
    1349           2 :         b := d.NewBatch()
    1350           2 :         if exciseSpan.Valid() {
    1351           2 :                 b.excise(exciseSpan.Start, exciseSpan.End)
    1352           2 :         }
    1353           2 :         for _, m := range meta {
    1354           2 :                 b.ingestSST(m.FileNum)
    1355           2 :         }
    1356           2 :         b.setSeqNum(seqNum)
    1357           2 : 
    1358           2 :         // If the WAL is disabled, then the logNum used to create the flushable
    1359           2 :         // entry doesn't matter. We just use the logNum assigned to the current
    1360           2 :         // mutable memtable. If the WAL is enabled, then this logNum will be
    1361           2 :         // overwritten by the logNum of the log which will contain the log entry
    1362           2 :         // for the ingestedFlushable.
    1363           2 :         logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1364           2 :         if !d.opts.DisableWAL {
    1365           2 :                 // We create a new WAL for the flushable instead of reusing the end of
    1366           2 :                 // the previous WAL. This simplifies the increment of the minimum
    1367           2 :                 // unflushed log number, and also simplifies WAL replay.
    1368           2 :                 var prevLogSize uint64
    1369           2 :                 logNum, prevLogSize = d.rotateWAL()
    1370           2 :                 // As the rotator of the WAL, we're responsible for updating the
    1371           2 :                 // previous flushable queue tail's log size.
    1372           2 :                 d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
    1373           2 : 
    1374           2 :                 d.mu.Unlock()
    1375           2 :                 err := d.commit.directWrite(b)
    1376           2 :                 if err != nil {
    1377           0 :                         d.opts.Logger.Fatalf("%v", err)
    1378           0 :                 }
    1379           2 :                 d.mu.Lock()
    1380             :         }
    1381             : 
    1382             :         // The excise span is going to outlive this ingestion call. Copy it.
    1383           2 :         exciseSpan = KeyRange{
    1384           2 :                 Start: slices.Clone(exciseSpan.Start),
    1385           2 :                 End:   slices.Clone(exciseSpan.End),
    1386           2 :         }
    1387           2 :         entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
    1388           2 :         if err != nil {
    1389           0 :                 return err
    1390           0 :         }
    1391           2 :         nextSeqNum := seqNum + base.SeqNum(b.Count())
    1392           2 : 
    1393           2 :         // Set newLogNum to the logNum of the previous flushable. This value is
    1394           2 :         // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
    1395           2 :         // the appropriate value below.
    1396           2 :         newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1397           2 :         if !d.opts.DisableWAL {
    1398           2 :                 // newLogNum will be the WAL num of the next mutable memtable which
    1399           2 :                 // comes after the ingestedFlushable in the flushable queue. The mutable
    1400           2 :                 // memtable will be created below.
    1401           2 :                 //
    1402           2 :                 // The prevLogSize returned by rotateWAL is the WAL to which the
    1403           2 :                 // flushable ingest keys were appended. This intermediary WAL is only
    1404           2 :                 // used to record the flushable ingest and nothing else.
    1405           2 :                 newLogNum, entry.logSize = d.rotateWAL()
    1406           2 :         }
    1407             : 
    1408           2 :         d.mu.versions.metrics.Ingest.Count++
    1409           2 :         currMem := d.mu.mem.mutable
    1410           2 :         // NB: Placing ingested sstables above the current memtables
    1411           2 :         // requires rotating of the existing memtables/WAL. There is
    1412           2 :         // some concern of churning through tiny memtables due to
    1413           2 :         // ingested sstables being placed on top of them, but those
    1414           2 :         // memtables would have to be flushed anyways.
    1415           2 :         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1416           2 :         d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
    1417           2 :         d.updateReadStateLocked(d.opts.DebugCheck)
    1418           2 :         // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
    1419           2 :         d.maybeScheduleFlush()
    1420           2 :         return nil
    1421             : }
    1422             : 
    1423             : // See comment at Ingest() for details on how this works.
    1424             : func (d *DB) ingest(
    1425             :         ctx context.Context,
    1426             :         paths []string,
    1427             :         shared []SharedSSTMeta,
    1428             :         exciseSpan KeyRange,
    1429             :         external []ExternalFile,
    1430           2 : ) (IngestOperationStats, error) {
    1431           2 :         if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
    1432           0 :                 panic("cannot ingest shared sstables with nil SharedStorage")
    1433             :         }
    1434           2 :         if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
    1435           0 :                 return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
    1436           0 :         }
    1437           2 :         if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
    1438           1 :                 for i := range external {
    1439           1 :                         if len(external[i].SyntheticPrefix) > 0 {
    1440           1 :                                 return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
    1441           1 :                         }
    1442           1 :                         if len(external[i].SyntheticSuffix) > 0 {
    1443           1 :                                 return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
    1444           1 :                         }
    1445             :                 }
    1446             :         }
    1447             :         // Allocate file numbers for all of the files being ingested and mark them as
    1448             :         // pending in order to prevent them from being deleted. Note that this causes
    1449             :         // the file number ordering to be out of alignment with sequence number
    1450             :         // ordering. The sorting of L0 tables by sequence number avoids relying on
    1451             :         // that (busted) invariant.
    1452           2 :         pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
    1453           2 :         for i := 0; i < len(paths)+len(shared)+len(external); i++ {
    1454           2 :                 pendingOutputs[i] = d.mu.versions.getNextFileNum()
    1455           2 :         }
    1456             : 
    1457           2 :         jobID := d.newJobID()
    1458           2 : 
    1459           2 :         // Load the metadata for all the files being ingested. This step detects
    1460           2 :         // and elides empty sstables.
    1461           2 :         loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
    1462           2 :         if err != nil {
    1463           1 :                 return IngestOperationStats{}, err
    1464           1 :         }
    1465             : 
    1466           2 :         if loadResult.fileCount() == 0 {
    1467           2 :                 // All of the sstables to be ingested were empty. Nothing to do.
    1468           2 :                 return IngestOperationStats{}, nil
    1469           2 :         }
    1470             : 
    1471             :         // Verify the sstables do not overlap.
    1472           2 :         if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
    1473           2 :                 return IngestOperationStats{}, err
    1474           2 :         }
    1475             : 
    1476             :         // Hard link the sstables into the DB directory. Since the sstables aren't
    1477             :         // referenced by a version, they won't be used. If the hard linking fails
    1478             :         // (e.g. because the files reside on a different filesystem), ingestLinkLocal
    1479             :         // will fall back to copying, and if that fails we undo our work and return an
    1480             :         // error.
    1481           2 :         if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
    1482           0 :                 return IngestOperationStats{}, err
    1483           0 :         }
    1484             : 
    1485           2 :         err = d.ingestAttachRemote(jobID, loadResult)
    1486           2 :         defer d.ingestUnprotectExternalBackings(loadResult)
    1487           2 :         if err != nil {
    1488           0 :                 return IngestOperationStats{}, err
    1489           0 :         }
    1490             : 
    1491             :         // Make the new tables durable. We need to do this at some point before we
    1492             :         // update the MANIFEST (via logAndApply), otherwise a crash can have the
    1493             :         // tables referenced in the MANIFEST, but not present in the provider.
    1494           2 :         if err := d.objProvider.Sync(); err != nil {
    1495           1 :                 return IngestOperationStats{}, err
    1496           1 :         }
    1497             : 
    1498             :         // metaFlushableOverlaps is a map indicating which of the ingested sstables
    1499             :         // overlap some table in the flushable queue. It's used to approximate
    1500             :         // ingest-into-L0 stats when using flushable ingests.
    1501           2 :         metaFlushableOverlaps := make(map[FileNum]bool, loadResult.fileCount())
    1502           2 :         var mem *flushableEntry
    1503           2 :         var mut *memTable
    1504           2 :         // asFlushable indicates whether the sstable was ingested as a flushable.
    1505           2 :         var asFlushable bool
    1506           2 :         prepare := func(seqNum base.SeqNum) {
    1507           2 :                 // Note that d.commit.mu is held by commitPipeline when calling prepare.
    1508           2 : 
    1509           2 :                 // Determine the set of bounds we care about for the purpose of checking
    1510           2 :                 // for overlap among the flushables. If there's an excise span, we need
    1511           2 :                 // to check for overlap with its bounds as well.
    1512           2 :                 overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
    1513           2 :                 for _, m := range loadResult.local {
    1514           2 :                         overlapBounds = append(overlapBounds, m.fileMetadata)
    1515           2 :                 }
    1516           2 :                 for _, m := range loadResult.shared {
    1517           2 :                         overlapBounds = append(overlapBounds, m.fileMetadata)
    1518           2 :                 }
    1519           2 :                 for _, m := range loadResult.external {
    1520           2 :                         overlapBounds = append(overlapBounds, m.fileMetadata)
    1521           2 :                 }
    1522           2 :                 if exciseSpan.Valid() {
    1523           2 :                         overlapBounds = append(overlapBounds, &exciseSpan)
    1524           2 :                 }
    1525             : 
    1526           2 :                 d.mu.Lock()
    1527           2 :                 defer d.mu.Unlock()
    1528           2 : 
    1529           2 :                 // Check if any of the currently-open EventuallyFileOnlySnapshots overlap
    1530           2 :                 // in key ranges with the excise span. If so, we need to check for memtable
    1531           2 :                 // overlaps with all bounds of that EventuallyFileOnlySnapshot in addition
    1532           2 :                 // to the ingestion's own bounds too.
    1533           2 : 
    1534           2 :                 if exciseSpan.Valid() {
    1535           2 :                         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
    1536           2 :                                 if s.efos == nil {
    1537           0 :                                         continue
    1538             :                                 }
    1539           2 :                                 if base.Visible(seqNum, s.efos.seqNum, base.SeqNumMax) {
    1540           0 :                                         // We only worry about snapshots older than the excise. Any snapshots
    1541           0 :                                         // created after the excise should see the excised view of the LSM
    1542           0 :                                         // anyway.
    1543           0 :                                         //
    1544           0 :                                         // Since we delay publishing the excise seqnum as visible until after
    1545           0 :                                         // the apply step, this case will never be hit in practice until we
    1546           0 :                                         // make excises flushable ingests.
    1547           0 :                                         continue
    1548             :                                 }
    1549           2 :                                 if invariants.Enabled {
    1550           2 :                                         if s.efos.hasTransitioned() {
    1551           0 :                                                 panic("unexpected transitioned EFOS in snapshots list")
    1552             :                                         }
    1553             :                                 }
    1554           2 :                                 for i := range s.efos.protectedRanges {
    1555           2 :                                         if !s.efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
    1556           2 :                                                 continue
    1557             :                                         }
    1558             :                                         // Our excise conflicts with this EFOS. We need to add its protected
    1559             :                                         // ranges to our overlapBounds. Grow overlapBounds in one allocation
    1560             :                                         // if necesary.
    1561           2 :                                         prs := s.efos.protectedRanges
    1562           2 :                                         if cap(overlapBounds) < len(overlapBounds)+len(prs) {
    1563           2 :                                                 oldOverlapBounds := overlapBounds
    1564           2 :                                                 overlapBounds = make([]bounded, len(oldOverlapBounds), len(oldOverlapBounds)+len(prs))
    1565           2 :                                                 copy(overlapBounds, oldOverlapBounds)
    1566           2 :                                         }
    1567           2 :                                         for i := range prs {
    1568           2 :                                                 overlapBounds = append(overlapBounds, &prs[i])
    1569           2 :                                         }
    1570           2 :                                         break
    1571             :                                 }
    1572             :                         }
    1573             :                 }
    1574             : 
    1575             :                 // Check to see if any files overlap with any of the memtables. The queue
    1576             :                 // is ordered from oldest to newest with the mutable memtable being the
    1577             :                 // last element in the slice. We want to wait for the newest table that
    1578             :                 // overlaps.
    1579             : 
    1580           2 :                 for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
    1581           2 :                         m := d.mu.mem.queue[i]
    1582           2 :                         m.computePossibleOverlaps(func(b bounded) shouldContinue {
    1583           2 :                                 // If this is the first table to overlap a flushable, save
    1584           2 :                                 // the flushable. This ingest must be ingested or flushed
    1585           2 :                                 // after it.
    1586           2 :                                 if mem == nil {
    1587           2 :                                         mem = m
    1588           2 :                                 }
    1589             : 
    1590           2 :                                 switch v := b.(type) {
    1591           2 :                                 case *fileMetadata:
    1592           2 :                                         // NB: False positives are possible if `m` is a flushable
    1593           2 :                                         // ingest that overlaps the file `v` in bounds but doesn't
    1594           2 :                                         // contain overlapping data. This is considered acceptable
    1595           2 :                                         // because it's rare (in CockroachDB a bound overlap likely
    1596           2 :                                         // indicates a data overlap), and blocking the commit
    1597           2 :                                         // pipeline while we perform I/O to check for overlap may be
    1598           2 :                                         // more disruptive than enqueueing this ingestion on the
    1599           2 :                                         // flushable queue and switching to a new memtable.
    1600           2 :                                         metaFlushableOverlaps[v.FileNum] = true
    1601           2 :                                 case *KeyRange:
    1602             :                                         // An excise span or an EventuallyFileOnlySnapshot protected range;
    1603             :                                         // not a file.
    1604           0 :                                 default:
    1605           0 :                                         panic("unreachable")
    1606             :                                 }
    1607           2 :                                 return continueIteration
    1608             :                         }, overlapBounds...)
    1609             :                 }
    1610             : 
    1611           2 :                 if mem == nil {
    1612           2 :                         // No overlap with any of the queued flushables, so no need to queue
    1613           2 :                         // after them.
    1614           2 : 
    1615           2 :                         // New writes with higher sequence numbers may be concurrently
    1616           2 :                         // committed. We must ensure they don't flush before this ingest
    1617           2 :                         // completes. To do that, we ref the mutable memtable as a writer,
    1618           2 :                         // preventing its flushing (and the flushing of all subsequent
    1619           2 :                         // flushables in the queue). Once we've acquired the manifest lock
    1620           2 :                         // to add the ingested sstables to the LSM, we can unref as we're
    1621           2 :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1622           2 :                         mut = d.mu.mem.mutable
    1623           2 :                         mut.writerRef()
    1624           2 :                         return
    1625           2 :                 }
    1626             : 
    1627             :                 // The ingestion overlaps with some entry in the flushable queue. If the
    1628             :                 // pre-conditions are met below, we can treat this ingestion as a flushable
    1629             :                 // ingest, otherwise we wait on the memtable flush before ingestion.
    1630             :                 //
    1631             :                 // TODO(aaditya): We should make flushableIngest compatible with remote
    1632             :                 // files.
    1633           2 :                 hasRemoteFiles := len(shared) > 0 || len(external) > 0
    1634           2 :                 canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
    1635           2 :                         (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
    1636           2 :                         !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
    1637           2 :                         (!exciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
    1638           2 : 
    1639           2 :                 if !canIngestFlushable {
    1640           2 :                         // We're not able to ingest as a flushable,
    1641           2 :                         // so we must synchronously flush.
    1642           2 :                         //
    1643           2 :                         // TODO(bilal): Currently, if any of the files being ingested are shared,
    1644           2 :                         // we cannot use flushable ingests and need
    1645           2 :                         // to wait synchronously.
    1646           2 :                         if mem.flushable == d.mu.mem.mutable {
    1647           2 :                                 err = d.makeRoomForWrite(nil)
    1648           2 :                         }
    1649             :                         // New writes with higher sequence numbers may be concurrently
    1650             :                         // committed. We must ensure they don't flush before this ingest
    1651             :                         // completes. To do that, we ref the mutable memtable as a writer,
    1652             :                         // preventing its flushing (and the flushing of all subsequent
    1653             :                         // flushables in the queue). Once we've acquired the manifest lock
    1654             :                         // to add the ingested sstables to the LSM, we can unref as we're
    1655             :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1656           2 :                         mut = d.mu.mem.mutable
    1657           2 :                         mut.writerRef()
    1658           2 :                         mem.flushForced = true
    1659           2 :                         d.maybeScheduleFlush()
    1660           2 :                         return
    1661             :                 }
    1662             :                 // Since there aren't too many memtables already queued up, we can
    1663             :                 // slide the ingested sstables on top of the existing memtables.
    1664           2 :                 asFlushable = true
    1665           2 :                 fileMetas := make([]*fileMetadata, len(loadResult.local))
    1666           2 :                 for i := range fileMetas {
    1667           2 :                         fileMetas[i] = loadResult.local[i].fileMetadata
    1668           2 :                 }
    1669           2 :                 err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
    1670             :         }
    1671             : 
    1672           2 :         var ve *versionEdit
    1673           2 :         apply := func(seqNum base.SeqNum) {
    1674           2 :                 if err != nil || asFlushable {
    1675           2 :                         // An error occurred during prepare.
    1676           2 :                         if mut != nil {
    1677           0 :                                 if mut.writerUnref() {
    1678           0 :                                         d.mu.Lock()
    1679           0 :                                         d.maybeScheduleFlush()
    1680           0 :                                         d.mu.Unlock()
    1681           0 :                                 }
    1682             :                         }
    1683           2 :                         return
    1684             :                 }
    1685             : 
    1686             :                 // If there's an excise being done atomically with the same ingest, we
    1687             :                 // assign the lowest sequence number in the set of sequence numbers for this
    1688             :                 // ingestion to the excise. Note that we've already allocated fileCount+1
    1689             :                 // sequence numbers in this case.
    1690           2 :                 if exciseSpan.Valid() {
    1691           2 :                         seqNum++ // the first seqNum is reserved for the excise.
    1692           2 :                 }
    1693             :                 // Update the sequence numbers for all ingested sstables'
    1694             :                 // metadata. When the version edit is applied, the metadata is
    1695             :                 // written to the manifest, persisting the sequence number.
    1696             :                 // The sstables themselves are left unmodified.
    1697           2 :                 if err = ingestUpdateSeqNum(
    1698           2 :                         d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
    1699           2 :                 ); err != nil {
    1700           0 :                         if mut != nil {
    1701           0 :                                 if mut.writerUnref() {
    1702           0 :                                         d.mu.Lock()
    1703           0 :                                         d.maybeScheduleFlush()
    1704           0 :                                         d.mu.Unlock()
    1705           0 :                                 }
    1706             :                         }
    1707           0 :                         return
    1708             :                 }
    1709             : 
    1710             :                 // If we overlapped with a memtable in prepare wait for the flush to
    1711             :                 // finish.
    1712           2 :                 if mem != nil {
    1713           2 :                         <-mem.flushed
    1714           2 :                 }
    1715             : 
    1716             :                 // Assign the sstables to the correct level in the LSM and apply the
    1717             :                 // version edit.
    1718           2 :                 ve, err = d.ingestApply(ctx, jobID, loadResult, mut, exciseSpan, seqNum)
    1719             :         }
    1720             : 
    1721             :         // Only one ingest can occur at a time because if not, one would block waiting
    1722             :         // for the other to finish applying. This blocking would happen while holding
    1723             :         // the commit mutex which would prevent unrelated batches from writing their
    1724             :         // changes to the WAL and memtable. This will cause a bigger commit hiccup
    1725             :         // during ingestion.
    1726           2 :         seqNumCount := loadResult.fileCount()
    1727           2 :         if exciseSpan.Valid() {
    1728           2 :                 seqNumCount++
    1729           2 :         }
    1730           2 :         d.commit.ingestSem <- struct{}{}
    1731           2 :         d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
    1732           2 :         <-d.commit.ingestSem
    1733           2 : 
    1734           2 :         if err != nil {
    1735           1 :                 if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
    1736           0 :                         d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
    1737           0 :                 }
    1738           2 :         } else {
    1739           2 :                 // Since we either created a hard link to the ingesting files, or copied
    1740           2 :                 // them over, it is safe to remove the originals paths.
    1741           2 :                 for i := range loadResult.local {
    1742           2 :                         path := loadResult.local[i].path
    1743           2 :                         if err2 := d.opts.FS.Remove(path); err2 != nil {
    1744           1 :                                 d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
    1745           1 :                         }
    1746             :                 }
    1747             :         }
    1748             : 
    1749           2 :         info := TableIngestInfo{
    1750           2 :                 JobID:     int(jobID),
    1751           2 :                 Err:       err,
    1752           2 :                 flushable: asFlushable,
    1753           2 :         }
    1754           2 :         if len(loadResult.local) > 0 {
    1755           2 :                 info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
    1756           2 :         } else if len(loadResult.shared) > 0 {
    1757           2 :                 info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
    1758           2 :         } else {
    1759           2 :                 info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
    1760           2 :         }
    1761           2 :         var stats IngestOperationStats
    1762           2 :         if ve != nil {
    1763           2 :                 info.Tables = make([]struct {
    1764           2 :                         TableInfo
    1765           2 :                         Level int
    1766           2 :                 }, len(ve.NewFiles))
    1767           2 :                 for i := range ve.NewFiles {
    1768           2 :                         e := &ve.NewFiles[i]
    1769           2 :                         info.Tables[i].Level = e.Level
    1770           2 :                         info.Tables[i].TableInfo = e.Meta.TableInfo()
    1771           2 :                         stats.Bytes += e.Meta.Size
    1772           2 :                         if e.Level == 0 {
    1773           2 :                                 stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
    1774           2 :                         }
    1775           2 :                         if metaFlushableOverlaps[e.Meta.FileNum] {
    1776           2 :                                 stats.MemtableOverlappingFiles++
    1777           2 :                         }
    1778             :                 }
    1779           2 :         } else if asFlushable {
    1780           2 :                 // NB: If asFlushable == true, there are no shared sstables.
    1781           2 :                 info.Tables = make([]struct {
    1782           2 :                         TableInfo
    1783           2 :                         Level int
    1784           2 :                 }, len(loadResult.local))
    1785           2 :                 for i, f := range loadResult.local {
    1786           2 :                         info.Tables[i].Level = -1
    1787           2 :                         info.Tables[i].TableInfo = f.TableInfo()
    1788           2 :                         stats.Bytes += f.Size
    1789           2 :                         // We don't have exact stats on which files will be ingested into
    1790           2 :                         // L0, because actual ingestion into the LSM has been deferred until
    1791           2 :                         // flush time. Instead, we infer based on memtable overlap.
    1792           2 :                         //
    1793           2 :                         // TODO(jackson): If we optimistically compute data overlap (#2112)
    1794           2 :                         // before entering the commit pipeline, we can use that overlap to
    1795           2 :                         // improve our approximation by incorporating overlap with L0, not
    1796           2 :                         // just memtables.
    1797           2 :                         if metaFlushableOverlaps[f.FileNum] {
    1798           2 :                                 stats.ApproxIngestedIntoL0Bytes += f.Size
    1799           2 :                                 stats.MemtableOverlappingFiles++
    1800           2 :                         }
    1801             :                 }
    1802             :         }
    1803           2 :         d.opts.EventListener.TableIngested(info)
    1804           2 : 
    1805           2 :         return stats, err
    1806             : }
    1807             : 
    1808             : // excise updates ve to include a replacement of the file m with new virtual
    1809             : // sstables that exclude exciseSpan, returning a slice of newly-created files if
    1810             : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
    1811             : // and m is deleted. Note that ve is updated in-place.
    1812             : //
    1813             : // This method is agnostic to whether d.mu is held or not. Some cases call it with
    1814             : // the db mutex held (eg. ingest-time excises), while in the case of compactions
    1815             : // the mutex is not held.
    1816             : func (d *DB) excise(
    1817             :         ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
    1818           2 : ) ([]manifest.NewFileEntry, error) {
    1819           2 :         numCreatedFiles := 0
    1820           2 :         // Check if there's actually an overlap between m and exciseSpan.
    1821           2 :         mBounds := base.UserKeyBoundsFromInternal(m.Smallest, m.Largest)
    1822           2 :         if !exciseSpan.Overlaps(d.cmp, &mBounds) {
    1823           2 :                 return nil, nil
    1824           2 :         }
    1825           2 :         ve.DeletedFiles[deletedFileEntry{
    1826           2 :                 Level:   level,
    1827           2 :                 FileNum: m.FileNum,
    1828           2 :         }] = m
    1829           2 :         // Fast path: m sits entirely within the exciseSpan, so just delete it.
    1830           2 :         if exciseSpan.ContainsInternalKey(d.cmp, m.Smallest) && exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
    1831           2 :                 return nil, nil
    1832           2 :         }
    1833             : 
    1834           2 :         var iters iterSet
    1835           2 :         var itersLoaded bool
    1836           2 :         defer iters.CloseAll()
    1837           2 :         loadItersIfNecessary := func() error {
    1838           2 :                 if itersLoaded {
    1839           2 :                         return nil
    1840           2 :                 }
    1841           2 :                 var err error
    1842           2 :                 iters, err = d.newIters(ctx, m, &IterOptions{
    1843           2 :                         Category: categoryIngest,
    1844           2 :                         layer:    manifest.Level(level),
    1845           2 :                 }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
    1846           2 :                 itersLoaded = true
    1847           2 :                 return err
    1848             :         }
    1849             : 
    1850           2 :         needsBacking := false
    1851           2 :         // Create a file to the left of the excise span, if necessary.
    1852           2 :         // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
    1853           2 :         //
    1854           2 :         // We create bounds that are tight on user keys, and we make the effort to find
    1855           2 :         // the last key in the original sstable that's smaller than exciseSpan.Start
    1856           2 :         // even though it requires some sstable reads. We could choose to create
    1857           2 :         // virtual sstables on loose userKey bounds, in which case we could just set
    1858           2 :         // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
    1859           2 :         // issue with that approach would be that it'd lead to lots of small virtual
    1860           2 :         // sstables in the LSM that have no guarantee on containing even a single user
    1861           2 :         // key within the file bounds. This has the potential to increase both read and
    1862           2 :         // write-amp as we will be opening up these sstables only to find no relevant
    1863           2 :         // keys in the read path, and compacting sstables on top of them instead of
    1864           2 :         // directly into the space occupied by them. We choose to incur the cost of
    1865           2 :         // calculating tight bounds at this time instead of creating more work in the
    1866           2 :         // future.
    1867           2 :         //
    1868           2 :         // TODO(bilal): Some of this work can happen without grabbing the manifest
    1869           2 :         // lock; we could grab one currentVersion, release the lock, calculate excised
    1870           2 :         // files, then grab the lock again and recalculate for just the files that
    1871           2 :         // have changed since our previous calculation. Do this optimiaztino as part of
    1872           2 :         // https://github.com/cockroachdb/pebble/issues/2112 .
    1873           2 :         if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
    1874           2 :                 leftFile := &fileMetadata{
    1875           2 :                         Virtual:     true,
    1876           2 :                         FileBacking: m.FileBacking,
    1877           2 :                         FileNum:     d.mu.versions.getNextFileNum(),
    1878           2 :                         // Note that these are loose bounds for smallest/largest seqnums, but they're
    1879           2 :                         // sufficient for maintaining correctness.
    1880           2 :                         SmallestSeqNum:           m.SmallestSeqNum,
    1881           2 :                         LargestSeqNum:            m.LargestSeqNum,
    1882           2 :                         LargestSeqNumAbsolute:    m.LargestSeqNumAbsolute,
    1883           2 :                         SyntheticPrefixAndSuffix: m.SyntheticPrefixAndSuffix,
    1884           2 :                 }
    1885           2 :                 if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
    1886           2 :                         // This file will probably contain point keys.
    1887           2 :                         if err := loadItersIfNecessary(); err != nil {
    1888           0 :                                 return nil, err
    1889           0 :                         }
    1890           2 :                         smallestPointKey := m.SmallestPointKey
    1891           2 :                         if kv := iters.Point().SeekLT(exciseSpan.Start, base.SeekLTFlagsNone); kv != nil {
    1892           2 :                                 leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, kv.K.Clone())
    1893           2 :                         }
    1894             :                         // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
    1895             :                         // needs to be a copy if the key is owned by the range del iter.
    1896           2 :                         var lastRangeDel []byte
    1897           2 :                         if rdel, err := iters.RangeDeletion().SeekLT(exciseSpan.Start); err != nil {
    1898           0 :                                 return nil, err
    1899           2 :                         } else if rdel != nil {
    1900           2 :                                 lastRangeDel = append(lastRangeDel[:0], rdel.End...)
    1901           2 :                                 if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
    1902           2 :                                         lastRangeDel = exciseSpan.Start
    1903           2 :                                 }
    1904             :                         }
    1905           2 :                         if lastRangeDel != nil {
    1906           2 :                                 leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
    1907           2 :                         }
    1908             :                 }
    1909           2 :                 if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestRangeKey) {
    1910           2 :                         // This file will probably contain range keys.
    1911           2 :                         if err := loadItersIfNecessary(); err != nil {
    1912           0 :                                 return nil, err
    1913           0 :                         }
    1914           2 :                         smallestRangeKey := m.SmallestRangeKey
    1915           2 :                         // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
    1916           2 :                         // needs to be a copy if the key is owned by the range key iter.
    1917           2 :                         var lastRangeKey []byte
    1918           2 :                         var lastRangeKeyKind InternalKeyKind
    1919           2 :                         if rkey, err := iters.RangeKey().SeekLT(exciseSpan.Start); err != nil {
    1920           0 :                                 return nil, err
    1921           2 :                         } else if rkey != nil {
    1922           2 :                                 lastRangeKey = append(lastRangeKey[:0], rkey.End...)
    1923           2 :                                 if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
    1924           2 :                                         lastRangeKey = exciseSpan.Start
    1925           2 :                                 }
    1926           2 :                                 lastRangeKeyKind = rkey.Keys[0].Kind()
    1927             :                         }
    1928           2 :                         if lastRangeKey != nil {
    1929           2 :                                 leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
    1930           2 :                         }
    1931             :                 }
    1932           2 :                 if leftFile.HasRangeKeys || leftFile.HasPointKeys {
    1933           2 :                         var err error
    1934           2 :                         leftFile.Size, err = d.fileCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
    1935           2 :                         if err != nil {
    1936           0 :                                 return nil, err
    1937           0 :                         }
    1938           2 :                         if leftFile.Size == 0 {
    1939           2 :                                 // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
    1940           2 :                                 // such as if the excised file only has range keys/dels and no point
    1941           2 :                                 // keys. This can cause panics in places where we divide by file sizes.
    1942           2 :                                 // Correct for it here.
    1943           2 :                                 leftFile.Size = 1
    1944           2 :                         }
    1945           2 :                         if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    1946           0 :                                 return nil, err
    1947           0 :                         }
    1948           2 :                         leftFile.ValidateVirtual(m)
    1949           2 :                         ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
    1950           2 :                         needsBacking = true
    1951           2 :                         numCreatedFiles++
    1952             :                 }
    1953             :         }
    1954             :         // Create a file to the right, if necessary.
    1955           2 :         if exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
    1956           2 :                 // No key exists to the right of the excise span in this file.
    1957           2 :                 if needsBacking && !m.Virtual {
    1958           2 :                         // If m is virtual, then its file backing is already known to the manifest.
    1959           2 :                         // We don't need to create another file backing. Note that there must be
    1960           2 :                         // only one CreatedBackingTables entry per backing sstable. This is
    1961           2 :                         // indicated by the VersionEdit.CreatedBackingTables invariant.
    1962           2 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    1963           2 :                 }
    1964           2 :                 return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
    1965             :         }
    1966             :         // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
    1967             :         //
    1968             :         // See comment before the definition of leftFile for the motivation behind
    1969             :         // calculating tight user-key bounds.
    1970           2 :         rightFile := &fileMetadata{
    1971           2 :                 Virtual:     true,
    1972           2 :                 FileBacking: m.FileBacking,
    1973           2 :                 FileNum:     d.mu.versions.getNextFileNum(),
    1974           2 :                 // Note that these are loose bounds for smallest/largest seqnums, but they're
    1975           2 :                 // sufficient for maintaining correctness.
    1976           2 :                 SmallestSeqNum:           m.SmallestSeqNum,
    1977           2 :                 LargestSeqNum:            m.LargestSeqNum,
    1978           2 :                 LargestSeqNumAbsolute:    m.LargestSeqNumAbsolute,
    1979           2 :                 SyntheticPrefixAndSuffix: m.SyntheticPrefixAndSuffix,
    1980           2 :         }
    1981           2 :         if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
    1982           2 :                 // This file will probably contain point keys
    1983           2 :                 if err := loadItersIfNecessary(); err != nil {
    1984           0 :                         return nil, err
    1985           0 :                 }
    1986           2 :                 largestPointKey := m.LargestPointKey
    1987           2 :                 if kv := iters.Point().SeekGE(exciseSpan.End.Key, base.SeekGEFlagsNone); kv != nil {
    1988           2 :                         if exciseSpan.End.Kind == base.Inclusive && d.equal(exciseSpan.End.Key, kv.K.UserKey) {
    1989           0 :                                 return nil, base.AssertionFailedf("cannot excise with an inclusive end key and data overlap at end key")
    1990           0 :                         }
    1991           2 :                         rightFile.ExtendPointKeyBounds(d.cmp, kv.K.Clone(), largestPointKey)
    1992             :                 }
    1993             :                 // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
    1994             :                 // needs to be a copy if the key is owned by the range del iter.
    1995           2 :                 var firstRangeDel []byte
    1996           2 :                 rdel, err := iters.RangeDeletion().SeekGE(exciseSpan.End.Key)
    1997           2 :                 if err != nil {
    1998           0 :                         return nil, err
    1999           2 :                 } else if rdel != nil {
    2000           2 :                         firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
    2001           2 :                         if d.cmp(firstRangeDel, exciseSpan.End.Key) < 0 {
    2002           2 :                                 // NB: This can only be done if the end bound is exclusive.
    2003           2 :                                 if exciseSpan.End.Kind != base.Exclusive {
    2004           0 :                                         return nil, base.AssertionFailedf("cannot truncate rangedel during excise with an inclusive upper bound")
    2005           0 :                                 }
    2006           2 :                                 firstRangeDel = exciseSpan.End.Key
    2007             :                         }
    2008             :                 }
    2009           2 :                 if firstRangeDel != nil {
    2010           2 :                         smallestPointKey := rdel.SmallestKey()
    2011           2 :                         smallestPointKey.UserKey = firstRangeDel
    2012           2 :                         rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
    2013           2 :                 }
    2014             :         }
    2015           2 :         if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestRangeKey) {
    2016           2 :                 // This file will probably contain range keys.
    2017           2 :                 if err := loadItersIfNecessary(); err != nil {
    2018           0 :                         return nil, err
    2019           0 :                 }
    2020           2 :                 largestRangeKey := m.LargestRangeKey
    2021           2 :                 // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
    2022           2 :                 // needs to be a copy if the key is owned by the range key iter.
    2023           2 :                 var firstRangeKey []byte
    2024           2 :                 rkey, err := iters.RangeKey().SeekGE(exciseSpan.End.Key)
    2025           2 :                 if err != nil {
    2026           0 :                         return nil, err
    2027           2 :                 } else if rkey != nil {
    2028           2 :                         firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
    2029           2 :                         if d.cmp(firstRangeKey, exciseSpan.End.Key) < 0 {
    2030           2 :                                 if exciseSpan.End.Kind != base.Exclusive {
    2031           0 :                                         return nil, base.AssertionFailedf("cannot truncate range key during excise with an inclusive upper bound")
    2032           0 :                                 }
    2033           2 :                                 firstRangeKey = exciseSpan.End.Key
    2034             :                         }
    2035             :                 }
    2036           2 :                 if firstRangeKey != nil {
    2037           2 :                         smallestRangeKey := rkey.SmallestKey()
    2038           2 :                         smallestRangeKey.UserKey = firstRangeKey
    2039           2 :                         // We call ExtendRangeKeyBounds so any internal boundType fields are
    2040           2 :                         // set correctly. Note that this is mildly wasteful as we'll be comparing
    2041           2 :                         // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
    2042           2 :                         // avoided if we exported ExtendOverallKeyBounds or so.
    2043           2 :                         rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
    2044           2 :                 }
    2045             :         }
    2046           2 :         if rightFile.HasRangeKeys || rightFile.HasPointKeys {
    2047           2 :                 var err error
    2048           2 :                 rightFile.Size, err = d.fileCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
    2049           2 :                 if err != nil {
    2050           0 :                         return nil, err
    2051           0 :                 }
    2052           2 :                 if rightFile.Size == 0 {
    2053           2 :                         // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
    2054           2 :                         // such as if the excised file only has range keys/dels and no point keys.
    2055           2 :                         // This can cause panics in places where we divide by file sizes. Correct
    2056           2 :                         // for it here.
    2057           2 :                         rightFile.Size = 1
    2058           2 :                 }
    2059           2 :                 if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    2060           0 :                         return nil, err
    2061           0 :                 }
    2062           2 :                 rightFile.ValidateVirtual(m)
    2063           2 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
    2064           2 :                 needsBacking = true
    2065           2 :                 numCreatedFiles++
    2066             :         }
    2067             : 
    2068           2 :         if needsBacking && !m.Virtual {
    2069           2 :                 // If m is virtual, then its file backing is already known to the manifest.
    2070           2 :                 // We don't need to create another file backing. Note that there must be
    2071           2 :                 // only one CreatedBackingTables entry per backing sstable. This is
    2072           2 :                 // indicated by the VersionEdit.CreatedBackingTables invariant.
    2073           2 :                 ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    2074           2 :         }
    2075             : 
    2076           2 :         return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
    2077             : }
    2078             : 
    2079             : type ingestSplitFile struct {
    2080             :         // ingestFile is the file being ingested.
    2081             :         ingestFile *fileMetadata
    2082             :         // splitFile is the file that needs to be split to allow ingestFile to slot
    2083             :         // into `level` level.
    2084             :         splitFile *fileMetadata
    2085             :         // The level where ingestFile will go (and where splitFile already is).
    2086             :         level int
    2087             : }
    2088             : 
    2089             : // ingestSplit splits files specified in `files` and updates ve in-place to
    2090             : // account for existing files getting split into two virtual sstables. The map
    2091             : // `replacedFiles` contains an in-progress map of all files that have been
    2092             : // replaced with new virtual sstables in this version edit so far, which is also
    2093             : // updated in-place.
    2094             : //
    2095             : // d.mu as well as the manifest lock must be held when calling this method.
    2096             : func (d *DB) ingestSplit(
    2097             :         ctx context.Context,
    2098             :         ve *versionEdit,
    2099             :         updateMetrics func(*fileMetadata, int, []newFileEntry),
    2100             :         files []ingestSplitFile,
    2101             :         replacedFiles map[base.FileNum][]newFileEntry,
    2102           2 : ) error {
    2103           2 :         for _, s := range files {
    2104           2 :                 ingestFileBounds := s.ingestFile.UserKeyBounds()
    2105           2 :                 // replacedFiles can be thought of as a tree, where we start iterating with
    2106           2 :                 // s.splitFile and run its fileNum through replacedFiles, then find which of
    2107           2 :                 // the replaced files overlaps with s.ingestFile, which becomes the new
    2108           2 :                 // splitFile, then we check splitFile's replacements in replacedFiles again
    2109           2 :                 // for overlap with s.ingestFile, and so on until we either can't find the
    2110           2 :                 // current splitFile in replacedFiles (i.e. that's the file that now needs to
    2111           2 :                 // be split), or we don't find a file that overlaps with s.ingestFile, which
    2112           2 :                 // means a prior ingest split already produced enough room for s.ingestFile
    2113           2 :                 // to go into this level without necessitating another ingest split.
    2114           2 :                 splitFile := s.splitFile
    2115           2 :                 for splitFile != nil {
    2116           2 :                         replaced, ok := replacedFiles[splitFile.FileNum]
    2117           2 :                         if !ok {
    2118           2 :                                 break
    2119             :                         }
    2120           2 :                         updatedSplitFile := false
    2121           2 :                         for i := range replaced {
    2122           2 :                                 if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
    2123           2 :                                         if updatedSplitFile {
    2124           0 :                                                 // This should never happen because the earlier ingestTargetLevel
    2125           0 :                                                 // function only finds split file candidates that are guaranteed to
    2126           0 :                                                 // have no data overlap, only boundary overlap. See the comments
    2127           0 :                                                 // in that method to see the definitions of data vs boundary
    2128           0 :                                                 // overlap. That, plus the fact that files in `replaced` are
    2129           0 :                                                 // guaranteed to have file bounds that are tight on user keys
    2130           0 :                                                 // (as that's what `d.excise` produces), means that the only case
    2131           0 :                                                 // where we overlap with two or more files in `replaced` is if we
    2132           0 :                                                 // actually had data overlap all along, or if the ingestion files
    2133           0 :                                                 // were overlapping, either of which is an invariant violation.
    2134           0 :                                                 panic("updated with two files in ingestSplit")
    2135             :                                         }
    2136           2 :                                         splitFile = replaced[i].Meta
    2137           2 :                                         updatedSplitFile = true
    2138             :                                 }
    2139             :                         }
    2140           2 :                         if !updatedSplitFile {
    2141           2 :                                 // None of the replaced files overlapped with the file being ingested.
    2142           2 :                                 // This can happen if we've already excised a span overlapping with
    2143           2 :                                 // this file, or if we have consecutive ingested files that can slide
    2144           2 :                                 // within the same gap between keys in an existing file. For instance,
    2145           2 :                                 // if an existing file has keys a and g and we're ingesting b-c, d-e,
    2146           2 :                                 // the first loop iteration will split the existing file into one that
    2147           2 :                                 // ends in a and another that starts at g, and the second iteration will
    2148           2 :                                 // fall into this case and require no splitting.
    2149           2 :                                 //
    2150           2 :                                 // No splitting necessary.
    2151           2 :                                 splitFile = nil
    2152           2 :                         }
    2153             :                 }
    2154           2 :                 if splitFile == nil {
    2155           2 :                         continue
    2156             :                 }
    2157             :                 // NB: excise operates on [start, end). We're splitting at [start, end]
    2158             :                 // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
    2159             :                 // of exclusive vs inclusive end bounds should not make a difference here
    2160             :                 // as we're guaranteed to not have any data overlap between splitFile and
    2161             :                 // s.ingestFile. d.excise will return an error if we pass an inclusive user
    2162             :                 // key bound _and_ we end up seeing data overlap at the end key.
    2163           2 :                 added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
    2164           2 :                 if err != nil {
    2165           0 :                         return err
    2166           0 :                 }
    2167           2 :                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    2168           2 :                         Level:   s.level,
    2169           2 :                         FileNum: splitFile.FileNum,
    2170           2 :                 }]; !ok {
    2171           0 :                         panic("did not split file that was expected to be split")
    2172             :                 }
    2173           2 :                 replacedFiles[splitFile.FileNum] = added
    2174           2 :                 for i := range added {
    2175           2 :                         addedBounds := added[i].Meta.UserKeyBounds()
    2176           2 :                         if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
    2177           0 :                                 panic("ingest-time split produced a file that overlaps with ingested file")
    2178             :                         }
    2179             :                 }
    2180           2 :                 updateMetrics(splitFile, s.level, added)
    2181             :         }
    2182             :         // Flatten the version edit by removing any entries from ve.NewFiles that
    2183             :         // are also in ve.DeletedFiles.
    2184           2 :         newNewFiles := ve.NewFiles[:0]
    2185           2 :         for i := range ve.NewFiles {
    2186           2 :                 fn := ve.NewFiles[i].Meta.FileNum
    2187           2 :                 deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
    2188           2 :                 if _, ok := ve.DeletedFiles[deEntry]; ok {
    2189           2 :                         delete(ve.DeletedFiles, deEntry)
    2190           2 :                 } else {
    2191           2 :                         newNewFiles = append(newNewFiles, ve.NewFiles[i])
    2192           2 :                 }
    2193             :         }
    2194           2 :         ve.NewFiles = newNewFiles
    2195           2 :         return nil
    2196             : }
    2197             : 
    2198             : func (d *DB) ingestApply(
    2199             :         ctx context.Context,
    2200             :         jobID JobID,
    2201             :         lr ingestLoadResult,
    2202             :         mut *memTable,
    2203             :         exciseSpan KeyRange,
    2204             :         exciseSeqNum base.SeqNum,
    2205           2 : ) (*versionEdit, error) {
    2206           2 :         d.mu.Lock()
    2207           2 :         defer d.mu.Unlock()
    2208           2 : 
    2209           2 :         ve := &versionEdit{
    2210           2 :                 NewFiles: make([]newFileEntry, lr.fileCount()),
    2211           2 :         }
    2212           2 :         if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
    2213           2 :                 ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
    2214           2 :         }
    2215           2 :         metrics := make(map[int]*LevelMetrics)
    2216           2 : 
    2217           2 :         // Lock the manifest for writing before we use the current version to
    2218           2 :         // determine the target level. This prevents two concurrent ingestion jobs
    2219           2 :         // from using the same version to determine the target level, and also
    2220           2 :         // provides serialization with concurrent compaction and flush jobs.
    2221           2 :         // logAndApply unconditionally releases the manifest lock, but any earlier
    2222           2 :         // returns must unlock the manifest.
    2223           2 :         d.mu.versions.logLock()
    2224           2 : 
    2225           2 :         if mut != nil {
    2226           2 :                 // Unref the mutable memtable to allows its flush to proceed. Now that we've
    2227           2 :                 // acquired the manifest lock, we can be certain that if the mutable
    2228           2 :                 // memtable has received more recent conflicting writes, the flush won't
    2229           2 :                 // beat us to applying to the manifest resulting in sequence number
    2230           2 :                 // inversion. Even though we call maybeScheduleFlush right now, this flush
    2231           2 :                 // will apply after our ingestion.
    2232           2 :                 if mut.writerUnref() {
    2233           2 :                         d.maybeScheduleFlush()
    2234           2 :                 }
    2235             :         }
    2236             : 
    2237           2 :         current := d.mu.versions.currentVersion()
    2238           2 :         overlapChecker := &overlapChecker{
    2239           2 :                 comparer: d.opts.Comparer,
    2240           2 :                 newIters: d.newIters,
    2241           2 :                 opts: IterOptions{
    2242           2 :                         logger:   d.opts.Logger,
    2243           2 :                         Category: categoryIngest,
    2244           2 :                 },
    2245           2 :                 v: current,
    2246           2 :         }
    2247           2 :         shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
    2248           2 :                 d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
    2249           2 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    2250           2 :         // filesToSplit is a list where each element is a pair consisting of a file
    2251           2 :         // being ingested and a file being split to make room for an ingestion into
    2252           2 :         // that level. Each ingested file will appear at most once in this list. It
    2253           2 :         // is possible for split files to appear twice in this list.
    2254           2 :         filesToSplit := make([]ingestSplitFile, 0)
    2255           2 :         checkCompactions := false
    2256           2 :         for i := 0; i < lr.fileCount(); i++ {
    2257           2 :                 // Determine the lowest level in the LSM for which the sstable doesn't
    2258           2 :                 // overlap any existing files in the level.
    2259           2 :                 var m *fileMetadata
    2260           2 :                 specifiedLevel := -1
    2261           2 :                 isShared := false
    2262           2 :                 isExternal := false
    2263           2 :                 if i < len(lr.local) {
    2264           2 :                         // local file.
    2265           2 :                         m = lr.local[i].fileMetadata
    2266           2 :                 } else if (i - len(lr.local)) < len(lr.shared) {
    2267           2 :                         // shared file.
    2268           2 :                         isShared = true
    2269           2 :                         sharedIdx := i - len(lr.local)
    2270           2 :                         m = lr.shared[sharedIdx].fileMetadata
    2271           2 :                         specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
    2272           2 :                 } else {
    2273           2 :                         // external file.
    2274           2 :                         isExternal = true
    2275           2 :                         externalIdx := i - (len(lr.local) + len(lr.shared))
    2276           2 :                         m = lr.external[externalIdx].fileMetadata
    2277           2 :                         if lr.externalFilesHaveLevel {
    2278           1 :                                 specifiedLevel = int(lr.external[externalIdx].external.Level)
    2279           1 :                         }
    2280             :                 }
    2281             : 
    2282             :                 // Add to CreatedBackingTables if this is a new backing.
    2283             :                 //
    2284             :                 // Shared files always have a new backing. External files have new backings
    2285             :                 // iff the backing disk file num and the file num match (see ingestAttachRemote).
    2286           2 :                 if isShared || (isExternal && m.FileBacking.DiskFileNum == base.DiskFileNum(m.FileNum)) {
    2287           2 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    2288           2 :                 }
    2289             : 
    2290           2 :                 f := &ve.NewFiles[i]
    2291           2 :                 var err error
    2292           2 :                 if specifiedLevel != -1 {
    2293           2 :                         f.Level = specifiedLevel
    2294           2 :                 } else {
    2295           2 :                         var splitFile *fileMetadata
    2296           2 :                         if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
    2297           2 :                                 // This file fits perfectly within the excise span. We can slot it at
    2298           2 :                                 // L6, or sharedLevelsStart - 1 if we have shared files.
    2299           2 :                                 if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
    2300           2 :                                         f.Level = sharedLevelsStart - 1
    2301           2 :                                         if baseLevel > f.Level {
    2302           2 :                                                 f.Level = 0
    2303           2 :                                         }
    2304           2 :                                 } else {
    2305           2 :                                         f.Level = 6
    2306           2 :                                 }
    2307           2 :                         } else {
    2308           2 :                                 // We check overlap against the LSM without holding DB.mu. Note that we
    2309           2 :                                 // are still holding the log lock, so the version cannot change.
    2310           2 :                                 // TODO(radu): perform this check optimistically outside of the log lock.
    2311           2 :                                 var lsmOverlap overlap.WithLSM
    2312           2 :                                 lsmOverlap, err = func() (overlap.WithLSM, error) {
    2313           2 :                                         d.mu.Unlock()
    2314           2 :                                         defer d.mu.Lock()
    2315           2 :                                         return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
    2316           2 :                                 }()
    2317           2 :                                 if err == nil {
    2318           2 :                                         f.Level, splitFile, err = ingestTargetLevel(
    2319           2 :                                                 ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
    2320           2 :                                         )
    2321           2 :                                 }
    2322             :                         }
    2323             : 
    2324           2 :                         if splitFile != nil {
    2325           2 :                                 if invariants.Enabled {
    2326           2 :                                         if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf.Empty() {
    2327           0 :                                                 panic("splitFile returned is not in level it should be")
    2328             :                                         }
    2329             :                                 }
    2330             :                                 // We take advantage of the fact that we won't drop the db mutex
    2331             :                                 // between now and the call to logAndApply. So, no files should
    2332             :                                 // get added to a new in-progress compaction at this point. We can
    2333             :                                 // avoid having to iterate on in-progress compactions to cancel them
    2334             :                                 // if none of the files being split have a compacting state.
    2335           2 :                                 if splitFile.IsCompacting() {
    2336           1 :                                         checkCompactions = true
    2337           1 :                                 }
    2338           2 :                                 filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
    2339             :                         }
    2340             :                 }
    2341           2 :                 if err != nil {
    2342           0 :                         d.mu.versions.logUnlock()
    2343           0 :                         return nil, err
    2344           0 :                 }
    2345           2 :                 if isShared && f.Level < sharedLevelsStart {
    2346           0 :                         panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
    2347           0 :                                 f.Level, sharedLevelsStart))
    2348             :                 }
    2349           2 :                 f.Meta = m
    2350           2 :                 levelMetrics := metrics[f.Level]
    2351           2 :                 if levelMetrics == nil {
    2352           2 :                         levelMetrics = &LevelMetrics{}
    2353           2 :                         metrics[f.Level] = levelMetrics
    2354           2 :                 }
    2355           2 :                 levelMetrics.NumFiles++
    2356           2 :                 levelMetrics.Size += int64(m.Size)
    2357           2 :                 levelMetrics.BytesIngested += m.Size
    2358           2 :                 levelMetrics.TablesIngested++
    2359             :         }
    2360             :         // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
    2361             :         // by ingestTargetLevel), to files that were created to replace it. This map
    2362             :         // is used to resolve references to split files in filesToSplit, as it is
    2363             :         // possible for a file that we want to split to no longer exist or have a
    2364             :         // newer fileMetadata due to a split induced by another ingestion file, or an
    2365             :         // excise.
    2366           2 :         replacedFiles := make(map[base.FileNum][]newFileEntry)
    2367           2 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    2368           2 :                 levelMetrics := metrics[level]
    2369           2 :                 if levelMetrics == nil {
    2370           2 :                         levelMetrics = &LevelMetrics{}
    2371           2 :                         metrics[level] = levelMetrics
    2372           2 :                 }
    2373           2 :                 levelMetrics.NumFiles--
    2374           2 :                 levelMetrics.Size -= int64(m.Size)
    2375           2 :                 for i := range added {
    2376           2 :                         levelMetrics.NumFiles++
    2377           2 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    2378           2 :                 }
    2379             :         }
    2380           2 :         if exciseSpan.Valid() {
    2381           2 :                 // Iterate through all levels and find files that intersect with exciseSpan.
    2382           2 :                 //
    2383           2 :                 // TODO(bilal): We could drop the DB mutex here as we don't need it for
    2384           2 :                 // excises; we only need to hold the version lock which we already are
    2385           2 :                 // holding. However releasing the DB mutex could mess with the
    2386           2 :                 // ingestTargetLevel calculation that happened above, as it assumed that it
    2387           2 :                 // had a complete view of in-progress compactions that wouldn't change
    2388           2 :                 // until logAndApply is called. If we were to drop the mutex now, we could
    2389           2 :                 // schedule another in-progress compaction that would go into the chosen target
    2390           2 :                 // level and lead to file overlap within level (which would panic in
    2391           2 :                 // logAndApply). We should drop the db mutex here, do the excise, then
    2392           2 :                 // re-grab the DB mutex and rerun just the in-progress compaction check to
    2393           2 :                 // see if any new compactions are conflicting with our chosen target levels
    2394           2 :                 // for files, and if they are, we should signal those compactions to error
    2395           2 :                 // out.
    2396           2 :                 for level := range current.Levels {
    2397           2 :                         overlaps := current.Overlaps(level, exciseSpan.UserKeyBounds())
    2398           2 :                         iter := overlaps.Iter()
    2399           2 : 
    2400           2 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2401           2 :                                 newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level)
    2402           2 :                                 if err != nil {
    2403           0 :                                         return nil, err
    2404           0 :                                 }
    2405             : 
    2406           2 :                                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    2407           2 :                                         Level:   level,
    2408           2 :                                         FileNum: m.FileNum,
    2409           2 :                                 }]; !ok {
    2410           2 :                                         // We did not excise this file.
    2411           2 :                                         continue
    2412             :                                 }
    2413           2 :                                 replacedFiles[m.FileNum] = newFiles
    2414           2 :                                 updateLevelMetricsOnExcise(m, level, newFiles)
    2415             :                         }
    2416             :                 }
    2417             :         }
    2418           2 :         if len(filesToSplit) > 0 {
    2419           2 :                 // For the same reasons as the above call to excise, we hold the db mutex
    2420           2 :                 // while calling this method.
    2421           2 :                 if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
    2422           0 :                         return nil, err
    2423           0 :                 }
    2424             :         }
    2425           2 :         if len(filesToSplit) > 0 || exciseSpan.Valid() {
    2426           2 :                 for c := range d.mu.compact.inProgress {
    2427           2 :                         if c.versionEditApplied {
    2428           1 :                                 continue
    2429             :                         }
    2430             :                         // Check if this compaction overlaps with the excise span. Note that just
    2431             :                         // checking if the inputs individually overlap with the excise span
    2432             :                         // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    2433             :                         // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    2434             :                         // doing a [c,d) excise at the same time as this compaction, we will have
    2435             :                         // to error out the whole compaction as we can't guarantee it hasn't/won't
    2436             :                         // write a file overlapping with the excise span.
    2437           2 :                         if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
    2438           2 :                                 c.cancel.Store(true)
    2439           2 :                         }
    2440             :                         // Check if this compaction's inputs have been replaced due to an
    2441             :                         // ingest-time split. In that case, cancel the compaction as a newly picked
    2442             :                         // compaction would need to include any new files that slid in between
    2443             :                         // previously-existing files. Note that we cancel any compaction that has a
    2444             :                         // file that was ingest-split as an input, even if it started before this
    2445             :                         // ingestion.
    2446           2 :                         if checkCompactions {
    2447           1 :                                 for i := range c.inputs {
    2448           1 :                                         iter := c.inputs[i].files.Iter()
    2449           1 :                                         for f := iter.First(); f != nil; f = iter.Next() {
    2450           1 :                                                 if _, ok := replacedFiles[f.FileNum]; ok {
    2451           1 :                                                         c.cancel.Store(true)
    2452           1 :                                                         break
    2453             :                                                 }
    2454             :                                         }
    2455             :                                 }
    2456             :                         }
    2457             :                 }
    2458             :         }
    2459             : 
    2460           2 :         if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
    2461           2 :                 return d.getInProgressCompactionInfoLocked(nil)
    2462           2 :         }); err != nil {
    2463           1 :                 // Note: any error during logAndApply is fatal; this won't be reachable in production.
    2464           1 :                 return nil, err
    2465           1 :         }
    2466             : 
    2467             :         // Check for any EventuallyFileOnlySnapshots that could be watching for
    2468             :         // an excise on this span. There should be none as the
    2469             :         // computePossibleOverlaps steps should have forced these EFOS to transition
    2470             :         // to file-only snapshots by now. If we see any that conflict with this
    2471             :         // excise, panic.
    2472           2 :         if exciseSpan.Valid() {
    2473           2 :                 for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
    2474           1 :                         // Skip non-EFOS snapshots, and also skip any EFOS that were created
    2475           1 :                         // *after* the excise.
    2476           1 :                         if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
    2477           0 :                                 continue
    2478             :                         }
    2479           1 :                         efos := s.efos
    2480           1 :                         // TODO(bilal): We can make this faster by taking advantage of the sorted
    2481           1 :                         // nature of protectedRanges to do a sort.Search, or even maintaining a
    2482           1 :                         // global list of all protected ranges instead of having to peer into every
    2483           1 :                         // snapshot.
    2484           1 :                         for i := range efos.protectedRanges {
    2485           1 :                                 if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
    2486           0 :                                         panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
    2487             :                                 }
    2488             :                         }
    2489             :                 }
    2490             :         }
    2491             : 
    2492           2 :         d.mu.versions.metrics.Ingest.Count++
    2493           2 : 
    2494           2 :         d.updateReadStateLocked(d.opts.DebugCheck)
    2495           2 :         // updateReadStateLocked could have generated obsolete tables, schedule a
    2496           2 :         // cleanup job if necessary.
    2497           2 :         d.deleteObsoleteFiles(jobID)
    2498           2 :         d.updateTableStatsLocked(ve.NewFiles)
    2499           2 :         // The ingestion may have pushed a level over the threshold for compaction,
    2500           2 :         // so check to see if one is necessary and schedule it.
    2501           2 :         d.maybeScheduleCompaction()
    2502           2 :         var toValidate []manifest.NewFileEntry
    2503           2 :         dedup := make(map[base.DiskFileNum]struct{})
    2504           2 :         for _, entry := range ve.NewFiles {
    2505           2 :                 if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
    2506           2 :                         toValidate = append(toValidate, entry)
    2507           2 :                         dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
    2508           2 :                 }
    2509             :         }
    2510           2 :         d.maybeValidateSSTablesLocked(toValidate)
    2511           2 :         return ve, nil
    2512             : }
    2513             : 
    2514             : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
    2515             : // queue of files to be validated, when the feature is enabled.
    2516             : //
    2517             : // Note that if two entries with the same backing file are added twice, then the
    2518             : // block checksums for the backing file will be validated twice.
    2519             : //
    2520             : // DB.mu must be locked when calling.
    2521           2 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
    2522           2 :         // Only add to the validation queue when the feature is enabled.
    2523           2 :         if !d.opts.Experimental.ValidateOnIngest {
    2524           2 :                 return
    2525           2 :         }
    2526             : 
    2527           2 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
    2528           2 :         if d.shouldValidateSSTablesLocked() {
    2529           2 :                 go d.validateSSTables()
    2530           2 :         }
    2531             : }
    2532             : 
    2533             : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
    2534             : // DB.mu must be locked when calling.
    2535           2 : func (d *DB) shouldValidateSSTablesLocked() bool {
    2536           2 :         return !d.mu.tableValidation.validating &&
    2537           2 :                 d.closed.Load() == nil &&
    2538           2 :                 d.opts.Experimental.ValidateOnIngest &&
    2539           2 :                 len(d.mu.tableValidation.pending) > 0
    2540           2 : }
    2541             : 
    2542             : // validateSSTables runs a round of validation on the tables in the pending
    2543             : // queue.
    2544           2 : func (d *DB) validateSSTables() {
    2545           2 :         d.mu.Lock()
    2546           2 :         if !d.shouldValidateSSTablesLocked() {
    2547           2 :                 d.mu.Unlock()
    2548           2 :                 return
    2549           2 :         }
    2550             : 
    2551           2 :         pending := d.mu.tableValidation.pending
    2552           2 :         d.mu.tableValidation.pending = nil
    2553           2 :         d.mu.tableValidation.validating = true
    2554           2 :         jobID := d.newJobIDLocked()
    2555           2 :         rs := d.loadReadState()
    2556           2 : 
    2557           2 :         // Drop DB.mu before performing IO.
    2558           2 :         d.mu.Unlock()
    2559           2 : 
    2560           2 :         // Validate all tables in the pending queue. This could lead to a situation
    2561           2 :         // where we are starving IO from other tasks due to having to page through
    2562           2 :         // all the blocks in all the sstables in the queue.
    2563           2 :         // TODO(travers): Add some form of pacing to avoid IO starvation.
    2564           2 : 
    2565           2 :         // If we fail to validate any files due to reasons other than uncovered
    2566           2 :         // corruption, accumulate them and re-queue them for another attempt.
    2567           2 :         var retry []manifest.NewFileEntry
    2568           2 : 
    2569           2 :         for _, f := range pending {
    2570           2 :                 // The file may have been moved or deleted since it was ingested, in
    2571           2 :                 // which case we skip.
    2572           2 :                 if !rs.current.Contains(f.Level, f.Meta) {
    2573           2 :                         // Assume the file was moved to a lower level. It is rare enough
    2574           2 :                         // that a table is moved or deleted between the time it was ingested
    2575           2 :                         // and the time the validation routine runs that the overall cost of
    2576           2 :                         // this inner loop is tolerably low, when amortized over all
    2577           2 :                         // ingested tables.
    2578           2 :                         found := false
    2579           2 :                         for i := f.Level + 1; i < numLevels; i++ {
    2580           2 :                                 if rs.current.Contains(i, f.Meta) {
    2581           1 :                                         found = true
    2582           1 :                                         break
    2583             :                                 }
    2584             :                         }
    2585           2 :                         if !found {
    2586           2 :                                 continue
    2587             :                         }
    2588             :                 }
    2589             : 
    2590           2 :                 var err error
    2591           2 :                 if f.Meta.Virtual {
    2592           2 :                         err = d.fileCache.withVirtualReader(
    2593           2 :                                 f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
    2594           2 :                                         return v.ValidateBlockChecksumsOnBacking()
    2595           2 :                                 })
    2596           2 :                 } else {
    2597           2 :                         err = d.fileCache.withReader(
    2598           2 :                                 f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
    2599           2 :                                         return r.ValidateBlockChecksums()
    2600           2 :                                 })
    2601             :                 }
    2602             : 
    2603           2 :                 if err != nil {
    2604           1 :                         if IsCorruptionError(err) {
    2605           1 :                                 // TODO(travers): Hook into the corruption reporting pipeline, once
    2606           1 :                                 // available. See pebble#1192.
    2607           1 :                                 d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
    2608           1 :                         } else {
    2609           1 :                                 // If there was some other, possibly transient, error that
    2610           1 :                                 // caused table validation to fail inform the EventListener and
    2611           1 :                                 // move on. We remember the table so that we can retry it in a
    2612           1 :                                 // subsequent table validation job.
    2613           1 :                                 //
    2614           1 :                                 // TODO(jackson): If the error is not transient, this will retry
    2615           1 :                                 // validation indefinitely. While not great, it's the same
    2616           1 :                                 // behavior as erroring flushes and compactions. We should
    2617           1 :                                 // address this as a part of #270.
    2618           1 :                                 d.opts.EventListener.BackgroundError(err)
    2619           1 :                                 retry = append(retry, f)
    2620           1 :                                 continue
    2621             :                         }
    2622             :                 }
    2623             : 
    2624           2 :                 d.opts.EventListener.TableValidated(TableValidatedInfo{
    2625           2 :                         JobID: int(jobID),
    2626           2 :                         Meta:  f.Meta,
    2627           2 :                 })
    2628             :         }
    2629           2 :         rs.unref()
    2630           2 :         d.mu.Lock()
    2631           2 :         defer d.mu.Unlock()
    2632           2 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
    2633           2 :         d.mu.tableValidation.validating = false
    2634           2 :         d.mu.tableValidation.cond.Broadcast()
    2635           2 :         if d.shouldValidateSSTablesLocked() {
    2636           2 :                 go d.validateSSTables()
    2637           2 :         }
    2638             : }

Generated by: LCOV version 1.14