LCOV - code coverage report
Current view: top level - pebble - scan_internal.go (source / functions) Coverage Total Hit
Test: 2025-09-06 08:17Z d86f6dab - tests only.lcov Lines: 84.4 % 816 689
Test Date: 2025-09-06 08:19:26 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "slices"
      11              :         "sync"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/keyspan"
      17              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18              :         "github.com/cockroachdb/pebble/internal/manifest"
      19              :         "github.com/cockroachdb/pebble/internal/treeprinter"
      20              :         "github.com/cockroachdb/pebble/objstorage"
      21              :         "github.com/cockroachdb/pebble/objstorage/remote"
      22              :         "github.com/cockroachdb/pebble/sstable"
      23              :         "github.com/cockroachdb/pebble/sstable/blob"
      24              :         "github.com/cockroachdb/pebble/sstable/block"
      25              : )
      26              : 
      27              : const (
      28              :         // In skip-shared iteration mode, keys in levels greater than
      29              :         // sharedLevelsStart (i.e. lower in the LSM) are skipped. Keys
      30              :         // in sharedLevelsStart are returned iff they are not in a
      31              :         // shared file.
      32              :         sharedLevelsStart = remote.SharedLevelsStart
      33              : 
      34              :         // In skip-external iteration mode, keys in levels greater
      35              :         // than externalSkipStart are skipped. Keys in
      36              :         // externalSkipStart are returned iff they are not in an
      37              :         // external file.
      38              :         externalSkipStart = 6
      39              : )
      40              : 
      41              : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
      42              : // with a shared file visitor function, and a file in a shareable level (i.e.
      43              : // level >= sharedLevelsStart) was found to not be in shared storage according
      44              : // to objstorage.Provider, or not shareable for another reason such as for
      45              : // containing keys newer than the snapshot sequence number.
      46              : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
      47              : 
      48              : // SharedSSTMeta represents an sstable on shared storage that can be ingested
      49              : // by another pebble instance. This struct must contain all fields that are
      50              : // required for a Pebble instance to ingest a foreign sstable on shared storage,
      51              : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
      52              : // data structures, as well as creating virtual TableMetadatas.
      53              : //
      54              : // Note that the Pebble instance creating and returning a SharedSSTMeta might
      55              : // not be the one that created the underlying sstable on shared storage to begin
      56              : // with; it's possible for a Pebble instance to reshare an sstable that was
      57              : // shared to it.
      58              : type SharedSSTMeta struct {
      59              :         // Backing is the shared object underlying this SST. Can be attached to an
      60              :         // objstorage.Provider.
      61              :         Backing objstorage.RemoteObjectBackingHandle
      62              : 
      63              :         // Smallest and Largest internal keys for the overall bounds. The kind and
      64              :         // SeqNum of these will reflect what is physically present on the source Pebble
      65              :         // instance's view of the sstable; it's up to the ingesting instance to set the
      66              :         // sequence number in the trailer to match the read-time sequence numbers
      67              :         // reserved for the level this SST is being ingested into. The Kind is expected
      68              :         // to remain unchanged by the ingesting instance.
      69              :         //
      70              :         // Note that these bounds could be narrower than the bounds of the underlying
      71              :         // sstable; ScanInternal is expected to truncate sstable bounds to the user key
      72              :         // bounds passed into that method.
      73              :         Smallest, Largest InternalKey
      74              : 
      75              :         // SmallestRangeKey and LargestRangeKey are internal keys that denote the
      76              :         // range key bounds of this sstable. Must lie within [Smallest, Largest].
      77              :         SmallestRangeKey, LargestRangeKey InternalKey
      78              : 
      79              :         // SmallestPointKey and LargestPointKey are internal keys that denote the
      80              :         // point key bounds of this sstable. Must lie within [Smallest, Largest].
      81              :         SmallestPointKey, LargestPointKey InternalKey
      82              : 
      83              :         // Level denotes the level at which this file was present at read time.
      84              :         // For files visited by ScanInternal, this value will only be 5 or 6.
      85              :         Level uint8
      86              : 
      87              :         // Size contains an estimate of the size of this sstable.
      88              :         Size uint64
      89              : 
      90              :         // tableNum at time of creation in the creator instance. Only used for
      91              :         // debugging/tests.
      92              :         tableNum base.TableNum
      93              : }
      94              : 
      95            1 : func (s *SharedSSTMeta) cloneFromFileMeta(f *manifest.TableMetadata) {
      96            1 :         *s = SharedSSTMeta{
      97            1 :                 Smallest:         f.Smallest().Clone(),
      98            1 :                 Largest:          f.Largest().Clone(),
      99            1 :                 SmallestPointKey: f.PointKeyBounds.Smallest().Clone(),
     100            1 :                 LargestPointKey:  f.PointKeyBounds.Largest().Clone(),
     101            1 :                 Size:             f.Size,
     102            1 :                 tableNum:         f.TableNum,
     103            1 :         }
     104            1 :         if f.HasRangeKeys {
     105            1 :                 s.SmallestRangeKey = f.RangeKeyBounds.Smallest().Clone()
     106            1 :                 s.LargestRangeKey = f.RangeKeyBounds.Largest().Clone()
     107            1 :         }
     108              : }
     109              : 
     110              : // ScanInternal scans all internal keys within the specified bounds, truncating
     111              : // any rangedels and rangekeys to those bounds if they span past them. For use
     112              : // when an external user needs to be aware of all internal keys that make up a
     113              : // key range.
     114              : //
     115              : // Keys deleted by range deletions must not be returned or exposed by this
     116              : // method, while the range deletion deleting that key must be exposed using
     117              : // visitRangeDel. Keys that would be masked by range key masking (if an
     118              : // appropriate prefix were set) should be exposed, alongside the range key
     119              : // that would have masked it. This method also collapses all point keys into
     120              : // one InternalKey; so only one internal key at most per user key is returned
     121              : // to visitPointKey.
     122              : //
     123              : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
     124              : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
     125              : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
     126              : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
     127              : // sstable in L5 or L6 is found that is not in shared storage according to
     128              : // provider.IsShared, or an sstable in those levels contains a newer key than the
     129              : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
     130              : // of when this could happen could be if Pebble started writing sstables before a
     131              : // creator ID was set (as creator IDs are necessary to enable shared storage)
     132              : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
     133              : // iteration is invalid in those cases.
     134            1 : func (d *DB) ScanInternal(ctx context.Context, opts ScanInternalOptions) (err error) {
     135            1 :         var iter *scanInternalIterator
     136            1 :         iter, err = d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, &opts)
     137            1 :         if err != nil {
     138            0 :                 return err
     139            0 :         }
     140            1 :         defer func() { err = errors.CombineErrors(err, iter.Close()) }()
     141            1 :         return scanInternalImpl(ctx, iter, &opts)
     142              : }
     143              : 
     144              : // newInternalIter constructs and returns a new scanInternalIterator on this db.
     145              : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
     146              : // to the internal iterator.
     147              : //
     148              : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
     149              : // finishInitializingIter. Both pairs of methods should be refactored to reduce
     150              : // this duplication.
     151              : func (d *DB) newInternalIter(
     152              :         ctx context.Context, sOpts snapshotIterOpts, o *ScanInternalOptions,
     153            1 : ) (*scanInternalIterator, error) {
     154            1 :         if err := d.closed.Load(); err != nil {
     155            0 :                 panic(err)
     156              :         }
     157              :         // Grab and reference the current readState. This prevents the underlying
     158              :         // files in the associated version from being deleted if there is a current
     159              :         // compaction. The readState is unref'd by Iterator.Close().
     160            1 :         var readState *readState
     161            1 :         var vers *manifest.Version
     162            1 :         if sOpts.vers == nil {
     163            1 :                 if sOpts.readState != nil {
     164            0 :                         readState = sOpts.readState
     165            0 :                         readState.ref()
     166            0 :                         vers = readState.current
     167            1 :                 } else {
     168            1 :                         readState = d.loadReadState()
     169            1 :                         vers = readState.current
     170            1 :                 }
     171            1 :         } else {
     172            1 :                 vers = sOpts.vers
     173            1 :                 sOpts.vers.Ref()
     174            1 :         }
     175              : 
     176              :         // Determine the seqnum to read at after grabbing the read state (current and
     177              :         // memtables) above.
     178            1 :         seqNum := sOpts.seqNum
     179            1 :         if seqNum == 0 {
     180            1 :                 seqNum = d.mu.versions.visibleSeqNum.Load()
     181            1 :         }
     182              : 
     183              :         // Bundle various structures under a single umbrella in order to allocate
     184              :         // them together.
     185            1 :         buf := scanInternalIteratorIterAllocPool.Get().(*scanInternalIterAlloc)
     186            1 :         dbi := &buf.scanIter
     187            1 :         *dbi = scanInternalIterator{
     188            1 :                 ctx:             ctx,
     189            1 :                 db:              d,
     190            1 :                 comparer:        d.opts.Comparer,
     191            1 :                 merge:           d.opts.Merger.Merge,
     192            1 :                 readState:       readState,
     193            1 :                 version:         sOpts.vers,
     194            1 :                 alloc:           buf,
     195            1 :                 newIters:        d.newIters,
     196            1 :                 newIterRangeKey: d.tableNewRangeKeyIter,
     197            1 :                 seqNum:          seqNum,
     198            1 :                 mergingIter:     &buf.iterAlloc.merging,
     199            1 :         }
     200            1 :         dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{},
     201            1 :                 blob.SuggestedCachedReaders(vers.MaxReadAmp()))
     202            1 : 
     203            1 :         dbi.opts = *o
     204            1 :         dbi.opts.logger = d.opts.Logger
     205            1 :         if d.opts.private.disableLazyCombinedIteration {
     206            0 :                 dbi.opts.disableLazyCombinedIteration = true
     207            0 :         }
     208            1 :         return finishInitializingInternalIter(buf, dbi)
     209              : }
     210              : 
     211              : type internalIterOpts struct {
     212              :         // if compaction is set, sstable-level iterators will be created using
     213              :         // NewCompactionIter; these iterators have a more constrained interface
     214              :         // and are optimized for the sequential scan of a compaction.
     215              :         compaction         bool
     216              :         readEnv            sstable.ReadEnv
     217              :         boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
     218              :         // blobValueFetcher is the base.ValueFetcher to use when constructing
     219              :         // internal values to represent values stored externally in blob files.
     220              :         blobValueFetcher base.ValueFetcher
     221              : }
     222              : 
     223              : func finishInitializingInternalIter(
     224              :         buf *scanInternalIterAlloc, i *scanInternalIterator,
     225            1 : ) (*scanInternalIterator, error) {
     226            1 :         // Short-hand.
     227            1 :         var memtables flushableList
     228            1 :         if i.readState != nil {
     229            1 :                 memtables = i.readState.memtables
     230            1 :         }
     231              :         // We only need to read from memtables which contain sequence numbers older
     232              :         // than seqNum. Trim off newer memtables.
     233            1 :         for j := len(memtables) - 1; j >= 0; j-- {
     234            1 :                 if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
     235            1 :                         break
     236              :                 }
     237            1 :                 memtables = memtables[:j]
     238              :         }
     239            1 :         i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
     240            1 : 
     241            1 :         if err := i.constructPointIter(i.opts.Category, memtables, &buf.iterAlloc); err != nil {
     242            0 :                 return nil, err
     243            0 :         }
     244              : 
     245              :         // For internal iterators, we skip the lazy combined iteration optimization
     246              :         // entirely, and create the range key iterator stack directly.
     247            1 :         i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
     248            1 :         if err := i.constructRangeKeyIter(); err != nil {
     249            0 :                 return nil, err
     250            0 :         }
     251              : 
     252              :         // Wrap the point iterator (currently i.iter) with an interleaving
     253              :         // iterator that interleaves range keys pulled from
     254              :         // i.rangeKey.rangeKeyIter.
     255            1 :         i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
     256            1 :                 keyspan.InterleavingIterOpts{
     257            1 :                         LowerBound: i.opts.LowerBound,
     258            1 :                         UpperBound: i.opts.UpperBound,
     259            1 :                 })
     260            1 :         i.iter = &i.rangeKey.iiter
     261            1 : 
     262            1 :         return i, nil
     263              : }
     264              : 
     265              : type sharedByLevel []SharedSSTMeta
     266              : 
     267            1 : func (s sharedByLevel) Len() int           { return len(s) }
     268            0 : func (s sharedByLevel) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
     269            1 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
     270              : 
     271              : type pcIterPos int
     272              : 
     273              : const (
     274              :         pcIterPosCur pcIterPos = iota
     275              :         pcIterPosNext
     276              : )
     277              : 
     278              : // pointCollapsingIterator is an internalIterator that collapses point keys and
     279              : // returns at most one point internal key for each user key. Merges and
     280              : // SingleDels are not supported and result in a panic if encountered. Point keys
     281              : // deleted by rangedels are considered shadowed and not exposed.
     282              : //
     283              : // Only used in ScanInternal to return at most one internal key per user key.
     284              : type pointCollapsingIterator struct {
     285              :         iter     keyspan.InterleavingIter
     286              :         pos      pcIterPos
     287              :         comparer *base.Comparer
     288              :         merge    base.Merge
     289              :         err      error
     290              :         seqNum   base.SeqNum
     291              :         // The current position of `iter`. Always owned by the underlying iter.
     292              :         iterKV *base.InternalKV
     293              :         // The last saved key. findNextEntry and similar methods are expected to save
     294              :         // the current value of iterKey to savedKey if they're iterating away from the
     295              :         // current key but still need to retain it. See comments in findNextEntry on
     296              :         // how this field is used.
     297              :         //
     298              :         // At the end of a positioning call:
     299              :         //  - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
     300              :         //    by `iter` while savedKey is holding a copy to our current key.
     301              :         //  - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
     302              :         //    key, and savedKey is either undefined or pointing to a version of the
     303              :         //    current key owned by this iterator (i.e. backed by savedKeyBuf).
     304              :         savedKey    InternalKey
     305              :         savedKeyBuf []byte
     306              :         // If fixedSeqNum is non-zero, all emitted points are verified to have this
     307              :         // fixed sequence number.
     308              :         fixedSeqNum base.SeqNum
     309              : }
     310              : 
     311            1 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
     312            1 :         return p.iter.Span()
     313            1 : }
     314              : 
     315              : // SeekPrefixGE implements the InternalIterator interface.
     316              : func (p *pointCollapsingIterator) SeekPrefixGE(
     317              :         prefix, key []byte, flags base.SeekGEFlags,
     318            0 : ) *base.InternalKV {
     319            0 :         p.resetKey()
     320            0 :         p.iterKV = p.iter.SeekPrefixGE(prefix, key, flags)
     321            0 :         p.pos = pcIterPosCur
     322            0 :         if p.iterKV == nil {
     323            0 :                 return nil
     324            0 :         }
     325            0 :         return p.findNextEntry()
     326              : }
     327              : 
     328              : // SeekGE implements the InternalIterator interface.
     329            1 : func (p *pointCollapsingIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
     330            1 :         p.resetKey()
     331            1 :         p.iterKV = p.iter.SeekGE(key, flags)
     332            1 :         p.pos = pcIterPosCur
     333            1 :         if p.iterKV == nil {
     334            1 :                 return nil
     335            1 :         }
     336            1 :         return p.findNextEntry()
     337              : }
     338              : 
     339              : // SeekLT implements the InternalIterator interface.
     340            0 : func (p *pointCollapsingIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
     341            0 :         panic("unimplemented")
     342              : }
     343              : 
     344            1 : func (p *pointCollapsingIterator) resetKey() {
     345            1 :         p.savedKey.UserKey = p.savedKeyBuf[:0]
     346            1 :         p.savedKey.Trailer = 0
     347            1 :         p.iterKV = nil
     348            1 :         p.pos = pcIterPosCur
     349            1 : }
     350              : 
     351            1 : func (p *pointCollapsingIterator) verifySeqNum(kv *base.InternalKV) *base.InternalKV {
     352            1 :         if !invariants.Enabled {
     353            0 :                 return kv
     354            0 :         }
     355            1 :         if p.fixedSeqNum == 0 || kv == nil || kv.Kind() == InternalKeyKindRangeDelete {
     356            1 :                 return kv
     357            1 :         }
     358            0 :         if kv.SeqNum() != p.fixedSeqNum {
     359            0 :                 panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, kv.SeqNum()))
     360              :         }
     361            0 :         return kv
     362              : }
     363              : 
     364              : // findNextEntry is called to return the next key. p.iter must be positioned at the
     365              : // start of the first user key we are interested in.
     366            1 : func (p *pointCollapsingIterator) findNextEntry() *base.InternalKV {
     367            1 :         p.saveKey()
     368            1 :         // Saves a comparison in the fast path
     369            1 :         firstIteration := true
     370            1 :         for p.iterKV != nil {
     371            1 :                 // NB: p.savedKey is either the current key (iff p.iterKV == firstKey),
     372            1 :                 // or the previous key.
     373            1 :                 if !firstIteration && !p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
     374            1 :                         p.saveKey()
     375            1 :                         continue
     376              :                 }
     377            1 :                 firstIteration = false
     378            1 :                 if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKV.SeqNum()) {
     379            1 :                         // All future keys for this user key must be deleted.
     380            1 :                         if p.savedKey.Kind() == InternalKeyKindSingleDelete {
     381            0 :                                 panic("cannot process singledel key in point collapsing iterator")
     382              :                         }
     383              :                         // Fast forward to the next user key.
     384            1 :                         p.saveKey()
     385            1 :                         p.iterKV = p.iter.Next()
     386            1 :                         for p.iterKV != nil && p.savedKey.SeqNum() >= p.iterKV.SeqNum() && p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
     387            1 :                                 p.iterKV = p.iter.Next()
     388            1 :                         }
     389            1 :                         continue
     390              :                 }
     391            1 :                 switch p.savedKey.Kind() {
     392            1 :                 case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
     393            1 :                         // Note that we return SETs directly, even if they would otherwise get
     394            1 :                         // compacted into a Del to turn into a SetWithDelete. This is a fast
     395            1 :                         // path optimization that can break SINGLEDEL determinism. To lead to
     396            1 :                         // consistent SINGLEDEL behaviour, this iterator should *not* be used for
     397            1 :                         // a keyspace where SINGLEDELs could be in use. If this iterator observes
     398            1 :                         // a SINGLEDEL as the first internal key for a user key, it will panic.
     399            1 :                         //
     400            1 :                         // As p.value is a lazy value owned by the child iterator, we can thread
     401            1 :                         // it through without loading it into p.valueBuf.
     402            1 :                         //
     403            1 :                         // TODO(bilal): We can even avoid saving the key in this fast path if
     404            1 :                         // we are in a block where setHasSamePrefix = false in a v3 sstable,
     405            1 :                         // guaranteeing that there's only one internal key for each user key.
     406            1 :                         // Thread this logic through the sstable iterators and/or consider
     407            1 :                         // collapsing (ha) this logic into the sstable iterators that are aware
     408            1 :                         // of blocks and can determine user key changes without doing key saves
     409            1 :                         // or comparisons.
     410            1 :                         p.pos = pcIterPosCur
     411            1 :                         return p.verifySeqNum(p.iterKV)
     412            0 :                 case InternalKeyKindSingleDelete:
     413            0 :                         // Panic, as this iterator is not expected to observe single deletes.
     414            0 :                         panic("cannot process singledel key in point collapsing iterator")
     415            0 :                 case InternalKeyKindMerge:
     416            0 :                         // Panic, as this iterator is not expected to observe merges.
     417            0 :                         panic("cannot process merge key in point collapsing iterator")
     418            1 :                 case InternalKeyKindRangeDelete:
     419            1 :                         // These are interleaved by the interleaving iterator ahead of all points.
     420            1 :                         // We should pass them as-is, but also account for any points ahead of
     421            1 :                         // them.
     422            1 :                         p.pos = pcIterPosCur
     423            1 :                         return p.verifySeqNum(p.iterKV)
     424            0 :                 default:
     425            0 :                         panic(fmt.Sprintf("unexpected kind: %d", p.iterKV.Kind()))
     426              :                 }
     427              :         }
     428            0 :         p.resetKey()
     429            0 :         return nil
     430              : }
     431              : 
     432              : // First implements the InternalIterator interface.
     433            1 : func (p *pointCollapsingIterator) First() *base.InternalKV {
     434            1 :         p.resetKey()
     435            1 :         p.iterKV = p.iter.First()
     436            1 :         p.pos = pcIterPosCur
     437            1 :         if p.iterKV == nil {
     438            0 :                 return nil
     439            0 :         }
     440            1 :         return p.findNextEntry()
     441              : }
     442              : 
     443              : // Last implements the InternalIterator interface.
     444            0 : func (p *pointCollapsingIterator) Last() *base.InternalKV {
     445            0 :         panic("unimplemented")
     446              : }
     447              : 
     448            1 : func (p *pointCollapsingIterator) saveKey() {
     449            1 :         if p.iterKV == nil {
     450            1 :                 p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
     451            1 :                 return
     452            1 :         }
     453            1 :         p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKV.K.UserKey...)
     454            1 :         p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKV.K.Trailer}
     455              : }
     456              : 
     457              : // Next implements the InternalIterator interface.
     458            1 : func (p *pointCollapsingIterator) Next() *base.InternalKV {
     459            1 :         switch p.pos {
     460            1 :         case pcIterPosCur:
     461            1 :                 p.saveKey()
     462            1 :                 if p.iterKV != nil && p.iterKV.Kind() == InternalKeyKindRangeDelete {
     463            1 :                         // Step over the interleaved range delete and process the very next
     464            1 :                         // internal key, even if it's at the same user key. This is because a
     465            1 :                         // point for that user key has not been returned yet.
     466            1 :                         p.iterKV = p.iter.Next()
     467            1 :                         break
     468              :                 }
     469              :                 // Fast forward to the next user key.
     470            1 :                 kv := p.iter.Next()
     471            1 :                 // p.iterKV.SeqNum() >= key.SeqNum() is an optimization that allows us to
     472            1 :                 // use p.iterKV.SeqNum() < key.SeqNum() as a sign that the user key has
     473            1 :                 // changed, without needing to do the full key comparison.
     474            1 :                 for kv != nil && p.savedKey.SeqNum() >= kv.SeqNum() &&
     475            1 :                         p.comparer.Equal(p.savedKey.UserKey, kv.K.UserKey) {
     476            1 :                         kv = p.iter.Next()
     477            1 :                 }
     478            1 :                 if kv == nil {
     479            1 :                         // There are no keys to return.
     480            1 :                         p.resetKey()
     481            1 :                         return nil
     482            1 :                 }
     483            1 :                 p.iterKV = kv
     484            0 :         case pcIterPosNext:
     485            0 :                 p.pos = pcIterPosCur
     486              :         }
     487            1 :         if p.iterKV == nil {
     488            1 :                 p.resetKey()
     489            1 :                 return nil
     490            1 :         }
     491            1 :         return p.findNextEntry()
     492              : }
     493              : 
     494              : // NextPrefix implements the InternalIterator interface.
     495            0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) *base.InternalKV {
     496            0 :         panic("unimplemented")
     497              : }
     498              : 
     499              : // Prev implements the InternalIterator interface.
     500            0 : func (p *pointCollapsingIterator) Prev() *base.InternalKV {
     501            0 :         panic("unimplemented")
     502              : }
     503              : 
     504              : // Error implements the InternalIterator interface.
     505            1 : func (p *pointCollapsingIterator) Error() error {
     506            1 :         if p.err != nil {
     507            0 :                 return p.err
     508            0 :         }
     509            1 :         return p.iter.Error()
     510              : }
     511              : 
     512              : // Close implements the InternalIterator interface.
     513            1 : func (p *pointCollapsingIterator) Close() error {
     514            1 :         return p.iter.Close()
     515            1 : }
     516              : 
     517              : // SetBounds implements the InternalIterator interface.
     518            0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
     519            0 :         p.resetKey()
     520            0 :         p.iter.SetBounds(lower, upper)
     521            0 : }
     522              : 
     523            0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
     524            0 :         p.iter.SetContext(ctx)
     525            0 : }
     526              : 
     527              : // DebugTree is part of the InternalIterator interface.
     528            0 : func (p *pointCollapsingIterator) DebugTree(tp treeprinter.Node) {
     529            0 :         n := tp.Childf("%T(%p)", p, p)
     530            0 :         p.iter.DebugTree(n)
     531            0 : }
     532              : 
     533              : // String implements the InternalIterator interface.
     534            0 : func (p *pointCollapsingIterator) String() string {
     535            0 :         return p.iter.String()
     536            0 : }
     537              : 
     538              : var _ internalIterator = &pointCollapsingIterator{}
     539              : 
     540              : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
     541              : // is unknown, belongs to a flushable, or belongs to an LSM level type.
     542              : type IteratorLevelKind int8
     543              : 
     544              : const (
     545              :         // IteratorLevelUnknown indicates an unknown LSM level.
     546              :         IteratorLevelUnknown IteratorLevelKind = iota
     547              :         // IteratorLevelLSM indicates an LSM level.
     548              :         IteratorLevelLSM
     549              :         // IteratorLevelFlushable indicates a flushable (i.e. memtable).
     550              :         IteratorLevelFlushable
     551              : )
     552              : 
     553              : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
     554              : // Note: this is struct is only provided for point keys.
     555              : type IteratorLevel struct {
     556              :         Kind IteratorLevelKind
     557              :         // FlushableIndex indicates the position within the flushable queue of this level.
     558              :         // Only valid if kind == IteratorLevelFlushable.
     559              :         FlushableIndex int
     560              :         // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
     561              :         Level int
     562              :         // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
     563              :         Sublevel int
     564              : }
     565              : 
     566              : // scanInternalIterator is an iterator that returns all internal keys, including
     567              : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
     568              : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
     569              : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
     570              : // one with the higher sequence is returned. Useful if an external user of Pebble
     571              : // needs to observe and rebuild Pebble's history of internal keys, such as in
     572              : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
     573              : //
     574              : // scanInternalIterator is expected to ignore point keys deleted by range
     575              : // deletions, and range keys shadowed by a range key unset or delete, however it
     576              : // *must* return the range delete as well as the range key unset/delete that did
     577              : // the shadowing.
     578              : type scanInternalIterator struct {
     579              :         ctx              context.Context
     580              :         db               *DB
     581              :         opts             ScanInternalOptions
     582              :         comparer         *base.Comparer
     583              :         merge            Merge
     584              :         iter             internalIterator
     585              :         readState        *readState
     586              :         version          *manifest.Version
     587              :         rangeKey         *iteratorRangeKeyState
     588              :         pointKeyIter     internalIterator
     589              :         iterKV           *base.InternalKV
     590              :         alloc            *scanInternalIterAlloc
     591              :         newIters         tableNewIters
     592              :         newIterRangeKey  keyspanimpl.TableNewSpanIter
     593              :         seqNum           base.SeqNum
     594              :         iterLevels       []IteratorLevel
     595              :         mergingIter      *mergingIter
     596              :         blobValueFetcher blob.ValueFetcher
     597              : 
     598              :         // boundsBuf holds two buffers used to store the lower and upper bounds.
     599              :         // Whenever the InternalIterator's bounds change, the new bounds are copied
     600              :         // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
     601              :         // allocations. opts.LowerBound and opts.UpperBound point into this slice.
     602              :         boundsBuf    [2][]byte
     603              :         boundsBufIdx int
     604              : }
     605              : 
     606              : // truncateExternalFile truncates an External file's [SmallestUserKey,
     607              : // LargestUserKey] fields to [lower, upper). A ExternalFile is
     608              : // produced that is suitable for external consumption by other Pebble
     609              : // instances.
     610              : //
     611              : // truncateSharedFile reads the file to try to create the smallest
     612              : // possible bounds.  Here, we blindly truncate them. This may mean we
     613              : // include this SST in iterations it isn't really needed in. Since we
     614              : // don't expect External files to be long-lived in the pebble
     615              : // instance, We think this is OK.
     616              : //
     617              : // TODO(ssd) 2024-01-26: Potentially de-duplicate with
     618              : // truncateSharedFile.
     619              : func (d *DB) truncateExternalFile(
     620              :         ctx context.Context,
     621              :         lower, upper []byte,
     622              :         level int,
     623              :         file *manifest.TableMetadata,
     624              :         objMeta objstorage.ObjectMetadata,
     625            1 : ) (*ExternalFile, error) {
     626            1 :         cmp := d.cmp
     627            1 :         sst := &ExternalFile{
     628            1 :                 Level:           uint8(level),
     629            1 :                 ObjName:         objMeta.Remote.CustomObjectName,
     630            1 :                 Locator:         objMeta.Remote.Locator,
     631            1 :                 HasPointKey:     file.HasPointKeys,
     632            1 :                 HasRangeKey:     file.HasRangeKeys,
     633            1 :                 Size:            file.Size,
     634            1 :                 SyntheticPrefix: slices.Clone(file.SyntheticPrefixAndSuffix.Prefix()),
     635            1 :                 SyntheticSuffix: slices.Clone(file.SyntheticPrefixAndSuffix.Suffix()),
     636            1 :         }
     637            1 : 
     638            1 :         needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
     639            1 :         if needsLowerTruncate {
     640            1 :                 sst.StartKey = slices.Clone(lower)
     641            1 :         } else {
     642            1 :                 sst.StartKey = slices.Clone(file.Smallest().UserKey)
     643            1 :         }
     644              : 
     645            1 :         cmpUpper := cmp(upper, file.Largest().UserKey)
     646            1 :         needsUpperTruncate := cmpUpper < 0
     647            1 :         if needsUpperTruncate {
     648            0 :                 sst.EndKey = slices.Clone(upper)
     649            0 :                 sst.EndKeyIsInclusive = false
     650            1 :         } else {
     651            1 :                 sst.EndKey = slices.Clone(file.Largest().UserKey)
     652            1 :                 sst.EndKeyIsInclusive = !file.Largest().IsExclusiveSentinel()
     653            1 :         }
     654              : 
     655            1 :         if cmp(sst.StartKey, sst.EndKey) > 0 {
     656            0 :                 return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
     657            0 :         }
     658              : 
     659            1 :         if cmp(sst.StartKey, sst.EndKey) == 0 && !sst.EndKeyIsInclusive {
     660            0 :                 return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
     661            0 :         }
     662              : 
     663            1 :         return sst, nil
     664              : }
     665              : 
     666              : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
     667              : // [lower, upper), potentially opening iterators on the file to find keys within
     668              : // the requested bounds. A SharedSSTMeta is produced that is suitable for
     669              : // external consumption by other Pebble instances. If shouldSkip is true, this
     670              : // file does not contain any keys in [lower, upper) and can be skipped.
     671              : //
     672              : // TODO(bilal): If opening iterators and doing reads in this method is too
     673              : // inefficient, consider producing non-tight file bounds instead.
     674              : func (d *DB) truncateSharedFile(
     675              :         ctx context.Context,
     676              :         lower, upper []byte,
     677              :         level int,
     678              :         file *manifest.TableMetadata,
     679              :         objMeta objstorage.ObjectMetadata,
     680            1 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
     681            1 :         cmp := d.cmp
     682            1 :         sst = &SharedSSTMeta{}
     683            1 :         sst.cloneFromFileMeta(file)
     684            1 :         sst.Level = uint8(level)
     685            1 :         sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
     686            1 :         if err != nil {
     687            0 :                 return nil, false, err
     688            0 :         }
     689            1 :         needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
     690            1 :         needsUpperTruncate := cmp(upper, file.Largest().UserKey) < 0 || (cmp(upper, file.Largest().UserKey) == 0 && !file.Largest().IsExclusiveSentinel())
     691            1 :         // Fast path: file is entirely within [lower, upper).
     692            1 :         if !needsLowerTruncate && !needsUpperTruncate {
     693            1 :                 return sst, false, nil
     694            1 :         }
     695              : 
     696              :         // We will need to truncate file bounds in at least one direction. Open all
     697              :         // relevant iterators.
     698            1 :         iters, err := d.newIters(ctx, file, &IterOptions{
     699            1 :                 LowerBound: lower,
     700            1 :                 UpperBound: upper,
     701            1 :                 layer:      manifest.Level(level),
     702            1 :         }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
     703            1 :         if err != nil {
     704            0 :                 return nil, false, err
     705            0 :         }
     706            1 :         defer func() { _ = iters.CloseAll() }()
     707            1 :         iter := iters.point
     708            1 :         rangeDelIter := iters.rangeDeletion
     709            1 :         rangeKeyIter := iters.rangeKey
     710            1 :         if rangeDelIter != nil {
     711            1 :                 rangeDelIter = keyspan.Truncate(cmp, rangeDelIter, base.UserKeyBoundsEndExclusive(lower, upper))
     712            1 :         }
     713            1 :         if rangeKeyIter != nil {
     714            1 :                 rangeKeyIter = keyspan.Truncate(cmp, rangeKeyIter, base.UserKeyBoundsEndExclusive(lower, upper))
     715            1 :         }
     716              :         // Check if we need to truncate on the left side. This means finding a new
     717              :         // LargestPointKey and LargestRangeKey that is >= lower.
     718            1 :         if needsLowerTruncate {
     719            1 :                 sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
     720            1 :                 sst.SmallestPointKey.Trailer = 0
     721            1 :                 kv := iter.SeekGE(lower, base.SeekGEFlagsNone)
     722            1 :                 foundPointKey := kv != nil
     723            1 :                 if kv != nil {
     724            1 :                         sst.SmallestPointKey.CopyFrom(kv.K)
     725            1 :                 }
     726            1 :                 if rangeDelIter != nil {
     727            1 :                         if span, err := rangeDelIter.SeekGE(lower); err != nil {
     728            0 :                                 return nil, false, err
     729            1 :                         } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
     730            1 :                                 sst.SmallestPointKey.CopyFrom(span.SmallestKey())
     731            1 :                                 foundPointKey = true
     732            1 :                         }
     733              :                 }
     734            1 :                 if !foundPointKey {
     735            1 :                         // There are no point keys in the span we're interested in.
     736            1 :                         sst.SmallestPointKey = InternalKey{}
     737            1 :                         sst.LargestPointKey = InternalKey{}
     738            1 :                 }
     739            1 :                 sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
     740            1 :                 sst.SmallestRangeKey.Trailer = 0
     741            1 :                 if rangeKeyIter != nil {
     742            1 :                         span, err := rangeKeyIter.SeekGE(lower)
     743            1 :                         switch {
     744            0 :                         case err != nil:
     745            0 :                                 return nil, false, err
     746            1 :                         case span != nil:
     747            1 :                                 sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
     748            1 :                         default:
     749            1 :                                 // There are no range keys in the span we're interested in.
     750            1 :                                 sst.SmallestRangeKey = InternalKey{}
     751            1 :                                 sst.LargestRangeKey = InternalKey{}
     752              :                         }
     753              :                 }
     754              :         }
     755              :         // Check if we need to truncate on the right side. This means finding a new
     756              :         // LargestPointKey and LargestRangeKey that is < upper.
     757            1 :         if needsUpperTruncate {
     758            1 :                 sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
     759            1 :                 sst.LargestPointKey.Trailer = 0
     760            1 :                 kv := iter.SeekLT(upper, base.SeekLTFlagsNone)
     761            1 :                 foundPointKey := kv != nil
     762            1 :                 if kv != nil {
     763            1 :                         sst.LargestPointKey.CopyFrom(kv.K)
     764            1 :                 }
     765            1 :                 if rangeDelIter != nil {
     766            1 :                         if span, err := rangeDelIter.SeekLT(upper); err != nil {
     767            0 :                                 return nil, false, err
     768            1 :                         } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
     769            1 :                                 sst.LargestPointKey.CopyFrom(span.LargestKey())
     770            1 :                                 foundPointKey = true
     771            1 :                         }
     772              :                 }
     773            1 :                 if !foundPointKey {
     774            1 :                         // There are no point keys in the span we're interested in.
     775            1 :                         sst.SmallestPointKey = InternalKey{}
     776            1 :                         sst.LargestPointKey = InternalKey{}
     777            1 :                 }
     778            1 :                 sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
     779            1 :                 sst.LargestRangeKey.Trailer = 0
     780            1 :                 if rangeKeyIter != nil {
     781            1 :                         span, err := rangeKeyIter.SeekLT(upper)
     782            1 :                         switch {
     783            0 :                         case err != nil:
     784            0 :                                 return nil, false, err
     785            1 :                         case span != nil:
     786            1 :                                 sst.LargestRangeKey.CopyFrom(span.LargestKey())
     787            1 :                         default:
     788            1 :                                 // There are no range keys in the span we're interested in.
     789            1 :                                 sst.SmallestRangeKey = InternalKey{}
     790            1 :                                 sst.LargestRangeKey = InternalKey{}
     791              :                         }
     792              :                 }
     793              :         }
     794              :         // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
     795            1 :         switch {
     796            1 :         case len(sst.SmallestRangeKey.UserKey) == 0:
     797            1 :                 sst.Smallest = sst.SmallestPointKey
     798            1 :         case len(sst.SmallestPointKey.UserKey) == 0:
     799            1 :                 sst.Smallest = sst.SmallestRangeKey
     800            1 :         default:
     801            1 :                 sst.Smallest = sst.SmallestPointKey
     802            1 :                 if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
     803            1 :                         sst.Smallest = sst.SmallestRangeKey
     804            1 :                 }
     805              :         }
     806            1 :         switch {
     807            1 :         case len(sst.LargestRangeKey.UserKey) == 0:
     808            1 :                 sst.Largest = sst.LargestPointKey
     809            1 :         case len(sst.LargestPointKey.UserKey) == 0:
     810            1 :                 sst.Largest = sst.LargestRangeKey
     811            1 :         default:
     812            1 :                 sst.Largest = sst.LargestPointKey
     813            1 :                 if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
     814            1 :                         sst.Largest = sst.LargestRangeKey
     815            1 :                 }
     816              :         }
     817              :         // On rare occasion, a file might overlap with [lower, upper) but not actually
     818              :         // have any keys within those bounds. Skip such files.
     819            1 :         if len(sst.Smallest.UserKey) == 0 {
     820            1 :                 return nil, true, nil
     821            1 :         }
     822            1 :         sst.Size, err = d.fileCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
     823            1 :         if err != nil {
     824            0 :                 return nil, false, err
     825            0 :         }
     826              :         // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
     827              :         // can cause panics in places where we divide by file sizes. Correct for it
     828              :         // here.
     829            1 :         if sst.Size == 0 {
     830            1 :                 sst.Size = 1
     831            1 :         }
     832            1 :         return sst, false, nil
     833              : }
     834              : 
     835              : func scanInternalImpl(
     836              :         ctx context.Context, iter *scanInternalIterator, opts *ScanInternalOptions,
     837            1 : ) error {
     838            1 :         if opts.VisitSharedFile != nil && (opts.LowerBound == nil || opts.UpperBound == nil) {
     839            0 :                 panic("lower and upper bounds must be specified in skip-shared iteration mode")
     840              :         }
     841            1 :         if opts.VisitSharedFile != nil && opts.VisitExternalFile != nil {
     842            0 :                 return base.AssertionFailedf("cannot provide both a shared-file and external-file visitor")
     843            0 :         }
     844              : 
     845              :         // Before starting iteration, check if any files in levels sharedLevelsStart
     846              :         // and below are *not* shared. Error out if that is the case, as skip-shared
     847              :         // iteration will not produce a consistent point-in-time view of this range
     848              :         // of keys. For files that are shared, call visitSharedFile with a truncated
     849              :         // version of that file.
     850            1 :         cmp := iter.comparer.Compare
     851            1 :         provider := iter.db.ObjProvider()
     852            1 :         seqNum := iter.seqNum
     853            1 :         current := iter.version
     854            1 :         if current == nil {
     855            1 :                 current = iter.readState.current
     856            1 :         }
     857              : 
     858            1 :         if opts.VisitSharedFile != nil || opts.VisitExternalFile != nil {
     859            1 :                 if provider == nil {
     860            0 :                         panic("expected non-nil Provider in skip-shared iteration mode")
     861              :                 }
     862              : 
     863            1 :                 firstLevelWithRemote := opts.skipLevelForOpts()
     864            1 :                 for level := firstLevelWithRemote; level < numLevels; level++ {
     865            1 :                         files := current.Levels[level].Iter()
     866            1 :                         for f := files.SeekGE(cmp, opts.LowerBound); f != nil && cmp(f.Smallest().UserKey, opts.UpperBound) < 0; f = files.Next() {
     867            1 :                                 if cmp(opts.LowerBound, f.Largest().UserKey) == 0 && f.Largest().IsExclusiveSentinel() {
     868            0 :                                         continue
     869              :                                 }
     870              : 
     871            1 :                                 var objMeta objstorage.ObjectMetadata
     872            1 :                                 var err error
     873            1 :                                 objMeta, err = provider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
     874            1 :                                 if err != nil {
     875            0 :                                         return err
     876            0 :                                 }
     877              : 
     878              :                                 // We allow a mix of files at the first level.
     879            1 :                                 if level != firstLevelWithRemote {
     880            1 :                                         if !objMeta.IsShared() && !objMeta.IsExternal() {
     881            0 :                                                 return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
     882            0 :                                         }
     883              :                                 }
     884              : 
     885            1 :                                 if objMeta.IsShared() && opts.VisitSharedFile == nil {
     886            0 :                                         return errors.Wrapf(ErrInvalidSkipSharedIteration, "shared file is present but no shared file visitor is defined")
     887            0 :                                 }
     888              : 
     889            1 :                                 if objMeta.IsExternal() && opts.VisitExternalFile == nil {
     890            1 :                                         return errors.Wrapf(ErrInvalidSkipSharedIteration, "external file is present but no external file visitor is defined")
     891            1 :                                 }
     892              : 
     893            1 :                                 if !base.Visible(f.LargestSeqNum, seqNum, base.SeqNumMax) {
     894            1 :                                         return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
     895            1 :                                 }
     896              : 
     897            1 :                                 if level != firstLevelWithRemote && (!objMeta.IsShared() && !objMeta.IsExternal()) {
     898            0 :                                         return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
     899            0 :                                 }
     900              : 
     901            1 :                                 if objMeta.IsShared() {
     902            1 :                                         var sst *SharedSSTMeta
     903            1 :                                         var skip bool
     904            1 :                                         sst, skip, err = iter.db.truncateSharedFile(ctx, opts.LowerBound, opts.UpperBound, level, f, objMeta)
     905            1 :                                         if err != nil {
     906            0 :                                                 return err
     907            0 :                                         }
     908            1 :                                         if skip {
     909            1 :                                                 continue
     910              :                                         }
     911            1 :                                         if err = opts.VisitSharedFile(sst); err != nil {
     912            0 :                                                 return err
     913            0 :                                         }
     914            1 :                                 } else if objMeta.IsExternal() {
     915            1 :                                         sst, err := iter.db.truncateExternalFile(ctx, opts.LowerBound, opts.UpperBound, level, f, objMeta)
     916            1 :                                         if err != nil {
     917            0 :                                                 return err
     918            0 :                                         }
     919            1 :                                         if err := opts.VisitExternalFile(sst); err != nil {
     920            0 :                                                 return err
     921            0 :                                         }
     922              :                                 }
     923              : 
     924              :                         }
     925              :                 }
     926              :         }
     927              : 
     928            1 :         for valid := iter.seekGE(opts.LowerBound); valid && iter.error() == nil; valid = iter.next() {
     929            1 :                 key := iter.unsafeKey()
     930            1 : 
     931            1 :                 if opts.RateLimitFunc != nil {
     932            0 :                         if err := opts.RateLimitFunc(key, iter.lazyValue()); err != nil {
     933            0 :                                 return err
     934            0 :                         }
     935              :                 }
     936              : 
     937            1 :                 switch key.Kind() {
     938            1 :                 case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
     939            1 :                         if opts.VisitRangeKey != nil {
     940            1 :                                 span := iter.unsafeSpan()
     941            1 :                                 // NB: The caller isn't interested in the sequence numbers of these
     942            1 :                                 // range keys. Rather, the caller wants them to be in trailer order
     943            1 :                                 // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
     944            1 :                                 // call visitRangeKey.
     945            1 :                                 keysCopy := make([]keyspan.Key, len(span.Keys))
     946            1 :                                 for i := range span.Keys {
     947            1 :                                         keysCopy[i].CopyFrom(span.Keys[i])
     948            1 :                                         keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
     949            1 :                                 }
     950            1 :                                 keyspan.SortKeysByTrailer(keysCopy)
     951            1 :                                 if err := opts.VisitRangeKey(span.Start, span.End, keysCopy); err != nil {
     952            0 :                                         return err
     953            0 :                                 }
     954              :                         }
     955            1 :                 case InternalKeyKindRangeDelete:
     956            1 :                         if opts.VisitRangeDel != nil {
     957            1 :                                 rangeDel := iter.unsafeRangeDel()
     958            1 :                                 if err := opts.VisitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
     959            0 :                                         return err
     960            0 :                                 }
     961              :                         }
     962            1 :                 default:
     963            1 :                         if opts.VisitPointKey != nil {
     964            1 :                                 var info IteratorLevel
     965            1 :                                 if len(iter.mergingIter.heap.items) > 0 {
     966            1 :                                         mergingIterIdx := iter.mergingIter.heap.items[0].index
     967            1 :                                         info = iter.iterLevels[mergingIterIdx]
     968            1 :                                 } else {
     969            0 :                                         info = IteratorLevel{Kind: IteratorLevelUnknown}
     970            0 :                                 }
     971            1 :                                 val := iter.lazyValue()
     972            1 :                                 if err := opts.VisitPointKey(key, val, info); err != nil {
     973            0 :                                         return err
     974            0 :                                 }
     975              :                         }
     976              :                 }
     977              :         }
     978              : 
     979            1 :         return nil
     980              : }
     981              : 
     982            1 : func (opts *ScanInternalOptions) skipLevelForOpts() int {
     983            1 :         if opts.VisitSharedFile != nil {
     984            1 :                 return sharedLevelsStart
     985            1 :         }
     986            1 :         if opts.VisitExternalFile != nil {
     987            1 :                 return externalSkipStart
     988            1 :         }
     989            1 :         return numLevels
     990              : }
     991              : 
     992              : // constructPointIter constructs a merging iterator and sets i.iter to it.
     993              : func (i *scanInternalIterator) constructPointIter(
     994              :         category block.Category, memtables flushableList, buf *iterAlloc,
     995            1 : ) error {
     996            1 :         // Merging levels and levels from iterAlloc.
     997            1 :         mlevels := buf.mlevels[:0]
     998            1 :         levels := buf.levels[:0]
     999            1 : 
    1000            1 :         // We compute the number of levels needed ahead of time and reallocate a slice if
    1001            1 :         // the array from the iterAlloc isn't large enough. Doing this allocation once
    1002            1 :         // should improve the performance.
    1003            1 :         numMergingLevels := len(memtables)
    1004            1 :         numLevelIters := 0
    1005            1 : 
    1006            1 :         current := i.version
    1007            1 :         if current == nil {
    1008            1 :                 current = i.readState.current
    1009            1 :         }
    1010            1 :         numMergingLevels += len(current.L0SublevelFiles)
    1011            1 :         numLevelIters += len(current.L0SublevelFiles)
    1012            1 : 
    1013            1 :         skipStart := i.opts.skipLevelForOpts()
    1014            1 :         for level := 1; level < len(current.Levels); level++ {
    1015            1 :                 if current.Levels[level].Empty() {
    1016            1 :                         continue
    1017              :                 }
    1018            1 :                 if level > skipStart {
    1019            1 :                         continue
    1020              :                 }
    1021            1 :                 numMergingLevels++
    1022            1 :                 numLevelIters++
    1023              :         }
    1024              : 
    1025            1 :         if numMergingLevels > cap(mlevels) {
    1026            0 :                 mlevels = make([]mergingIterLevel, 0, numMergingLevels)
    1027            0 :         }
    1028            1 :         if numLevelIters > cap(levels) {
    1029            0 :                 levels = make([]levelIter, 0, numLevelIters)
    1030            0 :         }
    1031              :         // TODO(bilal): Push these into the iterAlloc buf.
    1032            1 :         var rangeDelMiter keyspanimpl.MergingIter
    1033            1 :         rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
    1034            1 :         rangeDelLevels := make([]keyspanimpl.LevelIter, 0, numLevelIters)
    1035            1 : 
    1036            1 :         i.iterLevels = make([]IteratorLevel, numMergingLevels)
    1037            1 :         mlevelsIndex := 0
    1038            1 : 
    1039            1 :         // Next are the memtables.
    1040            1 :         for j := len(memtables) - 1; j >= 0; j-- {
    1041            1 :                 mem := memtables[j]
    1042            1 :                 mlevels = append(mlevels, mergingIterLevel{
    1043            1 :                         iter: mem.newIter(&i.opts.IterOptions),
    1044            1 :                 })
    1045            1 :                 i.iterLevels[mlevelsIndex] = IteratorLevel{
    1046            1 :                         Kind:           IteratorLevelFlushable,
    1047            1 :                         FlushableIndex: j,
    1048            1 :                 }
    1049            1 :                 mlevelsIndex++
    1050            1 :                 if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
    1051            1 :                         rangeDelIters = append(rangeDelIters, rdi)
    1052            1 :                 }
    1053              :         }
    1054              : 
    1055              :         // Next are the file levels: L0 sub-levels followed by lower levels.
    1056            1 :         levelsIndex := len(levels)
    1057            1 :         mlevels = mlevels[:numMergingLevels]
    1058            1 :         levels = levels[:numLevelIters]
    1059            1 :         rangeDelLevels = rangeDelLevels[:numLevelIters]
    1060            1 :         i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
    1061            1 :         i.opts.IterOptions.Category = category
    1062            1 : 
    1063            1 :         internalOpts := internalIterOpts{
    1064            1 :                 blobValueFetcher: &i.blobValueFetcher,
    1065            1 :         }
    1066            1 : 
    1067            1 :         addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
    1068            1 :                 li := &levels[levelsIndex]
    1069            1 :                 rli := &rangeDelLevels[levelsIndex]
    1070            1 : 
    1071            1 :                 li.init(i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level, internalOpts)
    1072            1 :                 mlevels[mlevelsIndex].iter = li
    1073            1 :                 rli.Init(i.ctx, keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
    1074            1 :                         i.comparer.Compare, tableNewRangeDelIter(i.newIters), files, level,
    1075            1 :                         manifest.KeyTypePoint)
    1076            1 :                 rangeDelIters = append(rangeDelIters, rli)
    1077            1 : 
    1078            1 :                 levelsIndex++
    1079            1 :                 mlevelsIndex++
    1080            1 :         }
    1081              : 
    1082            1 :         for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
    1083            1 :                 i.iterLevels[mlevelsIndex] = IteratorLevel{
    1084            1 :                         Kind:     IteratorLevelLSM,
    1085            1 :                         Level:    0,
    1086            1 :                         Sublevel: j,
    1087            1 :                 }
    1088            1 :                 addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
    1089            1 :         }
    1090              :         // Add level iterators for the non-empty non-L0 levels.
    1091            1 :         for level := 1; level < numLevels; level++ {
    1092            1 :                 if current.Levels[level].Empty() {
    1093            1 :                         continue
    1094              :                 }
    1095              : 
    1096            1 :                 if level > skipStart {
    1097            1 :                         continue
    1098              :                 }
    1099            1 :                 i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
    1100            1 :                 levIter := current.Levels[level].Iter()
    1101            1 :                 if level == skipStart {
    1102            1 :                         nonRemoteFiles := make([]*manifest.TableMetadata, 0)
    1103            1 :                         for f := levIter.First(); f != nil; f = levIter.Next() {
    1104            1 :                                 meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
    1105            1 :                                 if err != nil {
    1106            0 :                                         return err
    1107            0 :                                 }
    1108            1 :                                 if (meta.IsShared() && i.opts.VisitSharedFile != nil) ||
    1109            1 :                                         (meta.IsExternal() && i.opts.VisitExternalFile != nil) {
    1110            1 :                                         // Skip this file.
    1111            1 :                                         continue
    1112              :                                 }
    1113            1 :                                 nonRemoteFiles = append(nonRemoteFiles, f)
    1114              :                         }
    1115            1 :                         levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
    1116            1 :                         levIter = levSlice.Iter()
    1117              :                 }
    1118              : 
    1119            1 :                 addLevelIterForFiles(levIter, manifest.Level(level))
    1120              :         }
    1121              : 
    1122            1 :         buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
    1123            1 :         buf.merging.snapshot = i.seqNum
    1124            1 :         rangeDelMiter.Init(i.comparer, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
    1125            1 : 
    1126            1 :         if i.opts.IncludeObsoleteKeys {
    1127            1 :                 iiter := &keyspan.InterleavingIter{}
    1128            1 :                 iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
    1129            1 :                         keyspan.InterleavingIterOpts{
    1130            1 :                                 LowerBound: i.opts.LowerBound,
    1131            1 :                                 UpperBound: i.opts.UpperBound,
    1132            1 :                         })
    1133            1 :                 i.pointKeyIter = iiter
    1134            1 :         } else {
    1135            1 :                 pcIter := &pointCollapsingIterator{
    1136            1 :                         comparer: i.comparer,
    1137            1 :                         merge:    i.merge,
    1138            1 :                         seqNum:   i.seqNum,
    1139            1 :                 }
    1140            1 :                 pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
    1141            1 :                         LowerBound: i.opts.LowerBound,
    1142            1 :                         UpperBound: i.opts.UpperBound,
    1143            1 :                 })
    1144            1 :                 i.pointKeyIter = pcIter
    1145            1 :         }
    1146            1 :         i.iter = i.pointKeyIter
    1147            1 :         return nil
    1148              : }
    1149              : 
    1150              : // constructRangeKeyIter constructs the range-key iterator stack, populating
    1151              : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
    1152              : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
    1153              : // iterConfig does *not* elide unsets/deletes.
    1154            1 : func (i *scanInternalIterator) constructRangeKeyIter() error {
    1155            1 :         // We want the bounded iter from iterConfig, but not the collapsing of
    1156            1 :         // RangeKeyUnsets and RangeKeyDels.
    1157            1 :         i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
    1158            1 :                 i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
    1159            1 :                 nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
    1160            1 :                 &i.rangeKey.rangeKeyBuffers.internal)
    1161            1 : 
    1162            1 :         // Next are the flushables: memtables and large batches.
    1163            1 :         if i.readState != nil {
    1164            1 :                 for j := len(i.readState.memtables) - 1; j >= 0; j-- {
    1165            1 :                         mem := i.readState.memtables[j]
    1166            1 :                         // We only need to read from memtables which contain sequence numbers older
    1167            1 :                         // than seqNum.
    1168            1 :                         if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
    1169            1 :                                 continue
    1170              :                         }
    1171            1 :                         if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
    1172            1 :                                 i.rangeKey.iterConfig.AddLevel(rki)
    1173            1 :                         }
    1174              :                 }
    1175              :         }
    1176              : 
    1177            1 :         current := i.version
    1178            1 :         if current == nil {
    1179            1 :                 current = i.readState.current
    1180            1 :         }
    1181              :         // Next are the file levels: L0 sub-levels followed by lower levels.
    1182              :         //
    1183              :         // Add file-specific iterators for L0 files containing range keys. This is less
    1184              :         // efficient than using levelIters for sublevels of L0 files containing
    1185              :         // range keys, but range keys are expected to be sparse anyway, reducing the
    1186              :         // cost benefit of maintaining a separate L0Sublevels instance for range key
    1187              :         // files and then using it here.
    1188              :         //
    1189              :         // NB: We iterate L0's files in reverse order. They're sorted by
    1190              :         // LargestSeqNum ascending, and we need to add them to the merging iterator
    1191              :         // in LargestSeqNum descending to preserve the merging iterator's invariants
    1192              :         // around Key InternalKeyTrailer order.
    1193            1 :         iter := current.RangeKeyLevels[0].Iter()
    1194            1 :         for f := iter.Last(); f != nil; f = iter.Prev() {
    1195            1 :                 spanIter, err := i.newIterRangeKey(i.ctx, f, i.opts.SpanIterOptions())
    1196            1 :                 if err != nil {
    1197            0 :                         return err
    1198            0 :                 }
    1199            1 :                 i.rangeKey.iterConfig.AddLevel(spanIter)
    1200              :         }
    1201              :         // Add level iterators for the non-empty non-L0 levels.
    1202            1 :         skipStart := i.opts.skipLevelForOpts()
    1203            1 :         for level := 1; level < len(current.RangeKeyLevels); level++ {
    1204            1 :                 if current.RangeKeyLevels[level].Empty() {
    1205            1 :                         continue
    1206              :                 }
    1207            1 :                 if level > skipStart {
    1208            1 :                         continue
    1209              :                 }
    1210            1 :                 li := i.rangeKey.iterConfig.NewLevelIter()
    1211            1 :                 spanIterOpts := i.opts.SpanIterOptions()
    1212            1 :                 levIter := current.RangeKeyLevels[level].Iter()
    1213            1 :                 if level == skipStart {
    1214            1 :                         nonRemoteFiles := make([]*manifest.TableMetadata, 0)
    1215            1 :                         for f := levIter.First(); f != nil; f = levIter.Next() {
    1216            1 :                                 meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
    1217            1 :                                 if err != nil {
    1218            0 :                                         return err
    1219            0 :                                 }
    1220            1 :                                 if (meta.IsShared() && i.opts.VisitSharedFile != nil) ||
    1221            1 :                                         (meta.IsExternal() && i.opts.VisitExternalFile != nil) {
    1222            1 :                                         // Skip this file.
    1223            1 :                                         continue
    1224              :                                 }
    1225            0 :                                 nonRemoteFiles = append(nonRemoteFiles, f)
    1226              :                         }
    1227            1 :                         levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
    1228            1 :                         levIter = levSlice.Iter()
    1229              :                 }
    1230            1 :                 li.Init(i.ctx, spanIterOpts, i.comparer.Compare, i.newIterRangeKey, levIter,
    1231            1 :                         manifest.Level(level), manifest.KeyTypeRange)
    1232            1 :                 i.rangeKey.iterConfig.AddLevel(li)
    1233              :         }
    1234            1 :         return nil
    1235              : }
    1236              : 
    1237              : // seekGE seeks this iterator to the first key that's greater than or equal
    1238              : // to the specified user key.
    1239            1 : func (i *scanInternalIterator) seekGE(key []byte) bool {
    1240            1 :         i.iterKV = i.iter.SeekGE(key, base.SeekGEFlagsNone)
    1241            1 :         return i.iterKV != nil
    1242            1 : }
    1243              : 
    1244              : // unsafeKey returns the unsafe InternalKey at the current position. The value
    1245              : // is nil if the iterator is invalid or exhausted.
    1246            1 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
    1247            1 :         return &i.iterKV.K
    1248            1 : }
    1249              : 
    1250              : // lazyValue returns a value pointer to the value at the current iterator
    1251              : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
    1252              : // kind key.
    1253            1 : func (i *scanInternalIterator) lazyValue() LazyValue {
    1254            1 :         return i.iterKV.LazyValue()
    1255            1 : }
    1256              : 
    1257              : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
    1258              : // a non-rangedel kind.
    1259            1 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
    1260            1 :         type spanInternalIterator interface {
    1261            1 :                 Span() *keyspan.Span
    1262            1 :         }
    1263            1 :         return i.pointKeyIter.(spanInternalIterator).Span()
    1264            1 : }
    1265              : 
    1266              : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
    1267              : // a non-rangekey type.
    1268            1 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
    1269            1 :         return i.rangeKey.iiter.Span()
    1270            1 : }
    1271              : 
    1272              : // next advances the iterator in the forward direction, and returns the
    1273              : // iterator's new validity state.
    1274            1 : func (i *scanInternalIterator) next() bool {
    1275            1 :         i.iterKV = i.iter.Next()
    1276            1 :         return i.iterKV != nil
    1277            1 : }
    1278              : 
    1279              : // error returns an error from the internal iterator, if there's any.
    1280            1 : func (i *scanInternalIterator) error() error {
    1281            1 :         return i.iter.Error()
    1282            1 : }
    1283              : 
    1284              : // Close closes this iterator, and releases any pooled objects.
    1285            1 : func (i *scanInternalIterator) Close() error {
    1286            1 :         err := i.iter.Close()
    1287            1 :         err = errors.CombineErrors(err, i.blobValueFetcher.Close())
    1288            1 :         if i.readState != nil {
    1289            1 :                 i.readState.unref()
    1290            1 :         }
    1291            1 :         if i.version != nil {
    1292            1 :                 i.version.Unref()
    1293            1 :         }
    1294            1 :         if i.rangeKey != nil {
    1295            1 :                 i.rangeKey.PrepareForReuse()
    1296            1 :                 *i.rangeKey = iteratorRangeKeyState{
    1297            1 :                         rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
    1298            1 :                 }
    1299            1 :                 iterRangeKeyStateAllocPool.Put(i.rangeKey)
    1300            1 :                 i.rangeKey = nil
    1301            1 :         }
    1302            1 :         if alloc := i.alloc; alloc != nil {
    1303            1 :                 for j := range i.boundsBuf {
    1304            1 :                         if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
    1305            0 :                                 alloc.iterAlloc.boundsBuf[j] = nil
    1306            1 :                         } else {
    1307            1 :                                 alloc.iterAlloc.boundsBuf[j] = i.boundsBuf[j]
    1308            1 :                         }
    1309              :                 }
    1310            1 :                 keyBuf := alloc.iterAlloc.keyBuf[:0]
    1311            1 :                 boundsBuf := alloc.iterAlloc.boundsBuf
    1312            1 :                 prefixOrFullSeekKey := alloc.iterAlloc.prefixOrFullSeekKey[:0]
    1313            1 :                 *alloc = scanInternalIterAlloc{}
    1314            1 :                 alloc.iterAlloc.keyBuf = keyBuf
    1315            1 :                 alloc.iterAlloc.boundsBuf = boundsBuf
    1316            1 :                 alloc.iterAlloc.prefixOrFullSeekKey = prefixOrFullSeekKey
    1317            1 :                 scanInternalIteratorIterAllocPool.Put(alloc)
    1318              :         }
    1319            1 :         return err
    1320              : }
    1321              : 
    1322            1 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
    1323            1 :         buf := i.boundsBuf[i.boundsBufIdx][:0]
    1324            1 :         if lower != nil {
    1325            1 :                 buf = append(buf, lower...)
    1326            1 :                 i.opts.LowerBound = buf
    1327            1 :         } else {
    1328            1 :                 i.opts.LowerBound = nil
    1329            1 :         }
    1330            1 :         if upper != nil {
    1331            1 :                 buf = append(buf, upper...)
    1332            1 :                 i.opts.UpperBound = buf[len(buf)-len(upper):]
    1333            1 :         } else {
    1334            1 :                 i.opts.UpperBound = nil
    1335            1 :         }
    1336            1 :         i.boundsBuf[i.boundsBufIdx] = buf
    1337            1 :         i.boundsBufIdx = 1 - i.boundsBufIdx
    1338              : }
    1339              : 
    1340              : // scanInternalIterAlloc is a wrapper around iterAlloc that includes a
    1341              : // scanInternalIterator.
    1342              : type scanInternalIterAlloc struct {
    1343              :         iterAlloc iterAlloc
    1344              :         scanIter  scanInternalIterator
    1345              : }
    1346              : 
    1347              : var scanInternalIteratorIterAllocPool = sync.Pool{
    1348            1 :         New: func() interface{} {
    1349            1 :                 return &scanInternalIterAlloc{}
    1350            1 :         },
    1351              : }
        

Generated by: LCOV version 2.0-1