LCOV - code coverage report
Current view: top level - pebble - external_iterator.go (source / functions) Hit Total Coverage
Test: 2024-12-11 08:18Z ab9741a3 - tests only.lcov Lines: 164 204 80.4 %
Date: 2024-12-11 08:18:35 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             : 
      10             :         "github.com/cockroachdb/errors"
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             :         "github.com/cockroachdb/pebble/internal/keyspan"
      13             :         "github.com/cockroachdb/pebble/internal/manifest"
      14             :         "github.com/cockroachdb/pebble/sstable"
      15             : )
      16             : 
      17             : // NewExternalIter takes an input 2d array of sstable files which may overlap
      18             : // across subarrays but not within a subarray (at least as far as points are
      19             : // concerned; range keys are allowed to overlap arbitrarily even within a
      20             : // subarray), and returns an Iterator over the merged contents of the sstables.
      21             : // Input sstables may contain point keys, range keys, range deletions, etc. The
      22             : // input files slice must be sorted in reverse chronological ordering. A key in a
      23             : // file at a lower index subarray will shadow a key with an identical user key
      24             : // contained within a file at a higher index subarray. Each subarray must be
      25             : // sorted in internal key order, where lower index files contain keys that sort
      26             : // left of files with higher indexes.
      27             : //
      28             : // Input sstables must only contain keys with the zero sequence number.
      29             : //
      30             : // Iterators constructed through NewExternalIter do not support all iterator
      31             : // options, including block-property and table filters. NewExternalIter errors
      32             : // if an incompatible option is set.
      33             : func NewExternalIter(
      34             :         o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      35           1 : ) (it *Iterator, err error) {
      36           1 :         return NewExternalIterWithContext(context.Background(), o, iterOpts, files)
      37           1 : }
      38             : 
      39             : // NewExternalIterWithContext is like NewExternalIter, and additionally
      40             : // accepts a context for tracing.
      41             : func NewExternalIterWithContext(
      42             :         ctx context.Context, o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
      43           1 : ) (it *Iterator, err error) {
      44           1 :         if iterOpts != nil {
      45           1 :                 if err := validateExternalIterOpts(iterOpts); err != nil {
      46           0 :                         return nil, err
      47           0 :                 }
      48             :         }
      49             : 
      50           1 :         var readers [][]*sstable.Reader
      51           1 : 
      52           1 :         seqNumOffset := 0
      53           1 :         for _, levelFiles := range files {
      54           1 :                 seqNumOffset += len(levelFiles)
      55           1 :         }
      56           1 :         for _, levelFiles := range files {
      57           1 :                 seqNumOffset -= len(levelFiles)
      58           1 :                 subReaders, err := openExternalTables(ctx, o, levelFiles, seqNumOffset, o.MakeReaderOptions())
      59           1 :                 readers = append(readers, subReaders)
      60           1 :                 if err != nil {
      61           0 :                         // Close all the opened readers.
      62           0 :                         for i := range readers {
      63           0 :                                 for j := range readers[i] {
      64           0 :                                         _ = readers[i][j].Close()
      65           0 :                                 }
      66             :                         }
      67           0 :                         return nil, err
      68             :                 }
      69             :         }
      70             : 
      71           1 :         buf := iterAllocPool.Get().(*iterAlloc)
      72           1 :         dbi := &buf.dbi
      73           1 :         *dbi = Iterator{
      74           1 :                 ctx:                 ctx,
      75           1 :                 alloc:               buf,
      76           1 :                 merge:               o.Merger.Merge,
      77           1 :                 comparer:            *o.Comparer,
      78           1 :                 readState:           nil,
      79           1 :                 keyBuf:              buf.keyBuf,
      80           1 :                 prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
      81           1 :                 boundsBuf:           buf.boundsBuf,
      82           1 :                 batch:               nil,
      83           1 :                 // Add the readers to the Iterator so that Close closes them, and
      84           1 :                 // SetOptions can re-construct iterators from them.
      85           1 :                 externalReaders: readers,
      86           1 :                 newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
      87           1 :                         internalIterOpts, iterKinds) (iterSet, error) {
      88           0 :                         // NB: External iterators are currently constructed without any
      89           0 :                         // `levelIters`. newIters should never be called. When we support
      90           0 :                         // organizing multiple non-overlapping files into a single level
      91           0 :                         // (see TODO below), we'll need to adjust this tableNewIters
      92           0 :                         // implementation to open iterators by looking up f in a map
      93           0 :                         // of readers indexed by *fileMetadata.
      94           0 :                         panic("unreachable")
      95             :                 },
      96             :                 seqNum: base.SeqNumMax,
      97             :         }
      98           1 :         if iterOpts != nil {
      99           1 :                 dbi.opts = *iterOpts
     100           1 :                 dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
     101           1 :         }
     102           1 :         if err := finishInitializingExternal(ctx, dbi); err != nil {
     103           0 :                 dbi.Close()
     104           0 :                 return nil, err
     105           0 :         }
     106           1 :         return dbi, nil
     107             : }
     108             : 
     109           1 : func validateExternalIterOpts(iterOpts *IterOptions) error {
     110           1 :         switch {
     111           0 :         case iterOpts.PointKeyFilters != nil:
     112           0 :                 return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
     113           0 :         case iterOpts.RangeKeyFilters != nil:
     114           0 :                 return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
     115           0 :         case iterOpts.OnlyReadGuaranteedDurable:
     116           0 :                 return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
     117           0 :         case iterOpts.UseL6Filters:
     118           0 :                 return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
     119             :         }
     120           1 :         return nil
     121             : }
     122             : 
     123           1 : func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
     124           1 :         // TODO(jackson): In some instances we could generate fewer levels by using
     125           1 :         // L0Sublevels code to organize nonoverlapping files into the same level.
     126           1 :         // This would allow us to use levelIters and keep a smaller set of data and
     127           1 :         // files in-memory. However, it would also require us to identify the bounds
     128           1 :         // of all the files upfront.
     129           1 : 
     130           1 :         if !it.opts.pointKeys() {
     131           0 :                 return emptyIter, nil
     132           1 :         } else if it.pointIter != nil {
     133           1 :                 return it.pointIter, nil
     134           1 :         }
     135           1 :         mlevels := it.alloc.mlevels[:0]
     136           1 : 
     137           1 :         // TODO(jackson): External iterators never provide categorized iterator
     138           1 :         // stats today because they exist outside the context of a *DB. If the
     139           1 :         // sstables being read are on the physical filesystem, we may still want to
     140           1 :         // thread a CategoryStatsCollector through so that we collect their stats.
     141           1 : 
     142           1 :         if len(it.externalReaders) > cap(mlevels) {
     143           0 :                 mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
     144           0 :         }
     145             :         // We set a synthetic sequence number, with lower levels having higer numbers.
     146           1 :         seqNum := 0
     147           1 :         for _, readers := range it.externalReaders {
     148           1 :                 seqNum += len(readers)
     149           1 :         }
     150           1 :         for _, readers := range it.externalReaders {
     151           1 :                 for _, r := range readers {
     152           1 :                         var (
     153           1 :                                 rangeDelIter keyspan.FragmentIterator
     154           1 :                                 pointIter    internalIterator
     155           1 :                                 err          error
     156           1 :                         )
     157           1 :                         // We could set hideObsoletePoints=true, since we are reading at
     158           1 :                         // InternalKeySeqNumMax, but we don't bother since these sstables should
     159           1 :                         // not have obsolete points (so the performance optimization is
     160           1 :                         // unnecessary), and we don't want to bother constructing a
     161           1 :                         // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
     162           1 :                         transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     163           1 :                         seqNum--
     164           1 :                         pointIter, err = r.NewPointIter(
     165           1 :                                 ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
     166           1 :                                 sstable.NeverUseFilterBlock,
     167           1 :                                 &it.stats.InternalStats, nil,
     168           1 :                                 sstable.MakeTrivialReaderProvider(r))
     169           1 :                         if err != nil {
     170           0 :                                 return nil, err
     171           0 :                         }
     172           1 :                         rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
     173           1 :                                 SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
     174           1 :                         })
     175           1 :                         if err != nil {
     176           0 :                                 return nil, err
     177           0 :                         }
     178           1 :                         mlevels = append(mlevels, mergingIterLevel{
     179           1 :                                 iter:         pointIter,
     180           1 :                                 rangeDelIter: rangeDelIter,
     181           1 :                         })
     182             :                 }
     183             :         }
     184             : 
     185           1 :         it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
     186           1 :         it.alloc.merging.snapshot = base.SeqNumMax
     187           1 :         if len(mlevels) <= cap(it.alloc.levelsPositioned) {
     188           1 :                 it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
     189           1 :         }
     190           1 :         return &it.alloc.merging, nil
     191             : }
     192             : 
     193           1 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
     194           1 :         pointIter, err := createExternalPointIter(ctx, it)
     195           1 :         if err != nil {
     196           0 :                 return err
     197           0 :         }
     198           1 :         it.pointIter = pointIter
     199           1 :         it.iter = it.pointIter
     200           1 : 
     201           1 :         if it.opts.rangeKeys() {
     202           1 :                 it.rangeKeyMasking.init(it, &it.comparer)
     203           1 :                 var rangeKeyIters []keyspan.FragmentIterator
     204           1 :                 if it.rangeKey == nil {
     205           1 :                         // We could take advantage of the lack of overlaps in range keys within
     206           1 :                         // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
     207           1 :                         // out of those. However, since range keys are expected to be sparse to
     208           1 :                         // begin with, the performance gain might not be significant enough to
     209           1 :                         // warrant it.
     210           1 :                         //
     211           1 :                         // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
     212           1 :                         // operate on FileMetadatas (similar to simpleLevelIter), and implements
     213           1 :                         // this optimization.
     214           1 :                         // We set a synthetic sequence number, with lower levels having higer numbers.
     215           1 :                         seqNum := 0
     216           1 :                         for _, readers := range it.externalReaders {
     217           1 :                                 seqNum += len(readers)
     218           1 :                         }
     219           1 :                         for _, readers := range it.externalReaders {
     220           1 :                                 for _, r := range readers {
     221           1 :                                         transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
     222           1 :                                         seqNum--
     223           1 :                                         if rki, err := r.NewRawRangeKeyIter(ctx, transforms); err != nil {
     224           0 :                                                 return err
     225           1 :                                         } else if rki != nil {
     226           1 :                                                 rangeKeyIters = append(rangeKeyIters, rki)
     227           1 :                                         }
     228             :                                 }
     229             :                         }
     230           1 :                         if len(rangeKeyIters) > 0 {
     231           1 :                                 it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
     232           1 :                                 it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
     233           1 :                                 it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
     234           1 :                                         &it.comparer,
     235           1 :                                         base.SeqNumMax,
     236           1 :                                         it.opts.LowerBound, it.opts.UpperBound,
     237           1 :                                         &it.hasPrefix, &it.prefixOrFullSeekKey,
     238           1 :                                         false /* internalKeys */, &it.rangeKey.internal,
     239           1 :                                 )
     240           1 :                                 for i := range rangeKeyIters {
     241           1 :                                         it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
     242           1 :                                 }
     243             :                         }
     244             :                 }
     245           1 :                 if it.rangeKey != nil {
     246           1 :                         it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
     247           1 :                                 keyspan.InterleavingIterOpts{
     248           1 :                                         Mask:       &it.rangeKeyMasking,
     249           1 :                                         LowerBound: it.opts.LowerBound,
     250           1 :                                         UpperBound: it.opts.UpperBound,
     251           1 :                                 })
     252           1 :                         it.iter = &it.rangeKey.iiter
     253           1 :                 }
     254             :         }
     255           1 :         return nil
     256             : }
     257             : 
     258             : func openExternalTables(
     259             :         ctx context.Context,
     260             :         o *Options,
     261             :         files []sstable.ReadableFile,
     262             :         seqNumOffset int,
     263             :         readerOpts sstable.ReaderOptions,
     264           1 : ) (readers []*sstable.Reader, err error) {
     265           1 :         readers = make([]*sstable.Reader, 0, len(files))
     266           1 :         for i := range files {
     267           1 :                 readable, err := sstable.NewSimpleReadable(files[i])
     268           1 :                 if err != nil {
     269           0 :                         return readers, err
     270           0 :                 }
     271           1 :                 r, err := sstable.NewReader(ctx, readable, readerOpts)
     272           1 :                 if err != nil {
     273           0 :                         return readers, err
     274           0 :                 }
     275           1 :                 readers = append(readers, r)
     276             :         }
     277           1 :         return readers, err
     278             : }

Generated by: LCOV version 1.14