Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "sort"
9 : "sync/atomic"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : )
13 :
14 : // The Annotator type defined below is used by other packages to lazily
15 : // compute a value over a B-Tree. Each node of the B-Tree stores one
16 : // `annotation` per annotator, containing the result of the computation over
17 : // the node's subtree.
18 : //
19 : // An annotation is marked as valid if it's current with the current subtree
20 : // state. Annotations are marked as invalid whenever a node will be mutated
21 : // (in mut). Annotators may also return `false` from `Accumulate` to signal
22 : // that a computation for a file is not stable and may change in the future.
23 : // Annotations that include these unstable values are also marked as invalid
24 : // on the node, ensuring that future queries for the annotation will recompute
25 : // the value.
26 :
27 : // An Annotator defines a computation over a level's FileMetadata. If the
28 : // computation is stable and uses inputs that are fixed for the lifetime of
29 : // a FileMetadata, the LevelMetadata's internal data structures are annotated
30 : // with the intermediary computations. This allows the computation to be
31 : // computed incrementally as edits are applied to a level.
32 : type Annotator[T any] struct {
33 : Aggregator AnnotationAggregator[T]
34 : }
35 :
36 : // An AnnotationAggregator defines how an annotation should be accumulated
37 : // from a single FileMetadata and merged with other annotated values.
38 : type AnnotationAggregator[T any] interface {
39 : // Zero returns the zero value of an annotation. This value is returned
40 : // when a LevelMetadata is empty. The dst argument, if non-nil, is an
41 : // obsolete value previously returned by this Annotator and may be
42 : // overwritten and reused to avoid a memory allocation.
43 : Zero(dst *T) *T
44 :
45 : // Accumulate computes the annotation for a single file in a level's
46 : // metadata. It merges the file's value into dst and returns a bool flag
47 : // indicating whether or not the value is stable and okay to cache as an
48 : // annotation. If the file's value may change over the life of the file,
49 : // the annotator must return false.
50 : //
51 : // Implementations may modify dst and return it to avoid an allocation.
52 : Accumulate(f *FileMetadata, dst *T) (v *T, cacheOK bool)
53 :
54 : // Merge combines two values src and dst, returning the result.
55 : // Implementations may modify dst and return it to avoid an allocation.
56 : Merge(src *T, dst *T) *T
57 : }
58 :
59 : // A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
60 : // that allows for custom accumulation of range annotations for files that only
61 : // partially overlap with the range.
62 : type PartialOverlapAnnotationAggregator[T any] interface {
63 : AnnotationAggregator[T]
64 : AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
65 : }
66 :
67 : type annotation struct {
68 : // annotator is a pointer to the Annotator that computed this annotation.
69 : // NB: This is untyped to allow AnnotationAggregator to use Go generics,
70 : // since annotations are stored in a slice on each node and a single
71 : // slice cannot contain elements with different type parameters.
72 : annotator interface{}
73 : // v is contains the annotation value, the output of either
74 : // AnnotationAggregator.Accumulate or AnnotationAggregator.Merge.
75 : // NB: This is untyped for the same reason as annotator above.
76 : v atomic.Value
77 : // valid indicates whether future reads of the annotation may use the
78 : // value as-is. If false, v will be zeroed and recalculated.
79 : valid atomic.Bool
80 : }
81 :
82 2 : func (a *Annotator[T]) findExistingAnnotation(n *node) *annotation {
83 2 : n.annotMu.RLock()
84 2 : defer n.annotMu.RUnlock()
85 2 : for i := range n.annot {
86 2 : if n.annot[i].annotator == a {
87 2 : return &n.annot[i]
88 2 : }
89 : }
90 2 : return nil
91 : }
92 :
93 : // findAnnotation finds this Annotator's annotation on a node, creating
94 : // one if it doesn't already exist.
95 2 : func (a *Annotator[T]) findAnnotation(n *node) *annotation {
96 2 : if a := a.findExistingAnnotation(n); a != nil {
97 2 : return a
98 2 : }
99 2 : n.annotMu.Lock()
100 2 : defer n.annotMu.Unlock()
101 2 :
102 2 : // This node has never been annotated by a. Create a new annotation.
103 2 : n.annot = append(n.annot, annotation{
104 2 : annotator: a,
105 2 : v: atomic.Value{},
106 2 : })
107 2 : n.annot[len(n.annot)-1].v.Store(a.Aggregator.Zero(nil))
108 2 : return &n.annot[len(n.annot)-1]
109 : }
110 :
111 : // nodeAnnotation computes this annotator's annotation of this node across all
112 : // files in the node's subtree. The second return value indicates whether the
113 : // annotation is stable and thus cacheable.
114 2 : func (a *Annotator[T]) nodeAnnotation(n *node) (t *T, cacheOK bool) {
115 2 : annot := a.findAnnotation(n)
116 2 : // If the annotation is already marked as valid, we can return it without
117 2 : // recomputing anything.
118 2 : if annot.valid.Load() {
119 2 : // The load of these two atomics should be safe, as we don't need to
120 2 : // guarantee invalidations are concurrent-safe. See the comment on
121 2 : // InvalidateLevelAnnotation about why.
122 2 : return annot.v.Load().(*T), true
123 2 : }
124 :
125 2 : t = a.Aggregator.Zero(t)
126 2 : valid := true
127 2 :
128 2 : for i := int16(0); i <= n.count; i++ {
129 2 : if !n.leaf {
130 2 : v, ok := a.nodeAnnotation(n.children[i])
131 2 : t = a.Aggregator.Merge(v, t)
132 2 : valid = valid && ok
133 2 : }
134 :
135 2 : if i < n.count {
136 2 : var ok bool
137 2 : t, ok = a.Aggregator.Accumulate(n.items[i], t)
138 2 : valid = valid && ok
139 2 : }
140 : }
141 :
142 2 : if valid {
143 2 : // Two valid annotations should be identical, so this is
144 2 : // okay.
145 2 : annot.v.Store(t)
146 2 : annot.valid.Store(valid)
147 2 : }
148 :
149 2 : return t, valid
150 : }
151 :
152 : // accumulateRangeAnnotation computes this annotator's annotation across all
153 : // files in the node's subtree which overlap with the range defined by bounds.
154 : // The computed annotation is accumulated into a.scratch.
155 : func (a *Annotator[T]) accumulateRangeAnnotation(
156 : n *node,
157 : cmp base.Compare,
158 : bounds base.UserKeyBounds,
159 : // fullyWithinLowerBound and fullyWithinUpperBound indicate whether this
160 : // node's subtree is already known to be within each bound.
161 : fullyWithinLowerBound bool,
162 : fullyWithinUpperBound bool,
163 : dst *T,
164 1 : ) *T {
165 1 : // If this node's subtree is fully within the bounds, compute a regular
166 1 : // annotation.
167 1 : if fullyWithinLowerBound && fullyWithinUpperBound {
168 1 : v, _ := a.nodeAnnotation(n)
169 1 : dst = a.Aggregator.Merge(v, dst)
170 1 : return dst
171 1 : }
172 :
173 : // We will accumulate annotations from each item in the end-exclusive
174 : // range [leftItem, rightItem).
175 1 : leftItem, rightItem := 0, int(n.count)
176 1 : if !fullyWithinLowerBound {
177 1 : // leftItem is the index of the first item that overlaps the lower bound.
178 1 : leftItem = sort.Search(int(n.count), func(i int) bool {
179 1 : return cmp(bounds.Start, n.items[i].Largest.UserKey) <= 0
180 1 : })
181 : }
182 1 : if !fullyWithinUpperBound {
183 1 : // rightItem is the index of the first item that does not overlap the
184 1 : // upper bound.
185 1 : rightItem = sort.Search(int(n.count), func(i int) bool {
186 1 : return !bounds.End.IsUpperBoundFor(cmp, n.items[i].Smallest.UserKey)
187 1 : })
188 : }
189 :
190 : // Accumulate annotations from every item that overlaps the bounds.
191 1 : for i := leftItem; i < rightItem; i++ {
192 1 : if i == leftItem || i == rightItem-1 {
193 1 : if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
194 1 : fb := n.items[i].UserKeyBounds()
195 1 : if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
196 1 : dst = agg.AccumulatePartialOverlap(n.items[i], dst, bounds)
197 1 : continue
198 : }
199 : }
200 : }
201 1 : v, _ := a.Aggregator.Accumulate(n.items[i], dst)
202 1 : dst = v
203 : }
204 :
205 1 : if !n.leaf {
206 1 : // We will accumulate annotations from each child in the end-inclusive
207 1 : // range [leftChild, rightChild].
208 1 : leftChild, rightChild := leftItem, rightItem
209 1 : // If the lower bound overlaps with the child at leftItem, there is no
210 1 : // need to accumulate annotations from the child to its left.
211 1 : if leftItem < int(n.count) && cmp(bounds.Start, n.items[leftItem].Smallest.UserKey) >= 0 {
212 1 : leftChild++
213 1 : }
214 : // If the upper bound spans beyond the child at rightItem, we must also
215 : // accumulate annotations from the child to its right.
216 1 : if rightItem < int(n.count) && bounds.End.IsUpperBoundFor(cmp, n.items[rightItem].Largest.UserKey) {
217 0 : rightChild++
218 0 : }
219 :
220 1 : for i := leftChild; i <= rightChild; i++ {
221 1 : dst = a.accumulateRangeAnnotation(
222 1 : n.children[i],
223 1 : cmp,
224 1 : bounds,
225 1 : // If this child is to the right of leftItem, then its entire
226 1 : // subtree is within the lower bound.
227 1 : fullyWithinLowerBound || i > leftItem,
228 1 : // If this child is to the left of rightItem, then its entire
229 1 : // subtree is within the upper bound.
230 1 : fullyWithinUpperBound || i < rightItem,
231 1 : dst,
232 1 : )
233 1 : }
234 : }
235 1 : return dst
236 : }
237 :
238 : // InvalidateAnnotation removes any existing cached annotations from this
239 : // annotator from a node's subtree.
240 1 : func (a *Annotator[T]) invalidateNodeAnnotation(n *node) {
241 1 : annot := a.findAnnotation(n)
242 1 : annot.valid.Store(false)
243 1 : if !n.leaf {
244 0 : for i := int16(0); i <= n.count; i++ {
245 0 : a.invalidateNodeAnnotation(n.children[i])
246 0 : }
247 : }
248 : }
249 :
250 : // LevelAnnotation calculates the annotation defined by this Annotator for all
251 : // files in the given LevelMetadata. A pointer to the Annotator is used as the
252 : // key for pre-calculated values, so the same Annotator must be used to avoid
253 : // duplicate computation.
254 2 : func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) *T {
255 2 : if lm.Empty() {
256 2 : return a.Aggregator.Zero(nil)
257 2 : }
258 :
259 2 : v, _ := a.nodeAnnotation(lm.tree.root)
260 2 : return v
261 : }
262 :
263 : // MultiLevelAnnotation calculates the annotation defined by this Annotator for
264 : // all files across the given levels. A pointer to the Annotator is used as the
265 : // key for pre-calculated values, so the same Annotator must be used to avoid
266 : // duplicate computation.
267 2 : func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) *T {
268 2 : aggregated := a.Aggregator.Zero(nil)
269 2 : for l := 0; l < len(lms); l++ {
270 2 : if !lms[l].Empty() {
271 2 : v := a.LevelAnnotation(lms[l])
272 2 : aggregated = a.Aggregator.Merge(v, aggregated)
273 2 : }
274 : }
275 2 : return aggregated
276 : }
277 :
278 : // LevelRangeAnnotation calculates the annotation defined by this Annotator for
279 : // the files within LevelMetadata which are within the range
280 : // [lowerBound, upperBound). A pointer to the Annotator is used as the key for
281 : // pre-calculated values, so the same Annotator must be used to avoid duplicate
282 : // computation.
283 1 : func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKeyBounds) *T {
284 1 : if lm.Empty() {
285 0 : return a.Aggregator.Zero(nil)
286 0 : }
287 :
288 1 : var dst *T
289 1 : dst = a.Aggregator.Zero(dst)
290 1 : dst = a.accumulateRangeAnnotation(lm.tree.root, lm.tree.cmp, bounds, false, false, dst)
291 1 : return dst
292 : }
293 :
294 : // VersionRangeAnnotation calculates the annotation defined by this Annotator
295 : // for all files within the given Version which are within the range
296 : // defined by bounds.
297 1 : func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
298 1 : var dst *T
299 1 : dst = a.Aggregator.Zero(dst)
300 1 : accumulateSlice := func(ls LevelSlice) {
301 1 : if ls.Empty() {
302 1 : return
303 1 : }
304 1 : dst = a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false, dst)
305 : }
306 1 : for _, ls := range v.L0SublevelFiles {
307 1 : accumulateSlice(ls)
308 1 : }
309 1 : for _, lm := range v.Levels[1:] {
310 1 : accumulateSlice(lm.Slice())
311 1 : }
312 1 : return dst
313 : }
314 :
315 : // InvalidateAnnotation clears any cached annotations defined by Annotator. A
316 : // pointer to the Annotator is used as the key for pre-calculated values, so
317 : // the same Annotator must be used to clear the appropriate cached annotation.
318 : // Calls to InvalidateLevelAnnotation are *not* concurrent-safe with any other
319 : // calls to Annotator methods for the same Annotator (concurrent calls from other
320 : // annotators are fine). Any calls to this function must have some externally-guaranteed
321 : // mutual exclusion.
322 1 : func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
323 1 : if lm.Empty() {
324 0 : return
325 0 : }
326 1 : a.invalidateNodeAnnotation(lm.tree.root)
327 : }
328 :
329 : // SumAggregator defines an Aggregator which sums together a uint64 value
330 : // across files.
331 : type SumAggregator struct {
332 : AccumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
333 : AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
334 : }
335 :
336 : // Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
337 2 : func (sa SumAggregator) Zero(dst *uint64) *uint64 {
338 2 : if dst == nil {
339 2 : return new(uint64)
340 2 : }
341 0 : *dst = 0
342 0 : return dst
343 : }
344 :
345 : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
346 : // file's uint64 value.
347 2 : func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
348 2 : accumulated, ok := sa.AccumulateFunc(f)
349 2 : *dst += accumulated
350 2 : return dst, ok
351 2 : }
352 :
353 : // AccumulatePartialOverlap implements
354 : // PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
355 : // single file's uint64 value for a file which only partially overlaps with the
356 : // range defined by bounds.
357 : func (sa SumAggregator) AccumulatePartialOverlap(
358 : f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
359 1 : ) *uint64 {
360 1 : if sa.AccumulatePartialOverlapFunc == nil {
361 1 : v, _ := sa.Accumulate(f, dst)
362 1 : return v
363 1 : }
364 1 : *dst += sa.AccumulatePartialOverlapFunc(f, bounds)
365 1 : return dst
366 : }
367 :
368 : // Merge implements AnnotationAggregator.Merge by summing two uint64 values.
369 2 : func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
370 2 : *dst += *src
371 2 : return dst
372 2 : }
373 :
374 : // SumAnnotator takes a function that computes a uint64 value from a single
375 : // FileMetadata and returns an Annotator that sums together the values across
376 : // files.
377 2 : func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
378 2 : return &Annotator[uint64]{
379 2 : Aggregator: SumAggregator{
380 2 : AccumulateFunc: accumulate,
381 2 : },
382 2 : }
383 2 : }
384 :
385 : // NumFilesAnnotator is an Annotator which computes an annotation value
386 : // equal to the number of files included in the annotation. Particularly, it
387 : // can be used to efficiently calculate the number of files in a given key
388 : // range using range annotations.
389 1 : var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) {
390 1 : return 1, true
391 1 : })
392 :
393 : // PickFileAggregator implements the AnnotationAggregator interface. It defines
394 : // an aggregator that picks a single file from a set of eligible files.
395 : type PickFileAggregator struct {
396 : // Filter takes a FileMetadata and returns whether it is eligible to be
397 : // picked by this PickFileAggregator. The second return value indicates
398 : // whether this eligibility is stable and thus cacheable.
399 : Filter func(f *FileMetadata) (eligible bool, cacheOK bool)
400 : // Compare compares two instances of FileMetadata and returns true if
401 : // the first one should be picked over the second one. It may assume
402 : // that both arguments are non-nil.
403 : Compare func(f1 *FileMetadata, f2 *FileMetadata) bool
404 : }
405 :
406 : // Zero implements AnnotationAggregator.Zero, returning nil as the zero value.
407 2 : func (fa PickFileAggregator) Zero(dst *FileMetadata) *FileMetadata {
408 2 : return nil
409 2 : }
410 :
411 2 : func (fa PickFileAggregator) mergePickedFiles(src *FileMetadata, dst *FileMetadata) *FileMetadata {
412 2 : switch {
413 2 : case src == nil:
414 2 : return dst
415 2 : case dst == nil:
416 2 : return src
417 2 : case fa.Compare(src, dst):
418 2 : return src
419 2 : default:
420 2 : return dst
421 : }
422 : }
423 :
424 : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
425 : // file as long as it is eligible to be picked.
426 : func (fa PickFileAggregator) Accumulate(
427 : f *FileMetadata, dst *FileMetadata,
428 2 : ) (v *FileMetadata, cacheOK bool) {
429 2 : eligible, ok := fa.Filter(f)
430 2 : if eligible {
431 2 : return fa.mergePickedFiles(f, dst), ok
432 2 : }
433 2 : return dst, ok
434 : }
435 :
436 : // Merge implements AnnotationAggregator.Merge by picking a single file based
437 : // on the output of PickFileAggregator.Compare.
438 2 : func (fa PickFileAggregator) Merge(src *FileMetadata, dst *FileMetadata) *FileMetadata {
439 2 : return fa.mergePickedFiles(src, dst)
440 2 : }
|