LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Hit Total Coverage
Test: 2024-07-07 08:15Z fad89cfb - meta test only.lcov Lines: 168 190 88.4 %
Date: 2024-07-07 08:16:21 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "sync/atomic"
      11             :         "time"
      12             : 
      13             :         "github.com/cockroachdb/errors"
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/invariants"
      16             :         "github.com/cockroachdb/pebble/internal/keyspan"
      17             :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18             :         "github.com/cockroachdb/pebble/internal/manifest"
      19             : )
      20             : 
      21             : // flushable defines the interface for immutable memtables.
      22             : type flushable interface {
      23             :         newIter(o *IterOptions) internalIterator
      24             :         newFlushIter(o *IterOptions) internalIterator
      25             :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      26             :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      27             :         containsRangeKeys() bool
      28             :         // inuseBytes returns the number of inuse bytes by the flushable.
      29             :         inuseBytes() uint64
      30             :         // totalBytes returns the total number of bytes allocated by the flushable.
      31             :         totalBytes() uint64
      32             :         // readyForFlush returns true when the flushable is ready for flushing. See
      33             :         // memTable.readyForFlush for one implementation which needs to check whether
      34             :         // there are any outstanding write references.
      35             :         readyForFlush() bool
      36             :         // computePossibleOverlaps determines whether the flushable's keys overlap
      37             :         // with the bounds of any of the provided bounded items. If an item overlaps
      38             :         // or might overlap but it's not possible to determine overlap cheaply,
      39             :         // computePossibleOverlaps invokes the provided function with the object
      40             :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      41             :         // implementations should invoke the provided function for items that would
      42             :         // require I/O to determine overlap.
      43             :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      44             : }
      45             : 
      46             : type shouldContinue bool
      47             : 
      48             : const (
      49             :         continueIteration shouldContinue = true
      50             :         stopIteration                    = false
      51             : )
      52             : 
      53             : type bounded interface {
      54             :         UserKeyBounds() base.UserKeyBounds
      55             : }
      56             : 
      57             : var _ bounded = (*fileMetadata)(nil)
      58             : var _ bounded = KeyRange{}
      59             : 
      60           1 : func sliceAsBounded[B bounded](s []B) []bounded {
      61           1 :         ret := make([]bounded, len(s))
      62           1 :         for i := 0; i < len(s); i++ {
      63           1 :                 ret[i] = s[i]
      64           1 :         }
      65           1 :         return ret
      66             : }
      67             : 
      68             : // flushableEntry wraps a flushable and adds additional metadata and
      69             : // functionality that is common to all flushables.
      70             : type flushableEntry struct {
      71             :         flushable
      72             :         // Channel which is closed when the flushable has been flushed.
      73             :         flushed chan struct{}
      74             :         // flushForced indicates whether a flush was forced on this memtable (either
      75             :         // manual, or due to ingestion). Protected by DB.mu.
      76             :         flushForced bool
      77             :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      78             :         // flush on this memtable at some point in the future. Protected by DB.mu.
      79             :         // Holds the timestamp of when the flush will be issued.
      80             :         delayedFlushForcedAt time.Time
      81             :         // logNum corresponds to the WAL that contains the records present in the
      82             :         // receiver.
      83             :         logNum base.DiskFileNum
      84             :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      85             :         logSize uint64
      86             :         // The current logSeqNum at the time the memtable was created. This is
      87             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88             :         logSeqNum base.SeqNum
      89             :         // readerRefs tracks the read references on the flushable. The two sources of
      90             :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      91             :         // reserved by the flushable in the cache is released when the reader refs
      92             :         // drop to zero. If the flushable is referencing sstables, then the file
      93             :         // refount is also decreased once the reader refs drops to 0. If the
      94             :         // flushable is a memTable, when the reader refs drops to zero, the writer
      95             :         // refs will already be zero because the memtable will have been flushed and
      96             :         // that only occurs once the writer refs drops to zero.
      97             :         readerRefs atomic.Int32
      98             :         // Closure to invoke to release memory accounting.
      99             :         releaseMemAccounting func()
     100             :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     101             :         // files which are backing the flushable.
     102             :         unrefFiles func() []*fileBacking
     103             :         // deleteFnLocked should be called if the caller is holding DB.mu.
     104             :         deleteFnLocked func(obsolete []*fileBacking)
     105             :         // deleteFn should be called if the caller is not holding DB.mu.
     106             :         deleteFn func(obsolete []*fileBacking)
     107             : }
     108             : 
     109           1 : func (e *flushableEntry) readerRef() {
     110           1 :         switch v := e.readerRefs.Add(1); {
     111           0 :         case v <= 1:
     112           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     113             :         }
     114             : }
     115             : 
     116             : // db.mu must not be held when this is called.
     117           1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     118           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     119           1 : }
     120             : 
     121             : // db.mu must be held when this is called.
     122           1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     123           1 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     124           1 : }
     125             : 
     126             : func (e *flushableEntry) readerUnrefHelper(
     127             :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     128           1 : ) {
     129           1 :         switch v := e.readerRefs.Add(-1); {
     130           0 :         case v < 0:
     131           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     132           1 :         case v == 0:
     133           1 :                 if e.releaseMemAccounting == nil {
     134           0 :                         panic("pebble: memtable reservation already released")
     135             :                 }
     136           1 :                 e.releaseMemAccounting()
     137           1 :                 e.releaseMemAccounting = nil
     138           1 :                 if e.unrefFiles != nil {
     139           1 :                         obsolete := e.unrefFiles()
     140           1 :                         e.unrefFiles = nil
     141           1 :                         if deleteFiles {
     142           1 :                                 deleteFn(obsolete)
     143           1 :                         }
     144             :                 }
     145             :         }
     146             : }
     147             : 
     148             : type flushableList []*flushableEntry
     149             : 
     150             : // ingestedFlushable is the implementation of the flushable interface for the
     151             : // ingesting sstables which are added to the flushable list.
     152             : type ingestedFlushable struct {
     153             :         // files are non-overlapping and ordered (according to their bounds).
     154             :         files            []physicalMeta
     155             :         comparer         *Comparer
     156             :         newIters         tableNewIters
     157             :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     158             : 
     159             :         // Since the level slice is immutable, we construct and set it once. It
     160             :         // should be safe to read from slice in future reads.
     161             :         slice manifest.LevelSlice
     162             :         // hasRangeKeys is set on ingestedFlushable construction.
     163             :         hasRangeKeys bool
     164             :         // exciseSpan is populated if an excise operation should be performed during
     165             :         // flush.
     166             :         exciseSpan KeyRange
     167             : }
     168             : 
     169             : func newIngestedFlushable(
     170             :         files []*fileMetadata,
     171             :         comparer *Comparer,
     172             :         newIters tableNewIters,
     173             :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     174             :         exciseSpan KeyRange,
     175           1 : ) *ingestedFlushable {
     176           1 :         if invariants.Enabled {
     177           1 :                 for i := 1; i < len(files); i++ {
     178           1 :                         prev := files[i-1].UserKeyBounds()
     179           1 :                         this := files[i].UserKeyBounds()
     180           1 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     181           0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     182             :                         }
     183             :                 }
     184             :         }
     185           1 :         var physicalFiles []physicalMeta
     186           1 :         var hasRangeKeys bool
     187           1 :         for _, f := range files {
     188           1 :                 if f.HasRangeKeys {
     189           1 :                         hasRangeKeys = true
     190           1 :                 }
     191           1 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     192             :         }
     193             : 
     194           1 :         ret := &ingestedFlushable{
     195           1 :                 files:            physicalFiles,
     196           1 :                 comparer:         comparer,
     197           1 :                 newIters:         newIters,
     198           1 :                 newRangeKeyIters: newRangeKeyIters,
     199           1 :                 // slice is immutable and can be set once and used many times.
     200           1 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     201           1 :                 hasRangeKeys: hasRangeKeys,
     202           1 :                 exciseSpan:   exciseSpan,
     203           1 :         }
     204           1 : 
     205           1 :         return ret
     206             : }
     207             : 
     208             : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     209             : // tracing.
     210             : 
     211             : // newIter is part of the flushable interface.
     212           1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     213           1 :         var opts IterOptions
     214           1 :         if o != nil {
     215           1 :                 opts = *o
     216           1 :         }
     217             :         // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
     218             :         // logging. Update the manifest.Level encoding to account for levels which
     219             :         // aren't truly levels in the lsm. Right now, the encoding only supports
     220             :         // L0 sublevels, and the rest of the levels in the lsm.
     221           1 :         return newLevelIter(
     222           1 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
     223           1 :                 internalIterOpts{},
     224           1 :         )
     225             : }
     226             : 
     227             : // newFlushIter is part of the flushable interface.
     228           0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     229           0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     230           0 :         // Since ingested sstables are already present on disk, they don't need to
     231           0 :         // make use of a flush iter.
     232           0 :         panic("pebble: not implemented")
     233             : }
     234             : 
     235             : func (s *ingestedFlushable) constructRangeDelIter(
     236             :         file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     237           1 : ) (keyspan.FragmentIterator, error) {
     238           1 :         iters, err := s.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeDeletions)
     239           1 :         if err != nil {
     240           0 :                 return nil, err
     241           0 :         }
     242           1 :         return iters.RangeDeletion(), nil
     243             : }
     244             : 
     245             : // newRangeDelIter is part of the flushable interface.
     246             : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     247             : // surface range deletes is more efficient.
     248             : //
     249             : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     250             : // the point iterator in constructRangeDeIter is not tracked.
     251           1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     252           1 :         return keyspanimpl.NewLevelIter(
     253           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     254           1 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
     255           1 :                 manifest.KeyTypePoint,
     256           1 :         )
     257           1 : }
     258             : 
     259             : // newRangeKeyIter is part of the flushable interface.
     260           1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     261           1 :         if !s.containsRangeKeys() {
     262           1 :                 return nil
     263           1 :         }
     264             : 
     265           1 :         return keyspanimpl.NewLevelIter(
     266           1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     267           1 :                 s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
     268           1 :         )
     269             : }
     270             : 
     271             : // containsRangeKeys is part of the flushable interface.
     272           1 : func (s *ingestedFlushable) containsRangeKeys() bool {
     273           1 :         return s.hasRangeKeys
     274           1 : }
     275             : 
     276             : // inuseBytes is part of the flushable interface.
     277           0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     278           0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     279           0 :         panic("pebble: not implemented")
     280             : }
     281             : 
     282             : // totalBytes is part of the flushable interface.
     283           1 : func (s *ingestedFlushable) totalBytes() uint64 {
     284           1 :         // We don't allocate additional bytes for the ingestedFlushable.
     285           1 :         return 0
     286           1 : }
     287             : 
     288             : // readyForFlush is part of the flushable interface.
     289           1 : func (s *ingestedFlushable) readyForFlush() bool {
     290           1 :         // ingestedFlushable should always be ready to flush. However, note that
     291           1 :         // memtables before the ingested sstables in the memtable queue must be
     292           1 :         // flushed before an ingestedFlushable can be flushed. This is because the
     293           1 :         // ingested sstables need an updated view of the Version to
     294           1 :         // determine where to place the files in the lsm.
     295           1 :         return true
     296           1 : }
     297             : 
     298             : // computePossibleOverlaps is part of the flushable interface.
     299             : func (s *ingestedFlushable) computePossibleOverlaps(
     300             :         fn func(bounded) shouldContinue, bounded ...bounded,
     301           1 : ) {
     302           1 :         for _, b := range bounded {
     303           1 :                 if s.anyFileOverlaps(b.UserKeyBounds()) {
     304           1 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     305           1 :                         // contain any keys within the key range, but we would need to perform I/O
     306           1 :                         // to know for sure. The flushable interface dictates that we're not
     307           1 :                         // permitted to perform I/O here, so err towards assuming overlap.
     308           1 :                         if !fn(b) {
     309           1 :                                 return
     310           1 :                         }
     311             :                 }
     312             :         }
     313             : }
     314             : 
     315             : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     316             : // bounds that overlap the given bounds.
     317           1 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
     318           1 :         // Note that s.files are non-overlapping and sorted.
     319           1 :         for _, f := range s.files {
     320           1 :                 fileBounds := f.UserKeyBounds()
     321           1 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     322           1 :                         // The file ends before the bounds start. Go to the next file.
     323           1 :                         continue
     324             :                 }
     325           1 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     326           1 :                         // The file starts after the bounds end. There is no overlap, and
     327           1 :                         // further files will not overlap either (the files are sorted).
     328           1 :                         return false
     329           1 :                 }
     330             :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     331             :                 // checks above.
     332           1 :                 return true
     333             :         }
     334           1 :         return false
     335             : }
     336             : 
     337             : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     338             : // interface's computePossibleOverlaps function for flushable implementations
     339             : // with only in-memory state that do not have special requirements and should
     340             : // read through the ordinary flushable iterators.
     341             : //
     342             : // This function must only be used with implementations that are infallible (eg,
     343             : // memtable iterators) and will panic if an error is encountered.
     344             : func computePossibleOverlapsGenericImpl[F flushable](
     345             :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     346           1 : ) {
     347           1 :         iter := f.newIter(nil)
     348           1 :         rangeDelIter := f.newRangeDelIter(nil)
     349           1 :         rangeKeyIter := f.newRangeKeyIter(nil)
     350           1 :         for _, b := range bounded {
     351           1 :                 overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
     352           1 :                 if invariants.Enabled && err != nil {
     353           0 :                         panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
     354             :                 }
     355           1 :                 if overlap {
     356           1 :                         if !fn(b) {
     357           1 :                                 break
     358             :                         }
     359             :                 }
     360             :         }
     361             : 
     362           1 :         if iter != nil {
     363           1 :                 if err := iter.Close(); err != nil {
     364           0 :                         // This implementation must be used in circumstances where
     365           0 :                         // reading through the iterator is infallible.
     366           0 :                         panic(err)
     367             :                 }
     368             :         }
     369           1 :         if rangeDelIter != nil {
     370           1 :                 rangeDelIter.Close()
     371           1 :         }
     372           1 :         if rangeKeyIter != nil {
     373           1 :                 rangeKeyIter.Close()
     374           1 :         }
     375             : }
     376             : 
     377             : // determineOverlapAllIters checks for overlap in a point iterator, range
     378             : // deletion iterator and range key iterator.
     379             : func determineOverlapAllIters(
     380             :         cmp base.Compare,
     381             :         bounds base.UserKeyBounds,
     382             :         pointIter base.InternalIterator,
     383             :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     384           1 : ) (bool, error) {
     385           1 :         if pointIter != nil {
     386           1 :                 if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
     387           1 :                         return pointOverlap, err
     388           1 :                 }
     389             :         }
     390           1 :         if rangeDelIter != nil {
     391           1 :                 if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
     392           1 :                         return rangeDelOverlap, err
     393           1 :                 }
     394             :         }
     395           1 :         if rangeKeyIter != nil {
     396           1 :                 return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
     397           1 :         }
     398           1 :         return false, nil
     399             : }
     400             : 
     401             : func determineOverlapPointIterator(
     402             :         cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
     403           1 : ) (bool, error) {
     404           1 :         kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
     405           1 :         if kv == nil {
     406           1 :                 return false, iter.Error()
     407           1 :         }
     408           1 :         return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
     409             : }
     410             : 
     411             : func determineOverlapKeyspanIterator(
     412             :         cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
     413           1 : ) (bool, error) {
     414           1 :         // NB: The spans surfaced by the fragment iterator are non-overlapping.
     415           1 :         span, err := iter.SeekGE(bounds.Start)
     416           1 :         if err != nil {
     417           0 :                 return false, err
     418           0 :         }
     419           1 :         for ; span != nil; span, err = iter.Next() {
     420           1 :                 if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
     421           1 :                         // The span starts after our bounds.
     422           1 :                         return false, nil
     423           1 :                 }
     424           1 :                 if !span.Empty() {
     425           1 :                         return true, nil
     426           1 :                 }
     427             :         }
     428           1 :         return false, err
     429             : }

Generated by: LCOV version 1.14