LCOV - code coverage report
Current view: top level - pebble - external_iterator.go (source / functions) Hit Total Coverage
Test: 2024-05-02 08:15Z 902b6d09 - meta test only.lcov Lines: 0 215 0.0 %
Date: 2024-05-02 08:16:32 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/keyspan"
      13             :         "github.com/cockroachdb/pebble/internal/manifest"
      14             :         "github.com/cockroachdb/pebble/sstable"
      15             : )
      16             : 
      17             : // ExternalIterOption provide an interface to specify open-time options to
      18             : // NewExternalIter.
      19             : type ExternalIterOption interface {
      20             :         // iterApply is called on the iterator during opening in order to set internal
      21             :         // parameters.
      22             :         iterApply(*Iterator)
      23             :         // readerOptions returns any reader options added by this iter option.
      24             :         readerOptions() []sstable.ReaderOption
      25             : }
      26             : 
      27             : type externalIterReaderOptions struct {
      28             :         opts []sstable.ReaderOption
      29             : }
      30             : 
      31           0 : func (e *externalIterReaderOptions) iterApply(iterator *Iterator) {
      32           0 :         // Do nothing.
      33           0 : }
      34             : 
      35           0 : func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption {
      36           0 :         return e.opts
      37           0 : }
      38             : 
      39             : // ExternalIterReaderOptions returns an ExternalIterOption that specifies
      40             : // sstable.ReaderOptions to be applied on sstable readers in NewExternalIter.
      41           0 : func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption {
      42           0 :         return &externalIterReaderOptions{opts: opts}
      43           0 : }
      44             : 
      45             : // NewExternalIter takes an input 2d array of sstable files which may overlap
      46             : // across subarrays but not within a subarray (at least as far as points are
      47             : // concerned; range keys are allowed to overlap arbitrarily even within a
      48             : // subarray), and returns an Iterator over the merged contents of the sstables.
      49             : // Input sstables may contain point keys, range keys, range deletions, etc. The
      50             : // input files slice must be sorted in reverse chronological ordering. A key in a
      51             : // file at a lower index subarray will shadow a key with an identical user key
      52             : // contained within a file at a higher index subarray. Each subarray must be
      53             : // sorted in internal key order, where lower index files contain keys that sort
      54             : // left of files with higher indexes.
      55             : //
      56             : // Input sstables must only contain keys with the zero sequence number.
      57             : //
      58             : // Iterators constructed through NewExternalIter do not support all iterator
      59             : // options, including block-property and table filters. NewExternalIter errors
      60             : // if an incompatible option is set.
      61             : func NewExternalIter(
      62             :         o *Options,
      63             :         iterOpts *IterOptions,
      64             :         files [][]sstable.ReadableFile,
      65             :         extraOpts ...ExternalIterOption,
      66           0 : ) (it *Iterator, err error) {
      67           0 :         return NewExternalIterWithContext(context.Background(), o, iterOpts, files, extraOpts...)
      68           0 : }
      69             : 
      70             : // NewExternalIterWithContext is like NewExternalIter, and additionally
      71             : // accepts a context for tracing.
      72             : func NewExternalIterWithContext(
      73             :         ctx context.Context,
      74             :         o *Options,
      75             :         iterOpts *IterOptions,
      76             :         files [][]sstable.ReadableFile,
      77             :         extraOpts ...ExternalIterOption,
      78           0 : ) (it *Iterator, err error) {
      79           0 :         if iterOpts != nil {
      80           0 :                 if err := validateExternalIterOpts(iterOpts); err != nil {
      81           0 :                         return nil, err
      82           0 :                 }
      83             :         }
      84             : 
      85           0 :         var readers [][]*sstable.Reader
      86           0 : 
      87           0 :         seqNumOffset := 0
      88           0 :         var extraReaderOpts []sstable.ReaderOption
      89           0 :         for i := range extraOpts {
      90           0 :                 extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...)
      91           0 :         }
      92           0 :         for _, levelFiles := range files {
      93           0 :                 seqNumOffset += len(levelFiles)
      94           0 :         }
      95           0 :         for _, levelFiles := range files {
      96           0 :                 seqNumOffset -= len(levelFiles)
      97           0 :                 subReaders, err := openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...)
      98           0 :                 readers = append(readers, subReaders)
      99           0 :                 if err != nil {
     100           0 :                         // Close all the opened readers.
     101           0 :                         for i := range readers {
     102           0 :                                 for j := range readers[i] {
     103           0 :                                         _ = readers[i][j].Close()
     104           0 :                                 }
     105             :                         }
     106           0 :                         return nil, err
     107             :                 }
     108             :         }
     109             : 
     110           0 :         buf := iterAllocPool.Get().(*iterAlloc)
     111           0 :         dbi := &buf.dbi
     112           0 :         *dbi = Iterator{
     113           0 :                 ctx:                 ctx,
     114           0 :                 alloc:               buf,
     115           0 :                 merge:               o.Merger.Merge,
     116           0 :                 comparer:            *o.Comparer,
     117           0 :                 readState:           nil,
     118           0 :                 keyBuf:              buf.keyBuf,
     119           0 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
     120           0 :                 boundsBuf:           buf.boundsBuf,
     121           0 :                 batch:               nil,
     122           0 :                 // Add the readers to the Iterator so that Close closes them, and
     123           0 :                 // SetOptions can re-construct iterators from them.
     124           0 :                 externalReaders: readers,
     125           0 :                 newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
     126           0 :                         internalIterOpts, iterKinds) (iterSet, error) {
     127           0 :                         // NB: External iterators are currently constructed without any
     128           0 :                         // `levelIters`. newIters should never be called. When we support
     129           0 :                         // organizing multiple non-overlapping files into a single level
     130           0 :                         // (see TODO below), we'll need to adjust this tableNewIters
     131           0 :                         // implementation to open iterators by looking up f in a map
     132           0 :                         // of readers indexed by *fileMetadata.
     133           0 :                         panic("unreachable")
     134             :                 },
     135             :                 seqNum: base.InternalKeySeqNumMax,
     136             :         }
     137           0 :         if iterOpts != nil {
     138           0 :                 dbi.opts = *iterOpts
     139           0 :                 dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
     140           0 :         }
     141           0 :         for i := range extraOpts {
     142           0 :                 extraOpts[i].iterApply(dbi)
     143           0 :         }
     144           0 :         if err := finishInitializingExternal(ctx, dbi); err != nil {
     145           0 :                 dbi.Close()
     146           0 :                 return nil, err
     147           0 :         }
     148           0 :         return dbi, nil
     149             : }
     150             : 
     151           0 : func validateExternalIterOpts(iterOpts *IterOptions) error {
     152           0 :         switch {
     153           0 :         case iterOpts.TableFilter != nil:
     154           0 :                 return errors.Errorf("pebble: external iterator: TableFilter unsupported")
     155           0 :         case iterOpts.PointKeyFilters != nil:
     156           0 :                 return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
     157           0 :         case iterOpts.RangeKeyFilters != nil:
     158           0 :                 return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
     159           0 :         case iterOpts.OnlyReadGuaranteedDurable:
     160           0 :                 return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
     161           0 :         case iterOpts.UseL6Filters:
     162           0 :                 return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
     163             :         }
     164           0 :         return nil
     165             : }
     166             : 
     167           0 : func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
     168           0 :         // TODO(jackson): In some instances we could generate fewer levels by using
     169           0 :         // L0Sublevels code to organize nonoverlapping files into the same level.
     170           0 :         // This would allow us to use levelIters and keep a smaller set of data and
     171           0 :         // files in-memory. However, it would also require us to identify the bounds
     172           0 :         // of all the files upfront.
     173           0 : 
     174           0 :         if !it.opts.pointKeys() {
     175           0 :                 return emptyIter, nil
     176           0 :         } else if it.pointIter != nil {
     177           0 :                 return it.pointIter, nil
     178           0 :         }
     179           0 :         mlevels := it.alloc.mlevels[:0]
     180           0 : 
     181           0 :         if len(it.externalReaders) > cap(mlevels) {
     182           0 :                 mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
     183           0 :         }
     184             :         // We set a synthetic sequence number, with lower levels having higer numbers.
     185           0 :         seqNum := 0
     186           0 :         for _, readers := range it.externalReaders {
     187           0 :                 seqNum += len(readers)
     188           0 :         }
     189           0 :         for _, readers := range it.externalReaders {
     190           0 :                 for _, r := range readers {
     191           0 :                         var (
     192           0 :                                 rangeDelIter keyspan.FragmentIterator
     193           0 :                                 pointIter    internalIterator
     194           0 :                                 err          error
     195           0 :                         )
     196           0 :                         // We could set hideObsoletePoints=true, since we are reading at
     197           0 :                         // InternalKeySeqNumMax, but we don't bother since these sstables should
     198           0 :                         // not have obsolete points (so the performance optimization is
     199           0 :                         // unnecessary), and we don't want to bother constructing a
     200           0 :                         // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
     201           0 :                         transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     202           0 :                         seqNum--
     203           0 :                         pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
     204           0 :                                 ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
     205           0 :                                 false, /* useFilterBlock */
     206           0 :                                 &it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
     207           0 :                                 sstable.TrivialReaderProvider{Reader: r})
     208           0 :                         if err != nil {
     209           0 :                                 return nil, err
     210           0 :                         }
     211           0 :                         rangeDelIter, err = r.NewRawRangeDelIter(transforms)
     212           0 :                         if err != nil {
     213           0 :                                 return nil, err
     214           0 :                         }
     215           0 :                         mlevels = append(mlevels, mergingIterLevel{
     216           0 :                                 iter:         pointIter,
     217           0 :                                 rangeDelIter: rangeDelIter,
     218           0 :                         })
     219             :                 }
     220             :         }
     221             : 
     222           0 :         it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
     223           0 :         it.alloc.merging.snapshot = base.InternalKeySeqNumMax
     224           0 :         if len(mlevels) <= cap(it.alloc.levelsPositioned) {
     225           0 :                 it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
     226           0 :         }
     227           0 :         return &it.alloc.merging, nil
     228             : }
     229             : 
     230           0 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
     231           0 :         pointIter, err := createExternalPointIter(ctx, it)
     232           0 :         if err != nil {
     233           0 :                 return err
     234           0 :         }
     235           0 :         it.pointIter = pointIter
     236           0 :         it.iter = it.pointIter
     237           0 : 
     238           0 :         if it.opts.rangeKeys() {
     239           0 :                 it.rangeKeyMasking.init(it, it.comparer.Compare, it.comparer.Split)
     240           0 :                 var rangeKeyIters []keyspan.FragmentIterator
     241           0 :                 if it.rangeKey == nil {
     242           0 :                         // We could take advantage of the lack of overlaps in range keys within
     243           0 :                         // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
     244           0 :                         // out of those. However, since range keys are expected to be sparse to
     245           0 :                         // begin with, the performance gain might not be significant enough to
     246           0 :                         // warrant it.
     247           0 :                         //
     248           0 :                         // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
     249           0 :                         // operate on FileMetadatas (similar to simpleLevelIter), and implements
     250           0 :                         // this optimization.
     251           0 :                         // We set a synthetic sequence number, with lower levels having higer numbers.
     252           0 :                         seqNum := 0
     253           0 :                         for _, readers := range it.externalReaders {
     254           0 :                                 seqNum += len(readers)
     255           0 :                         }
     256           0 :                         for _, readers := range it.externalReaders {
     257           0 :                                 for _, r := range readers {
     258           0 :                                         transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     259           0 :                                         seqNum--
     260           0 :                                         if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
     261           0 :                                                 return err
     262           0 :                                         } else if rki != nil {
     263           0 :                                                 rangeKeyIters = append(rangeKeyIters, rki)
     264           0 :                                         }
     265             :                                 }
     266             :                         }
     267           0 :                         if len(rangeKeyIters) > 0 {
     268           0 :                                 it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
     269           0 :                                 it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
     270           0 :                                 it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
     271           0 :                                         &it.comparer,
     272           0 :                                         base.InternalKeySeqNumMax,
     273           0 :                                         it.opts.LowerBound, it.opts.UpperBound,
     274           0 :                                         &it.hasPrefix, &it.prefixOrFullSeekKey,
     275           0 :                                         false /* internalKeys */, &it.rangeKey.internal,
     276           0 :                                 )
     277           0 :                                 for i := range rangeKeyIters {
     278           0 :                                         it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
     279           0 :                                 }
     280             :                         }
     281             :                 }
     282           0 :                 if it.rangeKey != nil {
     283           0 :                         it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
     284           0 :                                 keyspan.InterleavingIterOpts{
     285           0 :                                         Mask:       &it.rangeKeyMasking,
     286           0 :                                         LowerBound: it.opts.LowerBound,
     287           0 :                                         UpperBound: it.opts.UpperBound,
     288           0 :                                 })
     289           0 :                         it.iter = &it.rangeKey.iiter
     290           0 :                 }
     291             :         }
     292           0 :         return nil
     293             : }
     294             : 
     295             : func openExternalTables(
     296             :         o *Options,
     297             :         files []sstable.ReadableFile,
     298             :         seqNumOffset int,
     299             :         readerOpts sstable.ReaderOptions,
     300             :         extraReaderOpts ...sstable.ReaderOption,
     301           0 : ) (readers []*sstable.Reader, err error) {
     302           0 :         readers = make([]*sstable.Reader, 0, len(files))
     303           0 :         for i := range files {
     304           0 :                 readable, err := sstable.NewSimpleReadable(files[i])
     305           0 :                 if err != nil {
     306           0 :                         return readers, err
     307           0 :                 }
     308           0 :                 r, err := sstable.NewReader(readable, readerOpts, extraReaderOpts...)
     309           0 :                 if err != nil {
     310           0 :                         return readers, err
     311           0 :                 }
     312           0 :                 readers = append(readers, r)
     313             :         }
     314           0 :         return readers, err
     315             : }

Generated by: LCOV version 1.14