LCOV - code coverage report
Current view: top level - pebble - external_iterator.go (source / functions) Coverage Total Hit
Test: 2025-02-12 08:16Z 419f2391 - meta test only.lcov Lines: 0.0 % 210 0
Test Date: 2025-02-12 08:17:57 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              : 
      10              :         "github.com/cockroachdb/errors"
      11              :         "github.com/cockroachdb/pebble/internal/base"
      12              :         "github.com/cockroachdb/pebble/internal/keyspan"
      13              :         "github.com/cockroachdb/pebble/internal/manifest"
      14              :         "github.com/cockroachdb/pebble/sstable"
      15              :         "github.com/cockroachdb/pebble/sstable/block"
      16              : )
      17              : 
      18              : // NewExternalIter takes an input 2d array of sstable files which may overlap
      19              : // across subarrays but not within a subarray (at least as far as points are
      20              : // concerned; range keys are allowed to overlap arbitrarily even within a
      21              : // subarray), and returns an Iterator over the merged contents of the sstables.
      22              : // Input sstables may contain point keys, range keys, range deletions, etc. The
      23              : // input files slice must be sorted in reverse chronological ordering. A key in a
      24              : // file at a lower index subarray will shadow a key with an identical user key
      25              : // contained within a file at a higher index subarray. Each subarray must be
      26              : // sorted in internal key order, where lower index files contain keys that sort
      27              : // left of files with higher indexes.
      28              : //
      29              : // Input sstables must only contain keys with the zero sequence number.
      30              : //
      31              : // Iterators constructed through NewExternalIter do not support all iterator
      32              : // options, including block-property and table filters. NewExternalIter errors
      33              : // if an incompatible option is set.
      34              : func NewExternalIter(
      35              :         o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      36            0 : ) (it *Iterator, err error) {
      37            0 :         return NewExternalIterWithContext(context.Background(), o, iterOpts, files)
      38            0 : }
      39              : 
      40              : // NewExternalIterWithContext is like NewExternalIter, and additionally
      41              : // accepts a context for tracing.
      42              : func NewExternalIterWithContext(
      43              :         ctx context.Context, o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      44            0 : ) (it *Iterator, err error) {
      45            0 :         if iterOpts != nil {
      46            0 :                 if err := validateExternalIterOpts(iterOpts); err != nil {
      47            0 :                         return nil, err
      48            0 :                 }
      49              :         }
      50              : 
      51            0 :         ro := o.MakeReaderOptions()
      52            0 :         var readers [][]*sstable.Reader
      53            0 :         for _, levelFiles := range files {
      54            0 :                 subReaders, err := openExternalTables(ctx, levelFiles, ro)
      55            0 :                 readers = append(readers, subReaders)
      56            0 :                 if err != nil {
      57            0 :                         // Close all the opened readers.
      58            0 :                         for i := range readers {
      59            0 :                                 for j := range readers[i] {
      60            0 :                                         _ = readers[i][j].Close()
      61            0 :                                 }
      62              :                         }
      63            0 :                         return nil, err
      64              :                 }
      65              :         }
      66              : 
      67            0 :         buf := iterAllocPool.Get().(*iterAlloc)
      68            0 :         dbi := &buf.dbi
      69            0 :         *dbi = Iterator{
      70            0 :                 ctx:                 ctx,
      71            0 :                 alloc:               buf,
      72            0 :                 merge:               o.Merger.Merge,
      73            0 :                 comparer:            *o.Comparer,
      74            0 :                 readState:           nil,
      75            0 :                 keyBuf:              buf.keyBuf,
      76            0 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
      77            0 :                 boundsBuf:           buf.boundsBuf,
      78            0 :                 batch:               nil,
      79            0 :                 // Add the external iter state to the Iterator so that Close closes it,
      80            0 :                 // and SetOptions can re-construct iterators using its state.
      81            0 :                 externalIter: &externalIterState{readers: readers},
      82            0 :                 newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
      83            0 :                         internalIterOpts, iterKinds) (iterSet, error) {
      84            0 :                         // NB: External iterators are currently constructed without any
      85            0 :                         // `levelIters`. newIters should never be called. When we support
      86            0 :                         // organizing multiple non-overlapping files into a single level
      87            0 :                         // (see TODO below), we'll need to adjust this tableNewIters
      88            0 :                         // implementation to open iterators by looking up f in a map
      89            0 :                         // of readers indexed by *fileMetadata.
      90            0 :                         panic("unreachable")
      91              :                 },
      92              :                 seqNum: base.SeqNumMax,
      93              :         }
      94            0 :         dbi.externalIter.bufferPool.Init(2)
      95            0 : 
      96            0 :         if iterOpts != nil {
      97            0 :                 dbi.opts = *iterOpts
      98            0 :                 dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
      99            0 :         }
     100            0 :         if err := finishInitializingExternal(ctx, dbi); err != nil {
     101            0 :                 dbi.Close()
     102            0 :                 return nil, err
     103            0 :         }
     104            0 :         return dbi, nil
     105              : }
     106              : 
     107              : // externalIterState encapsulates state that is specific to external iterators.
     108              : // An external *pebble.Iterator maintains a pointer to the externalIterState and
     109              : // calls Close when the Iterator is Closed, providing an opportuntity for the
     110              : // external iterator to release resources particular to external iterators.
     111              : type externalIterState struct {
     112              :         bufferPool block.BufferPool
     113              :         readers    [][]*sstable.Reader
     114              : }
     115              : 
     116            0 : func (e *externalIterState) Close() (err error) {
     117            0 :         for _, readers := range e.readers {
     118            0 :                 for _, r := range readers {
     119            0 :                         err = firstError(err, r.Close())
     120            0 :                 }
     121              :         }
     122            0 :         e.bufferPool.Release()
     123            0 :         return err
     124              : }
     125              : 
     126            0 : func validateExternalIterOpts(iterOpts *IterOptions) error {
     127            0 :         switch {
     128            0 :         case iterOpts.PointKeyFilters != nil:
     129            0 :                 return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
     130            0 :         case iterOpts.RangeKeyFilters != nil:
     131            0 :                 return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
     132            0 :         case iterOpts.OnlyReadGuaranteedDurable:
     133            0 :                 return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
     134            0 :         case iterOpts.UseL6Filters:
     135            0 :                 return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
     136              :         }
     137            0 :         return nil
     138              : }
     139              : 
     140              : func createExternalPointIter(
     141              :         ctx context.Context, it *Iterator, readEnv block.ReadEnv,
     142            0 : ) (topLevelIterator, error) {
     143            0 :         // TODO(jackson): In some instances we could generate fewer levels by using
     144            0 :         // L0Sublevels code to organize nonoverlapping files into the same level.
     145            0 :         // This would allow us to use levelIters and keep a smaller set of data and
     146            0 :         // files in-memory. However, it would also require us to identify the bounds
     147            0 :         // of all the files upfront.
     148            0 : 
     149            0 :         if !it.opts.pointKeys() {
     150            0 :                 return emptyIter, nil
     151            0 :         } else if it.pointIter != nil {
     152            0 :                 return it.pointIter, nil
     153            0 :         }
     154            0 :         mlevels := it.alloc.mlevels[:0]
     155            0 : 
     156            0 :         if len(it.externalIter.readers) > cap(mlevels) {
     157            0 :                 mlevels = make([]mergingIterLevel, 0, len(it.externalIter.readers))
     158            0 :         }
     159              :         // We set a synthetic sequence number, with lower levels having higer numbers.
     160            0 :         seqNum := 0
     161            0 :         for _, readers := range it.externalIter.readers {
     162            0 :                 seqNum += len(readers)
     163            0 :         }
     164            0 :         for _, readers := range it.externalIter.readers {
     165            0 :                 for _, r := range readers {
     166            0 :                         var (
     167            0 :                                 rangeDelIter keyspan.FragmentIterator
     168            0 :                                 pointIter    internalIterator
     169            0 :                                 err          error
     170            0 :                         )
     171            0 :                         // We could set hideObsoletePoints=true, since we are reading at
     172            0 :                         // InternalKeySeqNumMax, but we don't bother since these sstables should
     173            0 :                         // not have obsolete points (so the performance optimization is
     174            0 :                         // unnecessary), and we don't want to bother constructing a
     175            0 :                         // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
     176            0 :                         transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     177            0 :                         seqNum--
     178            0 :                         pointIter, err = r.NewPointIter(
     179            0 :                                 ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
     180            0 :                                 sstable.NeverUseFilterBlock, readEnv, sstable.MakeTrivialReaderProvider(r))
     181            0 :                         if err != nil {
     182            0 :                                 return nil, err
     183            0 :                         }
     184            0 :                         rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
     185            0 :                                 SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
     186            0 :                         }, readEnv)
     187            0 :                         if err != nil {
     188            0 :                                 return nil, err
     189            0 :                         }
     190            0 :                         mlevels = append(mlevels, mergingIterLevel{
     191            0 :                                 iter:         pointIter,
     192            0 :                                 rangeDelIter: rangeDelIter,
     193            0 :                         })
     194              :                 }
     195              :         }
     196              : 
     197            0 :         it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
     198            0 :         it.alloc.merging.snapshot = base.SeqNumMax
     199            0 :         if len(mlevels) <= cap(it.alloc.levelsPositioned) {
     200            0 :                 it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
     201            0 :         }
     202            0 :         return &it.alloc.merging, nil
     203              : }
     204              : 
     205            0 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
     206            0 :         readEnv := block.ReadEnv{
     207            0 :                 Stats: &it.stats.InternalStats,
     208            0 :                 // TODO(jackson): External iterators never provide categorized iterator
     209            0 :                 // stats today because they exist outside the context of a *DB. If the
     210            0 :                 // sstables being read are on the physical filesystem, we may still want to
     211            0 :                 // thread a CategoryStatsCollector through so that we collect their stats.
     212            0 :                 IterStats:  nil,
     213            0 :                 BufferPool: &it.externalIter.bufferPool,
     214            0 :         }
     215            0 :         pointIter, err := createExternalPointIter(ctx, it, readEnv)
     216            0 :         if err != nil {
     217            0 :                 return err
     218            0 :         }
     219            0 :         it.pointIter = pointIter
     220            0 :         it.iter = it.pointIter
     221            0 : 
     222            0 :         if it.opts.rangeKeys() {
     223            0 :                 it.rangeKeyMasking.init(it, &it.comparer)
     224            0 :                 var rangeKeyIters []keyspan.FragmentIterator
     225            0 :                 if it.rangeKey == nil {
     226            0 :                         // We could take advantage of the lack of overlaps in range keys within
     227            0 :                         // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
     228            0 :                         // out of those. However, since range keys are expected to be sparse to
     229            0 :                         // begin with, the performance gain might not be significant enough to
     230            0 :                         // warrant it.
     231            0 :                         //
     232            0 :                         // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
     233            0 :                         // operate on FileMetadatas (similar to simpleLevelIter), and implements
     234            0 :                         // this optimization.
     235            0 :                         // We set a synthetic sequence number, with lower levels having higer numbers.
     236            0 :                         seqNum := 0
     237            0 :                         for _, readers := range it.externalIter.readers {
     238            0 :                                 seqNum += len(readers)
     239            0 :                         }
     240            0 :                         for _, readers := range it.externalIter.readers {
     241            0 :                                 for _, r := range readers {
     242            0 :                                         transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     243            0 :                                         seqNum--
     244            0 :                                         if rki, err := r.NewRawRangeKeyIter(ctx, transforms, readEnv); err != nil {
     245            0 :                                                 return err
     246            0 :                                         } else if rki != nil {
     247            0 :                                                 rangeKeyIters = append(rangeKeyIters, rki)
     248            0 :                                         }
     249              :                                 }
     250              :                         }
     251            0 :                         if len(rangeKeyIters) > 0 {
     252            0 :                                 it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
     253            0 :                                 it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
     254            0 :                                 it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
     255            0 :                                         &it.comparer,
     256            0 :                                         base.SeqNumMax,
     257            0 :                                         it.opts.LowerBound, it.opts.UpperBound,
     258            0 :                                         &it.hasPrefix, &it.prefixOrFullSeekKey,
     259            0 :                                         false /* internalKeys */, &it.rangeKey.internal,
     260            0 :                                 )
     261            0 :                                 for i := range rangeKeyIters {
     262            0 :                                         it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
     263            0 :                                 }
     264              :                         }
     265              :                 }
     266            0 :                 if it.rangeKey != nil {
     267            0 :                         it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
     268            0 :                                 keyspan.InterleavingIterOpts{
     269            0 :                                         Mask:       &it.rangeKeyMasking,
     270            0 :                                         LowerBound: it.opts.LowerBound,
     271            0 :                                         UpperBound: it.opts.UpperBound,
     272            0 :                                 })
     273            0 :                         it.iter = &it.rangeKey.iiter
     274            0 :                 }
     275              :         }
     276            0 :         return nil
     277              : }
     278              : 
     279              : func openExternalTables(
     280              :         ctx context.Context, files []sstable.ReadableFile, readerOpts sstable.ReaderOptions,
     281            0 : ) (readers []*sstable.Reader, err error) {
     282            0 :         readers = make([]*sstable.Reader, 0, len(files))
     283            0 :         for i := range files {
     284            0 :                 readable, err := sstable.NewSimpleReadable(files[i])
     285            0 :                 if err != nil {
     286            0 :                         return readers, err
     287            0 :                 }
     288            0 :                 r, err := sstable.NewReader(ctx, readable, readerOpts)
     289            0 :                 if err != nil {
     290            0 :                         return readers, err
     291            0 :                 }
     292            0 :                 readers = append(readers, r)
     293              :         }
     294            0 :         return readers, err
     295              : }
        

Generated by: LCOV version 2.0-1