LCOV - code coverage report
Current view: top level - pebble - ingest.go (source / functions) Hit Total Coverage
Test: 2024-02-11 08:15Z 3e083df5 - tests only.lcov Lines: 1534 1781 86.1 %
Date: 2024-02-11 08:16:03 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "slices"
      10             :         "sort"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/keyspan"
      17             :         "github.com/cockroachdb/pebble/internal/manifest"
      18             :         "github.com/cockroachdb/pebble/internal/private"
      19             :         "github.com/cockroachdb/pebble/objstorage"
      20             :         "github.com/cockroachdb/pebble/objstorage/remote"
      21             :         "github.com/cockroachdb/pebble/sstable"
      22             : )
      23             : 
      24           1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
      25           1 :         c := userCmp(a.UserKey, b.UserKey)
      26           1 :         if c != 0 {
      27           1 :                 return c
      28           1 :         }
      29           1 :         if a.IsExclusiveSentinel() {
      30           1 :                 if !b.IsExclusiveSentinel() {
      31           1 :                         return -1
      32           1 :                 }
      33           1 :         } else if b.IsExclusiveSentinel() {
      34           1 :                 return +1
      35           1 :         }
      36           1 :         return 0
      37             : }
      38             : 
      39             : // KeyRange encodes a key range in user key space. A KeyRange's Start is
      40             : // inclusive while its End is exclusive.
      41             : type KeyRange struct {
      42             :         Start, End []byte
      43             : }
      44             : 
      45             : // Valid returns true if the KeyRange is defined.
      46           1 : func (k *KeyRange) Valid() bool {
      47           1 :         return k.Start != nil && k.End != nil
      48           1 : }
      49             : 
      50             : // Contains returns whether the specified key exists in the KeyRange.
      51           1 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
      52           1 :         v := cmp(key.UserKey, k.End)
      53           1 :         return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
      54           1 : }
      55             : 
      56             : // InternalKeyBounds returns the key range as internal key bounds, with the end
      57             : // boundary represented as an exclusive range delete sentinel key.
      58           1 : func (k KeyRange) InternalKeyBounds() (InternalKey, InternalKey) {
      59           1 :         return base.MakeInternalKey(k.Start, InternalKeySeqNumMax, InternalKeyKindMax),
      60           1 :                 base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, k.End)
      61           1 : }
      62             : 
      63             : // OverlapsInternalKeyRange checks if the specified internal key range has an
      64             : // overlap with the KeyRange. Note that we aren't checking for full containment
      65             : // of smallest-largest within k, rather just that there's some intersection
      66             : // between the two ranges.
      67           1 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
      68           1 :         v := cmp(k.Start, largest.UserKey)
      69           1 :         return v <= 0 && !(largest.IsExclusiveSentinel() && v == 0) &&
      70           1 :                 cmp(k.End, smallest.UserKey) > 0
      71           1 : }
      72             : 
      73             : // Overlaps checks if the specified file has an overlap with the KeyRange.
      74             : // Note that we aren't checking for full containment of m within k, rather just
      75             : // that there's some intersection between m and k's bounds.
      76           1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
      77           1 :         return k.OverlapsInternalKeyRange(cmp, m.Smallest, m.Largest)
      78           1 : }
      79             : 
      80             : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
      81             : // Note that we aren't checking for full containment of either span in the other,
      82             : // just that there's a key x that is in both key ranges.
      83           1 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
      84           1 :         return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
      85           1 : }
      86             : 
      87           1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
      88           1 :         if key.Kind() == InternalKeyKindInvalid {
      89           1 :                 return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
      90           1 :                         key.Pretty(opts.Comparer.FormatKey))
      91           1 :         }
      92           1 :         if key.SeqNum() != 0 {
      93           1 :                 return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
      94           1 :                         key.Pretty(opts.Comparer.FormatKey))
      95           1 :         }
      96           1 :         return nil
      97             : }
      98             : 
      99             : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
     100             : // or shared by another node.
     101             : func ingestSynthesizeShared(
     102             :         opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
     103           1 : ) (*fileMetadata, error) {
     104           1 :         if sm.Size == 0 {
     105           0 :                 // Disallow 0 file sizes
     106           0 :                 return nil, errors.New("pebble: cannot ingest shared file with size 0")
     107           0 :         }
     108             :         // Don't load table stats. Doing a round trip to shared storage, one SST
     109             :         // at a time is not worth it as it slows down ingestion.
     110           1 :         meta := &fileMetadata{
     111           1 :                 FileNum:      fileNum,
     112           1 :                 CreationTime: time.Now().Unix(),
     113           1 :                 Virtual:      true,
     114           1 :                 Size:         sm.Size,
     115           1 :         }
     116           1 :         // For simplicity, we use the same number for both the FileNum and the
     117           1 :         // DiskFileNum (even though this is a virtual sstable).
     118           1 :         meta.InitProviderBacking(base.DiskFileNum(fileNum))
     119           1 :         // Set the underlying FileBacking's size to the same size as the virtualized
     120           1 :         // view of the sstable. This ensures that we don't over-prioritize this
     121           1 :         // sstable for compaction just yet, as we do not have a clear sense of what
     122           1 :         // parts of this sstable are referenced by other nodes.
     123           1 :         meta.FileBacking.Size = sm.Size
     124           1 :         if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
     125           1 :                 // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
     126           1 :                 //
     127           1 :                 // NB: We create new internal keys and pass them into ExtendPointKeyBounds
     128           1 :                 // so that we can sub a zero sequence number into the bounds. We can set
     129           1 :                 // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
     130           1 :                 // anyway. However, we do need to use the same sequence number across all
     131           1 :                 // bound keys at this step so that we end up with bounds that are consistent
     132           1 :                 // across point/range keys.
     133           1 :                 //
     134           1 :                 // Because of the sequence number rewriting, we cannot use the Kind of
     135           1 :                 // sm.SmallestPointKey. For example, the original SST might start with
     136           1 :                 // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
     137           1 :                 // rewriting the sequence numbers, these keys become a.SET.100 and
     138           1 :                 // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
     139           1 :                 // correct bound, we just use the maximum key kind (which sorts first).
     140           1 :                 // Similarly, we use the smallest key kind for the largest key.
     141           1 :                 smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax)
     142           1 :                 largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
     143           1 :                 if sm.LargestPointKey.IsExclusiveSentinel() {
     144           1 :                         largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
     145           1 :                 }
     146           1 :                 if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
     147           1 :                         smallestPointKey.Trailer < largestPointKey.Trailer {
     148           0 :                         // We get kinds from the sender, however we substitute our own sequence
     149           0 :                         // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
     150           0 :                         // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
     151           0 :                         // kinds need to be reversed now because DelSized > Set.
     152           0 :                         smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
     153           0 :                 }
     154           1 :                 meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
     155             :         }
     156           1 :         if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
     157           1 :                 // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
     158           1 :                 //
     159           1 :                 // See comment above on why we use a zero sequence number and these key
     160           1 :                 // kinds here.
     161           1 :                 smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
     162           1 :                 largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
     163           1 :                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
     164           1 :         }
     165           1 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     166           0 :                 return nil, err
     167           0 :         }
     168           1 :         return meta, nil
     169             : }
     170             : 
     171             : // ingestLoad1External loads the fileMetadata for one external sstable.
     172             : // Sequence number and target level calculation happens during prepare/apply.
     173             : func ingestLoad1External(
     174             :         opts *Options, e ExternalFile, fileNum base.FileNum, objProvider objstorage.Provider, jobID int,
     175           1 : ) (*fileMetadata, error) {
     176           1 :         if e.Size == 0 {
     177           0 :                 // Disallow 0 file sizes
     178           0 :                 return nil, errors.New("pebble: cannot ingest external file with size 0")
     179           0 :         }
     180           1 :         if !e.HasRangeKey && !e.HasPointKey {
     181           0 :                 return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
     182           0 :         }
     183             :         // Don't load table stats. Doing a round trip to shared storage, one SST
     184             :         // at a time is not worth it as it slows down ingestion.
     185           1 :         meta := &fileMetadata{
     186           1 :                 FileNum:      fileNum,
     187           1 :                 CreationTime: time.Now().Unix(),
     188           1 :                 Virtual:      true,
     189           1 :                 Size:         e.Size,
     190           1 :         }
     191           1 :         // For simplicity, we use the same number for both the FileNum and the
     192           1 :         // DiskFileNum (even though this is a virtual sstable).
     193           1 :         meta.InitProviderBacking(base.DiskFileNum(fileNum))
     194           1 : 
     195           1 :         // In the name of keeping this ingestion as fast as possible, we avoid
     196           1 :         // *all* existence checks and synthesize a file metadata with smallest/largest
     197           1 :         // keys that overlap whatever the passed-in span was.
     198           1 :         smallestCopy := make([]byte, len(e.SmallestUserKey))
     199           1 :         copy(smallestCopy, e.SmallestUserKey)
     200           1 :         largestCopy := make([]byte, len(e.LargestUserKey))
     201           1 :         copy(largestCopy, e.LargestUserKey)
     202           1 :         if e.HasPointKey {
     203           1 :                 meta.ExtendPointKeyBounds(
     204           1 :                         opts.Comparer.Compare,
     205           1 :                         base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
     206           1 :                         base.MakeRangeDeleteSentinelKey(largestCopy),
     207           1 :                 )
     208           1 :         }
     209           1 :         if e.HasRangeKey {
     210           0 :                 meta.ExtendRangeKeyBounds(
     211           0 :                         opts.Comparer.Compare,
     212           0 :                         base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
     213           0 :                         base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
     214           0 :                 )
     215           0 :         }
     216             : 
     217             :         // Set the underlying FileBacking's size to the same size as the virtualized
     218             :         // view of the sstable. This ensures that we don't over-prioritize this
     219             :         // sstable for compaction just yet, as we do not have a clear sense of
     220             :         // what parts of this sstable are referenced by other nodes.
     221           1 :         meta.FileBacking.Size = e.Size
     222           1 : 
     223           1 :         if len(e.SyntheticPrefix) != 0 {
     224           1 :                 meta.PrefixReplacement = &manifest.PrefixReplacement{
     225           1 :                         ContentPrefix:   e.ContentPrefix,
     226           1 :                         SyntheticPrefix: e.SyntheticPrefix,
     227           1 :                 }
     228           1 :         }
     229             : 
     230           1 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     231           0 :                 return nil, err
     232           0 :         }
     233           1 :         return meta, nil
     234             : }
     235             : 
     236             : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
     237             : // by this store.
     238             : func ingestLoad1(
     239             :         opts *Options,
     240             :         fmv FormatMajorVersion,
     241             :         readable objstorage.Readable,
     242             :         cacheID uint64,
     243             :         fileNum base.FileNum,
     244           1 : ) (*fileMetadata, error) {
     245           1 :         cacheOpts := private.SSTableCacheOpts(cacheID, base.PhysicalTableDiskFileNum(fileNum)).(sstable.ReaderOption)
     246           1 :         r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
     247           1 :         if err != nil {
     248           1 :                 return nil, err
     249           1 :         }
     250           1 :         defer r.Close()
     251           1 : 
     252           1 :         // Avoid ingesting tables with format versions this DB doesn't support.
     253           1 :         tf, err := r.TableFormat()
     254           1 :         if err != nil {
     255           0 :                 return nil, err
     256           0 :         }
     257           1 :         if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
     258           1 :                 return nil, errors.Newf(
     259           1 :                         "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
     260           1 :                         tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
     261           1 :                 )
     262           1 :         }
     263             : 
     264           1 :         meta := &fileMetadata{}
     265           1 :         meta.FileNum = fileNum
     266           1 :         meta.Size = uint64(readable.Size())
     267           1 :         meta.CreationTime = time.Now().Unix()
     268           1 :         meta.InitPhysicalBacking()
     269           1 : 
     270           1 :         // Avoid loading into the table cache for collecting stats if we
     271           1 :         // don't need to. If there are no range deletions, we have all the
     272           1 :         // information to compute the stats here.
     273           1 :         //
     274           1 :         // This is helpful in tests for avoiding awkwardness around deletion of
     275           1 :         // ingested files from MemFS. MemFS implements the Windows semantics of
     276           1 :         // disallowing removal of an open file. Under MemFS, if we don't populate
     277           1 :         // meta.Stats here, the file will be loaded into the table cache for
     278           1 :         // calculating stats before we can remove the original link.
     279           1 :         maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
     280           1 : 
     281           1 :         {
     282           1 :                 iter, err := r.NewIter(nil /* lower */, nil /* upper */)
     283           1 :                 if err != nil {
     284           1 :                         return nil, err
     285           1 :                 }
     286           1 :                 defer iter.Close()
     287           1 :                 var smallest InternalKey
     288           1 :                 if key, _ := iter.First(); key != nil {
     289           1 :                         if err := ingestValidateKey(opts, key); err != nil {
     290           1 :                                 return nil, err
     291           1 :                         }
     292           1 :                         smallest = (*key).Clone()
     293             :                 }
     294           1 :                 if err := iter.Error(); err != nil {
     295           1 :                         return nil, err
     296           1 :                 }
     297           1 :                 if key, _ := iter.Last(); key != nil {
     298           1 :                         if err := ingestValidateKey(opts, key); err != nil {
     299           0 :                                 return nil, err
     300           0 :                         }
     301           1 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, key.Clone())
     302             :                 }
     303           1 :                 if err := iter.Error(); err != nil {
     304           1 :                         return nil, err
     305           1 :                 }
     306             :         }
     307             : 
     308           1 :         iter, err := r.NewRawRangeDelIter()
     309           1 :         if err != nil {
     310           0 :                 return nil, err
     311           0 :         }
     312           1 :         if iter != nil {
     313           1 :                 defer iter.Close()
     314           1 :                 var smallest InternalKey
     315           1 :                 if s, err := iter.First(); err != nil {
     316           0 :                         return nil, err
     317           1 :                 } else if s != nil {
     318           1 :                         key := s.SmallestKey()
     319           1 :                         if err := ingestValidateKey(opts, &key); err != nil {
     320           0 :                                 return nil, err
     321           0 :                         }
     322           1 :                         smallest = key.Clone()
     323             :                 }
     324           1 :                 if s, err := iter.Last(); err != nil {
     325           0 :                         return nil, err
     326           1 :                 } else if s != nil {
     327           1 :                         k := s.SmallestKey()
     328           1 :                         if err := ingestValidateKey(opts, &k); err != nil {
     329           0 :                                 return nil, err
     330           0 :                         }
     331           1 :                         largest := s.LargestKey().Clone()
     332           1 :                         meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
     333             :                 }
     334             :         }
     335             : 
     336             :         // Update the range-key bounds for the table.
     337           1 :         {
     338           1 :                 iter, err := r.NewRawRangeKeyIter()
     339           1 :                 if err != nil {
     340           0 :                         return nil, err
     341           0 :                 }
     342           1 :                 if iter != nil {
     343           1 :                         defer iter.Close()
     344           1 :                         var smallest InternalKey
     345           1 :                         if s, err := iter.First(); err != nil {
     346           0 :                                 return nil, err
     347           1 :                         } else if s != nil {
     348           1 :                                 key := s.SmallestKey()
     349           1 :                                 if err := ingestValidateKey(opts, &key); err != nil {
     350           0 :                                         return nil, err
     351           0 :                                 }
     352           1 :                                 smallest = key.Clone()
     353             :                         }
     354           1 :                         if s, err := iter.Last(); err != nil {
     355           0 :                                 return nil, err
     356           1 :                         } else if s != nil {
     357           1 :                                 k := s.SmallestKey()
     358           1 :                                 if err := ingestValidateKey(opts, &k); err != nil {
     359           0 :                                         return nil, err
     360           0 :                                 }
     361             :                                 // As range keys are fragmented, the end key of the last range key in
     362             :                                 // the table provides the upper bound for the table.
     363           1 :                                 largest := s.LargestKey().Clone()
     364           1 :                                 meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
     365             :                         }
     366             :                 }
     367             :         }
     368             : 
     369           1 :         if !meta.HasPointKeys && !meta.HasRangeKeys {
     370           1 :                 return nil, nil
     371           1 :         }
     372             : 
     373             :         // Sanity check that the various bounds on the file were set consistently.
     374           1 :         if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
     375           0 :                 return nil, err
     376           0 :         }
     377             : 
     378           1 :         return meta, nil
     379             : }
     380             : 
     381             : type ingestLoadResult struct {
     382             :         localMeta, sharedMeta []*fileMetadata
     383             :         externalMeta          []*fileMetadata
     384             :         localPaths            []string
     385             :         sharedLevels          []uint8
     386             :         fileCount             int
     387             : }
     388             : 
     389             : func ingestLoad(
     390             :         opts *Options,
     391             :         fmv FormatMajorVersion,
     392             :         paths []string,
     393             :         shared []SharedSSTMeta,
     394             :         external []ExternalFile,
     395             :         cacheID uint64,
     396             :         pending []base.FileNum,
     397             :         objProvider objstorage.Provider,
     398             :         jobID int,
     399           1 : ) (ingestLoadResult, error) {
     400           1 :         meta := make([]*fileMetadata, 0, len(paths))
     401           1 :         newPaths := make([]string, 0, len(paths))
     402           1 :         for i := range paths {
     403           1 :                 f, err := opts.FS.Open(paths[i])
     404           1 :                 if err != nil {
     405           1 :                         return ingestLoadResult{}, err
     406           1 :                 }
     407             : 
     408           1 :                 readable, err := sstable.NewSimpleReadable(f)
     409           1 :                 if err != nil {
     410           1 :                         return ingestLoadResult{}, err
     411           1 :                 }
     412           1 :                 m, err := ingestLoad1(opts, fmv, readable, cacheID, pending[i])
     413           1 :                 if err != nil {
     414           1 :                         return ingestLoadResult{}, err
     415           1 :                 }
     416           1 :                 if m != nil {
     417           1 :                         meta = append(meta, m)
     418           1 :                         newPaths = append(newPaths, paths[i])
     419           1 :                 }
     420             :         }
     421           1 :         if len(shared) == 0 && len(external) == 0 {
     422           1 :                 return ingestLoadResult{localMeta: meta, localPaths: newPaths, fileCount: len(meta)}, nil
     423           1 :         }
     424             : 
     425             :         // Sort the shared files according to level.
     426           1 :         sort.Sort(sharedByLevel(shared))
     427           1 : 
     428           1 :         sharedMeta := make([]*fileMetadata, 0, len(shared))
     429           1 :         levels := make([]uint8, 0, len(shared))
     430           1 :         for i := range shared {
     431           1 :                 m, err := ingestSynthesizeShared(opts, shared[i], pending[len(paths)+i])
     432           1 :                 if err != nil {
     433           0 :                         return ingestLoadResult{}, err
     434           0 :                 }
     435           1 :                 if shared[i].Level < sharedLevelsStart {
     436           0 :                         return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
     437           0 :                 }
     438           1 :                 sharedMeta = append(sharedMeta, m)
     439           1 :                 levels = append(levels, shared[i].Level)
     440             :         }
     441           1 :         externalMeta := make([]*fileMetadata, 0, len(external))
     442           1 :         for i := range external {
     443           1 :                 m, err := ingestLoad1External(opts, external[i], pending[len(paths)+len(shared)+i], objProvider, jobID)
     444           1 :                 if err != nil {
     445           0 :                         return ingestLoadResult{}, err
     446           0 :                 }
     447           1 :                 externalMeta = append(externalMeta, m)
     448             :         }
     449           1 :         result := ingestLoadResult{
     450           1 :                 localMeta:    meta,
     451           1 :                 sharedMeta:   sharedMeta,
     452           1 :                 externalMeta: externalMeta,
     453           1 :                 localPaths:   newPaths,
     454           1 :                 sharedLevels: levels,
     455           1 :                 fileCount:    len(meta) + len(sharedMeta) + len(externalMeta),
     456           1 :         }
     457           1 :         return result, nil
     458             : }
     459             : 
     460             : // Struct for sorting metadatas by smallest user keys, while ensuring the
     461             : // matching path also gets swapped to the same index. For use in
     462             : // ingestSortAndVerify.
     463             : type metaAndPaths struct {
     464             :         meta  []*fileMetadata
     465             :         paths []string
     466             :         cmp   Compare
     467             : }
     468             : 
     469           1 : func (m metaAndPaths) Len() int {
     470           1 :         return len(m.meta)
     471           1 : }
     472             : 
     473           1 : func (m metaAndPaths) Less(i, j int) bool {
     474           1 :         return m.cmp(m.meta[i].Smallest.UserKey, m.meta[j].Smallest.UserKey) < 0
     475           1 : }
     476             : 
     477           1 : func (m metaAndPaths) Swap(i, j int) {
     478           1 :         m.meta[i], m.meta[j] = m.meta[j], m.meta[i]
     479           1 :         if m.paths != nil {
     480           1 :                 m.paths[i], m.paths[j] = m.paths[j], m.paths[i]
     481           1 :         }
     482             : }
     483             : 
     484           1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
     485           1 :         // Verify that all the shared files (i.e. files in sharedMeta)
     486           1 :         // fit within the exciseSpan.
     487           1 :         for i := range lr.sharedMeta {
     488           1 :                 f := lr.sharedMeta[i]
     489           1 :                 if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
     490           0 :                         return errors.AssertionFailedf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
     491           0 :                 }
     492             :         }
     493           1 :         if len(lr.externalMeta) > 0 {
     494           1 :                 if len(lr.localMeta) > 0 || len(lr.sharedMeta) > 0 {
     495           0 :                         // Currently we only support external ingests on their own. If external
     496           0 :                         // files are present alongside local/shared files, return an error.
     497           0 :                         return errors.AssertionFailedf("pebble: external files cannot be ingested atomically alongside other types of files")
     498           0 :                 }
     499           1 :                 sort.Sort(&metaAndPaths{
     500           1 :                         meta: lr.externalMeta,
     501           1 :                         cmp:  cmp,
     502           1 :                 })
     503           1 :                 for i := 1; i < len(lr.externalMeta); i++ {
     504           1 :                         if sstableKeyCompare(cmp, lr.externalMeta[i-1].Largest, lr.externalMeta[i].Smallest) >= 0 {
     505           1 :                                 return errors.AssertionFailedf("pebble: external sstables have overlapping ranges")
     506           1 :                         }
     507             :                 }
     508           1 :                 return nil
     509             :         }
     510           1 :         if len(lr.localMeta) <= 1 || len(lr.localPaths) <= 1 {
     511           1 :                 return nil
     512           1 :         }
     513             : 
     514           1 :         sort.Sort(&metaAndPaths{
     515           1 :                 meta:  lr.localMeta,
     516           1 :                 paths: lr.localPaths,
     517           1 :                 cmp:   cmp,
     518           1 :         })
     519           1 : 
     520           1 :         for i := 1; i < len(lr.localPaths); i++ {
     521           1 :                 if sstableKeyCompare(cmp, lr.localMeta[i-1].Largest, lr.localMeta[i].Smallest) >= 0 {
     522           1 :                         return errors.AssertionFailedf("pebble: local ingestion sstables have overlapping ranges")
     523           1 :                 }
     524             :         }
     525           1 :         if len(lr.sharedMeta) == 0 {
     526           1 :                 return nil
     527           1 :         }
     528           0 :         filesInLevel := make([]*fileMetadata, 0, len(lr.sharedMeta))
     529           0 :         for l := sharedLevelsStart; l < numLevels; l++ {
     530           0 :                 filesInLevel = filesInLevel[:0]
     531           0 :                 for i := range lr.sharedMeta {
     532           0 :                         if lr.sharedLevels[i] == uint8(l) {
     533           0 :                                 filesInLevel = append(filesInLevel, lr.sharedMeta[i])
     534           0 :                         }
     535             :                 }
     536           0 :                 slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
     537           0 :                         return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
     538           0 :                 })
     539           0 :                 for i := 1; i < len(filesInLevel); i++ {
     540           0 :                         if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
     541           0 :                                 return errors.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
     542           0 :                         }
     543             :                 }
     544             :         }
     545           0 :         return nil
     546             : }
     547             : 
     548           1 : func ingestCleanup(objProvider objstorage.Provider, meta []*fileMetadata) error {
     549           1 :         var firstErr error
     550           1 :         for i := range meta {
     551           1 :                 if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
     552           1 :                         firstErr = firstError(firstErr, err)
     553           1 :                 }
     554             :         }
     555           1 :         return firstErr
     556             : }
     557             : 
     558             : // ingestLink creates new objects which are backed by either hardlinks to or
     559             : // copies of the ingested files. It also attaches shared objects to the provider.
     560             : func ingestLink(
     561             :         jobID int,
     562             :         opts *Options,
     563             :         objProvider objstorage.Provider,
     564             :         lr ingestLoadResult,
     565             :         shared []SharedSSTMeta,
     566             :         external []ExternalFile,
     567           1 : ) error {
     568           1 :         for i := range lr.localPaths {
     569           1 :                 objMeta, err := objProvider.LinkOrCopyFromLocal(
     570           1 :                         context.TODO(), opts.FS, lr.localPaths[i], fileTypeTable, lr.localMeta[i].FileBacking.DiskFileNum,
     571           1 :                         objstorage.CreateOptions{PreferSharedStorage: true},
     572           1 :                 )
     573           1 :                 if err != nil {
     574           1 :                         if err2 := ingestCleanup(objProvider, lr.localMeta[:i]); err2 != nil {
     575           0 :                                 opts.Logger.Errorf("ingest cleanup failed: %v", err2)
     576           0 :                         }
     577           1 :                         return err
     578             :                 }
     579           1 :                 if opts.EventListener.TableCreated != nil {
     580           1 :                         opts.EventListener.TableCreated(TableCreateInfo{
     581           1 :                                 JobID:   jobID,
     582           1 :                                 Reason:  "ingesting",
     583           1 :                                 Path:    objProvider.Path(objMeta),
     584           1 :                                 FileNum: base.PhysicalTableDiskFileNum(lr.localMeta[i].FileNum),
     585           1 :                         })
     586           1 :                 }
     587             :         }
     588           1 :         remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared)+len(external))
     589           1 :         for i := range shared {
     590           1 :                 backing, err := shared[i].Backing.Get()
     591           1 :                 if err != nil {
     592           0 :                         return err
     593           0 :                 }
     594           1 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     595           1 :                         FileNum:  lr.sharedMeta[i].FileBacking.DiskFileNum,
     596           1 :                         FileType: fileTypeTable,
     597           1 :                         Backing:  backing,
     598           1 :                 })
     599             :         }
     600           1 :         for i := range external {
     601           1 :                 // Try to resolve a reference to the external file.
     602           1 :                 backing, err := objProvider.CreateExternalObjectBacking(external[i].Locator, external[i].ObjName)
     603           1 :                 if err != nil {
     604           0 :                         return err
     605           0 :                 }
     606           1 :                 remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
     607           1 :                         FileNum:  lr.externalMeta[i].FileBacking.DiskFileNum,
     608           1 :                         FileType: fileTypeTable,
     609           1 :                         Backing:  backing,
     610           1 :                 })
     611             :         }
     612             : 
     613           1 :         remoteObjMetas, err := objProvider.AttachRemoteObjects(remoteObjs)
     614           1 :         if err != nil {
     615           0 :                 return err
     616           0 :         }
     617             : 
     618           1 :         for i := range shared {
     619           1 :                 // One corner case around file sizes we need to be mindful of, is that
     620           1 :                 // if one of the shareObjs was initially created by us (and has boomeranged
     621           1 :                 // back from another node), we'll need to update the FileBacking's size
     622           1 :                 // to be the true underlying size. Otherwise, we could hit errors when we
     623           1 :                 // open the db again after a crash/restart (see checkConsistency in open.go),
     624           1 :                 // plus it more accurately allows us to prioritize compactions of files
     625           1 :                 // that were originally created by us.
     626           1 :                 if remoteObjMetas[i].IsShared() && !objProvider.IsSharedForeign(remoteObjMetas[i]) {
     627           1 :                         size, err := objProvider.Size(remoteObjMetas[i])
     628           1 :                         if err != nil {
     629           0 :                                 return err
     630           0 :                         }
     631           1 :                         lr.sharedMeta[i].FileBacking.Size = uint64(size)
     632             :                 }
     633             :         }
     634             : 
     635           1 :         if opts.EventListener.TableCreated != nil {
     636           1 :                 for i := range remoteObjMetas {
     637           1 :                         opts.EventListener.TableCreated(TableCreateInfo{
     638           1 :                                 JobID:   jobID,
     639           1 :                                 Reason:  "ingesting",
     640           1 :                                 Path:    objProvider.Path(remoteObjMetas[i]),
     641           1 :                                 FileNum: remoteObjMetas[i].DiskFileNum,
     642           1 :                         })
     643           1 :                 }
     644             :         }
     645             : 
     646           1 :         return nil
     647             : }
     648             : 
     649             : func ingestUpdateSeqNum(
     650             :         cmp Compare, format base.FormatKey, seqNum uint64, loadResult ingestLoadResult,
     651           1 : ) error {
     652           1 :         setSeqFn := func(k base.InternalKey) base.InternalKey {
     653           1 :                 return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
     654           1 :         }
     655           1 :         updateMetadata := func(m *fileMetadata) error {
     656           1 :                 // NB: we set the fields directly here, rather than via their Extend*
     657           1 :                 // methods, as we are updating sequence numbers.
     658           1 :                 if m.HasPointKeys {
     659           1 :                         m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
     660           1 :                 }
     661           1 :                 if m.HasRangeKeys {
     662           1 :                         m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
     663           1 :                 }
     664           1 :                 m.Smallest = setSeqFn(m.Smallest)
     665           1 :                 // Only update the seqnum for the largest key if that key is not an
     666           1 :                 // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
     667           1 :                 // boundary), as doing so effectively drops the exclusive sentinel (by
     668           1 :                 // lowering the seqnum from the max value), and extends the bounds of the
     669           1 :                 // table.
     670           1 :                 // NB: as the largest range key is always an exclusive sentinel, it is never
     671           1 :                 // updated.
     672           1 :                 if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
     673           1 :                         m.LargestPointKey = setSeqFn(m.LargestPointKey)
     674           1 :                 }
     675           1 :                 if !m.Largest.IsExclusiveSentinel() {
     676           1 :                         m.Largest = setSeqFn(m.Largest)
     677           1 :                 }
     678             :                 // Setting smallestSeqNum == largestSeqNum triggers the setting of
     679             :                 // Properties.GlobalSeqNum when an sstable is loaded.
     680           1 :                 m.SmallestSeqNum = seqNum
     681           1 :                 m.LargestSeqNum = seqNum
     682           1 :                 // Ensure the new bounds are consistent.
     683           1 :                 if err := m.Validate(cmp, format); err != nil {
     684           0 :                         return err
     685           0 :                 }
     686           1 :                 seqNum++
     687           1 :                 return nil
     688             :         }
     689             : 
     690             :         // Shared sstables are required to be sorted by level ascending. We then
     691             :         // iterate the shared sstables in reverse, assigning the lower sequence
     692             :         // numbers to the shared sstables that will be ingested into the lower
     693             :         // (larger numbered) levels first. This ensures sequence number shadowing is
     694             :         // correct.
     695           1 :         for i := len(loadResult.sharedMeta) - 1; i >= 0; i-- {
     696           1 :                 if i-1 >= 0 && loadResult.sharedLevels[i-1] > loadResult.sharedLevels[i] {
     697           0 :                         panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.sharedMeta[i-1], loadResult.sharedMeta[i]))
     698             :                 }
     699           1 :                 if err := updateMetadata(loadResult.sharedMeta[i]); err != nil {
     700           0 :                         return err
     701           0 :                 }
     702             :         }
     703           1 :         for i := range loadResult.localMeta {
     704           1 :                 if err := updateMetadata(loadResult.localMeta[i]); err != nil {
     705           0 :                         return err
     706           0 :                 }
     707             :         }
     708           1 :         for i := range loadResult.externalMeta {
     709           1 :                 if err := updateMetadata(loadResult.externalMeta[i]); err != nil {
     710           0 :                         return err
     711           0 :                 }
     712             :         }
     713           1 :         return nil
     714             : }
     715             : 
     716             : // Denotes an internal key range. Smallest and largest are both inclusive.
     717             : type internalKeyRange struct {
     718             :         smallest, largest InternalKey
     719             : }
     720             : 
     721             : func overlapWithIterator(
     722             :         iter internalIterator,
     723             :         rangeDelIter *keyspan.FragmentIterator,
     724             :         rkeyIter keyspan.FragmentIterator,
     725             :         keyRange internalKeyRange,
     726             :         cmp Compare,
     727           1 : ) bool {
     728           1 :         // Check overlap with point operations.
     729           1 :         //
     730           1 :         // When using levelIter, it seeks to the SST whose boundaries
     731           1 :         // contain keyRange.smallest.UserKey(S).
     732           1 :         // It then tries to find a point in that SST that is >= S.
     733           1 :         // If there's no such point it means the SST ends in a tombstone in which case
     734           1 :         // levelIter.SeekGE generates a boundary range del sentinel.
     735           1 :         // The comparison of this boundary with keyRange.largest(L) below
     736           1 :         // is subtle but maintains correctness.
     737           1 :         // 1) boundary < L,
     738           1 :         //    since boundary is also > S (initial seek),
     739           1 :         //    whatever the boundary's start key may be, we're always overlapping.
     740           1 :         // 2) boundary > L,
     741           1 :         //    overlap with boundary cannot be determined since we don't know boundary's start key.
     742           1 :         //    We require checking for overlap with rangeDelIter.
     743           1 :         // 3) boundary == L and L is not sentinel,
     744           1 :         //    means boundary < L and hence is similar to 1).
     745           1 :         // 4) boundary == L and L is sentinel,
     746           1 :         //    we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap.
     747           1 :         key, _ := iter.SeekGE(keyRange.smallest.UserKey, base.SeekGEFlagsNone)
     748           1 :         if key != nil {
     749           1 :                 c := sstableKeyCompare(cmp, *key, keyRange.largest)
     750           1 :                 if c <= 0 {
     751           1 :                         return true
     752           1 :                 }
     753             :         }
     754             :         // Assume overlap if iterator errored.
     755           1 :         if err := iter.Error(); err != nil {
     756           1 :                 return true
     757           1 :         }
     758             : 
     759           1 :         computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) (bool, error) {
     760           1 :                 // NB: The spans surfaced by the fragment iterator are non-overlapping.
     761           1 :                 span, err := rIter.SeekLT(keyRange.smallest.UserKey)
     762           1 :                 if err != nil {
     763           0 :                         return false, err
     764           1 :                 } else if span == nil {
     765           1 :                         span, err = rIter.Next()
     766           1 :                         if err != nil {
     767           0 :                                 return false, err
     768           0 :                         }
     769             :                 }
     770           1 :                 for ; span != nil; span, err = rIter.Next() {
     771           1 :                         if span.Empty() {
     772           1 :                                 continue
     773             :                         }
     774           1 :                         key := span.SmallestKey()
     775           1 :                         c := sstableKeyCompare(cmp, key, keyRange.largest)
     776           1 :                         if c > 0 {
     777           1 :                                 // The start of the span is after the largest key in the
     778           1 :                                 // ingested table.
     779           1 :                                 return false, nil
     780           1 :                         }
     781           1 :                         if cmp(span.End, keyRange.smallest.UserKey) > 0 {
     782           1 :                                 // The end of the span is greater than the smallest in the
     783           1 :                                 // table. Note that the span end key is exclusive, thus ">0"
     784           1 :                                 // instead of ">=0".
     785           1 :                                 return true, nil
     786           1 :                         }
     787             :                 }
     788           1 :                 if err != nil {
     789           0 :                         return false, err
     790           0 :                 }
     791           1 :                 return false, nil
     792             :         }
     793             : 
     794             :         // rkeyIter is either a range key level iter, or a range key iterator
     795             :         // over a single file.
     796           1 :         if rkeyIter != nil {
     797           1 :                 // If an error occurs, assume overlap.
     798           1 :                 if overlap, err := computeOverlapWithSpans(rkeyIter); overlap || err != nil {
     799           1 :                         return true
     800           1 :                 }
     801             :         }
     802             : 
     803             :         // Check overlap with range deletions.
     804           1 :         if rangeDelIter == nil || *rangeDelIter == nil {
     805           1 :                 return false
     806           1 :         }
     807           1 :         overlap, err := computeOverlapWithSpans(*rangeDelIter)
     808           1 :         // If an error occurs, assume overlap.
     809           1 :         return overlap || err != nil
     810             : }
     811             : 
     812             : // ingestTargetLevel returns the target level for a file being ingested.
     813             : // If suggestSplit is true, it accounts for ingest-time splitting as part of
     814             : // its target level calculation, and if a split candidate is found, that file
     815             : // is returned as the splitFile.
     816             : func ingestTargetLevel(
     817             :         newIters tableNewIters,
     818             :         newRangeKeyIter keyspan.TableNewSpanIter,
     819             :         iterOps IterOptions,
     820             :         comparer *Comparer,
     821             :         v *version,
     822             :         baseLevel int,
     823             :         compactions map[*compaction]struct{},
     824             :         meta *fileMetadata,
     825             :         suggestSplit bool,
     826           1 : ) (targetLevel int, splitFile *fileMetadata, err error) {
     827           1 :         // Find the lowest level which does not have any files which overlap meta. We
     828           1 :         // search from L0 to L6 looking for whether there are any files in the level
     829           1 :         // which overlap meta. We want the "lowest" level (where lower means
     830           1 :         // increasing level number) in order to reduce write amplification.
     831           1 :         //
     832           1 :         // There are 2 kinds of overlap we need to check for: file boundary overlap
     833           1 :         // and data overlap. Data overlap implies file boundary overlap. Note that it
     834           1 :         // is always possible to ingest into L0.
     835           1 :         //
     836           1 :         // To place meta at level i where i > 0:
     837           1 :         // - there must not be any data overlap with levels <= i, since that will
     838           1 :         //   violate the sequence number invariant.
     839           1 :         // - no file boundary overlap with level i, since that will violate the
     840           1 :         //   invariant that files do not overlap in levels i > 0.
     841           1 :         //   - if there is only a file overlap at a given level, and no data overlap,
     842           1 :         //     we can still slot a file at that level. We return the fileMetadata with
     843           1 :         //     which we have file boundary overlap (must be only one file, as sstable
     844           1 :         //     bounds are usually tight on user keys) and the caller is expected to split
     845           1 :         //     that sstable into two virtual sstables, allowing this file to go into that
     846           1 :         //     level. Note that if we have file boundary overlap with two files, which
     847           1 :         //     should only happen on rare occasions, we treat it as data overlap and
     848           1 :         //     don't use this optimization.
     849           1 :         //
     850           1 :         // The file boundary overlap check is simpler to conceptualize. Consider the
     851           1 :         // following example, in which the ingested file lies completely before or
     852           1 :         // after the file being considered.
     853           1 :         //
     854           1 :         //   |--|           |--|  ingested file: [a,b] or [f,g]
     855           1 :         //         |-----|        existing file: [c,e]
     856           1 :         //  _____________________
     857           1 :         //   a  b  c  d  e  f  g
     858           1 :         //
     859           1 :         // In both cases the ingested file can move to considering the next level.
     860           1 :         //
     861           1 :         // File boundary overlap does not necessarily imply data overlap. The check
     862           1 :         // for data overlap is a little more nuanced. Consider the following examples:
     863           1 :         //
     864           1 :         //  1. No data overlap:
     865           1 :         //
     866           1 :         //          |-|   |--|    ingested file: [cc-d] or [ee-ff]
     867           1 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     868           1 :         //  _____________________
     869           1 :         //   a  b  c  d  e  f  g
     870           1 :         //
     871           1 :         // In this case the ingested files can "fall through" this level. The checks
     872           1 :         // continue at the next level.
     873           1 :         //
     874           1 :         //  2. Data overlap:
     875           1 :         //
     876           1 :         //            |--|        ingested file: [d-e]
     877           1 :         //  |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
     878           1 :         //  _____________________
     879           1 :         //   a  b  c  d  e  f  g
     880           1 :         //
     881           1 :         // In this case the file cannot be ingested into this level as the point 'dd'
     882           1 :         // is in the way.
     883           1 :         //
     884           1 :         // It is worth noting that the check for data overlap is only approximate. In
     885           1 :         // the previous example, the ingested table [d-e] could contain only the
     886           1 :         // points 'd' and 'e', in which case the table would be eligible for
     887           1 :         // considering lower levels. However, such a fine-grained check would need to
     888           1 :         // be exhaustive (comparing points and ranges in both the ingested existing
     889           1 :         // tables) and such a check is prohibitively expensive. Thus Pebble treats any
     890           1 :         // existing point that falls within the ingested table bounds as being "data
     891           1 :         // overlap".
     892           1 : 
     893           1 :         // This assertion implicitly checks that we have the current version of
     894           1 :         // the metadata.
     895           1 :         if v.L0Sublevels == nil {
     896           0 :                 return 0, nil, errors.AssertionFailedf("could not read L0 sublevels")
     897           0 :         }
     898           1 :         iterOps.CategoryAndQoS = sstable.CategoryAndQoS{
     899           1 :                 Category: "pebble-ingest",
     900           1 :                 QoSLevel: sstable.LatencySensitiveQoSLevel,
     901           1 :         }
     902           1 :         // Check for overlap over the keys of L0 by iterating over the sublevels.
     903           1 :         for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
     904           1 :                 iter := newLevelIter(context.Background(),
     905           1 :                         iterOps, comparer, newIters, v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})
     906           1 : 
     907           1 :                 var rangeDelIter keyspan.FragmentIterator
     908           1 :                 // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
     909           1 :                 // sets it up for the target file.
     910           1 :                 iter.initRangeDel(&rangeDelIter)
     911           1 : 
     912           1 :                 levelIter := keyspan.LevelIter{}
     913           1 :                 levelIter.Init(
     914           1 :                         keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
     915           1 :                         v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange,
     916           1 :                 )
     917           1 : 
     918           1 :                 kr := internalKeyRange{
     919           1 :                         smallest: meta.Smallest,
     920           1 :                         largest:  meta.Largest,
     921           1 :                 }
     922           1 :                 overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, kr, comparer.Compare)
     923           1 :                 err := iter.Close() // Closes range del iter as well.
     924           1 :                 err = firstError(err, levelIter.Close())
     925           1 :                 if err != nil {
     926           1 :                         return 0, nil, err
     927           1 :                 }
     928           1 :                 if overlap {
     929           1 :                         return targetLevel, nil, nil
     930           1 :                 }
     931             :         }
     932             : 
     933           1 :         level := baseLevel
     934           1 :         for ; level < numLevels; level++ {
     935           1 :                 levelIter := newLevelIter(context.Background(),
     936           1 :                         iterOps, comparer, newIters, v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
     937           1 :                 var rangeDelIter keyspan.FragmentIterator
     938           1 :                 // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
     939           1 :                 // sets it up for the target file.
     940           1 :                 levelIter.initRangeDel(&rangeDelIter)
     941           1 : 
     942           1 :                 rkeyLevelIter := &keyspan.LevelIter{}
     943           1 :                 rkeyLevelIter.Init(
     944           1 :                         keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
     945           1 :                         v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange,
     946           1 :                 )
     947           1 : 
     948           1 :                 kr := internalKeyRange{
     949           1 :                         smallest: meta.Smallest,
     950           1 :                         largest:  meta.Largest,
     951           1 :                 }
     952           1 :                 overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, kr, comparer.Compare)
     953           1 :                 err := levelIter.Close() // Closes range del iter as well.
     954           1 :                 err = firstError(err, rkeyLevelIter.Close())
     955           1 :                 if err != nil {
     956           0 :                         return 0, nil, err
     957           0 :                 }
     958           1 :                 if overlap {
     959           1 :                         return targetLevel, splitFile, nil
     960           1 :                 }
     961             : 
     962             :                 // Check boundary overlap.
     963           1 :                 var candidateSplitFile *fileMetadata
     964           1 :                 boundaryOverlaps := v.Overlaps(level, comparer.Compare, meta.Smallest.UserKey,
     965           1 :                         meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
     966           1 :                 if !boundaryOverlaps.Empty() {
     967           1 :                         // We are already guaranteed to not have any data overlaps with files
     968           1 :                         // in boundaryOverlaps, otherwise we'd have returned in the above if
     969           1 :                         // statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for
     970           1 :                         // the case where we can slot this file into the current level despite
     971           1 :                         // a boundary overlap, by splitting one existing file into two virtual
     972           1 :                         // sstables.
     973           1 :                         if suggestSplit && boundaryOverlaps.Len() == 1 {
     974           1 :                                 iter := boundaryOverlaps.Iter()
     975           1 :                                 candidateSplitFile = iter.First()
     976           1 :                         } else {
     977           1 :                                 // We either don't want to suggest ingest-time splits (i.e.
     978           1 :                                 // !suggestSplit), or we boundary-overlapped with more than one file.
     979           1 :                                 continue
     980             :                         }
     981             :                 }
     982             : 
     983             :                 // Check boundary overlap with any ongoing compactions. We consider an
     984             :                 // overlapping compaction that's writing files to an output level as
     985             :                 // equivalent to boundary overlap with files in that output level.
     986             :                 //
     987             :                 // We cannot check for data overlap with the new SSTs compaction will produce
     988             :                 // since compaction hasn't been done yet. However, there's no need to check
     989             :                 // since all keys in them will be from levels in [c.startLevel,
     990             :                 // c.outputLevel], and all those levels have already had their data overlap
     991             :                 // tested negative (else we'd have returned earlier).
     992             :                 //
     993             :                 // An alternative approach would be to cancel these compactions and proceed
     994             :                 // with an ingest-time split on this level if necessary. However, compaction
     995             :                 // cancellation can result in significant wasted effort and is best avoided
     996             :                 // unless necessary.
     997           1 :                 overlaps := false
     998           1 :                 for c := range compactions {
     999           1 :                         if c.outputLevel == nil || level != c.outputLevel.level {
    1000           1 :                                 continue
    1001             :                         }
    1002           1 :                         if comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
    1003           1 :                                 comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
    1004           1 :                                 overlaps = true
    1005           1 :                                 break
    1006             :                         }
    1007             :                 }
    1008           1 :                 if !overlaps {
    1009           1 :                         targetLevel = level
    1010           1 :                         splitFile = candidateSplitFile
    1011           1 :                 }
    1012             :         }
    1013           1 :         return targetLevel, splitFile, nil
    1014             : }
    1015             : 
    1016             : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
    1017             : // atomic and semantically equivalent to creating a single batch containing all
    1018             : // of the mutations in the sstables. Ingestion may require the memtable to be
    1019             : // flushed. The ingested sstable files are moved into the DB and must reside on
    1020             : // the same filesystem as the DB. Sstables can be created for ingestion using
    1021             : // sstable.Writer. On success, Ingest removes the input paths.
    1022             : //
    1023             : // Two types of sstables are accepted for ingestion(s): one is sstables present
    1024             : // in the instance's vfs.FS and can be referenced locally. The other is sstables
    1025             : // present in remote.Storage, referred to as shared or foreign sstables. These
    1026             : // shared sstables can be linked through objstorageprovider.Provider, and do not
    1027             : // need to already be present on the local vfs.FS. Foreign sstables must all fit
    1028             : // in an excise span, and are destined for a level specified in SharedSSTMeta.
    1029             : //
    1030             : // All sstables *must* be Sync()'d by the caller after all bytes are written
    1031             : // and before its file handle is closed; failure to do so could violate
    1032             : // durability or lead to corrupted on-disk state. This method cannot, in a
    1033             : // platform-and-FS-agnostic way, ensure that all sstables in the input are
    1034             : // properly synced to disk. Opening new file handles and Sync()-ing them
    1035             : // does not always guarantee durability; see the discussion here on that:
    1036             : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
    1037             : //
    1038             : // Ingestion loads each sstable into the lowest level of the LSM which it
    1039             : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
    1040             : // ingestion forces the memtable to flush, and then waits for the flush to
    1041             : // occur. In some cases, such as with no foreign sstables and no excise span,
    1042             : // ingestion that gets blocked on a memtable can join the flushable queue and
    1043             : // finish even before the memtable has been flushed.
    1044             : //
    1045             : // The steps for ingestion are:
    1046             : //
    1047             : //  1. Allocate file numbers for every sstable being ingested.
    1048             : //  2. Load the metadata for all sstables being ingested.
    1049             : //  3. Sort the sstables by smallest key, verifying non overlap (for local
    1050             : //     sstables).
    1051             : //  4. Hard link (or copy) the local sstables into the DB directory.
    1052             : //  5. Allocate a sequence number to use for all of the entries in the
    1053             : //     local sstables. This is the step where overlap with memtables is
    1054             : //     determined. If there is overlap, we remember the most recent memtable
    1055             : //     that overlaps.
    1056             : //  6. Update the sequence number in the ingested local sstables. (Remote
    1057             : //     sstables get fixed sequence numbers that were determined at load time.)
    1058             : //  7. Wait for the most recent memtable that overlaps to flush (if any).
    1059             : //  8. Add the ingested sstables to the version (DB.ingestApply).
    1060             : //     8.1.  If an excise span was specified, figure out what sstables in the
    1061             : //     current version overlap with the excise span, and create new virtual
    1062             : //     sstables out of those sstables that exclude the excised span (DB.excise).
    1063             : //  9. Publish the ingestion sequence number.
    1064             : //
    1065             : // Note that if the mutable memtable overlaps with ingestion, a flush of the
    1066             : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
    1067             : // mutations that get sequence numbers larger than the ingestion sequence
    1068             : // number get queued up behind the ingestion waiting for it to complete. This
    1069             : // can produce a noticeable hiccup in performance. See
    1070             : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
    1071             : // this hiccup.
    1072           1 : func (d *DB) Ingest(paths []string) error {
    1073           1 :         if err := d.closed.Load(); err != nil {
    1074           1 :                 panic(err)
    1075             :         }
    1076           1 :         if d.opts.ReadOnly {
    1077           1 :                 return ErrReadOnly
    1078           1 :         }
    1079           1 :         _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
    1080           1 :         return err
    1081             : }
    1082             : 
    1083             : // IngestOperationStats provides some information about where in the LSM the
    1084             : // bytes were ingested.
    1085             : type IngestOperationStats struct {
    1086             :         // Bytes is the total bytes in the ingested sstables.
    1087             :         Bytes uint64
    1088             :         // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
    1089             :         // into L0. This value is approximate when flushable ingests are active and
    1090             :         // an ingest overlaps an entry in the flushable queue. Currently, this
    1091             :         // approximation is very rough, only including tables that overlapped the
    1092             :         // memtable. This estimate may be improved with #2112.
    1093             :         ApproxIngestedIntoL0Bytes uint64
    1094             :         // MemtableOverlappingFiles is the count of ingested sstables
    1095             :         // that overlapped keys in the memtables.
    1096             :         MemtableOverlappingFiles int
    1097             : }
    1098             : 
    1099             : // ExternalFile are external sstables that can be referenced through
    1100             : // objprovider and ingested as remote files that will not be refcounted or
    1101             : // cleaned up. For use with online restore. Note that the underlying sstable
    1102             : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
    1103             : // is expected to only read the keys within those bounds.
    1104             : type ExternalFile struct {
    1105             :         // Locator is the shared.Locator that can be used with objProvider to
    1106             :         // resolve a reference to this external sstable.
    1107             :         Locator remote.Locator
    1108             :         // ObjName is the unique name of this sstable on Locator.
    1109             :         ObjName string
    1110             :         // Size of the referenced proportion of the virtualized sstable. An estimate
    1111             :         // is acceptable in lieu of the backing file size.
    1112             :         Size uint64
    1113             :         // SmallestUserKey and LargestUserKey are the [smallest,largest) user key
    1114             :         // bounds of the sstable. Both these bounds are loose i.e. it's possible for
    1115             :         // the sstable to not span the entirety of this range. However, multiple
    1116             :         // ExternalFiles in one ingestion must all have non-overlapping
    1117             :         // [smallest, largest) spans. Note that this Largest bound is exclusive.
    1118             :         SmallestUserKey, LargestUserKey []byte
    1119             :         // HasPointKey and HasRangeKey denote whether this file contains point keys
    1120             :         // or range keys. If both structs are false, an error is returned during
    1121             :         // ingestion.
    1122             :         HasPointKey, HasRangeKey bool
    1123             :         // ContentPrefix and SyntheticPrefix denote a prefix replacement rule causing
    1124             :         // a file, in which all keys have prefix ContentPrefix, to appear whenever it
    1125             :         // is accessed as if those keys all instead have prefix SyntheticPrefix.
    1126             :         // SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey.
    1127             :         ContentPrefix, SyntheticPrefix []byte
    1128             : }
    1129             : 
    1130             : // IngestWithStats does the same as Ingest, and additionally returns
    1131             : // IngestOperationStats.
    1132           1 : func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
    1133           1 :         if err := d.closed.Load(); err != nil {
    1134           0 :                 panic(err)
    1135             :         }
    1136           1 :         if d.opts.ReadOnly {
    1137           0 :                 return IngestOperationStats{}, ErrReadOnly
    1138           0 :         }
    1139           1 :         return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
    1140             : }
    1141             : 
    1142             : // IngestExternalFiles does the same as IngestWithStats, and additionally
    1143             : // accepts external files (with locator info that can be resolved using
    1144             : // d.opts.SharedStorage). These files must also be non-overlapping with
    1145             : // each other, and must be resolvable through d.objProvider.
    1146           1 : func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
    1147           1 :         if err := d.closed.Load(); err != nil {
    1148           0 :                 panic(err)
    1149             :         }
    1150             : 
    1151           1 :         if d.opts.ReadOnly {
    1152           0 :                 return IngestOperationStats{}, ErrReadOnly
    1153           0 :         }
    1154           1 :         if d.opts.Experimental.RemoteStorage == nil {
    1155           0 :                 return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
    1156           0 :         }
    1157           1 :         return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external)
    1158             : }
    1159             : 
    1160             : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
    1161             : // list of shared files to ingest that can be read from a remote.Storage through
    1162             : // a Provider. All the shared files must live within exciseSpan, and any existing
    1163             : // keys in exciseSpan are deleted by turning existing sstables into virtual
    1164             : // sstables (if not virtual already) and shrinking their spans to exclude
    1165             : // exciseSpan. See the comment at Ingest for a more complete picture of the
    1166             : // ingestion process.
    1167             : //
    1168             : // Panics if this DB instance was not instantiated with a remote.Storage and
    1169             : // shared sstables are present.
    1170             : func (d *DB) IngestAndExcise(
    1171             :         paths []string, shared []SharedSSTMeta, exciseSpan KeyRange,
    1172           1 : ) (IngestOperationStats, error) {
    1173           1 :         if err := d.closed.Load(); err != nil {
    1174           0 :                 panic(err)
    1175             :         }
    1176           1 :         if d.opts.ReadOnly {
    1177           0 :                 return IngestOperationStats{}, ErrReadOnly
    1178           0 :         }
    1179           1 :         if invariants.Enabled && d.opts.Comparer.Split != nil {
    1180           1 :                 // Excise is only supported on prefix keys.
    1181           1 :                 if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
    1182           0 :                         panic("IngestAndExcise called with suffixed start key")
    1183             :                 }
    1184           1 :                 if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
    1185           0 :                         panic("IngestAndExcise called with suffixed end key")
    1186             :                 }
    1187             :         }
    1188           1 :         if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
    1189           0 :                 return IngestOperationStats{}, errors.Errorf(
    1190           0 :                         "store has format major version %d; IngestAndExise requires at least %d",
    1191           0 :                         v, FormatMinForSharedObjects,
    1192           0 :                 )
    1193           0 :         }
    1194           1 :         return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, nil /* external */)
    1195             : }
    1196             : 
    1197             : // Both DB.mu and commitPipeline.mu must be held while this is called.
    1198             : func (d *DB) newIngestedFlushableEntry(
    1199             :         meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
    1200           1 : ) (*flushableEntry, error) {
    1201           1 :         // Update the sequence number for all of the sstables in the
    1202           1 :         // metadata. Writing the metadata to the manifest when the
    1203           1 :         // version edit is applied is the mechanism that persists the
    1204           1 :         // sequence number. The sstables themselves are left unmodified.
    1205           1 :         // In this case, a version edit will only be written to the manifest
    1206           1 :         // when the flushable is eventually flushed. If Pebble restarts in that
    1207           1 :         // time, then we'll lose the ingest sequence number information. But this
    1208           1 :         // information will also be reconstructed on node restart.
    1209           1 :         if err := ingestUpdateSeqNum(
    1210           1 :                 d.cmp, d.opts.Comparer.FormatKey, seqNum, ingestLoadResult{localMeta: meta},
    1211           1 :         ); err != nil {
    1212           0 :                 return nil, err
    1213           0 :         }
    1214             : 
    1215           1 :         f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter)
    1216           1 : 
    1217           1 :         // NB: The logNum/seqNum are the WAL number which we're writing this entry
    1218           1 :         // to and the sequence number within the WAL which we'll write this entry
    1219           1 :         // to.
    1220           1 :         entry := d.newFlushableEntry(f, logNum, seqNum)
    1221           1 :         // The flushable entry starts off with a single reader ref, so increment
    1222           1 :         // the FileMetadata.Refs.
    1223           1 :         for _, file := range f.files {
    1224           1 :                 file.Ref()
    1225           1 :         }
    1226           1 :         entry.unrefFiles = func() []*fileBacking {
    1227           1 :                 var obsolete []*fileBacking
    1228           1 :                 for _, file := range f.files {
    1229           1 :                         if file.Unref() == 0 {
    1230           1 :                                 obsolete = append(obsolete, file.FileMetadata.FileBacking)
    1231           1 :                         }
    1232             :                 }
    1233           1 :                 return obsolete
    1234             :         }
    1235             : 
    1236           1 :         entry.flushForced = true
    1237           1 :         entry.releaseMemAccounting = func() {}
    1238           1 :         return entry, nil
    1239             : }
    1240             : 
    1241             : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
    1242             : // we're holding both locks, the order in which we rotate the memtable or
    1243             : // recycle the WAL in this function is irrelevant as long as the correct log
    1244             : // numbers are assigned to the appropriate flushable.
    1245           1 : func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
    1246           1 :         b := d.NewBatch()
    1247           1 :         for _, m := range meta {
    1248           1 :                 b.ingestSST(m.FileNum)
    1249           1 :         }
    1250           1 :         b.setSeqNum(seqNum)
    1251           1 : 
    1252           1 :         // If the WAL is disabled, then the logNum used to create the flushable
    1253           1 :         // entry doesn't matter. We just use the logNum assigned to the current
    1254           1 :         // mutable memtable. If the WAL is enabled, then this logNum will be
    1255           1 :         // overwritten by the logNum of the log which will contain the log entry
    1256           1 :         // for the ingestedFlushable.
    1257           1 :         logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1258           1 :         if !d.opts.DisableWAL {
    1259           1 :                 // We create a new WAL for the flushable instead of reusing the end of
    1260           1 :                 // the previous WAL. This simplifies the increment of the minimum
    1261           1 :                 // unflushed log number, and also simplifies WAL replay.
    1262           1 :                 logNum, _ = d.recycleWAL()
    1263           1 :                 d.mu.Unlock()
    1264           1 :                 err := d.commit.directWrite(b)
    1265           1 :                 if err != nil {
    1266           0 :                         d.opts.Logger.Fatalf("%v", err)
    1267           0 :                 }
    1268           1 :                 d.mu.Lock()
    1269             :         }
    1270             : 
    1271           1 :         entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum)
    1272           1 :         if err != nil {
    1273           0 :                 return err
    1274           0 :         }
    1275           1 :         nextSeqNum := seqNum + uint64(b.Count())
    1276           1 : 
    1277           1 :         // Set newLogNum to the logNum of the previous flushable. This value is
    1278           1 :         // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
    1279           1 :         // the appropriate value below.
    1280           1 :         newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
    1281           1 :         if !d.opts.DisableWAL {
    1282           1 :                 // This is WAL num of the next mutable memtable which comes after the
    1283           1 :                 // ingestedFlushable in the flushable queue. The mutable memtable
    1284           1 :                 // will be created below.
    1285           1 :                 newLogNum, _ = d.recycleWAL()
    1286           1 :                 if err != nil {
    1287           0 :                         return err
    1288           0 :                 }
    1289             :         }
    1290             : 
    1291           1 :         currMem := d.mu.mem.mutable
    1292           1 :         // NB: Placing ingested sstables above the current memtables
    1293           1 :         // requires rotating of the existing memtables/WAL. There is
    1294           1 :         // some concern of churning through tiny memtables due to
    1295           1 :         // ingested sstables being placed on top of them, but those
    1296           1 :         // memtables would have to be flushed anyways.
    1297           1 :         d.mu.mem.queue = append(d.mu.mem.queue, entry)
    1298           1 :         d.rotateMemtable(newLogNum, nextSeqNum, currMem)
    1299           1 :         d.updateReadStateLocked(d.opts.DebugCheck)
    1300           1 :         d.maybeScheduleFlush()
    1301           1 :         return nil
    1302             : }
    1303             : 
    1304             : // See comment at Ingest() for details on how this works.
    1305             : func (d *DB) ingest(
    1306             :         paths []string,
    1307             :         targetLevelFunc ingestTargetLevelFunc,
    1308             :         shared []SharedSSTMeta,
    1309             :         exciseSpan KeyRange,
    1310             :         external []ExternalFile,
    1311           1 : ) (IngestOperationStats, error) {
    1312           1 :         if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
    1313           0 :                 panic("cannot ingest shared sstables with nil SharedStorage")
    1314             :         }
    1315           1 :         if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
    1316           0 :                 return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
    1317           0 :         }
    1318           1 :         if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixes {
    1319           0 :                 for i := range external {
    1320           0 :                         if len(external[i].SyntheticPrefix) > 0 {
    1321           0 :                                 return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
    1322           0 :                         }
    1323             :                 }
    1324             :         }
    1325             :         // Allocate file numbers for all of the files being ingested and mark them as
    1326             :         // pending in order to prevent them from being deleted. Note that this causes
    1327             :         // the file number ordering to be out of alignment with sequence number
    1328             :         // ordering. The sorting of L0 tables by sequence number avoids relying on
    1329             :         // that (busted) invariant.
    1330           1 :         d.mu.Lock()
    1331           1 :         pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
    1332           1 :         for i := 0; i < len(paths)+len(shared)+len(external); i++ {
    1333           1 :                 pendingOutputs[i] = d.mu.versions.getNextFileNum()
    1334           1 :         }
    1335             : 
    1336           1 :         jobID := d.mu.nextJobID
    1337           1 :         d.mu.nextJobID++
    1338           1 :         d.mu.Unlock()
    1339           1 : 
    1340           1 :         // Load the metadata for all the files being ingested. This step detects
    1341           1 :         // and elides empty sstables.
    1342           1 :         loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs, d.objProvider, jobID)
    1343           1 :         if err != nil {
    1344           1 :                 return IngestOperationStats{}, err
    1345           1 :         }
    1346             : 
    1347           1 :         if loadResult.fileCount == 0 {
    1348           1 :                 // All of the sstables to be ingested were empty. Nothing to do.
    1349           1 :                 return IngestOperationStats{}, nil
    1350           1 :         }
    1351             : 
    1352             :         // Verify the sstables do not overlap.
    1353           1 :         if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
    1354           1 :                 return IngestOperationStats{}, err
    1355           1 :         }
    1356             : 
    1357             :         // Hard link the sstables into the DB directory. Since the sstables aren't
    1358             :         // referenced by a version, they won't be used. If the hard linking fails
    1359             :         // (e.g. because the files reside on a different filesystem), ingestLink will
    1360             :         // fall back to copying, and if that fails we undo our work and return an
    1361             :         // error.
    1362           1 :         if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared, external); err != nil {
    1363           0 :                 return IngestOperationStats{}, err
    1364           0 :         }
    1365             : 
    1366             :         // Make the new tables durable. We need to do this at some point before we
    1367             :         // update the MANIFEST (via logAndApply), otherwise a crash can have the
    1368             :         // tables referenced in the MANIFEST, but not present in the provider.
    1369           1 :         if err := d.objProvider.Sync(); err != nil {
    1370           1 :                 return IngestOperationStats{}, err
    1371           1 :         }
    1372             : 
    1373             :         // metaFlushableOverlaps is a map indicating which of the ingested sstables
    1374             :         // overlap some table in the flushable queue. It's used to approximate
    1375             :         // ingest-into-L0 stats when using flushable ingests.
    1376           1 :         metaFlushableOverlaps := make(map[FileNum]bool, loadResult.fileCount)
    1377           1 :         var mem *flushableEntry
    1378           1 :         var mut *memTable
    1379           1 :         // asFlushable indicates whether the sstable was ingested as a flushable.
    1380           1 :         var asFlushable bool
    1381           1 :         prepare := func(seqNum uint64) {
    1382           1 :                 // Note that d.commit.mu is held by commitPipeline when calling prepare.
    1383           1 : 
    1384           1 :                 // Determine the set of bounds we care about for the purpose of checking
    1385           1 :                 // for overlap among the flushables. If there's an excise span, we need
    1386           1 :                 // to check for overlap with its bounds as well.
    1387           1 :                 overlapBounds := make([]bounded, 0, loadResult.fileCount+1)
    1388           1 :                 for _, metas := range [3][]*fileMetadata{loadResult.localMeta, loadResult.sharedMeta, loadResult.externalMeta} {
    1389           1 :                         for _, m := range metas {
    1390           1 :                                 overlapBounds = append(overlapBounds, m)
    1391           1 :                         }
    1392             :                 }
    1393           1 :                 if exciseSpan.Valid() {
    1394           1 :                         overlapBounds = append(overlapBounds, &exciseSpan)
    1395           1 :                 }
    1396             : 
    1397           1 :                 d.mu.Lock()
    1398           1 :                 defer d.mu.Unlock()
    1399           1 : 
    1400           1 :                 // Check if any of the currently-open EventuallyFileOnlySnapshots overlap
    1401           1 :                 // in key ranges with the excise span. If so, we need to check for memtable
    1402           1 :                 // overlaps with all bounds of that EventuallyFileOnlySnapshot in addition
    1403           1 :                 // to the ingestion's own bounds too.
    1404           1 : 
    1405           1 :                 if exciseSpan.Valid() {
    1406           1 :                         for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
    1407           1 :                                 if s.efos == nil {
    1408           0 :                                         continue
    1409             :                                 }
    1410           1 :                                 if base.Visible(seqNum, s.efos.seqNum, base.InternalKeySeqNumMax) {
    1411           0 :                                         // We only worry about snapshots older than the excise. Any snapshots
    1412           0 :                                         // created after the excise should see the excised view of the LSM
    1413           0 :                                         // anyway.
    1414           0 :                                         //
    1415           0 :                                         // Since we delay publishing the excise seqnum as visible until after
    1416           0 :                                         // the apply step, this case will never be hit in practice until we
    1417           0 :                                         // make excises flushable ingests.
    1418           0 :                                         continue
    1419             :                                 }
    1420           1 :                                 if invariants.Enabled {
    1421           1 :                                         if s.efos.hasTransitioned() {
    1422           0 :                                                 panic("unexpected transitioned EFOS in snapshots list")
    1423             :                                         }
    1424             :                                 }
    1425           1 :                                 for i := range s.efos.protectedRanges {
    1426           1 :                                         if !s.efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
    1427           0 :                                                 continue
    1428             :                                         }
    1429             :                                         // Our excise conflicts with this EFOS. We need to add its protected
    1430             :                                         // ranges to our overlapBounds. Grow overlapBounds in one allocation
    1431             :                                         // if necesary.
    1432           1 :                                         prs := s.efos.protectedRanges
    1433           1 :                                         if cap(overlapBounds) < len(overlapBounds)+len(prs) {
    1434           1 :                                                 oldOverlapBounds := overlapBounds
    1435           1 :                                                 overlapBounds = make([]bounded, len(oldOverlapBounds), len(oldOverlapBounds)+len(prs))
    1436           1 :                                                 copy(overlapBounds, oldOverlapBounds)
    1437           1 :                                         }
    1438           1 :                                         for i := range prs {
    1439           1 :                                                 overlapBounds = append(overlapBounds, &prs[i])
    1440           1 :                                         }
    1441           1 :                                         break
    1442             :                                 }
    1443             :                         }
    1444             :                 }
    1445             : 
    1446             :                 // Check to see if any files overlap with any of the memtables. The queue
    1447             :                 // is ordered from oldest to newest with the mutable memtable being the
    1448             :                 // last element in the slice. We want to wait for the newest table that
    1449             :                 // overlaps.
    1450             : 
    1451           1 :                 for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
    1452           1 :                         m := d.mu.mem.queue[i]
    1453           1 :                         m.computePossibleOverlaps(func(b bounded) shouldContinue {
    1454           1 :                                 // If this is the first table to overlap a flushable, save
    1455           1 :                                 // the flushable. This ingest must be ingested or flushed
    1456           1 :                                 // after it.
    1457           1 :                                 if mem == nil {
    1458           1 :                                         mem = m
    1459           1 :                                 }
    1460             : 
    1461           1 :                                 switch v := b.(type) {
    1462           1 :                                 case *fileMetadata:
    1463           1 :                                         // NB: False positives are possible if `m` is a flushable
    1464           1 :                                         // ingest that overlaps the file `v` in bounds but doesn't
    1465           1 :                                         // contain overlapping data. This is considered acceptable
    1466           1 :                                         // because it's rare (in CockroachDB a bound overlap likely
    1467           1 :                                         // indicates a data overlap), and blocking the commit
    1468           1 :                                         // pipeline while we perform I/O to check for overlap may be
    1469           1 :                                         // more disruptive than enqueueing this ingestion on the
    1470           1 :                                         // flushable queue and switching to a new memtable.
    1471           1 :                                         metaFlushableOverlaps[v.FileNum] = true
    1472           1 :                                 case *KeyRange:
    1473             :                                         // An excise span or an EventuallyFileOnlySnapshot protected range;
    1474             :                                         // not a file.
    1475           0 :                                 default:
    1476           0 :                                         panic("unreachable")
    1477             :                                 }
    1478           1 :                                 return continueIteration
    1479             :                         }, overlapBounds...)
    1480             :                 }
    1481             : 
    1482           1 :                 if mem == nil {
    1483           1 :                         // No overlap with any of the queued flushables, so no need to queue
    1484           1 :                         // after them.
    1485           1 : 
    1486           1 :                         // New writes with higher sequence numbers may be concurrently
    1487           1 :                         // committed. We must ensure they don't flush before this ingest
    1488           1 :                         // completes. To do that, we ref the mutable memtable as a writer,
    1489           1 :                         // preventing its flushing (and the flushing of all subsequent
    1490           1 :                         // flushables in the queue). Once we've acquired the manifest lock
    1491           1 :                         // to add the ingested sstables to the LSM, we can unref as we're
    1492           1 :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1493           1 :                         mut = d.mu.mem.mutable
    1494           1 :                         mut.writerRef()
    1495           1 :                         return
    1496           1 :                 }
    1497             :                 // The ingestion overlaps with some entry in the flushable queue.
    1498           1 :                 if d.FormatMajorVersion() < FormatFlushableIngest ||
    1499           1 :                         d.opts.Experimental.DisableIngestAsFlushable() ||
    1500           1 :                         len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 ||
    1501           1 :                         (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {
    1502           1 :                         // We're not able to ingest as a flushable,
    1503           1 :                         // so we must synchronously flush.
    1504           1 :                         //
    1505           1 :                         // TODO(bilal): Currently, if any of the files being ingested are shared or
    1506           1 :                         // there's an excise span present, we cannot use flushable ingests and need
    1507           1 :                         // to wait synchronously. Either remove this caveat by fleshing out
    1508           1 :                         // flushable ingest logic to also account for these cases, or remove this
    1509           1 :                         // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676
    1510           1 :                         if mem.flushable == d.mu.mem.mutable {
    1511           1 :                                 err = d.makeRoomForWrite(nil)
    1512           1 :                         }
    1513             :                         // New writes with higher sequence numbers may be concurrently
    1514             :                         // committed. We must ensure they don't flush before this ingest
    1515             :                         // completes. To do that, we ref the mutable memtable as a writer,
    1516             :                         // preventing its flushing (and the flushing of all subsequent
    1517             :                         // flushables in the queue). Once we've acquired the manifest lock
    1518             :                         // to add the ingested sstables to the LSM, we can unref as we're
    1519             :                         // guaranteed that the flush won't edit the LSM before this ingest.
    1520           1 :                         mut = d.mu.mem.mutable
    1521           1 :                         mut.writerRef()
    1522           1 :                         mem.flushForced = true
    1523           1 :                         d.maybeScheduleFlush()
    1524           1 :                         return
    1525             :                 }
    1526             :                 // Since there aren't too many memtables already queued up, we can
    1527             :                 // slide the ingested sstables on top of the existing memtables.
    1528           1 :                 asFlushable = true
    1529           1 :                 err = d.handleIngestAsFlushable(loadResult.localMeta, seqNum)
    1530             :         }
    1531             : 
    1532           1 :         var ve *versionEdit
    1533           1 :         apply := func(seqNum uint64) {
    1534           1 :                 if err != nil || asFlushable {
    1535           1 :                         // An error occurred during prepare.
    1536           1 :                         if mut != nil {
    1537           0 :                                 if mut.writerUnref() {
    1538           0 :                                         d.mu.Lock()
    1539           0 :                                         d.maybeScheduleFlush()
    1540           0 :                                         d.mu.Unlock()
    1541           0 :                                 }
    1542             :                         }
    1543           1 :                         return
    1544             :                 }
    1545             : 
    1546             :                 // If there's an excise being done atomically with the same ingest, we
    1547             :                 // assign the lowest sequence number in the set of sequence numbers for this
    1548             :                 // ingestion to the excise. Note that we've already allocated fileCount+1
    1549             :                 // sequence numbers in this case.
    1550           1 :                 if exciseSpan.Valid() {
    1551           1 :                         seqNum++ // the first seqNum is reserved for the excise.
    1552           1 :                 }
    1553             :                 // Update the sequence numbers for all ingested sstables'
    1554             :                 // metadata. When the version edit is applied, the metadata is
    1555             :                 // written to the manifest, persisting the sequence number.
    1556             :                 // The sstables themselves are left unmodified.
    1557           1 :                 if err = ingestUpdateSeqNum(
    1558           1 :                         d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
    1559           1 :                 ); err != nil {
    1560           0 :                         if mut != nil {
    1561           0 :                                 if mut.writerUnref() {
    1562           0 :                                         d.mu.Lock()
    1563           0 :                                         d.maybeScheduleFlush()
    1564           0 :                                         d.mu.Unlock()
    1565           0 :                                 }
    1566             :                         }
    1567           0 :                         return
    1568             :                 }
    1569             : 
    1570             :                 // If we overlapped with a memtable in prepare wait for the flush to
    1571             :                 // finish.
    1572           1 :                 if mem != nil {
    1573           1 :                         <-mem.flushed
    1574           1 :                 }
    1575             : 
    1576             :                 // Assign the sstables to the correct level in the LSM and apply the
    1577             :                 // version edit.
    1578           1 :                 ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan, seqNum)
    1579             :         }
    1580             : 
    1581             :         // Only one ingest can occur at a time because if not, one would block waiting
    1582             :         // for the other to finish applying. This blocking would happen while holding
    1583             :         // the commit mutex which would prevent unrelated batches from writing their
    1584             :         // changes to the WAL and memtable. This will cause a bigger commit hiccup
    1585             :         // during ingestion.
    1586           1 :         seqNumCount := loadResult.fileCount
    1587           1 :         if exciseSpan.Valid() {
    1588           1 :                 seqNumCount++
    1589           1 :         }
    1590           1 :         d.commit.ingestSem <- struct{}{}
    1591           1 :         d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
    1592           1 :         <-d.commit.ingestSem
    1593           1 : 
    1594           1 :         if err != nil {
    1595           1 :                 if err2 := ingestCleanup(d.objProvider, loadResult.localMeta); err2 != nil {
    1596           0 :                         d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
    1597           0 :                 }
    1598           1 :         } else {
    1599           1 :                 // Since we either created a hard link to the ingesting files, or copied
    1600           1 :                 // them over, it is safe to remove the originals paths.
    1601           1 :                 for _, path := range loadResult.localPaths {
    1602           1 :                         if err2 := d.opts.FS.Remove(path); err2 != nil {
    1603           1 :                                 d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
    1604           1 :                         }
    1605             :                 }
    1606             :         }
    1607             : 
    1608           1 :         info := TableIngestInfo{
    1609           1 :                 JobID:     jobID,
    1610           1 :                 Err:       err,
    1611           1 :                 flushable: asFlushable,
    1612           1 :         }
    1613           1 :         if len(loadResult.localMeta) > 0 {
    1614           1 :                 info.GlobalSeqNum = loadResult.localMeta[0].SmallestSeqNum
    1615           1 :         } else if len(loadResult.sharedMeta) > 0 {
    1616           1 :                 info.GlobalSeqNum = loadResult.sharedMeta[0].SmallestSeqNum
    1617           1 :         } else {
    1618           1 :                 info.GlobalSeqNum = loadResult.externalMeta[0].SmallestSeqNum
    1619           1 :         }
    1620           1 :         var stats IngestOperationStats
    1621           1 :         if ve != nil {
    1622           1 :                 info.Tables = make([]struct {
    1623           1 :                         TableInfo
    1624           1 :                         Level int
    1625           1 :                 }, len(ve.NewFiles))
    1626           1 :                 for i := range ve.NewFiles {
    1627           1 :                         e := &ve.NewFiles[i]
    1628           1 :                         info.Tables[i].Level = e.Level
    1629           1 :                         info.Tables[i].TableInfo = e.Meta.TableInfo()
    1630           1 :                         stats.Bytes += e.Meta.Size
    1631           1 :                         if e.Level == 0 {
    1632           1 :                                 stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
    1633           1 :                         }
    1634           1 :                         if metaFlushableOverlaps[e.Meta.FileNum] {
    1635           1 :                                 stats.MemtableOverlappingFiles++
    1636           1 :                         }
    1637             :                 }
    1638           1 :         } else if asFlushable {
    1639           1 :                 // NB: If asFlushable == true, there are no shared sstables.
    1640           1 :                 info.Tables = make([]struct {
    1641           1 :                         TableInfo
    1642           1 :                         Level int
    1643           1 :                 }, len(loadResult.localMeta))
    1644           1 :                 for i, f := range loadResult.localMeta {
    1645           1 :                         info.Tables[i].Level = -1
    1646           1 :                         info.Tables[i].TableInfo = f.TableInfo()
    1647           1 :                         stats.Bytes += f.Size
    1648           1 :                         // We don't have exact stats on which files will be ingested into
    1649           1 :                         // L0, because actual ingestion into the LSM has been deferred until
    1650           1 :                         // flush time. Instead, we infer based on memtable overlap.
    1651           1 :                         //
    1652           1 :                         // TODO(jackson): If we optimistically compute data overlap (#2112)
    1653           1 :                         // before entering the commit pipeline, we can use that overlap to
    1654           1 :                         // improve our approximation by incorporating overlap with L0, not
    1655           1 :                         // just memtables.
    1656           1 :                         if metaFlushableOverlaps[f.FileNum] {
    1657           1 :                                 stats.ApproxIngestedIntoL0Bytes += f.Size
    1658           1 :                                 stats.MemtableOverlappingFiles++
    1659           1 :                         }
    1660             :                 }
    1661             :         }
    1662           1 :         d.opts.EventListener.TableIngested(info)
    1663           1 : 
    1664           1 :         return stats, err
    1665             : }
    1666             : 
    1667             : // excise updates ve to include a replacement of the file m with new virtual
    1668             : // sstables that exclude exciseSpan, returning a slice of newly-created files if
    1669             : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
    1670             : // and m is deleted. Note that ve is updated in-place.
    1671             : //
    1672             : // The manifest lock must be held when calling this method.
    1673             : func (d *DB) excise(
    1674             :         exciseSpan KeyRange, m *fileMetadata, ve *versionEdit, level int,
    1675           1 : ) ([]manifest.NewFileEntry, error) {
    1676           1 :         numCreatedFiles := 0
    1677           1 :         // Check if there's actually an overlap between m and exciseSpan.
    1678           1 :         if !exciseSpan.Overlaps(d.cmp, m) {
    1679           1 :                 return nil, nil
    1680           1 :         }
    1681           1 :         ve.DeletedFiles[deletedFileEntry{
    1682           1 :                 Level:   level,
    1683           1 :                 FileNum: m.FileNum,
    1684           1 :         }] = m
    1685           1 :         // Fast path: m sits entirely within the exciseSpan, so just delete it.
    1686           1 :         if exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
    1687           1 :                 return nil, nil
    1688           1 :         }
    1689           1 :         var iter internalIterator
    1690           1 :         var rangeDelIter keyspan.FragmentIterator
    1691           1 :         var rangeKeyIter keyspan.FragmentIterator
    1692           1 :         needsBacking := false
    1693           1 :         // Create a file to the left of the excise span, if necessary.
    1694           1 :         // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
    1695           1 :         //
    1696           1 :         // We create bounds that are tight on user keys, and we make the effort to find
    1697           1 :         // the last key in the original sstable that's smaller than exciseSpan.Start
    1698           1 :         // even though it requires some sstable reads. We could choose to create
    1699           1 :         // virtual sstables on loose userKey bounds, in which case we could just set
    1700           1 :         // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
    1701           1 :         // issue with that approach would be that it'd lead to lots of small virtual
    1702           1 :         // sstables in the LSM that have no guarantee on containing even a single user
    1703           1 :         // key within the file bounds. This has the potential to increase both read and
    1704           1 :         // write-amp as we will be opening up these sstables only to find no relevant
    1705           1 :         // keys in the read path, and compacting sstables on top of them instead of
    1706           1 :         // directly into the space occupied by them. We choose to incur the cost of
    1707           1 :         // calculating tight bounds at this time instead of creating more work in the
    1708           1 :         // future.
    1709           1 :         //
    1710           1 :         // TODO(bilal): Some of this work can happen without grabbing the manifest
    1711           1 :         // lock; we could grab one currentVersion, release the lock, calculate excised
    1712           1 :         // files, then grab the lock again and recalculate for just the files that
    1713           1 :         // have changed since our previous calculation. Do this optimiaztino as part of
    1714           1 :         // https://github.com/cockroachdb/pebble/issues/2112 .
    1715           1 :         if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
    1716           1 :                 leftFile := &fileMetadata{
    1717           1 :                         Virtual:     true,
    1718           1 :                         FileBacking: m.FileBacking,
    1719           1 :                         FileNum:     d.mu.versions.getNextFileNum(),
    1720           1 :                         // Note that these are loose bounds for smallest/largest seqnums, but they're
    1721           1 :                         // sufficient for maintaining correctness.
    1722           1 :                         SmallestSeqNum: m.SmallestSeqNum,
    1723           1 :                         LargestSeqNum:  m.LargestSeqNum,
    1724           1 :                 }
    1725           1 :                 if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.SmallestPointKey) {
    1726           1 :                         // This file will contain point keys
    1727           1 :                         smallestPointKey := m.SmallestPointKey
    1728           1 :                         var err error
    1729           1 :                         iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
    1730           1 :                                 CategoryAndQoS: sstable.CategoryAndQoS{
    1731           1 :                                         Category: "pebble-ingest",
    1732           1 :                                         QoSLevel: sstable.LatencySensitiveQoSLevel,
    1733           1 :                                 },
    1734           1 :                                 level: manifest.Level(level),
    1735           1 :                         }, internalIterOpts{})
    1736           1 :                         if err != nil {
    1737           0 :                                 return nil, err
    1738           0 :                         }
    1739           1 :                         var key *InternalKey
    1740           1 :                         if iter != nil {
    1741           1 :                                 defer iter.Close()
    1742           1 :                                 key, _ = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone)
    1743           1 :                         } else {
    1744           0 :                                 iter = emptyIter
    1745           0 :                         }
    1746           1 :                         if key != nil {
    1747           1 :                                 leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, key.Clone())
    1748           1 :                         }
    1749             :                         // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
    1750             :                         // needs to be a copy if the key is owned by the range del iter.
    1751           1 :                         var lastRangeDel []byte
    1752           1 :                         if rangeDelIter != nil {
    1753           1 :                                 defer rangeDelIter.Close()
    1754           1 :                                 if rdel, err := rangeDelIter.SeekLT(exciseSpan.Start); err != nil {
    1755           0 :                                         return nil, err
    1756           1 :                                 } else if rdel != nil {
    1757           1 :                                         lastRangeDel = append(lastRangeDel[:0], rdel.End...)
    1758           1 :                                         if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
    1759           1 :                                                 lastRangeDel = exciseSpan.Start
    1760           1 :                                         }
    1761             :                                 }
    1762           1 :                         } else {
    1763           1 :                                 rangeDelIter = emptyKeyspanIter
    1764           1 :                         }
    1765           1 :                         if lastRangeDel != nil {
    1766           1 :                                 leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
    1767           1 :                         }
    1768             :                 }
    1769           1 :                 if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.SmallestRangeKey) {
    1770           1 :                         // This file will contain range keys
    1771           1 :                         var err error
    1772           1 :                         smallestRangeKey := m.SmallestRangeKey
    1773           1 :                         rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
    1774           1 :                         if err != nil {
    1775           0 :                                 return nil, err
    1776           0 :                         }
    1777             :                         // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
    1778             :                         // needs to be a copy if the key is owned by the range key iter.
    1779           1 :                         var lastRangeKey []byte
    1780           1 :                         var lastRangeKeyKind InternalKeyKind
    1781           1 :                         defer rangeKeyIter.Close()
    1782           1 :                         if rkey, err := rangeKeyIter.SeekLT(exciseSpan.Start); err != nil {
    1783           0 :                                 return nil, err
    1784           1 :                         } else if rkey != nil {
    1785           1 :                                 lastRangeKey = append(lastRangeKey[:0], rkey.End...)
    1786           1 :                                 if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
    1787           0 :                                         lastRangeKey = exciseSpan.Start
    1788           0 :                                 }
    1789           1 :                                 lastRangeKeyKind = rkey.Keys[0].Kind()
    1790             :                         }
    1791           1 :                         if lastRangeKey != nil {
    1792           1 :                                 leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
    1793           1 :                         }
    1794             :                 }
    1795           1 :                 if leftFile.HasRangeKeys || leftFile.HasPointKeys {
    1796           1 :                         var err error
    1797           1 :                         leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
    1798           1 :                         if err != nil {
    1799           0 :                                 return nil, err
    1800           0 :                         }
    1801           1 :                         if leftFile.Size == 0 {
    1802           1 :                                 // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
    1803           1 :                                 // such as if the excised file only has range keys/dels and no point
    1804           1 :                                 // keys. This can cause panics in places where we divide by file sizes.
    1805           1 :                                 // Correct for it here.
    1806           1 :                                 leftFile.Size = 1
    1807           1 :                         }
    1808           1 :                         if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    1809           0 :                                 return nil, err
    1810           0 :                         }
    1811           1 :                         leftFile.ValidateVirtual(m)
    1812           1 :                         ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
    1813           1 :                         needsBacking = true
    1814           1 :                         numCreatedFiles++
    1815             :                 }
    1816             :         }
    1817             :         // Create a file to the right, if necessary.
    1818           1 :         if exciseSpan.Contains(d.cmp, m.Largest) {
    1819           1 :                 // No key exists to the right of the excise span in this file.
    1820           1 :                 if needsBacking && !m.Virtual {
    1821           1 :                         // If m is virtual, then its file backing is already known to the manifest.
    1822           1 :                         // We don't need to create another file backing. Note that there must be
    1823           1 :                         // only one CreatedBackingTables entry per backing sstable. This is
    1824           1 :                         // indicated by the VersionEdit.CreatedBackingTables invariant.
    1825           1 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    1826           1 :                 }
    1827           1 :                 return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
    1828             :         }
    1829             :         // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
    1830             :         //
    1831             :         // See comment before the definition of leftFile for the motivation behind
    1832             :         // calculating tight user-key bounds.
    1833           1 :         rightFile := &fileMetadata{
    1834           1 :                 Virtual:     true,
    1835           1 :                 FileBacking: m.FileBacking,
    1836           1 :                 FileNum:     d.mu.versions.getNextFileNum(),
    1837           1 :                 // Note that these are loose bounds for smallest/largest seqnums, but they're
    1838           1 :                 // sufficient for maintaining correctness.
    1839           1 :                 SmallestSeqNum: m.SmallestSeqNum,
    1840           1 :                 LargestSeqNum:  m.LargestSeqNum,
    1841           1 :         }
    1842           1 :         if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.LargestPointKey) {
    1843           1 :                 // This file will contain point keys
    1844           1 :                 largestPointKey := m.LargestPointKey
    1845           1 :                 var err error
    1846           1 :                 if iter == nil && rangeDelIter == nil {
    1847           1 :                         iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
    1848           1 :                                 CategoryAndQoS: sstable.CategoryAndQoS{
    1849           1 :                                         Category: "pebble-ingest",
    1850           1 :                                         QoSLevel: sstable.LatencySensitiveQoSLevel,
    1851           1 :                                 },
    1852           1 :                                 level: manifest.Level(level),
    1853           1 :                         }, internalIterOpts{})
    1854           1 :                         if err != nil {
    1855           0 :                                 return nil, err
    1856           0 :                         }
    1857           1 :                         if iter != nil {
    1858           1 :                                 defer iter.Close()
    1859           1 :                         } else {
    1860           0 :                                 iter = emptyIter
    1861           0 :                         }
    1862           1 :                         if rangeDelIter != nil {
    1863           1 :                                 defer rangeDelIter.Close()
    1864           1 :                         } else {
    1865           1 :                                 rangeDelIter = emptyKeyspanIter
    1866           1 :                         }
    1867             :                 }
    1868           1 :                 key, _ := iter.SeekGE(exciseSpan.End, base.SeekGEFlagsNone)
    1869           1 :                 if key != nil {
    1870           1 :                         rightFile.ExtendPointKeyBounds(d.cmp, key.Clone(), largestPointKey)
    1871           1 :                 }
    1872             :                 // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
    1873             :                 // needs to be a copy if the key is owned by the range del iter.
    1874           1 :                 var firstRangeDel []byte
    1875           1 :                 rdel, err := rangeDelIter.SeekGE(exciseSpan.End)
    1876           1 :                 if err != nil {
    1877           0 :                         return nil, err
    1878           1 :                 } else if rdel != nil {
    1879           1 :                         firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
    1880           1 :                         if d.cmp(firstRangeDel, exciseSpan.End) < 0 {
    1881           1 :                                 firstRangeDel = exciseSpan.End
    1882           1 :                         }
    1883             :                 }
    1884           1 :                 if firstRangeDel != nil {
    1885           1 :                         smallestPointKey := rdel.SmallestKey()
    1886           1 :                         smallestPointKey.UserKey = firstRangeDel
    1887           1 :                         rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
    1888           1 :                 }
    1889             :         }
    1890           1 :         if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.LargestRangeKey) {
    1891           1 :                 // This file will contain range keys.
    1892           1 :                 largestRangeKey := m.LargestRangeKey
    1893           1 :                 if rangeKeyIter == nil {
    1894           1 :                         var err error
    1895           1 :                         rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
    1896           1 :                         if err != nil {
    1897           0 :                                 return nil, err
    1898           0 :                         }
    1899           1 :                         defer rangeKeyIter.Close()
    1900             :                 }
    1901             :                 // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
    1902             :                 // needs to be a copy if the key is owned by the range key iter.
    1903           1 :                 var firstRangeKey []byte
    1904           1 :                 rkey, err := rangeKeyIter.SeekGE(exciseSpan.End)
    1905           1 :                 if err != nil {
    1906           0 :                         return nil, err
    1907           1 :                 } else if rkey != nil {
    1908           1 :                         firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
    1909           1 :                         if d.cmp(firstRangeKey, exciseSpan.End) < 0 {
    1910           1 :                                 firstRangeKey = exciseSpan.End
    1911           1 :                         }
    1912             :                 }
    1913           1 :                 if firstRangeKey != nil {
    1914           1 :                         smallestRangeKey := rkey.SmallestKey()
    1915           1 :                         smallestRangeKey.UserKey = firstRangeKey
    1916           1 :                         // We call ExtendRangeKeyBounds so any internal boundType fields are
    1917           1 :                         // set correctly. Note that this is mildly wasteful as we'll be comparing
    1918           1 :                         // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
    1919           1 :                         // avoided if we exported ExtendOverallKeyBounds or so.
    1920           1 :                         rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
    1921           1 :                 }
    1922             :         }
    1923           1 :         if rightFile.HasRangeKeys || rightFile.HasPointKeys {
    1924           1 :                 var err error
    1925           1 :                 rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
    1926           1 :                 if err != nil {
    1927           0 :                         return nil, err
    1928           0 :                 }
    1929           1 :                 if rightFile.Size == 0 {
    1930           1 :                         // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
    1931           1 :                         // such as if the excised file only has range keys/dels and no point keys.
    1932           1 :                         // This can cause panics in places where we divide by file sizes. Correct
    1933           1 :                         // for it here.
    1934           1 :                         rightFile.Size = 1
    1935           1 :                 }
    1936           1 :                 rightFile.ValidateVirtual(m)
    1937           1 :                 ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
    1938           1 :                 needsBacking = true
    1939           1 :                 numCreatedFiles++
    1940             :         }
    1941             : 
    1942           1 :         if needsBacking && !m.Virtual {
    1943           1 :                 // If m is virtual, then its file backing is already known to the manifest.
    1944           1 :                 // We don't need to create another file backing. Note that there must be
    1945           1 :                 // only one CreatedBackingTables entry per backing sstable. This is
    1946           1 :                 // indicated by the VersionEdit.CreatedBackingTables invariant.
    1947           1 :                 ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    1948           1 :         }
    1949             : 
    1950           1 :         if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
    1951           0 :                 return nil, err
    1952           0 :         }
    1953           1 :         return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
    1954             : }
    1955             : 
    1956             : type ingestTargetLevelFunc func(
    1957             :         newIters tableNewIters,
    1958             :         newRangeKeyIter keyspan.TableNewSpanIter,
    1959             :         iterOps IterOptions,
    1960             :         comparer *Comparer,
    1961             :         v *version,
    1962             :         baseLevel int,
    1963             :         compactions map[*compaction]struct{},
    1964             :         meta *fileMetadata,
    1965             :         suggestSplit bool,
    1966             : ) (int, *fileMetadata, error)
    1967             : 
    1968             : type ingestSplitFile struct {
    1969             :         // ingestFile is the file being ingested.
    1970             :         ingestFile *fileMetadata
    1971             :         // splitFile is the file that needs to be split to allow ingestFile to slot
    1972             :         // into `level` level.
    1973             :         splitFile *fileMetadata
    1974             :         // The level where ingestFile will go (and where splitFile already is).
    1975             :         level int
    1976             : }
    1977             : 
    1978             : // ingestSplit splits files specified in `files` and updates ve in-place to
    1979             : // account for existing files getting split into two virtual sstables. The map
    1980             : // `replacedFiles` contains an in-progress map of all files that have been
    1981             : // replaced with new virtual sstables in this version edit so far, which is also
    1982             : // updated in-place.
    1983             : //
    1984             : // d.mu as well as the manifest lock must be held when calling this method.
    1985             : func (d *DB) ingestSplit(
    1986             :         ve *versionEdit,
    1987             :         updateMetrics func(*fileMetadata, int, []newFileEntry),
    1988             :         files []ingestSplitFile,
    1989             :         replacedFiles map[base.FileNum][]newFileEntry,
    1990           1 : ) error {
    1991           1 :         for _, s := range files {
    1992           1 :                 // replacedFiles can be thought of as a tree, where we start iterating with
    1993           1 :                 // s.splitFile and run its fileNum through replacedFiles, then find which of
    1994           1 :                 // the replaced files overlaps with s.ingestFile, which becomes the new
    1995           1 :                 // splitFile, then we check splitFile's replacements in replacedFiles again
    1996           1 :                 // for overlap with s.ingestFile, and so on until we either can't find the
    1997           1 :                 // current splitFile in replacedFiles (i.e. that's the file that now needs to
    1998           1 :                 // be split), or we don't find a file that overlaps with s.ingestFile, which
    1999           1 :                 // means a prior ingest split already produced enough room for s.ingestFile
    2000           1 :                 // to go into this level without necessitating another ingest split.
    2001           1 :                 splitFile := s.splitFile
    2002           1 :                 for splitFile != nil {
    2003           1 :                         replaced, ok := replacedFiles[splitFile.FileNum]
    2004           1 :                         if !ok {
    2005           1 :                                 break
    2006             :                         }
    2007           1 :                         updatedSplitFile := false
    2008           1 :                         for i := range replaced {
    2009           1 :                                 if replaced[i].Meta.Overlaps(d.cmp, s.ingestFile.Smallest.UserKey, s.ingestFile.Largest.UserKey, s.ingestFile.Largest.IsExclusiveSentinel()) {
    2010           1 :                                         if updatedSplitFile {
    2011           0 :                                                 // This should never happen because the earlier ingestTargetLevel
    2012           0 :                                                 // function only finds split file candidates that are guaranteed to
    2013           0 :                                                 // have no data overlap, only boundary overlap. See the comments
    2014           0 :                                                 // in that method to see the definitions of data vs boundary
    2015           0 :                                                 // overlap. That, plus the fact that files in `replaced` are
    2016           0 :                                                 // guaranteed to have file bounds that are tight on user keys
    2017           0 :                                                 // (as that's what `d.excise` produces), means that the only case
    2018           0 :                                                 // where we overlap with two or more files in `replaced` is if we
    2019           0 :                                                 // actually had data overlap all along, or if the ingestion files
    2020           0 :                                                 // were overlapping, either of which is an invariant violation.
    2021           0 :                                                 panic("updated with two files in ingestSplit")
    2022             :                                         }
    2023           1 :                                         splitFile = replaced[i].Meta
    2024           1 :                                         updatedSplitFile = true
    2025             :                                 }
    2026             :                         }
    2027           1 :                         if !updatedSplitFile {
    2028           1 :                                 // None of the replaced files overlapped with the file being ingested.
    2029           1 :                                 // This can happen if we've already excised a span overlapping with
    2030           1 :                                 // this file, or if we have consecutive ingested files that can slide
    2031           1 :                                 // within the same gap between keys in an existing file. For instance,
    2032           1 :                                 // if an existing file has keys a and g and we're ingesting b-c, d-e,
    2033           1 :                                 // the first loop iteration will split the existing file into one that
    2034           1 :                                 // ends in a and another that starts at g, and the second iteration will
    2035           1 :                                 // fall into this case and require no splitting.
    2036           1 :                                 //
    2037           1 :                                 // No splitting necessary.
    2038           1 :                                 splitFile = nil
    2039           1 :                         }
    2040             :                 }
    2041           1 :                 if splitFile == nil {
    2042           1 :                         continue
    2043             :                 }
    2044             :                 // NB: excise operates on [start, end). We're splitting at [start, end]
    2045             :                 // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
    2046             :                 // of exclusive vs inclusive end bounds should not make a difference here
    2047             :                 // as we're guaranteed to not have any data overlap between splitFile and
    2048             :                 // s.ingestFile, so panic if we do see a newly added file with an endKey
    2049             :                 // equalling s.ingestFile.Largest, and !s.ingestFile.Largest.IsExclusiveSentinel()
    2050           1 :                 added, err := d.excise(KeyRange{Start: s.ingestFile.Smallest.UserKey, End: s.ingestFile.Largest.UserKey}, splitFile, ve, s.level)
    2051           1 :                 if err != nil {
    2052           0 :                         return err
    2053           0 :                 }
    2054           1 :                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    2055           1 :                         Level:   s.level,
    2056           1 :                         FileNum: splitFile.FileNum,
    2057           1 :                 }]; !ok {
    2058           0 :                         panic("did not split file that was expected to be split")
    2059             :                 }
    2060           1 :                 replacedFiles[splitFile.FileNum] = added
    2061           1 :                 for i := range added {
    2062           1 :                         if s.ingestFile.Overlaps(d.cmp, added[i].Meta.Smallest.UserKey, added[i].Meta.Largest.UserKey, added[i].Meta.Largest.IsExclusiveSentinel()) {
    2063           0 :                                 panic("ingest-time split produced a file that overlaps with ingested file")
    2064             :                         }
    2065             :                 }
    2066           1 :                 updateMetrics(splitFile, s.level, added)
    2067             :         }
    2068             :         // Flatten the version edit by removing any entries from ve.NewFiles that
    2069             :         // are also in ve.DeletedFiles.
    2070           1 :         newNewFiles := ve.NewFiles[:0]
    2071           1 :         for i := range ve.NewFiles {
    2072           1 :                 fn := ve.NewFiles[i].Meta.FileNum
    2073           1 :                 deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
    2074           1 :                 if _, ok := ve.DeletedFiles[deEntry]; ok {
    2075           1 :                         delete(ve.DeletedFiles, deEntry)
    2076           1 :                 } else {
    2077           1 :                         newNewFiles = append(newNewFiles, ve.NewFiles[i])
    2078           1 :                 }
    2079             :         }
    2080           1 :         ve.NewFiles = newNewFiles
    2081           1 :         return nil
    2082             : }
    2083             : 
    2084             : func (d *DB) ingestApply(
    2085             :         jobID int,
    2086             :         lr ingestLoadResult,
    2087             :         findTargetLevel ingestTargetLevelFunc,
    2088             :         mut *memTable,
    2089             :         exciseSpan KeyRange,
    2090             :         exciseSeqNum uint64,
    2091           1 : ) (*versionEdit, error) {
    2092           1 :         d.mu.Lock()
    2093           1 :         defer d.mu.Unlock()
    2094           1 : 
    2095           1 :         ve := &versionEdit{
    2096           1 :                 NewFiles: make([]newFileEntry, lr.fileCount),
    2097           1 :         }
    2098           1 :         if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
    2099           1 :                 ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
    2100           1 :         }
    2101           1 :         metrics := make(map[int]*LevelMetrics)
    2102           1 : 
    2103           1 :         // Lock the manifest for writing before we use the current version to
    2104           1 :         // determine the target level. This prevents two concurrent ingestion jobs
    2105           1 :         // from using the same version to determine the target level, and also
    2106           1 :         // provides serialization with concurrent compaction and flush jobs.
    2107           1 :         // logAndApply unconditionally releases the manifest lock, but any earlier
    2108           1 :         // returns must unlock the manifest.
    2109           1 :         d.mu.versions.logLock()
    2110           1 : 
    2111           1 :         if mut != nil {
    2112           1 :                 // Unref the mutable memtable to allows its flush to proceed. Now that we've
    2113           1 :                 // acquired the manifest lock, we can be certain that if the mutable
    2114           1 :                 // memtable has received more recent conflicting writes, the flush won't
    2115           1 :                 // beat us to applying to the manifest resulting in sequence number
    2116           1 :                 // inversion. Even though we call maybeScheduleFlush right now, this flush
    2117           1 :                 // will apply after our ingestion.
    2118           1 :                 if mut.writerUnref() {
    2119           1 :                         d.maybeScheduleFlush()
    2120           1 :                 }
    2121             :         }
    2122             : 
    2123           1 :         shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
    2124           1 :                 d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
    2125           1 :         current := d.mu.versions.currentVersion()
    2126           1 :         baseLevel := d.mu.versions.picker.getBaseLevel()
    2127           1 :         iterOps := IterOptions{logger: d.opts.Logger}
    2128           1 :         // filesToSplit is a list where each element is a pair consisting of a file
    2129           1 :         // being ingested and a file being split to make room for an ingestion into
    2130           1 :         // that level. Each ingested file will appear at most once in this list. It
    2131           1 :         // is possible for split files to appear twice in this list.
    2132           1 :         filesToSplit := make([]ingestSplitFile, 0)
    2133           1 :         checkCompactions := false
    2134           1 :         for i := 0; i < lr.fileCount; i++ {
    2135           1 :                 // Determine the lowest level in the LSM for which the sstable doesn't
    2136           1 :                 // overlap any existing files in the level.
    2137           1 :                 var m *fileMetadata
    2138           1 :                 sharedIdx := -1
    2139           1 :                 sharedLevel := -1
    2140           1 :                 externalFile := false
    2141           1 :                 if i < len(lr.localMeta) {
    2142           1 :                         // local file.
    2143           1 :                         m = lr.localMeta[i]
    2144           1 :                 } else if (i - len(lr.localMeta)) < len(lr.sharedMeta) {
    2145           1 :                         // shared file.
    2146           1 :                         sharedIdx = i - len(lr.localMeta)
    2147           1 :                         m = lr.sharedMeta[sharedIdx]
    2148           1 :                         sharedLevel = int(lr.sharedLevels[sharedIdx])
    2149           1 :                 } else {
    2150           1 :                         // external file.
    2151           1 :                         externalFile = true
    2152           1 :                         m = lr.externalMeta[i-(len(lr.localMeta)+len(lr.sharedMeta))]
    2153           1 :                 }
    2154           1 :                 f := &ve.NewFiles[i]
    2155           1 :                 var err error
    2156           1 :                 if sharedIdx >= 0 {
    2157           1 :                         f.Level = sharedLevel
    2158           1 :                         if f.Level < sharedLevelsStart {
    2159           0 :                                 panic("cannot slot a shared file higher than the highest shared level")
    2160             :                         }
    2161           1 :                         ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    2162           1 :                 } else {
    2163           1 :                         if externalFile {
    2164           1 :                                 ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
    2165           1 :                         }
    2166           1 :                         var splitFile *fileMetadata
    2167           1 :                         if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
    2168           1 :                                 // This file fits perfectly within the excise span. We can slot it at
    2169           1 :                                 // L6, or sharedLevelsStart - 1 if we have shared files.
    2170           1 :                                 if len(lr.sharedMeta) > 0 {
    2171           1 :                                         f.Level = sharedLevelsStart - 1
    2172           1 :                                         if baseLevel > f.Level {
    2173           1 :                                                 f.Level = 0
    2174           1 :                                         }
    2175           1 :                                 } else {
    2176           1 :                                         f.Level = 6
    2177           1 :                                 }
    2178           1 :                         } else {
    2179           1 :                                 // TODO(bilal): findTargetLevel does disk IO (reading files for data
    2180           1 :                                 // overlap) even though we're holding onto d.mu. Consider unlocking
    2181           1 :                                 // d.mu while we do this. We already hold versions.logLock so we should
    2182           1 :                                 // not see any version applications while we're at this. The one
    2183           1 :                                 // complication here would be pulling out the mu.compact.inProgress
    2184           1 :                                 // check from findTargetLevel, as that requires d.mu to be held.
    2185           1 :                                 f.Level, splitFile, err = findTargetLevel(
    2186           1 :                                         d.newIters, d.tableNewRangeKeyIter, iterOps, d.opts.Comparer, current, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
    2187           1 :                         }
    2188             : 
    2189           1 :                         if splitFile != nil {
    2190           1 :                                 if invariants.Enabled {
    2191           1 :                                         if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil {
    2192           0 :                                                 panic("splitFile returned is not in level it should be")
    2193             :                                         }
    2194             :                                 }
    2195             :                                 // We take advantage of the fact that we won't drop the db mutex
    2196             :                                 // between now and the call to logAndApply. So, no files should
    2197             :                                 // get added to a new in-progress compaction at this point. We can
    2198             :                                 // avoid having to iterate on in-progress compactions to cancel them
    2199             :                                 // if none of the files being split have a compacting state.
    2200           1 :                                 if splitFile.IsCompacting() {
    2201           0 :                                         checkCompactions = true
    2202           0 :                                 }
    2203           1 :                                 filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
    2204             :                         }
    2205             :                 }
    2206           1 :                 if err != nil {
    2207           1 :                         d.mu.versions.logUnlock()
    2208           1 :                         return nil, err
    2209           1 :                 }
    2210           1 :                 f.Meta = m
    2211           1 :                 levelMetrics := metrics[f.Level]
    2212           1 :                 if levelMetrics == nil {
    2213           1 :                         levelMetrics = &LevelMetrics{}
    2214           1 :                         metrics[f.Level] = levelMetrics
    2215           1 :                 }
    2216           1 :                 levelMetrics.NumFiles++
    2217           1 :                 levelMetrics.Size += int64(m.Size)
    2218           1 :                 levelMetrics.BytesIngested += m.Size
    2219           1 :                 levelMetrics.TablesIngested++
    2220             :         }
    2221             :         // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
    2222             :         // by ingestTargetLevel), to files that were created to replace it. This map
    2223             :         // is used to resolve references to split files in filesToSplit, as it is
    2224             :         // possible for a file that we want to split to no longer exist or have a
    2225             :         // newer fileMetadata due to a split induced by another ingestion file, or an
    2226             :         // excise.
    2227           1 :         replacedFiles := make(map[base.FileNum][]newFileEntry)
    2228           1 :         updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
    2229           1 :                 levelMetrics := metrics[level]
    2230           1 :                 if levelMetrics == nil {
    2231           1 :                         levelMetrics = &LevelMetrics{}
    2232           1 :                         metrics[level] = levelMetrics
    2233           1 :                 }
    2234           1 :                 levelMetrics.NumFiles--
    2235           1 :                 levelMetrics.Size -= int64(m.Size)
    2236           1 :                 for i := range added {
    2237           1 :                         levelMetrics.NumFiles++
    2238           1 :                         levelMetrics.Size += int64(added[i].Meta.Size)
    2239           1 :                 }
    2240             :         }
    2241           1 :         if exciseSpan.Valid() {
    2242           1 :                 // Iterate through all levels and find files that intersect with exciseSpan.
    2243           1 :                 //
    2244           1 :                 // TODO(bilal): We could drop the DB mutex here as we don't need it for
    2245           1 :                 // excises; we only need to hold the version lock which we already are
    2246           1 :                 // holding. However releasing the DB mutex could mess with the
    2247           1 :                 // ingestTargetLevel calculation that happened above, as it assumed that it
    2248           1 :                 // had a complete view of in-progress compactions that wouldn't change
    2249           1 :                 // until logAndApply is called. If we were to drop the mutex now, we could
    2250           1 :                 // schedule another in-progress compaction that would go into the chosen target
    2251           1 :                 // level and lead to file overlap within level (which would panic in
    2252           1 :                 // logAndApply). We should drop the db mutex here, do the excise, then
    2253           1 :                 // re-grab the DB mutex and rerun just the in-progress compaction check to
    2254           1 :                 // see if any new compactions are conflicting with our chosen target levels
    2255           1 :                 // for files, and if they are, we should signal those compactions to error
    2256           1 :                 // out.
    2257           1 :                 for level := range current.Levels {
    2258           1 :                         overlaps := current.Overlaps(level, d.cmp, exciseSpan.Start, exciseSpan.End, true /* exclusiveEnd */)
    2259           1 :                         iter := overlaps.Iter()
    2260           1 : 
    2261           1 :                         for m := iter.First(); m != nil; m = iter.Next() {
    2262           1 :                                 newFiles, err := d.excise(exciseSpan, m, ve, level)
    2263           1 :                                 if err != nil {
    2264           0 :                                         return nil, err
    2265           0 :                                 }
    2266             : 
    2267           1 :                                 if _, ok := ve.DeletedFiles[deletedFileEntry{
    2268           1 :                                         Level:   level,
    2269           1 :                                         FileNum: m.FileNum,
    2270           1 :                                 }]; !ok {
    2271           1 :                                         // We did not excise this file.
    2272           1 :                                         continue
    2273             :                                 }
    2274           1 :                                 replacedFiles[m.FileNum] = newFiles
    2275           1 :                                 updateLevelMetricsOnExcise(m, level, newFiles)
    2276             :                         }
    2277             :                 }
    2278             :         }
    2279           1 :         if len(filesToSplit) > 0 {
    2280           1 :                 // For the same reasons as the above call to excise, we hold the db mutex
    2281           1 :                 // while calling this method.
    2282           1 :                 if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
    2283           0 :                         return nil, err
    2284           0 :                 }
    2285             :         }
    2286           1 :         if len(filesToSplit) > 0 || exciseSpan.Valid() {
    2287           1 :                 for c := range d.mu.compact.inProgress {
    2288           1 :                         if c.versionEditApplied {
    2289           0 :                                 continue
    2290             :                         }
    2291             :                         // Check if this compaction overlaps with the excise span. Note that just
    2292             :                         // checking if the inputs individually overlap with the excise span
    2293             :                         // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
    2294             :                         // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
    2295             :                         // doing a [c,d) excise at the same time as this compaction, we will have
    2296             :                         // to error out the whole compaction as we can't guarantee it hasn't/won't
    2297             :                         // write a file overlapping with the excise span.
    2298           1 :                         if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
    2299           1 :                                 c.cancel.Store(true)
    2300           1 :                         }
    2301             :                         // Check if this compaction's inputs have been replaced due to an
    2302             :                         // ingest-time split. In that case, cancel the compaction as a newly picked
    2303             :                         // compaction would need to include any new files that slid in between
    2304             :                         // previously-existing files. Note that we cancel any compaction that has a
    2305             :                         // file that was ingest-split as an input, even if it started before this
    2306             :                         // ingestion.
    2307           1 :                         if checkCompactions {
    2308           0 :                                 for i := range c.inputs {
    2309           0 :                                         iter := c.inputs[i].files.Iter()
    2310           0 :                                         for f := iter.First(); f != nil; f = iter.Next() {
    2311           0 :                                                 if _, ok := replacedFiles[f.FileNum]; ok {
    2312           0 :                                                         c.cancel.Store(true)
    2313           0 :                                                         break
    2314             :                                                 }
    2315             :                                         }
    2316             :                                 }
    2317             :                         }
    2318             :                 }
    2319             :         }
    2320           1 :         if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
    2321           1 :                 return d.getInProgressCompactionInfoLocked(nil)
    2322           1 :         }); err != nil {
    2323           1 :                 return nil, err
    2324           1 :         }
    2325             : 
    2326             :         // Check for any EventuallyFileOnlySnapshots that could be watching for
    2327             :         // an excise on this span. There should be none as the
    2328             :         // computePossibleOverlaps steps should have forced these EFOS to transition
    2329             :         // to file-only snapshots by now. If we see any that conflict with this
    2330             :         // excise, panic.
    2331           1 :         if exciseSpan.Valid() {
    2332           1 :                 for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
    2333           0 :                         // Skip non-EFOS snapshots, and also skip any EFOS that were created
    2334           0 :                         // *after* the excise.
    2335           0 :                         if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.InternalKeySeqNumMax) {
    2336           0 :                                 continue
    2337             :                         }
    2338           0 :                         efos := s.efos
    2339           0 :                         // TODO(bilal): We can make this faster by taking advantage of the sorted
    2340           0 :                         // nature of protectedRanges to do a sort.Search, or even maintaining a
    2341           0 :                         // global list of all protected ranges instead of having to peer into every
    2342           0 :                         // snapshot.
    2343           0 :                         for i := range efos.protectedRanges {
    2344           0 :                                 if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
    2345           0 :                                         panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
    2346             :                                 }
    2347             :                         }
    2348             :                 }
    2349             :         }
    2350             : 
    2351           1 :         d.mu.versions.metrics.Ingest.Count++
    2352           1 : 
    2353           1 :         d.updateReadStateLocked(d.opts.DebugCheck)
    2354           1 :         // updateReadStateLocked could have generated obsolete tables, schedule a
    2355           1 :         // cleanup job if necessary.
    2356           1 :         d.deleteObsoleteFiles(jobID)
    2357           1 :         d.updateTableStatsLocked(ve.NewFiles)
    2358           1 :         // The ingestion may have pushed a level over the threshold for compaction,
    2359           1 :         // so check to see if one is necessary and schedule it.
    2360           1 :         d.maybeScheduleCompaction()
    2361           1 :         var toValidate []manifest.NewFileEntry
    2362           1 :         dedup := make(map[base.DiskFileNum]struct{})
    2363           1 :         for _, entry := range ve.NewFiles {
    2364           1 :                 if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
    2365           1 :                         toValidate = append(toValidate, entry)
    2366           1 :                         dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
    2367           1 :                 }
    2368             :         }
    2369           1 :         d.maybeValidateSSTablesLocked(toValidate)
    2370           1 :         return ve, nil
    2371             : }
    2372             : 
    2373             : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
    2374             : // queue of files to be validated, when the feature is enabled.
    2375             : //
    2376             : // Note that if two entries with the same backing file are added twice, then the
    2377             : // block checksums for the backing file will be validated twice.
    2378             : //
    2379             : // DB.mu must be locked when calling.
    2380           1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
    2381           1 :         // Only add to the validation queue when the feature is enabled.
    2382           1 :         if !d.opts.Experimental.ValidateOnIngest {
    2383           1 :                 return
    2384           1 :         }
    2385             : 
    2386           1 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
    2387           1 :         if d.shouldValidateSSTablesLocked() {
    2388           1 :                 go d.validateSSTables()
    2389           1 :         }
    2390             : }
    2391             : 
    2392             : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
    2393             : // DB.mu must be locked when calling.
    2394           1 : func (d *DB) shouldValidateSSTablesLocked() bool {
    2395           1 :         return !d.mu.tableValidation.validating &&
    2396           1 :                 d.closed.Load() == nil &&
    2397           1 :                 d.opts.Experimental.ValidateOnIngest &&
    2398           1 :                 len(d.mu.tableValidation.pending) > 0
    2399           1 : }
    2400             : 
    2401             : // validateSSTables runs a round of validation on the tables in the pending
    2402             : // queue.
    2403           1 : func (d *DB) validateSSTables() {
    2404           1 :         d.mu.Lock()
    2405           1 :         if !d.shouldValidateSSTablesLocked() {
    2406           1 :                 d.mu.Unlock()
    2407           1 :                 return
    2408           1 :         }
    2409             : 
    2410           1 :         pending := d.mu.tableValidation.pending
    2411           1 :         d.mu.tableValidation.pending = nil
    2412           1 :         d.mu.tableValidation.validating = true
    2413           1 :         jobID := d.mu.nextJobID
    2414           1 :         d.mu.nextJobID++
    2415           1 :         rs := d.loadReadState()
    2416           1 : 
    2417           1 :         // Drop DB.mu before performing IO.
    2418           1 :         d.mu.Unlock()
    2419           1 : 
    2420           1 :         // Validate all tables in the pending queue. This could lead to a situation
    2421           1 :         // where we are starving IO from other tasks due to having to page through
    2422           1 :         // all the blocks in all the sstables in the queue.
    2423           1 :         // TODO(travers): Add some form of pacing to avoid IO starvation.
    2424           1 : 
    2425           1 :         // If we fail to validate any files due to reasons other than uncovered
    2426           1 :         // corruption, accumulate them and re-queue them for another attempt.
    2427           1 :         var retry []manifest.NewFileEntry
    2428           1 : 
    2429           1 :         for _, f := range pending {
    2430           1 :                 // The file may have been moved or deleted since it was ingested, in
    2431           1 :                 // which case we skip.
    2432           1 :                 if !rs.current.Contains(f.Level, d.cmp, f.Meta) {
    2433           0 :                         // Assume the file was moved to a lower level. It is rare enough
    2434           0 :                         // that a table is moved or deleted between the time it was ingested
    2435           0 :                         // and the time the validation routine runs that the overall cost of
    2436           0 :                         // this inner loop is tolerably low, when amortized over all
    2437           0 :                         // ingested tables.
    2438           0 :                         found := false
    2439           0 :                         for i := f.Level + 1; i < numLevels; i++ {
    2440           0 :                                 if rs.current.Contains(i, d.cmp, f.Meta) {
    2441           0 :                                         found = true
    2442           0 :                                         break
    2443             :                                 }
    2444             :                         }
    2445           0 :                         if !found {
    2446           0 :                                 continue
    2447             :                         }
    2448             :                 }
    2449             : 
    2450           1 :                 var err error
    2451           1 :                 if f.Meta.Virtual {
    2452           0 :                         err = d.tableCache.withVirtualReader(
    2453           0 :                                 f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
    2454           0 :                                         return v.ValidateBlockChecksumsOnBacking()
    2455           0 :                                 })
    2456           1 :                 } else {
    2457           1 :                         err = d.tableCache.withReader(
    2458           1 :                                 f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
    2459           1 :                                         return r.ValidateBlockChecksums()
    2460           1 :                                 })
    2461             :                 }
    2462             : 
    2463           1 :                 if err != nil {
    2464           1 :                         if IsCorruptionError(err) {
    2465           1 :                                 // TODO(travers): Hook into the corruption reporting pipeline, once
    2466           1 :                                 // available. See pebble#1192.
    2467           1 :                                 d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
    2468           1 :                         } else {
    2469           1 :                                 // If there was some other, possibly transient, error that
    2470           1 :                                 // caused table validation to fail inform the EventListener and
    2471           1 :                                 // move on. We remember the table so that we can retry it in a
    2472           1 :                                 // subsequent table validation job.
    2473           1 :                                 //
    2474           1 :                                 // TODO(jackson): If the error is not transient, this will retry
    2475           1 :                                 // validation indefinitely. While not great, it's the same
    2476           1 :                                 // behavior as erroring flushes and compactions. We should
    2477           1 :                                 // address this as a part of #270.
    2478           1 :                                 d.opts.EventListener.BackgroundError(err)
    2479           1 :                                 retry = append(retry, f)
    2480           1 :                                 continue
    2481             :                         }
    2482             :                 }
    2483             : 
    2484           1 :                 d.opts.EventListener.TableValidated(TableValidatedInfo{
    2485           1 :                         JobID: jobID,
    2486           1 :                         Meta:  f.Meta,
    2487           1 :                 })
    2488             :         }
    2489           1 :         rs.unref()
    2490           1 :         d.mu.Lock()
    2491           1 :         defer d.mu.Unlock()
    2492           1 :         d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
    2493           1 :         d.mu.tableValidation.validating = false
    2494           1 :         d.mu.tableValidation.cond.Broadcast()
    2495           1 :         if d.shouldValidateSSTablesLocked() {
    2496           1 :                 go d.validateSSTables()
    2497           1 :         }
    2498             : }

Generated by: LCOV version 1.14