LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Hit Total Coverage
Test: 2024-08-28 08:16Z 34e929e1 - meta test only.lcov Lines: 170 192 88.5 %
Date: 2024-08-28 08:17:02 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/keyspan"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18             :         "github.com/cockroachdb/pebble/internal/manifest"
      19             : )
      20             : 
      21             : // flushable defines the interface for immutable memtables.
      22             : type flushable interface {
      23             :         newIter(o *IterOptions) internalIterator
      24             :         newFlushIter(o *IterOptions) internalIterator
      25             :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      26             :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      27             :         containsRangeKeys() bool
      28             :         // inuseBytes returns the number of inuse bytes by the flushable.
      29             :         inuseBytes() uint64
      30             :         // totalBytes returns the total number of bytes allocated by the flushable.
      31             :         totalBytes() uint64
      32             :         // readyForFlush returns true when the flushable is ready for flushing. See
      33             :         // memTable.readyForFlush for one implementation which needs to check whether
      34             :         // there are any outstanding write references.
      35             :         readyForFlush() bool
      36             :         // computePossibleOverlaps determines whether the flushable's keys overlap
      37             :         // with the bounds of any of the provided bounded items. If an item overlaps
      38             :         // or might overlap but it's not possible to determine overlap cheaply,
      39             :         // computePossibleOverlaps invokes the provided function with the object
      40             :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      41             :         // implementations should invoke the provided function for items that would
      42             :         // require I/O to determine overlap.
      43             :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      44             : }
      45             : 
      46             : type shouldContinue bool
      47             : 
      48             : const (
      49             :         continueIteration shouldContinue = true
      50             :         stopIteration                    = false
      51             : )
      52             : 
      53             : type bounded interface {
      54             :         UserKeyBounds() base.UserKeyBounds
      55             : }
      56             : 
      57             : var _ bounded = (*fileMetadata)(nil)
      58             : var _ bounded = KeyRange{}
      59             : 
      60           1 : func sliceAsBounded[B bounded](s []B) []bounded {
      61           1 :         ret := make([]bounded, len(s))
      62           1 :         for i := 0; i < len(s); i++ {
      63           1 :                 ret[i] = s[i]
      64           1 :         }
      65           1 :         return ret
      66             : }
      67             : 
      68             : // flushableEntry wraps a flushable and adds additional metadata and
      69             : // functionality that is common to all flushables.
      70             : type flushableEntry struct {
      71             :         flushable
      72             :         // Channel which is closed when the flushable has been flushed.
      73             :         flushed chan struct{}
      74             :         // flushForced indicates whether a flush was forced on this memtable (either
      75             :         // manual, or due to ingestion). Protected by DB.mu.
      76             :         flushForced bool
      77             :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      78             :         // flush on this memtable at some point in the future. Protected by DB.mu.
      79             :         // Holds the timestamp of when the flush will be issued.
      80             :         delayedFlushForcedAt time.Time
      81             :         // logNum corresponds to the WAL that contains the records present in the
      82             :         // receiver.
      83             :         logNum base.DiskFileNum
      84             :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      85             :         logSize uint64
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum base.SeqNum
      89             :         // readerRefs tracks the read references on the flushable. The two sources of
      90             :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      91             :         // reserved by the flushable in the cache is released when the reader refs
      92             :         // drop to zero. If the flushable is referencing sstables, then the file
      93             :         // refount is also decreased once the reader refs drops to 0. If the
      94             :         // flushable is a memTable, when the reader refs drops to zero, the writer
      95             :         // refs will already be zero because the memtable will have been flushed and
      96             :         // that only occurs once the writer refs drops to zero.
      97             :         readerRefs atomic.Int32
      98             :         // Closure to invoke to release memory accounting.
      99             :         releaseMemAccounting func()
     100             :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     101             :         // files which are backing the flushable.
     102             :         unrefFiles func() []*fileBacking
     103             :         // deleteFnLocked should be called if the caller is holding DB.mu.
     104             :         deleteFnLocked func(obsolete []*fileBacking)
     105             :         // deleteFn should be called if the caller is not holding DB.mu.
     106             :         deleteFn func(obsolete []*fileBacking)
     107             : }
     108             : 
     109           1 : func (e *flushableEntry) readerRef() {
     110           1 :         switch v := e.readerRefs.Add(1); {
     111           0 :         case v <= 1:
     112           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     113             :         }
     114             : }
     115             : 
     116             : // db.mu must not be held when this is called.
     117           1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     118           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     119           1 : }
     120             : 
     121             : // db.mu must be held when this is called.
     122           1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     123           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     124           1 : }
     125             : 
     126             : func (e *flushableEntry) readerUnrefHelper(
     127             :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     128           1 : ) {
     129           1 :         switch v := e.readerRefs.Add(-1); {
     130           0 :         case v < 0:
     131           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     132           1 :         case v == 0:
     133           1 :                 if e.releaseMemAccounting == nil {
     134           0 :                         panic("pebble: memtable reservation already released")
     135             :                 }
     136           1 :                 e.releaseMemAccounting()
     137           1 :                 e.releaseMemAccounting = nil
     138           1 :                 if e.unrefFiles != nil {
     139           1 :                         obsolete := e.unrefFiles()
     140           1 :                         e.unrefFiles = nil
     141           1 :                         if deleteFiles {
     142           1 :                                 deleteFn(obsolete)
     143           1 :                         }
     144             :                 }
     145             :         }
     146             : }
     147             : 
     148             : type flushableList []*flushableEntry
     149             : 
     150             : // ingestedFlushable is the implementation of the flushable interface for the
     151             : // ingesting sstables which are added to the flushable list.
     152             : type ingestedFlushable struct {
     153             :         // files are non-overlapping and ordered (according to their bounds).
     154             :         files            []physicalMeta
     155             :         comparer         *Comparer
     156             :         newIters         tableNewIters
     157             :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     158             : 
     159             :         // Since the level slice is immutable, we construct and set it once. It
     160             :         // should be safe to read from slice in future reads.
     161             :         slice manifest.LevelSlice
     162             :         // hasRangeKeys is set on ingestedFlushable construction.
     163             :         hasRangeKeys bool
     164             :         // exciseSpan is populated if an excise operation should be performed during
     165             :         // flush.
     166             :         exciseSpan KeyRange
     167             : }
     168             : 
     169             : func newIngestedFlushable(
     170             :         files []*fileMetadata,
     171             :         comparer *Comparer,
     172             :         newIters tableNewIters,
     173             :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     174             :         exciseSpan KeyRange,
     175           1 : ) *ingestedFlushable {
     176           1 :         if invariants.Enabled {
     177           1 :                 for i := 1; i < len(files); i++ {
     178           1 :                         prev := files[i-1].UserKeyBounds()
     179           1 :                         this := files[i].UserKeyBounds()
     180           1 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     181           0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     182             :                         }
     183             :                 }
     184             :         }
     185           1 :         var physicalFiles []physicalMeta
     186           1 :         var hasRangeKeys bool
     187           1 :         for _, f := range files {
     188           1 :                 if f.HasRangeKeys {
     189           1 :                         hasRangeKeys = true
     190           1 :                 }
     191           1 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     192             :         }
     193             : 
     194           1 :         ret := &ingestedFlushable{
     195           1 :                 files:            physicalFiles,
     196           1 :                 comparer:         comparer,
     197           1 :                 newIters:         newIters,
     198           1 :                 newRangeKeyIters: newRangeKeyIters,
     199           1 :                 // slice is immutable and can be set once and used many times.
     200           1 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     201           1 :                 hasRangeKeys: hasRangeKeys,
     202           1 :                 exciseSpan:   exciseSpan,
     203           1 :         }
     204           1 : 
     205           1 :         return ret
     206             : }
     207             : 
     208             : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     209             : // tracing.
     210             : 
     211             : // newIter is part of the flushable interface.
     212           1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     213           1 :         var opts IterOptions
     214           1 :         if o != nil {
     215           1 :                 opts = *o
     216           1 :         }
     217           1 :         return newLevelIter(
     218           1 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     219           1 :                 internalIterOpts{},
     220           1 :         )
     221             : }
     222             : 
     223             : // newFlushIter is part of the flushable interface.
     224           0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     225           0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     226           0 :         // Since ingested sstables are already present on disk, they don't need to
     227           0 :         // make use of a flush iter.
     228           0 :         panic("pebble: not implemented")
     229             : }
     230             : 
     231             : func (s *ingestedFlushable) constructRangeDelIter(
     232             :         ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     233           1 : ) (keyspan.FragmentIterator, error) {
     234           1 :         iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
     235           1 :         if err != nil {
     236           0 :                 return nil, err
     237           0 :         }
     238           1 :         return iters.RangeDeletion(), nil
     239             : }
     240             : 
     241             : // newRangeDelIter is part of the flushable interface.
     242             : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     243             : // surface range deletes is more efficient.
     244             : //
     245             : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     246             : // the point iterator in constructRangeDeIter is not tracked.
     247           1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     248           1 :         return keyspanimpl.NewLevelIter(
     249           1 :                 context.TODO(),
     250           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     251           1 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     252           1 :                 manifest.KeyTypePoint,
     253           1 :         )
     254           1 : }
     255             : 
     256             : // newRangeKeyIter is part of the flushable interface.
     257           1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     258           1 :         if !s.containsRangeKeys() {
     259           1 :                 return nil
     260           1 :         }
     261             : 
     262           1 :         return keyspanimpl.NewLevelIter(
     263           1 :                 context.TODO(),
     264           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     265           1 :                 s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
     266           1 :         )
     267             : }
     268             : 
     269             : // containsRangeKeys is part of the flushable interface.
     270           1 : func (s *ingestedFlushable) containsRangeKeys() bool {
     271           1 :         return s.hasRangeKeys
     272           1 : }
     273             : 
     274             : // inuseBytes is part of the flushable interface.
     275           0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     276           0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     277           0 :         panic("pebble: not implemented")
     278             : }
     279             : 
     280             : // totalBytes is part of the flushable interface.
     281           1 : func (s *ingestedFlushable) totalBytes() uint64 {
     282           1 :         // We don't allocate additional bytes for the ingestedFlushable.
     283           1 :         return 0
     284           1 : }
     285             : 
     286             : // readyForFlush is part of the flushable interface.
     287           1 : func (s *ingestedFlushable) readyForFlush() bool {
     288           1 :         // ingestedFlushable should always be ready to flush. However, note that
     289           1 :         // memtables before the ingested sstables in the memtable queue must be
     290           1 :         // flushed before an ingestedFlushable can be flushed. This is because the
     291           1 :         // ingested sstables need an updated view of the Version to
     292           1 :         // determine where to place the files in the lsm.
     293           1 :         return true
     294           1 : }
     295             : 
     296             : // computePossibleOverlaps is part of the flushable interface.
     297             : func (s *ingestedFlushable) computePossibleOverlaps(
     298             :         fn func(bounded) shouldContinue, bounded ...bounded,
     299           1 : ) {
     300           1 :         for _, b := range bounded {
     301           1 :                 if s.anyFileOverlaps(b.UserKeyBounds()) {
     302           1 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     303           1 :                         // contain any keys within the key range, but we would need to perform I/O
     304           1 :                         // to know for sure. The flushable interface dictates that we're not
     305           1 :                         // permitted to perform I/O here, so err towards assuming overlap.
     306           1 :                         if !fn(b) {
     307           1 :                                 return
     308           1 :                         }
     309             :                 }
     310             :         }
     311             : }
     312             : 
     313             : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     314             : // bounds that overlap the given bounds.
     315           1 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
     316           1 :         // Note that s.files are non-overlapping and sorted.
     317           1 :         for _, f := range s.files {
     318           1 :                 fileBounds := f.UserKeyBounds()
     319           1 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     320           1 :                         // The file ends before the bounds start. Go to the next file.
     321           1 :                         continue
     322             :                 }
     323           1 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     324           1 :                         // The file starts after the bounds end. There is no overlap, and
     325           1 :                         // further files will not overlap either (the files are sorted).
     326           1 :                         return false
     327           1 :                 }
     328             :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     329             :                 // checks above.
     330           1 :                 return true
     331             :         }
     332           1 :         return false
     333             : }
     334             : 
     335             : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     336             : // interface's computePossibleOverlaps function for flushable implementations
     337             : // with only in-memory state that do not have special requirements and should
     338             : // read through the ordinary flushable iterators.
     339             : //
     340             : // This function must only be used with implementations that are infallible (eg,
     341             : // memtable iterators) and will panic if an error is encountered.
     342             : func computePossibleOverlapsGenericImpl[F flushable](
     343             :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     344           1 : ) {
     345           1 :         iter := f.newIter(nil)
     346           1 :         rangeDelIter := f.newRangeDelIter(nil)
     347           1 :         rangeKeyIter := f.newRangeKeyIter(nil)
     348           1 :         for _, b := range bounded {
     349           1 :                 overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
     350           1 :                 if invariants.Enabled && err != nil {
     351           0 :                         panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
     352             :                 }
     353           1 :                 if overlap {
     354           1 :                         if !fn(b) {
     355           1 :                                 break
     356             :                         }
     357             :                 }
     358             :         }
     359             : 
     360           1 :         if iter != nil {
     361           1 :                 if err := iter.Close(); err != nil {
     362           0 :                         // This implementation must be used in circumstances where
     363           0 :                         // reading through the iterator is infallible.
     364           0 :                         panic(err)
     365             :                 }
     366             :         }
     367           1 :         if rangeDelIter != nil {
     368           1 :                 rangeDelIter.Close()
     369           1 :         }
     370           1 :         if rangeKeyIter != nil {
     371           1 :                 rangeKeyIter.Close()
     372           1 :         }
     373             : }
     374             : 
     375             : // determineOverlapAllIters checks for overlap in a point iterator, range
     376             : // deletion iterator and range key iterator.
     377             : func determineOverlapAllIters(
     378             :         cmp base.Compare,
     379             :         bounds base.UserKeyBounds,
     380             :         pointIter base.InternalIterator,
     381             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     382           1 : ) (bool, error) {
     383           1 :         if pointIter != nil {
     384           1 :                 if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
     385           1 :                         return pointOverlap, err
     386           1 :                 }
     387             :         }
     388           1 :         if rangeDelIter != nil {
     389           1 :                 if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
     390           1 :                         return rangeDelOverlap, err
     391           1 :                 }
     392             :         }
     393           1 :         if rangeKeyIter != nil {
     394           1 :                 return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
     395           1 :         }
     396           1 :         return false, nil
     397             : }
     398             : 
     399             : func determineOverlapPointIterator(
     400             :         cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
     401           1 : ) (bool, error) {
     402           1 :         kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
     403           1 :         if kv == nil {
     404           1 :                 return false, iter.Error()
     405           1 :         }
     406           1 :         return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
     407             : }
     408             : 
     409             : func determineOverlapKeyspanIterator(
     410             :         cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
     411           1 : ) (bool, error) {
     412           1 :         // NB: The spans surfaced by the fragment iterator are non-overlapping.
     413           1 :         span, err := iter.SeekGE(bounds.Start)
     414           1 :         if err != nil {
     415           0 :                 return false, err
     416           0 :         }
     417           1 :         for ; span != nil; span, err = iter.Next() {
     418           1 :                 if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
     419           1 :                         // The span starts after our bounds.
     420           1 :                         return false, nil
     421           1 :                 }
     422           1 :                 if !span.Empty() {
     423           1 :                         return true, nil
     424           1 :                 }
     425             :         }
     426           1 :         return false, err
     427             : }

Generated by: LCOV version 1.14