LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Coverage Total Hit
Test: 2025-02-12 08:16Z 419f2391 - meta test only.lcov Lines: 90.4 % 228 206
Test Date: 2025-02-12 08:17:57 Functions: - 0 0

            Line data    Source code
       1              : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2              : // of this source code is governed by a BSD-style license that can be found in
       3              : // the LICENSE file.
       4              : 
       5              : package pebble
       6              : 
       7              : import (
       8              :         "context"
       9              :         "fmt"
      10              :         "sync/atomic"
      11              :         "time"
      12              : 
      13              :         "github.com/cockroachdb/errors"
      14              :         "github.com/cockroachdb/pebble/internal/base"
      15              :         "github.com/cockroachdb/pebble/internal/invariants"
      16              :         "github.com/cockroachdb/pebble/internal/keyspan"
      17              :         "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
      18              :         "github.com/cockroachdb/pebble/internal/manifest"
      19              : )
      20              : 
      21              : // flushable defines the interface for immutable memtables.
      22              : type flushable interface {
      23              :         newIter(o *IterOptions) internalIterator
      24              :         newFlushIter(o *IterOptions) internalIterator
      25              :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      26              :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      27              :         containsRangeKeys() bool
      28              :         // inuseBytes returns the number of inuse bytes by the flushable.
      29              :         inuseBytes() uint64
      30              :         // totalBytes returns the total number of bytes allocated by the flushable.
      31              :         totalBytes() uint64
      32              :         // readyForFlush returns true when the flushable is ready for flushing. See
      33              :         // memTable.readyForFlush for one implementation which needs to check whether
      34              :         // there are any outstanding write references.
      35              :         readyForFlush() bool
      36              :         // computePossibleOverlaps determines whether the flushable's keys overlap
      37              :         // with the bounds of any of the provided bounded items. If an item overlaps
      38              :         // or might overlap but it's not possible to determine overlap cheaply,
      39              :         // computePossibleOverlaps invokes the provided function with the object
      40              :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      41              :         // implementations should invoke the provided function for items that would
      42              :         // require I/O to determine overlap.
      43              :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      44              : }
      45              : 
      46              : type shouldContinue bool
      47              : 
      48              : const (
      49              :         continueIteration shouldContinue = true
      50              :         stopIteration                    = false
      51              : )
      52              : 
      53              : type bounded interface {
      54              :         UserKeyBounds() base.UserKeyBounds
      55              : }
      56              : 
      57              : var _ bounded = (*fileMetadata)(nil)
      58              : var _ bounded = KeyRange{}
      59              : 
      60            1 : func sliceAsBounded[B bounded](s []B) []bounded {
      61            1 :         ret := make([]bounded, len(s))
      62            1 :         for i := 0; i < len(s); i++ {
      63            1 :                 ret[i] = s[i]
      64            1 :         }
      65            1 :         return ret
      66              : }
      67              : 
      68              : // flushableEntry wraps a flushable and adds additional metadata and
      69              : // functionality that is common to all flushables.
      70              : type flushableEntry struct {
      71              :         flushable
      72              :         // Channel which is closed when the flushable has been flushed.
      73              :         flushed chan struct{}
      74              :         // flushForced indicates whether a flush was forced on this memtable (either
      75              :         // manual, or due to ingestion). Protected by DB.mu.
      76              :         flushForced bool
      77              :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      78              :         // flush on this memtable at some point in the future. Protected by DB.mu.
      79              :         // Holds the timestamp of when the flush will be issued.
      80              :         delayedFlushForcedAt time.Time
      81              :         // logNum corresponds to the WAL that contains the records present in the
      82              :         // receiver.
      83              :         logNum base.DiskFileNum
      84              :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      85              :         logSize uint64
      86              :         // The current logSeqNum at the time the memtable was created. This is
      87              :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      88              :         logSeqNum base.SeqNum
      89              :         // readerRefs tracks the read references on the flushable. The two sources of
      90              :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      91              :         // reserved by the flushable in the cache is released when the reader refs
      92              :         // drop to zero. If the flushable is referencing sstables, then the file
      93              :         // refount is also decreased once the reader refs drops to 0. If the
      94              :         // flushable is a memTable, when the reader refs drops to zero, the writer
      95              :         // refs will already be zero because the memtable will have been flushed and
      96              :         // that only occurs once the writer refs drops to zero.
      97              :         readerRefs atomic.Int32
      98              :         // Closure to invoke to release memory accounting.
      99              :         releaseMemAccounting func()
     100              :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
     101              :         // files which are backing the flushable.
     102              :         unrefFiles func() []*fileBacking
     103              :         // deleteFnLocked should be called if the caller is holding DB.mu.
     104              :         deleteFnLocked func(obsolete []*fileBacking)
     105              :         // deleteFn should be called if the caller is not holding DB.mu.
     106              :         deleteFn func(obsolete []*fileBacking)
     107              : }
     108              : 
     109            1 : func (e *flushableEntry) readerRef() {
     110            1 :         switch v := e.readerRefs.Add(1); {
     111            0 :         case v <= 1:
     112            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     113              :         }
     114              : }
     115              : 
     116              : // db.mu must not be held when this is called.
     117            1 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     118            1 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     119            1 : }
     120              : 
     121              : // db.mu must be held when this is called.
     122            1 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     123            1 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     124            1 : }
     125              : 
     126              : func (e *flushableEntry) readerUnrefHelper(
     127              :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     128            1 : ) {
     129            1 :         switch v := e.readerRefs.Add(-1); {
     130            0 :         case v < 0:
     131            0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     132            1 :         case v == 0:
     133            1 :                 if e.releaseMemAccounting == nil {
     134            0 :                         panic("pebble: memtable reservation already released")
     135              :                 }
     136            1 :                 e.releaseMemAccounting()
     137            1 :                 e.releaseMemAccounting = nil
     138            1 :                 if e.unrefFiles != nil {
     139            1 :                         obsolete := e.unrefFiles()
     140            1 :                         e.unrefFiles = nil
     141            1 :                         if deleteFiles {
     142            1 :                                 deleteFn(obsolete)
     143            1 :                         }
     144              :                 }
     145              :         }
     146              : }
     147              : 
     148              : type flushableList []*flushableEntry
     149              : 
     150              : // ingestedFlushable is the implementation of the flushable interface for the
     151              : // ingesting sstables which are added to the flushable list.
     152              : type ingestedFlushable struct {
     153              :         // files are non-overlapping and ordered (according to their bounds).
     154              :         files            []physicalMeta
     155              :         comparer         *Comparer
     156              :         newIters         tableNewIters
     157              :         newRangeKeyIters keyspanimpl.TableNewSpanIter
     158              : 
     159              :         // Since the level slice is immutable, we construct and set it once. It
     160              :         // should be safe to read from slice in future reads.
     161              :         slice manifest.LevelSlice
     162              :         // hasRangeKeys is set on ingestedFlushable construction.
     163              :         hasRangeKeys bool
     164              :         // exciseSpan is populated if an excise operation should be performed during
     165              :         // flush.
     166              :         exciseSpan   KeyRange
     167              :         exciseSeqNum base.SeqNum
     168              : }
     169              : 
     170              : func newIngestedFlushable(
     171              :         files []*fileMetadata,
     172              :         comparer *Comparer,
     173              :         newIters tableNewIters,
     174              :         newRangeKeyIters keyspanimpl.TableNewSpanIter,
     175              :         exciseSpan KeyRange,
     176              :         seqNum base.SeqNum,
     177            1 : ) *ingestedFlushable {
     178            1 :         if invariants.Enabled {
     179            1 :                 for i := 1; i < len(files); i++ {
     180            1 :                         prev := files[i-1].UserKeyBounds()
     181            1 :                         this := files[i].UserKeyBounds()
     182            1 :                         if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
     183            0 :                                 panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
     184              :                         }
     185              :                 }
     186              :         }
     187            1 :         var physicalFiles []physicalMeta
     188            1 :         var hasRangeKeys bool
     189            1 :         for _, f := range files {
     190            1 :                 if f.HasRangeKeys {
     191            1 :                         hasRangeKeys = true
     192            1 :                 }
     193            1 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     194              :         }
     195              : 
     196            1 :         ret := &ingestedFlushable{
     197            1 :                 files:            physicalFiles,
     198            1 :                 comparer:         comparer,
     199            1 :                 newIters:         newIters,
     200            1 :                 newRangeKeyIters: newRangeKeyIters,
     201            1 :                 // slice is immutable and can be set once and used many times.
     202            1 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     203            1 :                 hasRangeKeys: hasRangeKeys,
     204            1 :                 exciseSpan:   exciseSpan,
     205            1 :                 exciseSeqNum: seqNum,
     206            1 :         }
     207            1 : 
     208            1 :         return ret
     209              : }
     210              : 
     211              : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     212              : // tracing.
     213              : 
     214              : // newIter is part of the flushable interface.
     215            1 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     216            1 :         var opts IterOptions
     217            1 :         if o != nil {
     218            1 :                 opts = *o
     219            1 :         }
     220            1 :         return newLevelIter(
     221            1 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     222            1 :                 internalIterOpts{},
     223            1 :         )
     224              : }
     225              : 
     226              : // newFlushIter is part of the flushable interface.
     227            0 : func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
     228            0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     229            0 :         // Since ingested sstables are already present on disk, they don't need to
     230            0 :         // make use of a flush iter.
     231            0 :         panic("pebble: not implemented")
     232              : }
     233              : 
     234              : func (s *ingestedFlushable) constructRangeDelIter(
     235              :         ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     236            1 : ) (keyspan.FragmentIterator, error) {
     237            1 :         iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
     238            1 :         if err != nil {
     239            0 :                 return nil, err
     240            0 :         }
     241            1 :         return iters.RangeDeletion(), nil
     242              : }
     243              : 
     244              : // newRangeDelIter is part of the flushable interface.
     245              : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     246              : // surface range deletes is more efficient.
     247              : //
     248              : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     249              : // the point iterator in constructRangeDeIter is not tracked.
     250            1 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     251            1 :         liter := keyspanimpl.NewLevelIter(
     252            1 :                 context.TODO(),
     253            1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     254            1 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
     255            1 :                 manifest.KeyTypePoint,
     256            1 :         )
     257            1 :         if !s.exciseSpan.Valid() {
     258            1 :                 return liter
     259            1 :         }
     260              :         // We have an excise span to weave into the rangedel iterators.
     261              :         //
     262              :         // TODO(bilal): should this be pooled?
     263            1 :         miter := &keyspanimpl.MergingIter{}
     264            1 :         rdel := keyspan.Span{
     265            1 :                 Start: s.exciseSpan.Start,
     266            1 :                 End:   s.exciseSpan.End,
     267            1 :                 Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
     268            1 :         }
     269            1 :         rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
     270            1 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
     271            1 :         return miter
     272              : }
     273              : 
     274              : // newRangeKeyIter is part of the flushable interface.
     275            1 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     276            1 :         var rkeydelIter keyspan.FragmentIterator
     277            1 :         if s.exciseSpan.Valid() {
     278            1 :                 // We have an excise span to weave into the rangekey iterators.
     279            1 :                 rkeydel := keyspan.Span{
     280            1 :                         Start: s.exciseSpan.Start,
     281            1 :                         End:   s.exciseSpan.End,
     282            1 :                         Keys:  []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
     283            1 :                 }
     284            1 :                 rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
     285            1 :         }
     286              : 
     287            1 :         if !s.hasRangeKeys {
     288            1 :                 if rkeydelIter == nil {
     289            1 :                         // NB: we have to return the nil literal as opposed to the nil
     290            1 :                         // value of rkeydelIter, otherwise callers of this function will
     291            1 :                         // have the return value fail == nil checks.
     292            1 :                         return nil
     293            1 :                 }
     294            1 :                 return rkeydelIter
     295              :         }
     296              : 
     297            1 :         liter := keyspanimpl.NewLevelIter(
     298            1 :                 context.TODO(),
     299            1 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     300            1 :                 s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
     301            1 :         )
     302            1 :         if rkeydelIter == nil {
     303            1 :                 return liter
     304            1 :         }
     305              :         // TODO(bilal): should this be pooled?
     306            1 :         miter := &keyspanimpl.MergingIter{}
     307            1 :         miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
     308            1 :         return miter
     309              : }
     310              : 
     311              : // containsRangeKeys is part of the flushable interface.
     312            1 : func (s *ingestedFlushable) containsRangeKeys() bool {
     313            1 :         return s.hasRangeKeys || s.exciseSpan.Valid()
     314            1 : }
     315              : 
     316              : // inuseBytes is part of the flushable interface.
     317            0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     318            0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     319            0 :         panic("pebble: not implemented")
     320              : }
     321              : 
     322              : // totalBytes is part of the flushable interface.
     323            1 : func (s *ingestedFlushable) totalBytes() uint64 {
     324            1 :         // We don't allocate additional bytes for the ingestedFlushable.
     325            1 :         return 0
     326            1 : }
     327              : 
     328              : // readyForFlush is part of the flushable interface.
     329            1 : func (s *ingestedFlushable) readyForFlush() bool {
     330            1 :         // ingestedFlushable should always be ready to flush. However, note that
     331            1 :         // memtables before the ingested sstables in the memtable queue must be
     332            1 :         // flushed before an ingestedFlushable can be flushed. This is because the
     333            1 :         // ingested sstables need an updated view of the Version to
     334            1 :         // determine where to place the files in the lsm.
     335            1 :         return true
     336            1 : }
     337              : 
     338              : // computePossibleOverlaps is part of the flushable interface.
     339              : func (s *ingestedFlushable) computePossibleOverlaps(
     340              :         fn func(bounded) shouldContinue, bounded ...bounded,
     341            1 : ) {
     342            1 :         for _, b := range bounded {
     343            1 :                 if s.anyFileOverlaps(b.UserKeyBounds()) {
     344            1 :                         // Some file overlaps in key boundaries. The file doesn't necessarily
     345            1 :                         // contain any keys within the key range, but we would need to perform I/O
     346            1 :                         // to know for sure. The flushable interface dictates that we're not
     347            1 :                         // permitted to perform I/O here, so err towards assuming overlap.
     348            1 :                         if !fn(b) {
     349            1 :                                 return
     350            1 :                         }
     351              :                 }
     352              :         }
     353              : }
     354              : 
     355              : // anyFileBoundsOverlap returns true if there is at least a file in s.files with
     356              : // bounds that overlap the given bounds.
     357            1 : func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
     358            1 :         // Note that s.files are non-overlapping and sorted.
     359            1 :         for _, f := range s.files {
     360            1 :                 fileBounds := f.UserKeyBounds()
     361            1 :                 if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
     362            1 :                         // The file ends before the bounds start. Go to the next file.
     363            1 :                         continue
     364              :                 }
     365            1 :                 if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
     366            1 :                         // The file starts after the bounds end. There is no overlap, and
     367            1 :                         // further files will not overlap either (the files are sorted).
     368            1 :                         break
     369              :                 }
     370              :                 // There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
     371              :                 // checks above.
     372            1 :                 return true
     373              :         }
     374            1 :         if s.exciseSpan.Valid() {
     375            1 :                 uk := s.exciseSpan.UserKeyBounds()
     376            1 :                 return uk.Overlaps(s.comparer.Compare, &bounds)
     377            1 :         }
     378            1 :         return false
     379              : }
     380              : 
     381              : // computePossibleOverlapsGenericImpl is an implementation of the flushable
     382              : // interface's computePossibleOverlaps function for flushable implementations
     383              : // with only in-memory state that do not have special requirements and should
     384              : // read through the ordinary flushable iterators.
     385              : //
     386              : // This function must only be used with implementations that are infallible (eg,
     387              : // memtable iterators) and will panic if an error is encountered.
     388              : func computePossibleOverlapsGenericImpl[F flushable](
     389              :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     390            1 : ) {
     391            1 :         iter := f.newIter(nil)
     392            1 :         rangeDelIter := f.newRangeDelIter(nil)
     393            1 :         rangeKeyIter := f.newRangeKeyIter(nil)
     394            1 :         for _, b := range bounded {
     395            1 :                 overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
     396            1 :                 if invariants.Enabled && err != nil {
     397            0 :                         panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
     398              :                 }
     399            1 :                 if overlap {
     400            1 :                         if !fn(b) {
     401            1 :                                 break
     402              :                         }
     403              :                 }
     404              :         }
     405              : 
     406            1 :         if iter != nil {
     407            1 :                 if err := iter.Close(); err != nil {
     408            0 :                         // This implementation must be used in circumstances where
     409            0 :                         // reading through the iterator is infallible.
     410            0 :                         panic(err)
     411              :                 }
     412              :         }
     413            1 :         if rangeDelIter != nil {
     414            1 :                 rangeDelIter.Close()
     415            1 :         }
     416            1 :         if rangeKeyIter != nil {
     417            1 :                 rangeKeyIter.Close()
     418            1 :         }
     419              : }
     420              : 
     421              : // determineOverlapAllIters checks for overlap in a point iterator, range
     422              : // deletion iterator and range key iterator.
     423              : func determineOverlapAllIters(
     424              :         cmp base.Compare,
     425              :         bounds base.UserKeyBounds,
     426              :         pointIter base.InternalIterator,
     427              :         rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
     428            1 : ) (bool, error) {
     429            1 :         if pointIter != nil {
     430            1 :                 if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
     431            1 :                         return pointOverlap, err
     432            1 :                 }
     433              :         }
     434            1 :         if rangeDelIter != nil {
     435            1 :                 if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
     436            1 :                         return rangeDelOverlap, err
     437            1 :                 }
     438              :         }
     439            1 :         if rangeKeyIter != nil {
     440            1 :                 return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
     441            1 :         }
     442            1 :         return false, nil
     443              : }
     444              : 
     445              : func determineOverlapPointIterator(
     446              :         cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
     447            1 : ) (bool, error) {
     448            1 :         kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
     449            1 :         if kv == nil {
     450            1 :                 return false, iter.Error()
     451            1 :         }
     452            1 :         return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
     453              : }
     454              : 
     455              : func determineOverlapKeyspanIterator(
     456              :         cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
     457            1 : ) (bool, error) {
     458            1 :         // NB: The spans surfaced by the fragment iterator are non-overlapping.
     459            1 :         span, err := iter.SeekGE(bounds.Start)
     460            1 :         if err != nil {
     461            0 :                 return false, err
     462            0 :         }
     463            1 :         for ; span != nil; span, err = iter.Next() {
     464            1 :                 if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
     465            1 :                         // The span starts after our bounds.
     466            1 :                         return false, nil
     467            1 :                 }
     468            1 :                 if !span.Empty() {
     469            1 :                         return true, nil
     470            1 :                 }
     471              :         }
     472            1 :         return false, err
     473              : }
        

Generated by: LCOV version 2.0-1