LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Hit Total Coverage
Test: 2024-04-18 08:16Z e0c6d173 - meta test only.lcov Lines: 136 155 87.7 %
Date: 2024-04-18 08:17:43 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "sync/atomic"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/errors"
      15             :         "github.com/cockroachdb/pebble/internal/base"
      16             :         "github.com/cockroachdb/pebble/internal/invariants"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan"
      18             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      19             :         "github.com/cockroachdb/pebble/internal/manifest"
      20             : )
      21             : 
      22             : // flushable defines the interface for immutable memtables.
      23             : type flushable interface {
      24             :         newIter(o *IterOptions) internalIterator
      25             :         newFlushIter(o *IterOptions) internalIterator
      26             :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      27             :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      28             :         containsRangeKeys() bool
      29             :         // inuseBytes returns the number of inuse bytes by the flushable.
      30             :         inuseBytes() uint64
      31             :         // totalBytes returns the total number of bytes allocated by the flushable.
      32             :         totalBytes() uint64
      33             :         // readyForFlush returns true when the flushable is ready for flushing. See
      34             :         // memTable.readyForFlush for one implementation which needs to check whether
      35             :         // there are any outstanding write references.
      36             :         readyForFlush() bool
      37             :         // computePossibleOverlaps determines whether the flushable's keys overlap
      38             :         // with the bounds of any of the provided bounded items. If an item overlaps
      39             :         // or might overlap but it's not possible to determine overlap cheaply,
      40             :         // computePossibleOverlaps invokes the provided function with the object
      41             :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      42             :         // implementations should invoke the provided function for items that would
      43             :         // require I/O to determine overlap.
      44             :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      45             : }
      46             : 
      47             : type shouldContinue bool
      48             : 
      49             : const (
      50             :         continueIteration shouldContinue = true
      51             :         stopIteration                    = false
      52             : )
      53             : 
      54             : type bounded interface {
      55             :         UserKeyBounds() base.UserKeyBounds
      56             : }
      57             : 
      58             : var _ bounded = (*fileMetadata)(nil)
      59             : var _ bounded = KeyRange{}
      60             : 
      61           1 : func sliceAsBounded[B bounded](s []B) []bounded {
      62           1 :         ret := make([]bounded, len(s))
      63           1 :         for i := 0; i < len(s); i++ {
      64           1 :                 ret[i] = s[i]
      65           1 :         }
      66           1 :         return ret
      67             : }
      68             : 
      69             : // flushableEntry wraps a flushable and adds additional metadata and
      70             : // functionality that is common to all flushables.
      71             : type flushableEntry struct {
      72             :         flushable
      73             :         // Channel which is closed when the flushable has been flushed.
      74             :         flushed chan struct{}
      75             :         // flushForced indicates whether a flush was forced on this memtable (either
      76             :         // manual, or due to ingestion). Protected by DB.mu.
      77             :         flushForced bool
      78             :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      79             :         // flush on this memtable at some point in the future. Protected by DB.mu.
      80             :         // Holds the timestamp of when the flush will be issued.
      81             :         delayedFlushForcedAt time.Time
      82             :         // logNum corresponds to the WAL that contains the records present in the
      83             :         // receiver.
      84             :         logNum base.DiskFileNum
      85             :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      86             :         logSize uint64
      87             :         // The current logSeqNum at the time the memtable was created. This is
      88             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      89             :         logSeqNum uint64
      90             :         // readerRefs tracks the read references on the flushable. The two sources of
      91             :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      92             :         // reserved by the flushable in the cache is released when the reader refs
      93             :         // drop to zero. If the flushable is referencing sstables, then the file
      94             :         // refount is also decreased once the reader refs drops to 0. If the
      95             :         // flushable is a memTable, when the reader refs drops to zero, the writer
      96             :         // refs will already be zero because the memtable will have been flushed and
      97             :         // that only occurs once the writer refs drops to zero.
      98             :         readerRefs atomic.Int32
      99             :         // Closure to invoke to release memory accounting.
     100             :         releaseMemAccounting func()
     101             :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     102             :         // files which are backing the flushable.
     103             :         unrefFiles func() []*fileBacking
     104             :         // deleteFnLocked should be called if the caller is holding DB.mu.
     105             :         deleteFnLocked func(obsolete []*fileBacking)
     106             :         // deleteFn should be called if the caller is not holding DB.mu.
     107             :         deleteFn func(obsolete []*fileBacking)
     108             : }
     109             : 
     110           1 : func (e *flushableEntry) readerRef() {
     111           1 :         switch v := e.readerRefs.Add(1); {
     112           0 :         case v <= 1:
     113           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     114             :         }
     115             : }
     116             : 
     117             : // db.mu must not be held when this is called.
     118           1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     119           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     120           1 : }
     121             : 
     122             : // db.mu must be held when this is called.
     123           1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     124           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     125           1 : }
     126             : 
     127             : func (e *flushableEntry) readerUnrefHelper(
     128             :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     129           1 : ) {
     130           1 :         switch v := e.readerRefs.Add(-1); {
     131           0 :         case v < 0:
     132           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     133           1 :         case v == 0:
     134           1 :                 if e.releaseMemAccounting == nil {
     135           0 :                         panic("pebble: memtable reservation already released")
     136             :                 }
     137           1 :                 e.releaseMemAccounting()
     138           1 :                 e.releaseMemAccounting = nil
     139           1 :                 if e.unrefFiles != nil {
     140           1 :                         obsolete := e.unrefFiles()
     141           1 :                         e.unrefFiles = nil
     142           1 :                         if deleteFiles {
     143           1 :                                 deleteFn(obsolete)
     144           1 :                         }
     145             :                 }
     146             :         }
     147             : }
     148             : 
     149             : type flushableList []*flushableEntry
     150             : 
     151             : // ingestedFlushable is the implementation of the flushable interface for the
     152             : // ingesting sstables which are added to the flushable list.
     153             : type ingestedFlushable struct {
     154             :         // files are non-overlapping and ordered (according to their bounds).
     155             :         files            []physicalMeta
     156             :         comparer         *Comparer
     157             :         newIters         tableNewIters
     158             :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     159             : 
     160             :         // Since the level slice is immutable, we construct and set it once. It
     161             :         // should be safe to read from slice in future reads.
     162             :         slice manifest.LevelSlice
     163             :         // hasRangeKeys is set on ingestedFlushable construction.
     164             :         hasRangeKeys bool
     165             :         // exciseSpan is populated if an excise operation should be performed during
     166             :         // flush.
     167             :         exciseSpan KeyRange
     168             : }
     169             : 
     170             : func newIngestedFlushable(
     171             :         files []*fileMetadata,
     172             :         comparer *Comparer,
     173             :         newIters tableNewIters,
     174             :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     175             :         exciseSpan KeyRange,
     176           1 : ) *ingestedFlushable {
     177           1 :         if invariants.Enabled {
     178           1 :                 for i := 1; i < len(files); i++ {
     179           1 :                         prev := files[i-1].UserKeyBounds()
     180           1 :                         this := files[i].UserKeyBounds()
     181           1 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     182           0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     183             :                         }
     184             :                 }
     185             :         }
     186           1 :         var physicalFiles []physicalMeta
     187           1 :         var hasRangeKeys bool
     188           1 :         for _, f := range files {
     189           1 :                 if f.HasRangeKeys {
     190           1 :                         hasRangeKeys = true
     191           1 :                 }
     192           1 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     193             :         }
     194             : 
     195           1 :         ret := &ingestedFlushable{
     196           1 :                 files:            physicalFiles,
     197           1 :                 comparer:         comparer,
     198           1 :                 newIters:         newIters,
     199           1 :                 newRangeKeyIters: newRangeKeyIters,
     200           1 :                 // slice is immutable and can be set once and used many times.
     201           1 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     202           1 :                 hasRangeKeys: hasRangeKeys,
     203           1 :                 exciseSpan:   exciseSpan,
     204           1 :         }
     205           1 : 
     206           1 :         return ret
     207             : }
     208             : 
     209             : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     210             : // tracing.
     211             : 
     212             : // newIter is part of the flushable interface.
     213           1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     214           1 :         var opts IterOptions
     215           1 :         if o != nil {
     216           1 :                 opts = *o
     217           1 :         }
     218             :         // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
     219             :         // logging. Update the manifest.Level encoding to account for levels which
     220             :         // aren't truly levels in the lsm. Right now, the encoding only supports
     221             :         // L0 sublevels, and the rest of the levels in the lsm.
     222           1 :         return newLevelIter(
     223           1 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
     224           1 :                 internalIterOpts{},
     225           1 :         )
     226             : }
     227             : 
     228             : // newFlushIter is part of the flushable interface.
     229           0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     230           0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     231           0 :         // Since ingested sstables are already present on disk, they don't need to
     232           0 :         // make use of a flush iter.
     233           0 :         panic("pebble: not implemented")
     234             : }
     235             : 
     236             : func (s *ingestedFlushable) constructRangeDelIter(
     237             :         file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     238           1 : ) (keyspan.FragmentIterator, error) {
     239           1 :         // Note that the keyspan level iter expects a non-nil iterator to be
     240           1 :         // returned even if there is an error. So, we return the emptyKeyspanIter.
     241           1 :         iter, rangeDelIter, err := s.newIters.TODO(context.Background(), file, nil, internalIterOpts{})
     242           1 :         if err != nil {
     243           0 :                 return emptyKeyspanIter, err
     244           0 :         }
     245           1 :         iter.Close()
     246           1 :         if rangeDelIter == nil {
     247           1 :                 return emptyKeyspanIter, nil
     248           1 :         }
     249           1 :         return rangeDelIter, nil
     250             : }
     251             : 
     252             : // newRangeDelIter is part of the flushable interface.
     253             : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     254             : // surface range deletes is more efficient.
     255             : //
     256             : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     257             : // the point iterator in constructRangeDeIter is not tracked.
     258           1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     259           1 :         return keyspanimpl.NewLevelIter(
     260           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     261           1 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
     262           1 :                 manifest.KeyTypePoint,
     263           1 :         )
     264           1 : }
     265             : 
     266             : // newRangeKeyIter is part of the flushable interface.
     267           1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     268           1 :         if !s.containsRangeKeys() {
     269           1 :                 return nil
     270           1 :         }
     271             : 
     272           1 :         return keyspanimpl.NewLevelIter(
     273           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     274           1 :                 s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
     275           1 :         )
     276             : }
     277             : 
     278             : // containsRangeKeys is part of the flushable interface.
     279           1 : func (s *ingestedFlushable) containsRangeKeys() bool {
     280           1 :         return s.hasRangeKeys
     281           1 : }
     282             : 
     283             : // inuseBytes is part of the flushable interface.
     284           0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     285           0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     286           0 :         panic("pebble: not implemented")
     287             : }
     288             : 
     289             : // totalBytes is part of the flushable interface.
     290           1 : func (s *ingestedFlushable) totalBytes() uint64 {
     291           1 :         // We don't allocate additional bytes for the ingestedFlushable.
     292           1 :         return 0
     293           1 : }
     294             : 
     295             : // readyForFlush is part of the flushable interface.
     296           1 : func (s *ingestedFlushable) readyForFlush() bool {
     297           1 :         // ingestedFlushable should always be ready to flush. However, note that
     298           1 :         // memtables before the ingested sstables in the memtable queue must be
     299           1 :         // flushed before an ingestedFlushable can be flushed. This is because the
     300           1 :         // ingested sstables need an updated view of the Version to
     301           1 :         // determine where to place the files in the lsm.
     302           1 :         return true
     303           1 : }
     304             : 
     305             : // computePossibleOverlaps is part of the flushable interface.
     306             : func (s *ingestedFlushable) computePossibleOverlaps(
     307             :         fn func(bounded) shouldContinue, bounded ...bounded,
     308           1 : ) {
     309           1 :         for _, b := range bounded {
     310           1 :                 bounds := b.UserKeyBounds()
     311           1 :                 if s.anyFileOverlaps(b, &bounds) {
     312           1 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     313           1 :                         // contain any keys within the key range, but we would need to perform I/O
     314           1 :                         // to know for sure. The flushable interface dictates that we're not
     315           1 :                         // permitted to perform I/O here, so err towards assuming overlap.
     316           1 :                         if !fn(b) {
     317           1 :                                 return
     318           1 :                         }
     319             :                 }
     320             :         }
     321             : }
     322             : 
     323             : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     324             : // bounds that overlap the given bounds.
     325           1 : func (s *ingestedFlushable) anyFileOverlaps(b bounded, bounds *base.UserKeyBounds) bool {
     326           1 :         // Note that s.files are non-overlapping and sorted.
     327           1 :         for _, f := range s.files {
     328           1 :                 fileBounds := f.UserKeyBounds()
     329           1 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     330           1 :                         // The file ends before the bounds start. Go to the next file.
     331           1 :                         continue
     332             :                 }
     333           1 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     334           1 :                         // The file starts after the bounds end. There is no overlap, and
     335           1 :                         // further files will not overlap either (the files are sorted).
     336           1 :                         return false
     337           1 :                 }
     338             :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     339             :                 // checks above.
     340           1 :                 return true
     341             :         }
     342           1 :         return false
     343             : }
     344             : 
     345             : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     346             : // interface's computePossibleOverlaps function for flushable implementations
     347             : // with only in-memory state that do not have special requirements and should
     348             : // read through the ordinary flushable iterators.
     349             : //
     350             : // This function must only be used with implementations that are infallible (eg,
     351             : // memtable iterators) and will panic if an error is encountered.
     352             : func computePossibleOverlapsGenericImpl[F flushable](
     353             :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     354           1 : ) {
     355           1 :         iter := f.newIter(nil)
     356           1 :         rangeDelIter := f.newRangeDelIter(nil)
     357           1 :         rkeyIter := f.newRangeKeyIter(nil)
     358           1 :         for _, b := range bounded {
     359           1 :                 if overlapWithIterator(iter, &rangeDelIter, rkeyIter, b.UserKeyBounds(), cmp) {
     360           1 :                         if !fn(b) {
     361           1 :                                 break
     362             :                         }
     363             :                 }
     364             :         }
     365             : 
     366           1 :         for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
     367           1 :                 if c != nil {
     368           1 :                         if err := c.Close(); err != nil {
     369           0 :                                 // This implementation must be used in circumstances where
     370           0 :                                 // reading through the iterator is infallible.
     371           0 :                                 panic(err)
     372             :                         }
     373             :                 }
     374             :         }
     375             : }

Generated by: LCOV version 1.14