LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Coverage Total Hit
Test: 2025-02-24 08:17Z 6949b900 - tests only.lcov Lines: 90.4 % 229 207
Test Date: 2025-02-24 08:18:26 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "sync/atomic"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/keyspan"
      17              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18              :         "github.com/cockroachdb/pebble/internal/manifest"
      19              : )
      20              : 
      21              : // flushable defines the interface for immutable memtables.
      22              : type flushable interface {
      23              :         newIter(o *IterOptions) internalIterator
      24              :         newFlushIter(o *IterOptions) internalIterator
      25              :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      26              :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      27              :         containsRangeKeys() bool
      28              :         // inuseBytes returns the number of inuse bytes by the flushable.
      29              :         inuseBytes() uint64
      30              :         // totalBytes returns the total number of bytes allocated by the flushable.
      31              :         totalBytes() uint64
      32              :         // readyForFlush returns true when the flushable is ready for flushing. See
      33              :         // memTable.readyForFlush for one implementation which needs to check whether
      34              :         // there are any outstanding write references.
      35              :         readyForFlush() bool
      36              :         // computePossibleOverlaps determines whether the flushable's keys overlap
      37              :         // with the bounds of any of the provided bounded items. If an item overlaps
      38              :         // or might overlap but it's not possible to determine overlap cheaply,
      39              :         // computePossibleOverlaps invokes the provided function with the object
      40              :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      41              :         // implementations should invoke the provided function for items that would
      42              :         // require I/O to determine overlap.
      43              :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      44              : }
      45              : 
      46              : type shouldContinue bool
      47              : 
      48              : const (
      49              :         continueIteration shouldContinue = true
      50              :         stopIteration                    = false
      51              : )
      52              : 
      53              : type bounded interface {
      54              :         UserKeyBounds() base.UserKeyBounds
      55              : }
      56              : 
      57              : var _ bounded = (*tableMetadata)(nil)
      58              : var _ bounded = KeyRange{}
      59              : 
      60            1 : func sliceAsBounded[B bounded](s []B) []bounded {
      61            1 :         ret := make([]bounded, len(s))
      62            1 :         for i := 0; i < len(s); i++ {
      63            1 :                 ret[i] = s[i]
      64            1 :         }
      65            1 :         return ret
      66              : }
      67              : 
      68              : // flushableEntry wraps a flushable and adds additional metadata and
      69              : // functionality that is common to all flushables.
      70              : type flushableEntry struct {
      71              :         flushable
      72              :         // Channel which is closed when the flushable has been flushed.
      73              :         flushed chan struct{}
      74              :         // flushForced indicates whether a flush was forced on this memtable (either
      75              :         // manual, or due to ingestion). Protected by DB.mu.
      76              :         flushForced bool
      77              :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      78              :         // flush on this memtable at some point in the future. Protected by DB.mu.
      79              :         // Holds the timestamp of when the flush will be issued.
      80              :         delayedFlushForcedAt time.Time
      81              :         // logNum corresponds to the WAL that contains the records present in the
      82              :         // receiver.
      83              :         logNum base.DiskFileNum
      84              :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      85              :         logSize uint64
      86              :         // The current logSeqNum at the time the memtable was created. This is
      87              :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88              :         logSeqNum base.SeqNum
      89              :         // readerRefs tracks the read references on the flushable. The two sources of
      90              :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      91              :         // reserved by the flushable in the cache is released when the reader refs
      92              :         // drop to zero. If the flushable is referencing sstables, then the file
      93              :         // refount is also decreased once the reader refs drops to 0. If the
      94              :         // flushable is a memTable, when the reader refs drops to zero, the writer
      95              :         // refs will already be zero because the memtable will have been flushed and
      96              :         // that only occurs once the writer refs drops to zero.
      97              :         readerRefs atomic.Int32
      98              :         // Closure to invoke to release memory accounting.
      99              :         releaseMemAccounting func()
     100              :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     101              :         // files which are backing the flushable.
     102              :         unrefFiles func(*manifest.ObsoleteFiles)
     103              :         // deleteFnLocked should be called if the caller is holding DB.mu.
     104              :         deleteFnLocked func(manifest.ObsoleteFiles)
     105              :         // deleteFn should be called if the caller is not holding DB.mu.
     106              :         deleteFn func(manifest.ObsoleteFiles)
     107              : }
     108              : 
     109            1 : func (e *flushableEntry) readerRef() {
     110            1 :         switch v := e.readerRefs.Add(1); {
     111            0 :         case v <= 1:
     112            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     113              :         }
     114              : }
     115              : 
     116              : // db.mu must not be held when this is called.
     117            1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     118            1 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     119            1 : }
     120              : 
     121              : // db.mu must be held when this is called.
     122            1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     123            1 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     124            1 : }
     125              : 
     126              : func (e *flushableEntry) readerUnrefHelper(
     127              :         deleteFiles bool, deleteFn func(manifest.ObsoleteFiles),
     128            1 : ) {
     129            1 :         switch v := e.readerRefs.Add(-1); {
     130            0 :         case v < 0:
     131            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     132            1 :         case v == 0:
     133            1 :                 if e.releaseMemAccounting == nil {
     134            0 :                         panic("pebble: memtable reservation already released")
     135              :                 }
     136            1 :                 e.releaseMemAccounting()
     137            1 :                 e.releaseMemAccounting = nil
     138            1 :                 if e.unrefFiles != nil {
     139            1 :                         var obsolete manifest.ObsoleteFiles
     140            1 :                         e.unrefFiles(&obsolete)
     141            1 :                         e.unrefFiles = nil
     142            1 :                         if deleteFiles {
     143            1 :                                 deleteFn(obsolete)
     144            1 :                         }
     145              :                 }
     146              :         }
     147              : }
     148              : 
     149              : type flushableList []*flushableEntry
     150              : 
     151              : // ingestedFlushable is the implementation of the flushable interface for the
     152              : // ingesting sstables which are added to the flushable list.
     153              : type ingestedFlushable struct {
     154              :         // files are non-overlapping and ordered (according to their bounds).
     155              :         files            []physicalMeta
     156              :         comparer         *Comparer
     157              :         newIters         tableNewIters
     158              :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     159              : 
     160              :         // Since the level slice is immutable, we construct and set it once. It
     161              :         // should be safe to read from slice in future reads.
     162              :         slice manifest.LevelSlice
     163              :         // hasRangeKeys is set on ingestedFlushable construction.
     164              :         hasRangeKeys bool
     165              :         // exciseSpan is populated if an excise operation should be performed during
     166              :         // flush.
     167              :         exciseSpan   KeyRange
     168              :         exciseSeqNum base.SeqNum
     169              : }
     170              : 
     171              : func newIngestedFlushable(
     172              :         files []*tableMetadata,
     173              :         comparer *Comparer,
     174              :         newIters tableNewIters,
     175              :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     176              :         exciseSpan KeyRange,
     177              :         seqNum base.SeqNum,
     178            1 : ) *ingestedFlushable {
     179            1 :         if invariants.Enabled {
     180            1 :                 for i := 1; i < len(files); i++ {
     181            1 :                         prev := files[i-1].UserKeyBounds()
     182            1 :                         this := files[i].UserKeyBounds()
     183            1 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     184            0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     185              :                         }
     186              :                 }
     187              :         }
     188            1 :         var physicalFiles []physicalMeta
     189            1 :         var hasRangeKeys bool
     190            1 :         for _, f := range files {
     191            1 :                 if f.HasRangeKeys {
     192            1 :                         hasRangeKeys = true
     193            1 :                 }
     194            1 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     195              :         }
     196              : 
     197            1 :         ret := &ingestedFlushable{
     198            1 :                 files:            physicalFiles,
     199            1 :                 comparer:         comparer,
     200            1 :                 newIters:         newIters,
     201            1 :                 newRangeKeyIters: newRangeKeyIters,
     202            1 :                 // slice is immutable and can be set once and used many times.
     203            1 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     204            1 :                 hasRangeKeys: hasRangeKeys,
     205            1 :                 exciseSpan:   exciseSpan,
     206            1 :                 exciseSeqNum: seqNum,
     207            1 :         }
     208            1 : 
     209            1 :         return ret
     210              : }
     211              : 
     212              : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     213              : // tracing.
     214              : 
     215              : // newIter is part of the flushable interface.
     216            1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     217            1 :         var opts IterOptions
     218            1 :         if o != nil {
     219            1 :                 opts = *o
     220            1 :         }
     221            1 :         return newLevelIter(
     222            1 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     223            1 :                 internalIterOpts{},
     224            1 :         )
     225              : }
     226              : 
     227              : // newFlushIter is part of the flushable interface.
     228            0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     229            0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     230            0 :         // Since ingested sstables are already present on disk, they don't need to
     231            0 :         // make use of a flush iter.
     232            0 :         panic("pebble: not implemented")
     233              : }
     234              : 
     235              : func (s *ingestedFlushable) constructRangeDelIter(
     236              :         ctx context.Context, file *manifest.TableMetadata, _ keyspan.SpanIterOptions,
     237            1 : ) (keyspan.FragmentIterator, error) {
     238            1 :         iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
     239            1 :         if err != nil {
     240            0 :                 return nil, err
     241            0 :         }
     242            1 :         return iters.RangeDeletion(), nil
     243              : }
     244              : 
     245              : // newRangeDelIter is part of the flushable interface.
     246              : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     247              : // surface range deletes is more efficient.
     248              : //
     249              : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     250              : // the point iterator in constructRangeDeIter is not tracked.
     251            1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     252            1 :         liter := keyspanimpl.NewLevelIter(
     253            1 :                 context.TODO(),
     254            1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     255            1 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     256            1 :                 manifest.KeyTypePoint,
     257            1 :         )
     258            1 :         if !s.exciseSpan.Valid() {
     259            1 :                 return liter
     260            1 :         }
     261              :         // We have an excise span to weave into the rangedel iterators.
     262              :         //
     263              :         // TODO(bilal): should this be pooled?
     264            1 :         miter := &keyspanimpl.MergingIter{}
     265            1 :         rdel := keyspan.Span{
     266            1 :                 Start: s.exciseSpan.Start,
     267            1 :                 End:   s.exciseSpan.End,
     268            1 :                 Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
     269            1 :         }
     270            1 :         rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
     271            1 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
     272            1 :         return miter
     273              : }
     274              : 
     275              : // newRangeKeyIter is part of the flushable interface.
     276            1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     277            1 :         var rkeydelIter keyspan.FragmentIterator
     278            1 :         if s.exciseSpan.Valid() {
     279            1 :                 // We have an excise span to weave into the rangekey iterators.
     280            1 :                 rkeydel := keyspan.Span{
     281            1 :                         Start: s.exciseSpan.Start,
     282            1 :                         End:   s.exciseSpan.End,
     283            1 :                         Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
     284            1 :                 }
     285            1 :                 rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
     286            1 :         }
     287              : 
     288            1 :         if !s.hasRangeKeys {
     289            1 :                 if rkeydelIter == nil {
     290            1 :                         // NB: we have to return the nil literal as opposed to the nil
     291            1 :                         // value of rkeydelIter, otherwise callers of this function will
     292            1 :                         // have the return value fail == nil checks.
     293            1 :                         return nil
     294            1 :                 }
     295            1 :                 return rkeydelIter
     296              :         }
     297              : 
     298            1 :         liter := keyspanimpl.NewLevelIter(
     299            1 :                 context.TODO(),
     300            1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     301            1 :                 s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
     302            1 :         )
     303            1 :         if rkeydelIter == nil {
     304            1 :                 return liter
     305            1 :         }
     306              :         // TODO(bilal): should this be pooled?
     307            1 :         miter := &keyspanimpl.MergingIter{}
     308            1 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
     309            1 :         return miter
     310              : }
     311              : 
     312              : // containsRangeKeys is part of the flushable interface.
     313            1 : func (s *ingestedFlushable) containsRangeKeys() bool {
     314            1 :         return s.hasRangeKeys || s.exciseSpan.Valid()
     315            1 : }
     316              : 
     317              : // inuseBytes is part of the flushable interface.
     318            0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     319            0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     320            0 :         panic("pebble: not implemented")
     321              : }
     322              : 
     323              : // totalBytes is part of the flushable interface.
     324            1 : func (s *ingestedFlushable) totalBytes() uint64 {
     325            1 :         // We don't allocate additional bytes for the ingestedFlushable.
     326            1 :         return 0
     327            1 : }
     328              : 
     329              : // readyForFlush is part of the flushable interface.
     330            1 : func (s *ingestedFlushable) readyForFlush() bool {
     331            1 :         // ingestedFlushable should always be ready to flush. However, note that
     332            1 :         // memtables before the ingested sstables in the memtable queue must be
     333            1 :         // flushed before an ingestedFlushable can be flushed. This is because the
     334            1 :         // ingested sstables need an updated view of the Version to
     335            1 :         // determine where to place the files in the lsm.
     336            1 :         return true
     337            1 : }
     338              : 
     339              : // computePossibleOverlaps is part of the flushable interface.
     340              : func (s *ingestedFlushable) computePossibleOverlaps(
     341              :         fn func(bounded) shouldContinue, bounded ...bounded,
     342            1 : ) {
     343            1 :         for _, b := range bounded {
     344            1 :                 if s.anyFileOverlaps(b.UserKeyBounds()) {
     345            1 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     346            1 :                         // contain any keys within the key range, but we would need to perform I/O
     347            1 :                         // to know for sure. The flushable interface dictates that we're not
     348            1 :                         // permitted to perform I/O here, so err towards assuming overlap.
     349            1 :                         if !fn(b) {
     350            1 :                                 return
     351            1 :                         }
     352              :                 }
     353              :         }
     354              : }
     355              : 
     356              : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     357              : // bounds that overlap the given bounds.
     358            1 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
     359            1 :         // Note that s.files are non-overlapping and sorted.
     360            1 :         for _, f := range s.files {
     361            1 :                 fileBounds := f.UserKeyBounds()
     362            1 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     363            1 :                         // The file ends before the bounds start. Go to the next file.
     364            1 :                         continue
     365              :                 }
     366            1 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     367            1 :                         // The file starts after the bounds end. There is no overlap, and
     368            1 :                         // further files will not overlap either (the files are sorted).
     369            1 :                         break
     370              :                 }
     371              :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     372              :                 // checks above.
     373            1 :                 return true
     374              :         }
     375            1 :         if s.exciseSpan.Valid() {
     376            1 :                 uk := s.exciseSpan.UserKeyBounds()
     377            1 :                 return uk.Overlaps(s.comparer.Compare, &bounds)
     378            1 :         }
     379            1 :         return false
     380              : }
     381              : 
     382              : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     383              : // interface's computePossibleOverlaps function for flushable implementations
     384              : // with only in-memory state that do not have special requirements and should
     385              : // read through the ordinary flushable iterators.
     386              : //
     387              : // This function must only be used with implementations that are infallible (eg,
     388              : // memtable iterators) and will panic if an error is encountered.
     389              : func computePossibleOverlapsGenericImpl[F flushable](
     390              :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     391            1 : ) {
     392            1 :         iter := f.newIter(nil)
     393            1 :         rangeDelIter := f.newRangeDelIter(nil)
     394            1 :         rangeKeyIter := f.newRangeKeyIter(nil)
     395            1 :         for _, b := range bounded {
     396            1 :                 overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
     397            1 :                 if invariants.Enabled && err != nil {
     398            0 :                         panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
     399              :                 }
     400            1 :                 if overlap {
     401            1 :                         if !fn(b) {
     402            1 :                                 break
     403              :                         }
     404              :                 }
     405              :         }
     406              : 
     407            1 :         if iter != nil {
     408            1 :                 if err := iter.Close(); err != nil {
     409            0 :                         // This implementation must be used in circumstances where
     410            0 :                         // reading through the iterator is infallible.
     411            0 :                         panic(err)
     412              :                 }
     413              :         }
     414            1 :         if rangeDelIter != nil {
     415            1 :                 rangeDelIter.Close()
     416            1 :         }
     417            1 :         if rangeKeyIter != nil {
     418            1 :                 rangeKeyIter.Close()
     419            1 :         }
     420              : }
     421              : 
     422              : // determineOverlapAllIters checks for overlap in a point iterator, range
     423              : // deletion iterator and range key iterator.
     424              : func determineOverlapAllIters(
     425              :         cmp base.Compare,
     426              :         bounds base.UserKeyBounds,
     427              :         pointIter base.InternalIterator,
     428              :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     429            1 : ) (bool, error) {
     430            1 :         if pointIter != nil {
     431            1 :                 if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
     432            1 :                         return pointOverlap, err
     433            1 :                 }
     434              :         }
     435            1 :         if rangeDelIter != nil {
     436            1 :                 if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
     437            1 :                         return rangeDelOverlap, err
     438            1 :                 }
     439              :         }
     440            1 :         if rangeKeyIter != nil {
     441            1 :                 return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
     442            1 :         }
     443            1 :         return false, nil
     444              : }
     445              : 
     446              : func determineOverlapPointIterator(
     447              :         cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
     448            1 : ) (bool, error) {
     449            1 :         kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
     450            1 :         if kv == nil {
     451            1 :                 return false, iter.Error()
     452            1 :         }
     453            1 :         return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
     454              : }
     455              : 
     456              : func determineOverlapKeyspanIterator(
     457              :         cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
     458            1 : ) (bool, error) {
     459            1 :         // NB: The spans surfaced by the fragment iterator are non-overlapping.
     460            1 :         span, err := iter.SeekGE(bounds.Start)
     461            1 :         if err != nil {
     462            0 :                 return false, err
     463            0 :         }
     464            1 :         for ; span != nil; span, err = iter.Next() {
     465            1 :                 if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
     466            1 :                         // The span starts after our bounds.
     467            1 :                         return false, nil
     468            1 :                 }
     469            1 :                 if !span.Empty() {
     470            1 :                         return true, nil
     471            1 :                 }
     472              :         }
     473            1 :         return false, err
     474              : }
        

Generated by: LCOV version 2.0-1