LCOV - code coverage report
Current view: top level - pebble/internal/manifest - annotator.go (source / functions) Hit Total Coverage
Test: 2024-11-12 08:17Z 9f68a214 - tests + meta.lcov Lines: 201 212 94.8 %
Date: 2024-11-12 08:18:08 Functions: 0 0 -

          Line data    Source code
       1             : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
       2             : // of this source code is governed by a BSD-style license that can be found in
       3             : // the LICENSE file.
       4             : 
       5             : package manifest
       6             : 
       7             : import (
       8             :         "sort"
       9             :         "sync/atomic"
      10             : 
      11             :         "github.com/cockroachdb/pebble/internal/base"
      12             : )
      13             : 
      14             : // The Annotator type defined below is used by other packages to lazily
      15             : // compute a value over a B-Tree. Each node of the B-Tree stores one
      16             : // `annotation` per annotator, containing the result of the computation over
      17             : // the node's subtree.
      18             : //
      19             : // An annotation is marked as valid if it's current with the current subtree
      20             : // state. Annotations are marked as invalid whenever a node will be mutated
      21             : // (in mut).  Annotators may also return `false` from `Accumulate` to signal
      22             : // that a computation for a file is not stable and may change in the future.
      23             : // Annotations that include these unstable values are also marked as invalid
      24             : // on the node, ensuring that future queries for the annotation will recompute
      25             : // the value.
      26             : 
      27             : // An Annotator defines a computation over a level's FileMetadata. If the
      28             : // computation is stable and uses inputs that are fixed for the lifetime of
      29             : // a FileMetadata, the LevelMetadata's internal data structures are annotated
      30             : // with the intermediary computations. This allows the computation to be
      31             : // computed incrementally as edits are applied to a level.
      32             : type Annotator[T any] struct {
      33             :         Aggregator AnnotationAggregator[T]
      34             : }
      35             : 
      36             : // An AnnotationAggregator defines how an annotation should be accumulated
      37             : // from a single FileMetadata and merged with other annotated values.
      38             : type AnnotationAggregator[T any] interface {
      39             :         // Zero returns the zero value of an annotation. This value is returned
      40             :         // when a LevelMetadata is empty. The dst argument, if non-nil, is an
      41             :         // obsolete value previously returned by this Annotator and may be
      42             :         // overwritten and reused to avoid a memory allocation.
      43             :         Zero(dst *T) *T
      44             : 
      45             :         // Accumulate computes the annotation for a single file in a level's
      46             :         // metadata. It merges the file's value into dst and returns a bool flag
      47             :         // indicating whether or not the value is stable and okay to cache as an
      48             :         // annotation. If the file's value may change over the life of the file,
      49             :         // the annotator must return false.
      50             :         //
      51             :         // Implementations may modify dst and return it to avoid an allocation.
      52             :         Accumulate(f *FileMetadata, dst *T) (v *T, cacheOK bool)
      53             : 
      54             :         // Merge combines two values src and dst, returning the result.
      55             :         // Implementations may modify dst and return it to avoid an allocation.
      56             :         Merge(src *T, dst *T) *T
      57             : }
      58             : 
      59             : // A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
      60             : // that allows for custom accumulation of range annotations for files that only
      61             : // partially overlap with the range.
      62             : type PartialOverlapAnnotationAggregator[T any] interface {
      63             :         AnnotationAggregator[T]
      64             :         AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
      65             : }
      66             : 
      67             : type annotation struct {
      68             :         // annotator is a pointer to the Annotator that computed this annotation.
      69             :         // NB: This is untyped to allow AnnotationAggregator to use Go generics,
      70             :         // since annotations are stored in a slice on each node and a single
      71             :         // slice cannot contain elements with different type parameters.
      72             :         annotator interface{}
      73             :         // v is contains the annotation value, the output of either
      74             :         // AnnotationAggregator.Accumulate or AnnotationAggregator.Merge.
      75             :         // NB: This is untyped for the same reason as annotator above.
      76             :         v atomic.Value
      77             :         // valid indicates whether future reads of the annotation may use the
      78             :         // value as-is. If false, v will be zeroed and recalculated.
      79             :         valid atomic.Bool
      80             : }
      81             : 
      82           2 : func (a *Annotator[T]) findExistingAnnotation(n *node) *annotation {
      83           2 :         n.annotMu.RLock()
      84           2 :         defer n.annotMu.RUnlock()
      85           2 :         for i := range n.annot {
      86           2 :                 if n.annot[i].annotator == a {
      87           2 :                         return &n.annot[i]
      88           2 :                 }
      89             :         }
      90           2 :         return nil
      91             : }
      92             : 
      93             : // findAnnotation finds this Annotator's annotation on a node, creating
      94             : // one if it doesn't already exist.
      95           2 : func (a *Annotator[T]) findAnnotation(n *node) *annotation {
      96           2 :         if a := a.findExistingAnnotation(n); a != nil {
      97           2 :                 return a
      98           2 :         }
      99           2 :         n.annotMu.Lock()
     100           2 :         defer n.annotMu.Unlock()
     101           2 : 
     102           2 :         // This node has never been annotated by a. Create a new annotation.
     103           2 :         n.annot = append(n.annot, annotation{
     104           2 :                 annotator: a,
     105           2 :                 v:         atomic.Value{},
     106           2 :         })
     107           2 :         n.annot[len(n.annot)-1].v.Store(a.Aggregator.Zero(nil))
     108           2 :         return &n.annot[len(n.annot)-1]
     109             : }
     110             : 
     111             : // nodeAnnotation computes this annotator's annotation of this node across all
     112             : // files in the node's subtree. The second return value indicates whether the
     113             : // annotation is stable and thus cacheable.
     114           2 : func (a *Annotator[T]) nodeAnnotation(n *node) (t *T, cacheOK bool) {
     115           2 :         annot := a.findAnnotation(n)
     116           2 :         // If the annotation is already marked as valid, we can return it without
     117           2 :         // recomputing anything.
     118           2 :         if annot.valid.Load() {
     119           2 :                 // The load of these two atomics should be safe, as we don't need to
     120           2 :                 // guarantee invalidations are concurrent-safe. See the comment on
     121           2 :                 // InvalidateLevelAnnotation about why.
     122           2 :                 return annot.v.Load().(*T), true
     123           2 :         }
     124             : 
     125           2 :         t = a.Aggregator.Zero(t)
     126           2 :         valid := true
     127           2 : 
     128           2 :         for i := int16(0); i <= n.count; i++ {
     129           2 :                 if !n.leaf {
     130           2 :                         v, ok := a.nodeAnnotation(n.children[i])
     131           2 :                         t = a.Aggregator.Merge(v, t)
     132           2 :                         valid = valid && ok
     133           2 :                 }
     134             : 
     135           2 :                 if i < n.count {
     136           2 :                         var ok bool
     137           2 :                         t, ok = a.Aggregator.Accumulate(n.items[i], t)
     138           2 :                         valid = valid && ok
     139           2 :                 }
     140             :         }
     141             : 
     142           2 :         if valid {
     143           2 :                 // Two valid annotations should be identical, so this is
     144           2 :                 // okay.
     145           2 :                 annot.v.Store(t)
     146           2 :                 annot.valid.Store(valid)
     147           2 :         }
     148             : 
     149           2 :         return t, valid
     150             : }
     151             : 
     152             : // accumulateRangeAnnotation computes this annotator's annotation across all
     153             : // files in the node's subtree which overlap with the range defined by bounds.
     154             : // The computed annotation is accumulated into a.scratch.
     155             : func (a *Annotator[T]) accumulateRangeAnnotation(
     156             :         n *node,
     157             :         cmp base.Compare,
     158             :         bounds base.UserKeyBounds,
     159             :         // fullyWithinLowerBound and fullyWithinUpperBound indicate whether this
     160             :         // node's subtree is already known to be within each bound.
     161             :         fullyWithinLowerBound bool,
     162             :         fullyWithinUpperBound bool,
     163             :         dst *T,
     164           1 : ) *T {
     165           1 :         // If this node's subtree is fully within the bounds, compute a regular
     166           1 :         // annotation.
     167           1 :         if fullyWithinLowerBound && fullyWithinUpperBound {
     168           1 :                 v, _ := a.nodeAnnotation(n)
     169           1 :                 dst = a.Aggregator.Merge(v, dst)
     170           1 :                 return dst
     171           1 :         }
     172             : 
     173             :         // We will accumulate annotations from each item in the end-exclusive
     174             :         // range [leftItem, rightItem).
     175           1 :         leftItem, rightItem := 0, int(n.count)
     176           1 :         if !fullyWithinLowerBound {
     177           1 :                 // leftItem is the index of the first item that overlaps the lower bound.
     178           1 :                 leftItem = sort.Search(int(n.count), func(i int) bool {
     179           1 :                         return cmp(bounds.Start, n.items[i].Largest.UserKey) <= 0
     180           1 :                 })
     181             :         }
     182           1 :         if !fullyWithinUpperBound {
     183           1 :                 // rightItem is the index of the first item that does not overlap the
     184           1 :                 // upper bound.
     185           1 :                 rightItem = sort.Search(int(n.count), func(i int) bool {
     186           1 :                         return !bounds.End.IsUpperBoundFor(cmp, n.items[i].Smallest.UserKey)
     187           1 :                 })
     188             :         }
     189             : 
     190             :         // Accumulate annotations from every item that overlaps the bounds.
     191           1 :         for i := leftItem; i < rightItem; i++ {
     192           1 :                 if i == leftItem || i == rightItem-1 {
     193           1 :                         if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
     194           1 :                                 fb := n.items[i].UserKeyBounds()
     195           1 :                                 if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
     196           1 :                                         dst = agg.AccumulatePartialOverlap(n.items[i], dst, bounds)
     197           1 :                                         continue
     198             :                                 }
     199             :                         }
     200             :                 }
     201           1 :                 v, _ := a.Aggregator.Accumulate(n.items[i], dst)
     202           1 :                 dst = v
     203             :         }
     204             : 
     205           1 :         if !n.leaf {
     206           1 :                 // We will accumulate annotations from each child in the end-inclusive
     207           1 :                 // range [leftChild, rightChild].
     208           1 :                 leftChild, rightChild := leftItem, rightItem
     209           1 :                 // If the lower bound overlaps with the child at leftItem, there is no
     210           1 :                 // need to accumulate annotations from the child to its left.
     211           1 :                 if leftItem < int(n.count) && cmp(bounds.Start, n.items[leftItem].Smallest.UserKey) >= 0 {
     212           1 :                         leftChild++
     213           1 :                 }
     214             :                 // If the upper bound spans beyond the child at rightItem, we must also
     215             :                 // accumulate annotations from the child to its right.
     216           1 :                 if rightItem < int(n.count) && bounds.End.IsUpperBoundFor(cmp, n.items[rightItem].Largest.UserKey) {
     217           0 :                         rightChild++
     218           0 :                 }
     219             : 
     220           1 :                 for i := leftChild; i <= rightChild; i++ {
     221           1 :                         dst = a.accumulateRangeAnnotation(
     222           1 :                                 n.children[i],
     223           1 :                                 cmp,
     224           1 :                                 bounds,
     225           1 :                                 // If this child is to the right of leftItem, then its entire
     226           1 :                                 // subtree is within the lower bound.
     227           1 :                                 fullyWithinLowerBound || i > leftItem,
     228           1 :                                 // If this child is to the left of rightItem, then its entire
     229           1 :                                 // subtree is within the upper bound.
     230           1 :                                 fullyWithinUpperBound || i < rightItem,
     231           1 :                                 dst,
     232           1 :                         )
     233           1 :                 }
     234             :         }
     235           1 :         return dst
     236             : }
     237             : 
     238             : // InvalidateAnnotation removes any existing cached annotations from this
     239             : // annotator from a node's subtree.
     240           1 : func (a *Annotator[T]) invalidateNodeAnnotation(n *node) {
     241           1 :         annot := a.findAnnotation(n)
     242           1 :         annot.valid.Store(false)
     243           1 :         if !n.leaf {
     244           0 :                 for i := int16(0); i <= n.count; i++ {
     245           0 :                         a.invalidateNodeAnnotation(n.children[i])
     246           0 :                 }
     247             :         }
     248             : }
     249             : 
     250             : // LevelAnnotation calculates the annotation defined by this Annotator for all
     251             : // files in the given LevelMetadata. A pointer to the Annotator is used as the
     252             : // key for pre-calculated values, so the same Annotator must be used to avoid
     253             : // duplicate computation.
     254           2 : func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) *T {
     255           2 :         if lm.Empty() {
     256           2 :                 return a.Aggregator.Zero(nil)
     257           2 :         }
     258             : 
     259           2 :         v, _ := a.nodeAnnotation(lm.tree.root)
     260           2 :         return v
     261             : }
     262             : 
     263             : // MultiLevelAnnotation calculates the annotation defined by this Annotator for
     264             : // all files across the given levels. A pointer to the Annotator is used as the
     265             : // key for pre-calculated values, so the same Annotator must be used to avoid
     266             : // duplicate computation.
     267           2 : func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) *T {
     268           2 :         aggregated := a.Aggregator.Zero(nil)
     269           2 :         for l := 0; l < len(lms); l++ {
     270           2 :                 if !lms[l].Empty() {
     271           2 :                         v := a.LevelAnnotation(lms[l])
     272           2 :                         aggregated = a.Aggregator.Merge(v, aggregated)
     273           2 :                 }
     274             :         }
     275           2 :         return aggregated
     276             : }
     277             : 
     278             : // LevelRangeAnnotation calculates the annotation defined by this Annotator for
     279             : // the files within LevelMetadata which are within the range
     280             : // [lowerBound, upperBound). A pointer to the Annotator is used as the key for
     281             : // pre-calculated values, so the same Annotator must be used to avoid duplicate
     282             : // computation.
     283           1 : func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKeyBounds) *T {
     284           1 :         if lm.Empty() {
     285           0 :                 return a.Aggregator.Zero(nil)
     286           0 :         }
     287             : 
     288           1 :         var dst *T
     289           1 :         dst = a.Aggregator.Zero(dst)
     290           1 :         dst = a.accumulateRangeAnnotation(lm.tree.root, lm.tree.cmp, bounds, false, false, dst)
     291           1 :         return dst
     292             : }
     293             : 
     294             : // VersionRangeAnnotation calculates the annotation defined by this Annotator
     295             : // for all files within the given Version which are within the range
     296             : // defined by bounds.
     297           1 : func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
     298           1 :         var dst *T
     299           1 :         dst = a.Aggregator.Zero(dst)
     300           1 :         accumulateSlice := func(ls LevelSlice) {
     301           1 :                 if ls.Empty() {
     302           1 :                         return
     303           1 :                 }
     304           1 :                 dst = a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false, dst)
     305             :         }
     306           1 :         for _, ls := range v.L0SublevelFiles {
     307           1 :                 accumulateSlice(ls)
     308           1 :         }
     309           1 :         for _, lm := range v.Levels[1:] {
     310           1 :                 accumulateSlice(lm.Slice())
     311           1 :         }
     312           1 :         return dst
     313             : }
     314             : 
     315             : // InvalidateAnnotation clears any cached annotations defined by Annotator. A
     316             : // pointer to the Annotator is used as the key for pre-calculated values, so
     317             : // the same Annotator must be used to clear the appropriate cached annotation.
     318             : // Calls to InvalidateLevelAnnotation are *not* concurrent-safe with any other
     319             : // calls to Annotator methods for the same Annotator (concurrent calls from other
     320             : // annotators are fine). Any calls to this function must have some externally-guaranteed
     321             : // mutual exclusion.
     322           1 : func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
     323           1 :         if lm.Empty() {
     324           0 :                 return
     325           0 :         }
     326           1 :         a.invalidateNodeAnnotation(lm.tree.root)
     327             : }
     328             : 
     329             : // SumAggregator defines an Aggregator which sums together a uint64 value
     330             : // across files.
     331             : type SumAggregator struct {
     332             :         AccumulateFunc               func(f *FileMetadata) (v uint64, cacheOK bool)
     333             :         AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
     334             : }
     335             : 
     336             : // Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
     337           2 : func (sa SumAggregator) Zero(dst *uint64) *uint64 {
     338           2 :         if dst == nil {
     339           2 :                 return new(uint64)
     340           2 :         }
     341           0 :         *dst = 0
     342           0 :         return dst
     343             : }
     344             : 
     345             : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
     346             : // file's uint64 value.
     347           2 : func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
     348           2 :         accumulated, ok := sa.AccumulateFunc(f)
     349           2 :         *dst += accumulated
     350           2 :         return dst, ok
     351           2 : }
     352             : 
     353             : // AccumulatePartialOverlap implements
     354             : // PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
     355             : // single file's uint64 value for a file which only partially overlaps with the
     356             : // range defined by bounds.
     357             : func (sa SumAggregator) AccumulatePartialOverlap(
     358             :         f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
     359           1 : ) *uint64 {
     360           1 :         if sa.AccumulatePartialOverlapFunc == nil {
     361           1 :                 v, _ := sa.Accumulate(f, dst)
     362           1 :                 return v
     363           1 :         }
     364           1 :         *dst += sa.AccumulatePartialOverlapFunc(f, bounds)
     365           1 :         return dst
     366             : }
     367             : 
     368             : // Merge implements AnnotationAggregator.Merge by summing two uint64 values.
     369           2 : func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
     370           2 :         *dst += *src
     371           2 :         return dst
     372           2 : }
     373             : 
     374             : // SumAnnotator takes a function that computes a uint64 value from a single
     375             : // FileMetadata and returns an Annotator that sums together the values across
     376             : // files.
     377           2 : func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
     378           2 :         return &Annotator[uint64]{
     379           2 :                 Aggregator: SumAggregator{
     380           2 :                         AccumulateFunc: accumulate,
     381           2 :                 },
     382           2 :         }
     383           2 : }
     384             : 
     385             : // NumFilesAnnotator is an Annotator which computes an annotation value
     386             : // equal to the number of files included in the annotation. Particularly, it
     387             : // can be used to efficiently calculate the number of files in a given key
     388             : // range using range annotations.
     389           1 : var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) {
     390           1 :         return 1, true
     391           1 : })
     392             : 
     393             : // PickFileAggregator implements the AnnotationAggregator interface. It defines
     394             : // an aggregator that picks a single file from a set of eligible files.
     395             : type PickFileAggregator struct {
     396             :         // Filter takes a FileMetadata and returns whether it is eligible to be
     397             :         // picked by this PickFileAggregator. The second return value indicates
     398             :         // whether this eligibility is stable and thus cacheable.
     399             :         Filter func(f *FileMetadata) (eligible bool, cacheOK bool)
     400             :         // Compare compares two instances of FileMetadata and returns true if
     401             :         // the first one should be picked over the second one. It may assume
     402             :         // that both arguments are non-nil.
     403             :         Compare func(f1 *FileMetadata, f2 *FileMetadata) bool
     404             : }
     405             : 
     406             : // Zero implements AnnotationAggregator.Zero, returning nil as the zero value.
     407           2 : func (fa PickFileAggregator) Zero(dst *FileMetadata) *FileMetadata {
     408           2 :         return nil
     409           2 : }
     410             : 
     411           2 : func (fa PickFileAggregator) mergePickedFiles(src *FileMetadata, dst *FileMetadata) *FileMetadata {
     412           2 :         switch {
     413           2 :         case src == nil:
     414           2 :                 return dst
     415           2 :         case dst == nil:
     416           2 :                 return src
     417           2 :         case fa.Compare(src, dst):
     418           2 :                 return src
     419           2 :         default:
     420           2 :                 return dst
     421             :         }
     422             : }
     423             : 
     424             : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
     425             : // file as long as it is eligible to be picked.
     426             : func (fa PickFileAggregator) Accumulate(
     427             :         f *FileMetadata, dst *FileMetadata,
     428           2 : ) (v *FileMetadata, cacheOK bool) {
     429           2 :         eligible, ok := fa.Filter(f)
     430           2 :         if eligible {
     431           2 :                 return fa.mergePickedFiles(f, dst), ok
     432           2 :         }
     433           2 :         return dst, ok
     434             : }
     435             : 
     436             : // Merge implements AnnotationAggregator.Merge by picking a single file based
     437             : // on the output of PickFileAggregator.Compare.
     438           2 : func (fa PickFileAggregator) Merge(src *FileMetadata, dst *FileMetadata) *FileMetadata {
     439           2 :         return fa.mergePickedFiles(src, dst)
     440           2 : }

Generated by: LCOV version 1.14