LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Hit Total Coverage
Test: 2024-10-04 08:16Z 6fa80f28 - tests + meta.lcov Lines: 206 228 90.4 %
Date: 2024-10-04 08:17:44 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/keyspan"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18             :         "github.com/cockroachdb/pebble/internal/manifest"
      19             : )
      20             : 
      21             : // flushable defines the interface for immutable memtables.
      22             : type flushable interface {
      23             :         newIter(o *IterOptions) internalIterator
      24             :         newFlushIter(o *IterOptions) internalIterator
      25             :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      26             :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      27             :         containsRangeKeys() bool
      28             :         // inuseBytes returns the number of inuse bytes by the flushable.
      29             :         inuseBytes() uint64
      30             :         // totalBytes returns the total number of bytes allocated by the flushable.
      31             :         totalBytes() uint64
      32             :         // readyForFlush returns true when the flushable is ready for flushing. See
      33             :         // memTable.readyForFlush for one implementation which needs to check whether
      34             :         // there are any outstanding write references.
      35             :         readyForFlush() bool
      36             :         // computePossibleOverlaps determines whether the flushable's keys overlap
      37             :         // with the bounds of any of the provided bounded items. If an item overlaps
      38             :         // or might overlap but it's not possible to determine overlap cheaply,
      39             :         // computePossibleOverlaps invokes the provided function with the object
      40             :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      41             :         // implementations should invoke the provided function for items that would
      42             :         // require I/O to determine overlap.
      43             :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      44             : }
      45             : 
      46             : type shouldContinue bool
      47             : 
      48             : const (
      49             :         continueIteration shouldContinue = true
      50             :         stopIteration                    = false
      51             : )
      52             : 
      53             : type bounded interface {
      54             :         UserKeyBounds() base.UserKeyBounds
      55             : }
      56             : 
      57             : var _ bounded = (*fileMetadata)(nil)
      58             : var _ bounded = KeyRange{}
      59             : 
      60           2 : func sliceAsBounded[B bounded](s []B) []bounded {
      61           2 :         ret := make([]bounded, len(s))
      62           2 :         for i := 0; i < len(s); i++ {
      63           2 :                 ret[i] = s[i]
      64           2 :         }
      65           2 :         return ret
      66             : }
      67             : 
      68             : // flushableEntry wraps a flushable and adds additional metadata and
      69             : // functionality that is common to all flushables.
      70             : type flushableEntry struct {
      71             :         flushable
      72             :         // Channel which is closed when the flushable has been flushed.
      73             :         flushed chan struct{}
      74             :         // flushForced indicates whether a flush was forced on this memtable (either
      75             :         // manual, or due to ingestion). Protected by DB.mu.
      76             :         flushForced bool
      77             :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      78             :         // flush on this memtable at some point in the future. Protected by DB.mu.
      79             :         // Holds the timestamp of when the flush will be issued.
      80             :         delayedFlushForcedAt time.Time
      81             :         // logNum corresponds to the WAL that contains the records present in the
      82             :         // receiver.
      83             :         logNum base.DiskFileNum
      84             :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      85             :         logSize uint64
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum base.SeqNum
      89             :         // readerRefs tracks the read references on the flushable. The two sources of
      90             :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      91             :         // reserved by the flushable in the cache is released when the reader refs
      92             :         // drop to zero. If the flushable is referencing sstables, then the file
      93             :         // refount is also decreased once the reader refs drops to 0. If the
      94             :         // flushable is a memTable, when the reader refs drops to zero, the writer
      95             :         // refs will already be zero because the memtable will have been flushed and
      96             :         // that only occurs once the writer refs drops to zero.
      97             :         readerRefs atomic.Int32
      98             :         // Closure to invoke to release memory accounting.
      99             :         releaseMemAccounting func()
     100             :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     101             :         // files which are backing the flushable.
     102             :         unrefFiles func() []*fileBacking
     103             :         // deleteFnLocked should be called if the caller is holding DB.mu.
     104             :         deleteFnLocked func(obsolete []*fileBacking)
     105             :         // deleteFn should be called if the caller is not holding DB.mu.
     106             :         deleteFn func(obsolete []*fileBacking)
     107             : }
     108             : 
     109           2 : func (e *flushableEntry) readerRef() {
     110           2 :         switch v := e.readerRefs.Add(1); {
     111           0 :         case v <= 1:
     112           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     113             :         }
     114             : }
     115             : 
     116             : // db.mu must not be held when this is called.
     117           2 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     118           2 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     119           2 : }
     120             : 
     121             : // db.mu must be held when this is called.
     122           2 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     123           2 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     124           2 : }
     125             : 
     126             : func (e *flushableEntry) readerUnrefHelper(
     127             :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     128           2 : ) {
     129           2 :         switch v := e.readerRefs.Add(-1); {
     130           0 :         case v < 0:
     131           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     132           2 :         case v == 0:
     133           2 :                 if e.releaseMemAccounting == nil {
     134           0 :                         panic("pebble: memtable reservation already released")
     135             :                 }
     136           2 :                 e.releaseMemAccounting()
     137           2 :                 e.releaseMemAccounting = nil
     138           2 :                 if e.unrefFiles != nil {
     139           2 :                         obsolete := e.unrefFiles()
     140           2 :                         e.unrefFiles = nil
     141           2 :                         if deleteFiles {
     142           2 :                                 deleteFn(obsolete)
     143           2 :                         }
     144             :                 }
     145             :         }
     146             : }
     147             : 
     148             : type flushableList []*flushableEntry
     149             : 
     150             : // ingestedFlushable is the implementation of the flushable interface for the
     151             : // ingesting sstables which are added to the flushable list.
     152             : type ingestedFlushable struct {
     153             :         // files are non-overlapping and ordered (according to their bounds).
     154             :         files            []physicalMeta
     155             :         comparer         *Comparer
     156             :         newIters         tableNewIters
     157             :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     158             : 
     159             :         // Since the level slice is immutable, we construct and set it once. It
     160             :         // should be safe to read from slice in future reads.
     161             :         slice manifest.LevelSlice
     162             :         // hasRangeKeys is set on ingestedFlushable construction.
     163             :         hasRangeKeys bool
     164             :         // exciseSpan is populated if an excise operation should be performed during
     165             :         // flush.
     166             :         exciseSpan   KeyRange
     167             :         exciseSeqNum base.SeqNum
     168             : }
     169             : 
     170             : func newIngestedFlushable(
     171             :         files []*fileMetadata,
     172             :         comparer *Comparer,
     173             :         newIters tableNewIters,
     174             :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     175             :         exciseSpan KeyRange,
     176             :         seqNum base.SeqNum,
     177           2 : ) *ingestedFlushable {
     178           2 :         if invariants.Enabled {
     179           2 :                 for i := 1; i < len(files); i++ {
     180           2 :                         prev := files[i-1].UserKeyBounds()
     181           2 :                         this := files[i].UserKeyBounds()
     182           2 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     183           0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     184             :                         }
     185             :                 }
     186             :         }
     187           2 :         var physicalFiles []physicalMeta
     188           2 :         var hasRangeKeys bool
     189           2 :         for _, f := range files {
     190           2 :                 if f.HasRangeKeys {
     191           2 :                         hasRangeKeys = true
     192           2 :                 }
     193           2 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     194             :         }
     195             : 
     196           2 :         ret := &ingestedFlushable{
     197           2 :                 files:            physicalFiles,
     198           2 :                 comparer:         comparer,
     199           2 :                 newIters:         newIters,
     200           2 :                 newRangeKeyIters: newRangeKeyIters,
     201           2 :                 // slice is immutable and can be set once and used many times.
     202           2 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     203           2 :                 hasRangeKeys: hasRangeKeys,
     204           2 :                 exciseSpan:   exciseSpan,
     205           2 :                 exciseSeqNum: seqNum,
     206           2 :         }
     207           2 : 
     208           2 :         return ret
     209             : }
     210             : 
     211             : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     212             : // tracing.
     213             : 
     214             : // newIter is part of the flushable interface.
     215           2 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     216           2 :         var opts IterOptions
     217           2 :         if o != nil {
     218           2 :                 opts = *o
     219           2 :         }
     220           2 :         return newLevelIter(
     221           2 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     222           2 :                 internalIterOpts{},
     223           2 :         )
     224             : }
     225             : 
     226             : // newFlushIter is part of the flushable interface.
     227           0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     228           0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     229           0 :         // Since ingested sstables are already present on disk, they don't need to
     230           0 :         // make use of a flush iter.
     231           0 :         panic("pebble: not implemented")
     232             : }
     233             : 
     234             : func (s *ingestedFlushable) constructRangeDelIter(
     235             :         ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     236           2 : ) (keyspan.FragmentIterator, error) {
     237           2 :         iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
     238           2 :         if err != nil {
     239           0 :                 return nil, err
     240           0 :         }
     241           2 :         return iters.RangeDeletion(), nil
     242             : }
     243             : 
     244             : // newRangeDelIter is part of the flushable interface.
     245             : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     246             : // surface range deletes is more efficient.
     247             : //
     248             : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     249             : // the point iterator in constructRangeDeIter is not tracked.
     250           2 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     251           2 :         liter := keyspanimpl.NewLevelIter(
     252           2 :                 context.TODO(),
     253           2 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     254           2 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     255           2 :                 manifest.KeyTypePoint,
     256           2 :         )
     257           2 :         if !s.exciseSpan.Valid() {
     258           2 :                 return liter
     259           2 :         }
     260             :         // We have an excise span to weave into the rangedel iterators.
     261             :         //
     262             :         // TODO(bilal): should this be pooled?
     263           2 :         miter := &keyspanimpl.MergingIter{}
     264           2 :         rdel := keyspan.Span{
     265           2 :                 Start: s.exciseSpan.Start,
     266           2 :                 End:   s.exciseSpan.End,
     267           2 :                 Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
     268           2 :         }
     269           2 :         rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
     270           2 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
     271           2 :         return miter
     272             : }
     273             : 
     274             : // newRangeKeyIter is part of the flushable interface.
     275           2 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     276           2 :         var rkeydelIter keyspan.FragmentIterator
     277           2 :         if s.exciseSpan.Valid() {
     278           2 :                 // We have an excise span to weave into the rangekey iterators.
     279           2 :                 rkeydel := keyspan.Span{
     280           2 :                         Start: s.exciseSpan.Start,
     281           2 :                         End:   s.exciseSpan.End,
     282           2 :                         Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
     283           2 :                 }
     284           2 :                 rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
     285           2 :         }
     286             : 
     287           2 :         if !s.hasRangeKeys {
     288           2 :                 if rkeydelIter == nil {
     289           2 :                         // NB: we have to return the nil literal as opposed to the nil
     290           2 :                         // value of rkeydelIter, otherwise callers of this function will
     291           2 :                         // have the return value fail == nil checks.
     292           2 :                         return nil
     293           2 :                 }
     294           1 :                 return rkeydelIter
     295             :         }
     296             : 
     297           2 :         liter := keyspanimpl.NewLevelIter(
     298           2 :                 context.TODO(),
     299           2 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     300           2 :                 s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
     301           2 :         )
     302           2 :         if rkeydelIter == nil {
     303           2 :                 return liter
     304           2 :         }
     305             :         // TODO(bilal): should this be pooled?
     306           2 :         miter := &keyspanimpl.MergingIter{}
     307           2 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
     308           2 :         return miter
     309             : }
     310             : 
     311             : // containsRangeKeys is part of the flushable interface.
     312           2 : func (s *ingestedFlushable) containsRangeKeys() bool {
     313           2 :         return s.hasRangeKeys || s.exciseSpan.Valid()
     314           2 : }
     315             : 
     316             : // inuseBytes is part of the flushable interface.
     317           0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     318           0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     319           0 :         panic("pebble: not implemented")
     320             : }
     321             : 
     322             : // totalBytes is part of the flushable interface.
     323           2 : func (s *ingestedFlushable) totalBytes() uint64 {
     324           2 :         // We don't allocate additional bytes for the ingestedFlushable.
     325           2 :         return 0
     326           2 : }
     327             : 
     328             : // readyForFlush is part of the flushable interface.
     329           2 : func (s *ingestedFlushable) readyForFlush() bool {
     330           2 :         // ingestedFlushable should always be ready to flush. However, note that
     331           2 :         // memtables before the ingested sstables in the memtable queue must be
     332           2 :         // flushed before an ingestedFlushable can be flushed. This is because the
     333           2 :         // ingested sstables need an updated view of the Version to
     334           2 :         // determine where to place the files in the lsm.
     335           2 :         return true
     336           2 : }
     337             : 
     338             : // computePossibleOverlaps is part of the flushable interface.
     339             : func (s *ingestedFlushable) computePossibleOverlaps(
     340             :         fn func(bounded) shouldContinue, bounded ...bounded,
     341           2 : ) {
     342           2 :         for _, b := range bounded {
     343           2 :                 if s.anyFileOverlaps(b.UserKeyBounds()) {
     344           2 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     345           2 :                         // contain any keys within the key range, but we would need to perform I/O
     346           2 :                         // to know for sure. The flushable interface dictates that we're not
     347           2 :                         // permitted to perform I/O here, so err towards assuming overlap.
     348           2 :                         if !fn(b) {
     349           2 :                                 return
     350           2 :                         }
     351             :                 }
     352             :         }
     353             : }
     354             : 
     355             : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     356             : // bounds that overlap the given bounds.
     357           2 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
     358           2 :         // Note that s.files are non-overlapping and sorted.
     359           2 :         for _, f := range s.files {
     360           2 :                 fileBounds := f.UserKeyBounds()
     361           2 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     362           2 :                         // The file ends before the bounds start. Go to the next file.
     363           2 :                         continue
     364             :                 }
     365           2 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     366           2 :                         // The file starts after the bounds end. There is no overlap, and
     367           2 :                         // further files will not overlap either (the files are sorted).
     368           2 :                         break
     369             :                 }
     370             :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     371             :                 // checks above.
     372           2 :                 return true
     373             :         }
     374           2 :         if s.exciseSpan.Valid() {
     375           2 :                 uk := s.exciseSpan.UserKeyBounds()
     376           2 :                 return uk.Overlaps(s.comparer.Compare, &bounds)
     377           2 :         }
     378           2 :         return false
     379             : }
     380             : 
     381             : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     382             : // interface's computePossibleOverlaps function for flushable implementations
     383             : // with only in-memory state that do not have special requirements and should
     384             : // read through the ordinary flushable iterators.
     385             : //
     386             : // This function must only be used with implementations that are infallible (eg,
     387             : // memtable iterators) and will panic if an error is encountered.
     388             : func computePossibleOverlapsGenericImpl[F flushable](
     389             :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     390           2 : ) {
     391           2 :         iter := f.newIter(nil)
     392           2 :         rangeDelIter := f.newRangeDelIter(nil)
     393           2 :         rangeKeyIter := f.newRangeKeyIter(nil)
     394           2 :         for _, b := range bounded {
     395           2 :                 overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
     396           2 :                 if invariants.Enabled && err != nil {
     397           0 :                         panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
     398             :                 }
     399           2 :                 if overlap {
     400           2 :                         if !fn(b) {
     401           2 :                                 break
     402             :                         }
     403             :                 }
     404             :         }
     405             : 
     406           2 :         if iter != nil {
     407           2 :                 if err := iter.Close(); err != nil {
     408           0 :                         // This implementation must be used in circumstances where
     409           0 :                         // reading through the iterator is infallible.
     410           0 :                         panic(err)
     411             :                 }
     412             :         }
     413           2 :         if rangeDelIter != nil {
     414           2 :                 rangeDelIter.Close()
     415           2 :         }
     416           2 :         if rangeKeyIter != nil {
     417           2 :                 rangeKeyIter.Close()
     418           2 :         }
     419             : }
     420             : 
     421             : // determineOverlapAllIters checks for overlap in a point iterator, range
     422             : // deletion iterator and range key iterator.
     423             : func determineOverlapAllIters(
     424             :         cmp base.Compare,
     425             :         bounds base.UserKeyBounds,
     426             :         pointIter base.InternalIterator,
     427             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     428           2 : ) (bool, error) {
     429           2 :         if pointIter != nil {
     430           2 :                 if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
     431           2 :                         return pointOverlap, err
     432           2 :                 }
     433             :         }
     434           2 :         if rangeDelIter != nil {
     435           2 :                 if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
     436           2 :                         return rangeDelOverlap, err
     437           2 :                 }
     438             :         }
     439           2 :         if rangeKeyIter != nil {
     440           2 :                 return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
     441           2 :         }
     442           2 :         return false, nil
     443             : }
     444             : 
     445             : func determineOverlapPointIterator(
     446             :         cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
     447           2 : ) (bool, error) {
     448           2 :         kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
     449           2 :         if kv == nil {
     450           2 :                 return false, iter.Error()
     451           2 :         }
     452           2 :         return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
     453             : }
     454             : 
     455             : func determineOverlapKeyspanIterator(
     456             :         cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
     457           2 : ) (bool, error) {
     458           2 :         // NB: The spans surfaced by the fragment iterator are non-overlapping.
     459           2 :         span, err := iter.SeekGE(bounds.Start)
     460           2 :         if err != nil {
     461           0 :                 return false, err
     462           0 :         }
     463           2 :         for ; span != nil; span, err = iter.Next() {
     464           2 :                 if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
     465           2 :                         // The span starts after our bounds.
     466           2 :                         return false, nil
     467           2 :                 }
     468           2 :                 if !span.Empty() {
     469           2 :                         return true, nil
     470           2 :                 }
     471             :         }
     472           2 :         return false, err
     473             : }

Generated by: LCOV version 1.14