LCOV - code coverage report
Current view: top level - pebble - flushable.go (source / functions) Hit Total Coverage
Test: 2024-01-20 08:15Z 5b092519 - tests + meta.lcov Lines: 127 145 87.6 %
Date: 2024-01-20 08:16:47 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package pebble
       6             : 
       7             : import (
       8             :         "context"
       9             :         "fmt"
      10             :         "io"
      11             :         "sync/atomic"
      12             :         "time"
      13             : 
      14             :         "github.com/cockroachdb/pebble/internal/base"
      15             :         "github.com/cockroachdb/pebble/internal/keyspan"
      16             :         "github.com/cockroachdb/pebble/internal/manifest"
      17             : )
      18             : 
      19             : // flushable defines the interface for immutable memtables.
      20             : type flushable interface {
      21             :         newIter(o *IterOptions) internalIterator
      22             :         newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator
      23             :         newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
      24             :         newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
      25             :         containsRangeKeys() bool
      26             :         // inuseBytes returns the number of inuse bytes by the flushable.
      27             :         inuseBytes() uint64
      28             :         // totalBytes returns the total number of bytes allocated by the flushable.
      29             :         totalBytes() uint64
      30             :         // readyForFlush returns true when the flushable is ready for flushing. See
      31             :         // memTable.readyForFlush for one implementation which needs to check whether
      32             :         // there are any outstanding write references.
      33             :         readyForFlush() bool
      34             :         // computePossibleOverlaps determines whether the flushable's keys overlap
      35             :         // with the bounds of any of the provided bounded items. If an item overlaps
      36             :         // or might overlap but it's not possible to determine overlap cheaply,
      37             :         // computePossibleOverlaps invokes the provided function with the object
      38             :         // that might overlap. computePossibleOverlaps must not perform any I/O and
      39             :         // implementations should invoke the provided function for items that would
      40             :         // require I/O to determine overlap.
      41             :         computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
      42             : }
      43             : 
      44             : type shouldContinue bool
      45             : 
      46             : const (
      47             :         continueIteration shouldContinue = true
      48             :         stopIteration                    = false
      49             : )
      50             : 
      51             : type bounded interface {
      52             :         // InternalKeyBounds returns a start key and an end key. Both bounds are
      53             :         // inclusive.
      54             :         InternalKeyBounds() (InternalKey, InternalKey)
      55             : }
      56             : 
      57           2 : func sliceAsBounded[B bounded](s []B) []bounded {
      58           2 :         ret := make([]bounded, len(s))
      59           2 :         for i := 0; i < len(s); i++ {
      60           2 :                 ret[i] = s[i]
      61           2 :         }
      62           2 :         return ret
      63             : }
      64             : 
      65             : // flushableEntry wraps a flushable and adds additional metadata and
      66             : // functionality that is common to all flushables.
      67             : type flushableEntry struct {
      68             :         flushable
      69             :         // Channel which is closed when the flushable has been flushed.
      70             :         flushed chan struct{}
      71             :         // flushForced indicates whether a flush was forced on this memtable (either
      72             :         // manual, or due to ingestion). Protected by DB.mu.
      73             :         flushForced bool
      74             :         // delayedFlushForcedAt indicates whether a timer has been set to force a
      75             :         // flush on this memtable at some point in the future. Protected by DB.mu.
      76             :         // Holds the timestamp of when the flush will be issued.
      77             :         delayedFlushForcedAt time.Time
      78             :         // logNum corresponds to the WAL that contains the records present in the
      79             :         // receiver.
      80             :         logNum base.DiskFileNum
      81             :         // logSize is the size in bytes of the associated WAL. Protected by DB.mu.
      82             :         logSize uint64
      83             :         // The current logSeqNum at the time the memtable was created. This is
      84             :         // guaranteed to be less than or equal to any seqnum stored in the memtable.
      85             :         logSeqNum uint64
      86             :         // readerRefs tracks the read references on the flushable. The two sources of
      87             :         // reader references are DB.mu.mem.queue and readState.memtables. The memory
      88             :         // reserved by the flushable in the cache is released when the reader refs
      89             :         // drop to zero. If the flushable is referencing sstables, then the file
      90             :         // refount is also decreased once the reader refs drops to 0. If the
      91             :         // flushable is a memTable, when the reader refs drops to zero, the writer
      92             :         // refs will already be zero because the memtable will have been flushed and
      93             :         // that only occurs once the writer refs drops to zero.
      94             :         readerRefs atomic.Int32
      95             :         // Closure to invoke to release memory accounting.
      96             :         releaseMemAccounting func()
      97             :         // unrefFiles, if not nil, should be invoked to decrease the ref count of
      98             :         // files which are backing the flushable.
      99             :         unrefFiles func() []*fileBacking
     100             :         // deleteFnLocked should be called if the caller is holding DB.mu.
     101             :         deleteFnLocked func(obsolete []*fileBacking)
     102             :         // deleteFn should be called if the caller is not holding DB.mu.
     103             :         deleteFn func(obsolete []*fileBacking)
     104             : }
     105             : 
     106           2 : func (e *flushableEntry) readerRef() {
     107           2 :         switch v := e.readerRefs.Add(1); {
     108           0 :         case v <= 1:
     109           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     110             :         }
     111             : }
     112             : 
     113             : // db.mu must not be held when this is called.
     114           2 : func (e *flushableEntry) readerUnref(deleteFiles bool) {
     115           2 :         e.readerUnrefHelper(deleteFiles, e.deleteFn)
     116           2 : }
     117             : 
     118             : // db.mu must be held when this is called.
     119           2 : func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
     120           2 :         e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
     121           2 : }
     122             : 
     123             : func (e *flushableEntry) readerUnrefHelper(
     124             :         deleteFiles bool, deleteFn func(obsolete []*fileBacking),
     125           2 : ) {
     126           2 :         switch v := e.readerRefs.Add(-1); {
     127           0 :         case v < 0:
     128           0 :                 panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
     129           2 :         case v == 0:
     130           2 :                 if e.releaseMemAccounting == nil {
     131           0 :                         panic("pebble: memtable reservation already released")
     132             :                 }
     133           2 :                 e.releaseMemAccounting()
     134           2 :                 e.releaseMemAccounting = nil
     135           2 :                 if e.unrefFiles != nil {
     136           2 :                         obsolete := e.unrefFiles()
     137           2 :                         e.unrefFiles = nil
     138           2 :                         if deleteFiles {
     139           2 :                                 deleteFn(obsolete)
     140           2 :                         }
     141             :                 }
     142             :         }
     143             : }
     144             : 
     145             : type flushableList []*flushableEntry
     146             : 
     147             : // ingestedFlushable is the implementation of the flushable interface for the
     148             : // ingesting sstables which are added to the flushable list.
     149             : type ingestedFlushable struct {
     150             :         files            []physicalMeta
     151             :         comparer         *Comparer
     152             :         newIters         tableNewIters
     153             :         newRangeKeyIters keyspan.TableNewSpanIter
     154             : 
     155             :         // Since the level slice is immutable, we construct and set it once. It
     156             :         // should be safe to read from slice in future reads.
     157             :         slice manifest.LevelSlice
     158             :         // hasRangeKeys is set on ingestedFlushable construction.
     159             :         hasRangeKeys bool
     160             : }
     161             : 
     162             : func newIngestedFlushable(
     163             :         files []*fileMetadata,
     164             :         comparer *Comparer,
     165             :         newIters tableNewIters,
     166             :         newRangeKeyIters keyspan.TableNewSpanIter,
     167           2 : ) *ingestedFlushable {
     168           2 :         var physicalFiles []physicalMeta
     169           2 :         var hasRangeKeys bool
     170           2 :         for _, f := range files {
     171           2 :                 if f.HasRangeKeys {
     172           2 :                         hasRangeKeys = true
     173           2 :                 }
     174           2 :                 physicalFiles = append(physicalFiles, f.PhysicalMeta())
     175             :         }
     176             : 
     177           2 :         ret := &ingestedFlushable{
     178           2 :                 files:            physicalFiles,
     179           2 :                 comparer:         comparer,
     180           2 :                 newIters:         newIters,
     181           2 :                 newRangeKeyIters: newRangeKeyIters,
     182           2 :                 // slice is immutable and can be set once and used many times.
     183           2 :                 slice:        manifest.NewLevelSliceKeySorted(comparer.Compare, files),
     184           2 :                 hasRangeKeys: hasRangeKeys,
     185           2 :         }
     186           2 : 
     187           2 :         return ret
     188             : }
     189             : 
     190             : // TODO(sumeer): ingestedFlushable iters also need to plumb context for
     191             : // tracing.
     192             : 
     193             : // newIter is part of the flushable interface.
     194           2 : func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
     195           2 :         var opts IterOptions
     196           2 :         if o != nil {
     197           2 :                 opts = *o
     198           2 :         }
     199             :         // TODO(bananabrick): The manifest.Level in newLevelIter is only used for
     200             :         // logging. Update the manifest.Level encoding to account for levels which
     201             :         // aren't truly levels in the lsm. Right now, the encoding only supports
     202             :         // L0 sublevels, and the rest of the levels in the lsm.
     203           2 :         return newLevelIter(
     204           2 :                 context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.Level(0),
     205           2 :                 internalIterOpts{},
     206           2 :         )
     207             : }
     208             : 
     209             : // newFlushIter is part of the flushable interface.
     210           0 : func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
     211           0 :         // newFlushIter is only used for writing memtables to disk as sstables.
     212           0 :         // Since ingested sstables are already present on disk, they don't need to
     213           0 :         // make use of a flush iter.
     214           0 :         panic("pebble: not implemented")
     215             : }
     216             : 
     217             : func (s *ingestedFlushable) constructRangeDelIter(
     218             :         file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
     219           2 : ) (keyspan.FragmentIterator, error) {
     220           2 :         // Note that the keyspan level iter expects a non-nil iterator to be
     221           2 :         // returned even if there is an error. So, we return the emptyKeyspanIter.
     222           2 :         iter, rangeDelIter, err := s.newIters(context.Background(), file, nil, internalIterOpts{})
     223           2 :         if err != nil {
     224           0 :                 return emptyKeyspanIter, err
     225           0 :         }
     226           2 :         iter.Close()
     227           2 :         if rangeDelIter == nil {
     228           2 :                 return emptyKeyspanIter, nil
     229           2 :         }
     230           2 :         return rangeDelIter, nil
     231             : }
     232             : 
     233             : // newRangeDelIter is part of the flushable interface.
     234             : // TODO(bananabrick): Using a level iter instead of a keyspan level iter to
     235             : // surface range deletes is more efficient.
     236             : //
     237             : // TODO(sumeer): *IterOptions are being ignored, so the index block load for
     238             : // the point iterator in constructRangeDeIter is not tracked.
     239           2 : func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
     240           2 :         return keyspan.NewLevelIter(
     241           2 :                 keyspan.SpanIterOptions{}, s.comparer.Compare,
     242           2 :                 s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
     243           2 :                 manifest.KeyTypePoint,
     244           2 :         )
     245           2 : }
     246             : 
     247             : // newRangeKeyIter is part of the flushable interface.
     248           2 : func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
     249           2 :         if !s.containsRangeKeys() {
     250           2 :                 return nil
     251           2 :         }
     252             : 
     253           2 :         return keyspan.NewLevelIter(
     254           2 :                 keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
     255           2 :                 s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
     256           2 :         )
     257             : }
     258             : 
     259             : // containsRangeKeys is part of the flushable interface.
     260           2 : func (s *ingestedFlushable) containsRangeKeys() bool {
     261           2 :         return s.hasRangeKeys
     262           2 : }
     263             : 
     264             : // inuseBytes is part of the flushable interface.
     265           0 : func (s *ingestedFlushable) inuseBytes() uint64 {
     266           0 :         // inuseBytes is only used when memtables are flushed to disk as sstables.
     267           0 :         panic("pebble: not implemented")
     268             : }
     269             : 
     270             : // totalBytes is part of the flushable interface.
     271           2 : func (s *ingestedFlushable) totalBytes() uint64 {
     272           2 :         // We don't allocate additional bytes for the ingestedFlushable.
     273           2 :         return 0
     274           2 : }
     275             : 
     276             : // readyForFlush is part of the flushable interface.
     277           2 : func (s *ingestedFlushable) readyForFlush() bool {
     278           2 :         // ingestedFlushable should always be ready to flush. However, note that
     279           2 :         // memtables before the ingested sstables in the memtable queue must be
     280           2 :         // flushed before an ingestedFlushable can be flushed. This is because the
     281           2 :         // ingested sstables need an updated view of the Version to
     282           2 :         // determine where to place the files in the lsm.
     283           2 :         return true
     284           2 : }
     285             : 
     286             : // computePossibleOverlaps is part of the flushable interface.
     287             : func (s *ingestedFlushable) computePossibleOverlaps(
     288             :         fn func(bounded) shouldContinue, bounded ...bounded,
     289           2 : ) {
     290           2 :         for i := range bounded {
     291           2 :                 smallest, largest := bounded[i].InternalKeyBounds()
     292           2 :                 for j := 0; j < len(s.files); j++ {
     293           2 :                         if sstableKeyCompare(s.comparer.Compare, s.files[j].Largest, smallest) >= 0 {
     294           2 :                                 // This file's largest key is larger than smallest. Either the
     295           2 :                                 // file overlaps the bounds, or it lies strictly after the
     296           2 :                                 // bounds. Either way we can stop iterating since the files are
     297           2 :                                 // sorted. But first, determine if there's overlap and call fn
     298           2 :                                 // if necessary.
     299           2 :                                 if sstableKeyCompare(s.comparer.Compare, s.files[j].Smallest, largest) <= 0 {
     300           2 :                                         // The file overlaps in key boundaries. The file doesn't necessarily
     301           2 :                                         // contain any keys within the key range, but we would need to
     302           2 :                                         // perform I/O to know for sure. The flushable interface dictates
     303           2 :                                         // that we're not permitted to perform I/O here, so err towards
     304           2 :                                         // assuming overlap.
     305           2 :                                         if !fn(bounded[i]) {
     306           1 :                                                 return
     307           1 :                                         }
     308             :                                 }
     309           2 :                                 break
     310             :                         }
     311             :                 }
     312             :         }
     313             : }
     314             : 
     315             : // computePossibleOverlapsGenericImpl is an implemention of the flushable
     316             : // interface's computePossibleOverlaps function for flushable implementations
     317             : // with only in-memory state that do not have special requirements and should
     318             : // read through the ordinary flushable iterators.
     319             : //
     320             : // This function must only be used with implementations that are infallible (eg,
     321             : // memtable iterators) and will panic if an error is encountered.
     322             : func computePossibleOverlapsGenericImpl[F flushable](
     323             :         f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
     324           2 : ) {
     325           2 :         iter := f.newIter(nil)
     326           2 :         rangeDelIter := f.newRangeDelIter(nil)
     327           2 :         rkeyIter := f.newRangeKeyIter(nil)
     328           2 :         for _, b := range bounded {
     329           2 :                 s, l := b.InternalKeyBounds()
     330           2 :                 kr := internalKeyRange{s, l}
     331           2 :                 if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
     332           2 :                         if !fn(b) {
     333           2 :                                 break
     334             :                         }
     335             :                 }
     336             :         }
     337             : 
     338           2 :         for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
     339           2 :                 if c != nil {
     340           2 :                         if err := c.Close(); err != nil {
     341           0 :                                 // This implementation must be used in circumstances where
     342           0 :                                 // reading through the iterator is infallible.
     343           0 :                                 panic(err)
     344             :                         }
     345             :                 }
     346             :         }
     347             : }

Generated by: LCOV version 1.14