LCOV - code coverage report
Current view: top level - pebble - external_iterator.go (source / functions) Coverage Total Hit
Test: 2025-05-06 08:18Z afff98cb - tests only.lcov Lines: 90.4 % 230 208
Test Date: 2025-05-06 08:20:17 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              : 
      10              :         "github.com/cockroachdb/errors"
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              :         "github.com/cockroachdb/pebble/internal/keyspan"
      13              :         "github.com/cockroachdb/pebble/internal/manifest"
      14              :         "github.com/cockroachdb/pebble/sstable"
      15              :         "github.com/cockroachdb/pebble/sstable/block"
      16              : )
      17              : 
      18              : // NewExternalIter takes an input 2d array of sstable files which may overlap
      19              : // across subarrays but not within a subarray (at least as far as points are
      20              : // concerned; range keys are allowed to overlap arbitrarily even within a
      21              : // subarray), and returns an Iterator over the merged contents of the sstables.
      22              : // Input sstables may contain point keys, range keys, range deletions, etc. The
      23              : // input files slice must be sorted in reverse chronological ordering. A key in a
      24              : // file at a lower index subarray will shadow a key with an identical user key
      25              : // contained within a file at a higher index subarray. Each subarray must be
      26              : // sorted in internal key order, where lower index files contain keys that sort
      27              : // left of files with higher indexes.
      28              : //
      29              : // Input sstables must only contain keys with the zero sequence number and must
      30              : // not contain references to values in external blob files.
      31              : //
      32              : // Iterators constructed through NewExternalIter do not support all iterator
      33              : // options, including block-property and table filters. NewExternalIter errors
      34              : // if an incompatible option is set.
      35              : func NewExternalIter(
      36              :         o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      37            1 : ) (it *Iterator, err error) {
      38            1 :         return NewExternalIterWithContext(context.Background(), o, iterOpts, files)
      39            1 : }
      40              : 
      41              : // NewExternalIterWithContext is like NewExternalIter, and additionally
      42              : // accepts a context for tracing.
      43              : func NewExternalIterWithContext(
      44              :         ctx context.Context, o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      45            1 : ) (it *Iterator, err error) {
      46            1 :         if iterOpts != nil {
      47            1 :                 if err := validateExternalIterOpts(iterOpts); err != nil {
      48            0 :                         return nil, err
      49            0 :                 }
      50              :         }
      51              : 
      52            1 :         ro := o.MakeReaderOptions()
      53            1 :         var readers [][]*sstable.Reader
      54            1 :         for _, levelFiles := range files {
      55            1 :                 subReaders, err := openExternalTables(ctx, levelFiles, ro)
      56            1 :                 readers = append(readers, subReaders)
      57            1 :                 if err != nil {
      58            1 :                         // Close all the opened readers.
      59            1 :                         for i := range readers {
      60            1 :                                 for j := range readers[i] {
      61            1 :                                         _ = readers[i][j].Close()
      62            1 :                                 }
      63              :                         }
      64            1 :                         return nil, err
      65              :                 }
      66              :         }
      67              : 
      68            1 :         buf := iterAllocPool.Get().(*iterAlloc)
      69            1 :         dbi := &buf.dbi
      70            1 :         *dbi = Iterator{
      71            1 :                 ctx:                 ctx,
      72            1 :                 alloc:               buf,
      73            1 :                 merge:               o.Merger.Merge,
      74            1 :                 comparer:            *o.Comparer,
      75            1 :                 readState:           nil,
      76            1 :                 keyBuf:              buf.keyBuf,
      77            1 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
      78            1 :                 boundsBuf:           buf.boundsBuf,
      79            1 :                 batch:               nil,
      80            1 :                 // Add the external iter state to the Iterator so that Close closes it,
      81            1 :                 // and SetOptions can re-construct iterators using its state.
      82            1 :                 externalIter: &externalIterState{readers: readers},
      83            1 :                 newIters: func(context.Context, *manifest.TableMetadata, *IterOptions,
      84            1 :                         internalIterOpts, iterKinds) (iterSet, error) {
      85            0 :                         // NB: External iterators are currently constructed without any
      86            0 :                         // `levelIters`. newIters should never be called. When we support
      87            0 :                         // organizing multiple non-overlapping files into a single level
      88            0 :                         // (see TODO below), we'll need to adjust this tableNewIters
      89            0 :                         // implementation to open iterators by looking up f in a map
      90            0 :                         // of readers indexed by *fileMetadata.
      91            0 :                         panic("unreachable")
      92              :                 },
      93              :                 seqNum: base.SeqNumMax,
      94              :         }
      95            1 :         dbi.externalIter.bufferPool.Init(2)
      96            1 : 
      97            1 :         if iterOpts != nil {
      98            1 :                 dbi.opts = *iterOpts
      99            1 :                 dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
     100            1 :         }
     101            1 :         if err := finishInitializingExternal(ctx, dbi); err != nil {
     102            1 :                 _ = dbi.Close()
     103            1 :                 return nil, err
     104            1 :         }
     105            1 :         return dbi, nil
     106              : }
     107              : 
     108              : // externalIterState encapsulates state that is specific to external iterators.
     109              : // An external *pebble.Iterator maintains a pointer to the externalIterState and
     110              : // calls Close when the Iterator is Closed, providing an opportuntity for the
     111              : // external iterator to release resources particular to external iterators.
     112              : type externalIterState struct {
     113              :         bufferPool block.BufferPool
     114              :         readers    [][]*sstable.Reader
     115              : }
     116              : 
     117            1 : func (e *externalIterState) Close() (err error) {
     118            1 :         for _, readers := range e.readers {
     119            1 :                 for _, r := range readers {
     120            1 :                         err = firstError(err, r.Close())
     121            1 :                 }
     122              :         }
     123            1 :         e.bufferPool.Release()
     124            1 :         return err
     125              : }
     126              : 
     127            1 : func validateExternalIterOpts(iterOpts *IterOptions) error {
     128            1 :         switch {
     129            0 :         case iterOpts.PointKeyFilters != nil:
     130            0 :                 return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
     131            0 :         case iterOpts.RangeKeyFilters != nil:
     132            0 :                 return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
     133            0 :         case iterOpts.OnlyReadGuaranteedDurable:
     134            0 :                 return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
     135            0 :         case iterOpts.UseL6Filters:
     136            0 :                 return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
     137              :         }
     138            1 :         return nil
     139              : }
     140              : 
     141              : func createExternalPointIter(
     142              :         ctx context.Context, it *Iterator, readEnv sstable.ReadEnv,
     143            1 : ) (topLevelIterator, error) {
     144            1 :         // TODO(jackson): In some instances we could generate fewer levels by using
     145            1 :         // L0Sublevels code to organize nonoverlapping files into the same level.
     146            1 :         // This would allow us to use levelIters and keep a smaller set of data and
     147            1 :         // files in-memory. However, it would also require us to identify the bounds
     148            1 :         // of all the files upfront.
     149            1 : 
     150            1 :         if !it.opts.pointKeys() {
     151            0 :                 return emptyIter, nil
     152            1 :         } else if it.pointIter != nil {
     153            1 :                 return it.pointIter, nil
     154            1 :         }
     155            1 :         mlevels := it.alloc.mlevels[:0]
     156            1 : 
     157            1 :         if len(it.externalIter.readers) > cap(mlevels) {
     158            0 :                 mlevels = make([]mergingIterLevel, 0, len(it.externalIter.readers))
     159            0 :         }
     160              :         // We set a synthetic sequence number, with lower levels having higer numbers.
     161            1 :         seqNum := 0
     162            1 :         for _, readers := range it.externalIter.readers {
     163            1 :                 seqNum += len(readers)
     164            1 :         }
     165            1 :         for _, readers := range it.externalIter.readers {
     166            1 :                 for _, r := range readers {
     167            1 :                         var (
     168            1 :                                 rangeDelIter keyspan.FragmentIterator
     169            1 :                                 pointIter    internalIterator
     170            1 :                                 err          error
     171            1 :                         )
     172            1 :                         // We could set hideObsoletePoints=true, since we are reading at
     173            1 :                         // InternalKeySeqNumMax, but we don't bother since these sstables should
     174            1 :                         // not have obsolete points (so the performance optimization is
     175            1 :                         // unnecessary), and we don't want to bother constructing a
     176            1 :                         // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
     177            1 :                         transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     178            1 :                         seqNum--
     179            1 :                         pointIter, err = r.NewPointIter(ctx, sstable.IterOptions{
     180            1 :                                 Lower:                it.opts.LowerBound,
     181            1 :                                 Upper:                it.opts.UpperBound,
     182            1 :                                 Transforms:           transforms,
     183            1 :                                 FilterBlockSizeLimit: sstable.NeverUseFilterBlock,
     184            1 :                                 Env:                  readEnv,
     185            1 :                                 ReaderProvider:       sstable.MakeTrivialReaderProvider(r),
     186            1 :                         })
     187            1 :                         if err == nil {
     188            1 :                                 rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
     189            1 :                                         SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
     190            1 :                                 }, readEnv)
     191            1 :                         }
     192            1 :                         if err != nil {
     193            1 :                                 if pointIter != nil {
     194            1 :                                         _ = pointIter.Close()
     195            1 :                                 }
     196            1 :                                 for i := range mlevels {
     197            1 :                                         _ = mlevels[i].iter.Close()
     198            1 :                                         if mlevels[i].rangeDelIter != nil {
     199            1 :                                                 mlevels[i].rangeDelIter.Close()
     200            1 :                                         }
     201              :                                 }
     202            1 :                                 return nil, err
     203              :                         }
     204            1 :                         mlevels = append(mlevels, mergingIterLevel{
     205            1 :                                 iter:         pointIter,
     206            1 :                                 rangeDelIter: rangeDelIter,
     207            1 :                         })
     208              :                 }
     209              :         }
     210              : 
     211            1 :         it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
     212            1 :         it.alloc.merging.snapshot = base.SeqNumMax
     213            1 :         if len(mlevels) <= cap(it.alloc.levelsPositioned) {
     214            1 :                 it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
     215            1 :         }
     216            1 :         return &it.alloc.merging, nil
     217              : }
     218              : 
     219            1 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
     220            1 :         readEnv := sstable.ReadEnv{
     221            1 :                 Block: block.ReadEnv{
     222            1 :                         Stats: &it.stats.InternalStats,
     223            1 :                         // TODO(jackson): External iterators never provide categorized iterator
     224            1 :                         // stats today because they exist outside the context of a *DB. If the
     225            1 :                         // sstables being read are on the physical filesystem, we may still want to
     226            1 :                         // thread a CategoryStatsCollector through so that we collect their stats.
     227            1 :                         IterStats:  nil,
     228            1 :                         BufferPool: &it.externalIter.bufferPool,
     229            1 :                 },
     230            1 :         }
     231            1 :         pointIter, err := createExternalPointIter(ctx, it, readEnv)
     232            1 :         if err != nil {
     233            1 :                 return err
     234            1 :         }
     235            1 :         it.pointIter = pointIter
     236            1 :         it.iter = it.pointIter
     237            1 : 
     238            1 :         if it.opts.rangeKeys() {
     239            1 :                 it.rangeKeyMasking.init(it, &it.comparer)
     240            1 :                 var rangeKeyIters []keyspan.FragmentIterator
     241            1 :                 if it.rangeKey == nil {
     242            1 :                         // We could take advantage of the lack of overlaps in range keys within
     243            1 :                         // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
     244            1 :                         // out of those. However, since range keys are expected to be sparse to
     245            1 :                         // begin with, the performance gain might not be significant enough to
     246            1 :                         // warrant it.
     247            1 :                         //
     248            1 :                         // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
     249            1 :                         // operate on TableMetadatas (similar to simpleLevelIter), and implements
     250            1 :                         // this optimization.
     251            1 :                         // We set a synthetic sequence number, with lower levels having higer numbers.
     252            1 :                         seqNum := 0
     253            1 :                         for _, readers := range it.externalIter.readers {
     254            1 :                                 seqNum += len(readers)
     255            1 :                         }
     256            1 :                         for _, readers := range it.externalIter.readers {
     257            1 :                                 for _, r := range readers {
     258            1 :                                         transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     259            1 :                                         seqNum--
     260            1 :                                         rki, err := r.NewRawRangeKeyIter(ctx, transforms, readEnv)
     261            1 :                                         if err != nil {
     262            1 :                                                 for _, iter := range rangeKeyIters {
     263            1 :                                                         iter.Close()
     264            1 :                                                 }
     265            1 :                                                 return err
     266              :                                         }
     267            1 :                                         if rki != nil {
     268            1 :                                                 rangeKeyIters = append(rangeKeyIters, rki)
     269            1 :                                         }
     270              :                                 }
     271              :                         }
     272            1 :                         if len(rangeKeyIters) > 0 {
     273            1 :                                 it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
     274            1 :                                 it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
     275            1 :                                 it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
     276            1 :                                         &it.comparer,
     277            1 :                                         base.SeqNumMax,
     278            1 :                                         it.opts.LowerBound, it.opts.UpperBound,
     279            1 :                                         &it.hasPrefix, &it.prefixOrFullSeekKey,
     280            1 :                                         false /* internalKeys */, &it.rangeKey.internal,
     281            1 :                                 )
     282            1 :                                 for i := range rangeKeyIters {
     283            1 :                                         it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
     284            1 :                                 }
     285              :                         }
     286              :                 }
     287            1 :                 if it.rangeKey != nil {
     288            1 :                         it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
     289            1 :                                 keyspan.InterleavingIterOpts{
     290            1 :                                         Mask:       &it.rangeKeyMasking,
     291            1 :                                         LowerBound: it.opts.LowerBound,
     292            1 :                                         UpperBound: it.opts.UpperBound,
     293            1 :                                 })
     294            1 :                         it.iter = &it.rangeKey.iiter
     295            1 :                 }
     296              :         }
     297            1 :         return nil
     298              : }
     299              : 
     300              : func openExternalTables(
     301              :         ctx context.Context, files []sstable.ReadableFile, readerOpts sstable.ReaderOptions,
     302            1 : ) (readers []*sstable.Reader, err error) {
     303            1 :         readers = make([]*sstable.Reader, 0, len(files))
     304            1 :         for i := range files {
     305            1 :                 readable, err := sstable.NewSimpleReadable(files[i])
     306            1 :                 if err != nil {
     307            0 :                         return readers, err
     308            0 :                 }
     309            1 :                 r, err := sstable.NewReader(ctx, readable, readerOpts)
     310            1 :                 if err != nil {
     311            1 :                         return readers, err
     312            1 :                 }
     313            1 :                 if r.Attributes.Has(sstable.AttributeBlobValues) {
     314            1 :                         return readers, errors.Newf("pebble: NewExternalIter does not support blob references")
     315            1 :                 }
     316            1 :                 readers = append(readers, r)
     317              :         }
     318            1 :         return readers, err
     319              : }
        

Generated by: LCOV version 2.0-1