Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "sort"
9 :
10 : "github.com/cockroachdb/pebble/internal/base"
11 : )
12 :
13 : // The Annotator type defined below is used by other packages to lazily
14 : // compute a value over a B-Tree. Each node of the B-Tree stores one
15 : // `annotation` per annotator, containing the result of the computation over
16 : // the node's subtree.
17 : //
18 : // An annotation is marked as valid if it's current with the current subtree
19 : // state. Annotations are marked as invalid whenever a node will be mutated
20 : // (in mut). Annotators may also return `false` from `Accumulate` to signal
21 : // that a computation for a file is not stable and may change in the future.
22 : // Annotations that include these unstable values are also marked as invalid
23 : // on the node, ensuring that future queries for the annotation will recompute
24 : // the value.
25 :
26 : // An Annotator defines a computation over a level's FileMetadata. If the
27 : // computation is stable and uses inputs that are fixed for the lifetime of
28 : // a FileMetadata, the LevelMetadata's internal data structures are annotated
29 : // with the intermediary computations. This allows the computation to be
30 : // computed incrementally as edits are applied to a level.
31 : type Annotator[T any] struct {
32 : Aggregator AnnotationAggregator[T]
33 :
34 : // scratch is used to hold the aggregated annotation value when computing
35 : // range annotations in order to avoid additional allocations.
36 : scratch *T
37 : }
38 :
39 : // An AnnotationAggregator defines how an annotation should be accumulated
40 : // from a single FileMetadata and merged with other annotated values.
41 : type AnnotationAggregator[T any] interface {
42 : // Zero returns the zero value of an annotation. This value is returned
43 : // when a LevelMetadata is empty. The dst argument, if non-nil, is an
44 : // obsolete value previously returned by this Annotator and may be
45 : // overwritten and reused to avoid a memory allocation.
46 : Zero(dst *T) *T
47 :
48 : // Accumulate computes the annotation for a single file in a level's
49 : // metadata. It merges the file's value into dst and returns a bool flag
50 : // indicating whether or not the value is stable and okay to cache as an
51 : // annotation. If the file's value may change over the life of the file,
52 : // the annotator must return false.
53 : //
54 : // Implementations may modify dst and return it to avoid an allocation.
55 : Accumulate(f *FileMetadata, dst *T) (v *T, cacheOK bool)
56 :
57 : // Merge combines two values src and dst, returning the result.
58 : // Implementations may modify dst and return it to avoid an allocation.
59 : Merge(src *T, dst *T) *T
60 : }
61 :
62 : // A PartialOverlapAnnotationAggregator is an extension of AnnotationAggregator
63 : // that allows for custom accumulation of range annotations for files that only
64 : // partially overlap with the range.
65 : type PartialOverlapAnnotationAggregator[T any] interface {
66 : AnnotationAggregator[T]
67 : AccumulatePartialOverlap(f *FileMetadata, dst *T, bounds base.UserKeyBounds) *T
68 : }
69 :
70 : type annotation struct {
71 : // annotator is a pointer to the Annotator that computed this annotation.
72 : // NB: This is untyped to allow AnnotationAggregator to use Go generics,
73 : // since annotations are stored in a slice on each node and a single
74 : // slice cannot contain elements with different type parameters.
75 : annotator interface{}
76 : // v is contains the annotation value, the output of either
77 : // AnnotationAggregator.Accumulate or AnnotationAggregator.Merge.
78 : // NB: This is untyped for the same reason as annotator above.
79 : v interface{}
80 : // valid indicates whether future reads of the annotation may use the
81 : // value as-is. If false, v will be zeroed and recalculated.
82 : valid bool
83 : }
84 :
85 : // findAnnotation finds this Annotator's annotation on a node, creating
86 : // one if it doesn't already exist.
87 1 : func (a *Annotator[T]) findAnnotation(n *node) *annotation {
88 1 : for i := range n.annot {
89 1 : if n.annot[i].annotator == a {
90 1 : return &n.annot[i]
91 1 : }
92 : }
93 :
94 : // This node has never been annotated by a. Create a new annotation.
95 1 : n.annot = append(n.annot, annotation{
96 1 : annotator: a,
97 1 : v: a.Aggregator.Zero(nil),
98 1 : })
99 1 : return &n.annot[len(n.annot)-1]
100 : }
101 :
102 : // nodeAnnotation computes this annotator's annotation of this node across all
103 : // files in the node's subtree. The second return value indicates whether the
104 : // annotation is stable and thus cacheable.
105 1 : func (a *Annotator[T]) nodeAnnotation(n *node) (_ *T, cacheOK bool) {
106 1 : annot := a.findAnnotation(n)
107 1 : t := annot.v.(*T)
108 1 : // If the annotation is already marked as valid, we can return it without
109 1 : // recomputing anything.
110 1 : if annot.valid {
111 1 : return t, true
112 1 : }
113 :
114 1 : t = a.Aggregator.Zero(t)
115 1 : valid := true
116 1 :
117 1 : for i := int16(0); i <= n.count; i++ {
118 1 : if !n.leaf {
119 1 : v, ok := a.nodeAnnotation(n.children[i])
120 1 : t = a.Aggregator.Merge(v, t)
121 1 : valid = valid && ok
122 1 : }
123 :
124 1 : if i < n.count {
125 1 : var ok bool
126 1 : t, ok = a.Aggregator.Accumulate(n.items[i], t)
127 1 : valid = valid && ok
128 1 : }
129 : }
130 :
131 1 : annot.v = t
132 1 : annot.valid = valid
133 1 :
134 1 : return t, annot.valid
135 : }
136 :
137 : // accumulateRangeAnnotation computes this annotator's annotation across all
138 : // files in the node's subtree which overlap with the range defined by bounds.
139 : // The computed annotation is accumulated into a.scratch.
140 : func (a *Annotator[T]) accumulateRangeAnnotation(
141 : n *node,
142 : cmp base.Compare,
143 : bounds base.UserKeyBounds,
144 : // fullyWithinLowerBound and fullyWithinUpperBound indicate whether this
145 : // node's subtree is already known to be within each bound.
146 : fullyWithinLowerBound bool,
147 : fullyWithinUpperBound bool,
148 1 : ) {
149 1 : // If this node's subtree is fully within the bounds, compute a regular
150 1 : // annotation.
151 1 : if fullyWithinLowerBound && fullyWithinUpperBound {
152 1 : v, _ := a.nodeAnnotation(n)
153 1 : a.scratch = a.Aggregator.Merge(v, a.scratch)
154 1 : return
155 1 : }
156 :
157 : // We will accumulate annotations from each item in the end-exclusive
158 : // range [leftItem, rightItem).
159 1 : leftItem, rightItem := 0, int(n.count)
160 1 : if !fullyWithinLowerBound {
161 1 : // leftItem is the index of the first item that overlaps the lower bound.
162 1 : leftItem = sort.Search(int(n.count), func(i int) bool {
163 1 : return cmp(bounds.Start, n.items[i].Largest.UserKey) <= 0
164 1 : })
165 : }
166 1 : if !fullyWithinUpperBound {
167 1 : // rightItem is the index of the first item that does not overlap the
168 1 : // upper bound.
169 1 : rightItem = sort.Search(int(n.count), func(i int) bool {
170 1 : return !bounds.End.IsUpperBoundFor(cmp, n.items[i].Smallest.UserKey)
171 1 : })
172 : }
173 :
174 : // Accumulate annotations from every item that overlaps the bounds.
175 1 : for i := leftItem; i < rightItem; i++ {
176 1 : if i == leftItem || i == rightItem-1 {
177 1 : if agg, ok := a.Aggregator.(PartialOverlapAnnotationAggregator[T]); ok {
178 1 : fb := n.items[i].UserKeyBounds()
179 1 : if cmp(bounds.Start, fb.Start) > 0 || bounds.End.CompareUpperBounds(cmp, fb.End) < 0 {
180 1 : a.scratch = agg.AccumulatePartialOverlap(n.items[i], a.scratch, bounds)
181 1 : continue
182 : }
183 : }
184 : }
185 1 : v, _ := a.Aggregator.Accumulate(n.items[i], a.scratch)
186 1 : a.scratch = v
187 : }
188 :
189 1 : if !n.leaf {
190 1 : // We will accumulate annotations from each child in the end-inclusive
191 1 : // range [leftChild, rightChild].
192 1 : leftChild, rightChild := leftItem, rightItem
193 1 : // If the lower bound overlaps with the child at leftItem, there is no
194 1 : // need to accumulate annotations from the child to its left.
195 1 : if leftItem < int(n.count) && cmp(bounds.Start, n.items[leftItem].Smallest.UserKey) >= 0 {
196 1 : leftChild++
197 1 : }
198 : // If the upper bound spans beyond the child at rightItem, we must also
199 : // accumulate annotations from the child to its right.
200 1 : if rightItem < int(n.count) && bounds.End.IsUpperBoundFor(cmp, n.items[rightItem].Largest.UserKey) {
201 0 : rightChild++
202 0 : }
203 :
204 1 : for i := leftChild; i <= rightChild; i++ {
205 1 : a.accumulateRangeAnnotation(
206 1 : n.children[i],
207 1 : cmp,
208 1 : bounds,
209 1 : // If this child is to the right of leftItem, then its entire
210 1 : // subtree is within the lower bound.
211 1 : fullyWithinLowerBound || i > leftItem,
212 1 : // If this child is to the left of rightItem, then its entire
213 1 : // subtree is within the upper bound.
214 1 : fullyWithinUpperBound || i < rightItem,
215 1 : )
216 1 : }
217 : }
218 : }
219 :
220 : // InvalidateAnnotation removes any existing cached annotations from this
221 : // annotator from a node's subtree.
222 1 : func (a *Annotator[T]) invalidateNodeAnnotation(n *node) {
223 1 : annot := a.findAnnotation(n)
224 1 : annot.valid = false
225 1 : if !n.leaf {
226 0 : for i := int16(0); i <= n.count; i++ {
227 0 : a.invalidateNodeAnnotation(n.children[i])
228 0 : }
229 : }
230 : }
231 :
232 : // LevelAnnotation calculates the annotation defined by this Annotator for all
233 : // files in the given LevelMetadata. A pointer to the Annotator is used as the
234 : // key for pre-calculated values, so the same Annotator must be used to avoid
235 : // duplicate computation. Annotation must not be called concurrently, and in
236 : // practice this is achieved by requiring callers to hold DB.mu.
237 1 : func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) *T {
238 1 : if lm.Empty() {
239 1 : return a.Aggregator.Zero(nil)
240 1 : }
241 :
242 1 : v, _ := a.nodeAnnotation(lm.tree.root)
243 1 : return v
244 : }
245 :
246 : // MultiLevelAnnotation calculates the annotation defined by this Annotator for
247 : // all files across the given levels. A pointer to the Annotator is used as the
248 : // key for pre-calculated values, so the same Annotator must be used to avoid
249 : // duplicate computation. Annotation must not be called concurrently, and in
250 : // practice this is achieved by requiring callers to hold DB.mu.
251 1 : func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) *T {
252 1 : aggregated := a.Aggregator.Zero(nil)
253 1 : for l := 0; l < len(lms); l++ {
254 1 : if !lms[l].Empty() {
255 1 : v := a.LevelAnnotation(lms[l])
256 1 : aggregated = a.Aggregator.Merge(v, aggregated)
257 1 : }
258 : }
259 1 : return aggregated
260 : }
261 :
262 : // LevelRangeAnnotation calculates the annotation defined by this Annotator for
263 : // the files within LevelMetadata which are within the range
264 : // [lowerBound, upperBound). A pointer to the Annotator is used as the key for
265 : // pre-calculated values, so the same Annotator must be used to avoid duplicate
266 : // computation. Annotation must not be called concurrently, and in practice this
267 : // is achieved by requiring callers to hold DB.mu.
268 1 : func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKeyBounds) *T {
269 1 : if lm.Empty() {
270 0 : return a.Aggregator.Zero(nil)
271 0 : }
272 :
273 1 : a.scratch = a.Aggregator.Zero(a.scratch)
274 1 : a.accumulateRangeAnnotation(lm.tree.root, lm.tree.cmp, bounds, false, false)
275 1 : return a.scratch
276 : }
277 :
278 : // VersionRangeAnnotation calculates the annotation defined by this Annotator
279 : // for all files within the given Version which are within the range
280 : // defined by bounds.
281 1 : func (a *Annotator[T]) VersionRangeAnnotation(v *Version, bounds base.UserKeyBounds) *T {
282 1 : accumulateSlice := func(ls LevelSlice) {
283 1 : if ls.Empty() {
284 1 : return
285 1 : }
286 1 : a.accumulateRangeAnnotation(ls.iter.r, v.cmp.Compare, bounds, false, false)
287 : }
288 1 : a.scratch = a.Aggregator.Zero(a.scratch)
289 1 : for _, ls := range v.L0SublevelFiles {
290 1 : accumulateSlice(ls)
291 1 : }
292 1 : for _, lm := range v.Levels[1:] {
293 1 : accumulateSlice(lm.Slice())
294 1 : }
295 1 : return a.scratch
296 : }
297 :
298 : // InvalidateAnnotation clears any cached annotations defined by Annotator. A
299 : // pointer to the Annotator is used as the key for pre-calculated values, so
300 : // the same Annotator must be used to clear the appropriate cached annotation.
301 : // InvalidateAnnotation must not be called concurrently, and in practice this
302 : // is achieved by requiring callers to hold DB.mu.
303 1 : func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
304 1 : if lm.Empty() {
305 0 : return
306 0 : }
307 1 : a.invalidateNodeAnnotation(lm.tree.root)
308 : }
309 :
310 : // SumAggregator defines an Aggregator which sums together a uint64 value
311 : // across files.
312 : type SumAggregator struct {
313 : AccumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
314 : AccumulatePartialOverlapFunc func(f *FileMetadata, bounds base.UserKeyBounds) uint64
315 : }
316 :
317 : // Zero implements AnnotationAggregator.Zero, returning a new uint64 set to 0.
318 1 : func (sa SumAggregator) Zero(dst *uint64) *uint64 {
319 1 : if dst == nil {
320 1 : return new(uint64)
321 1 : }
322 1 : *dst = 0
323 1 : return dst
324 : }
325 :
326 : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
327 : // file's uint64 value.
328 1 : func (sa SumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
329 1 : accumulated, ok := sa.AccumulateFunc(f)
330 1 : *dst += accumulated
331 1 : return dst, ok
332 1 : }
333 :
334 : // AccumulatePartialOverlap implements
335 : // PartialOverlapAnnotationAggregator.AccumulatePartialOverlap, accumulating a
336 : // single file's uint64 value for a file which only partially overlaps with the
337 : // range defined by bounds.
338 : func (sa SumAggregator) AccumulatePartialOverlap(
339 : f *FileMetadata, dst *uint64, bounds base.UserKeyBounds,
340 1 : ) *uint64 {
341 1 : if sa.AccumulatePartialOverlapFunc == nil {
342 1 : v, _ := sa.Accumulate(f, dst)
343 1 : return v
344 1 : }
345 1 : *dst += sa.AccumulatePartialOverlapFunc(f, bounds)
346 1 : return dst
347 : }
348 :
349 : // Merge implements AnnotationAggregator.Merge by summing two uint64 values.
350 1 : func (sa SumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
351 1 : *dst += *src
352 1 : return dst
353 1 : }
354 :
355 : // SumAnnotator takes a function that computes a uint64 value from a single
356 : // FileMetadata and returns an Annotator that sums together the values across
357 : // files.
358 1 : func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
359 1 : return &Annotator[uint64]{
360 1 : Aggregator: SumAggregator{
361 1 : AccumulateFunc: accumulate,
362 1 : },
363 1 : }
364 1 : }
365 :
366 : // NumFilesAnnotator is an Annotator which computes an annotation value
367 : // equal to the number of files included in the annotation. Particularly, it
368 : // can be used to efficiently calculate the number of files in a given key
369 : // range using range annotations.
370 1 : var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) {
371 1 : return 1, true
372 1 : })
373 :
374 : // PickFileAggregator implements the AnnotationAggregator interface. It defines
375 : // an aggregator that picks a single file from a set of eligible files.
376 : type PickFileAggregator struct {
377 : // Filter takes a FileMetadata and returns whether it is eligible to be
378 : // picked by this PickFileAggregator. The second return value indicates
379 : // whether this eligibility is stable and thus cacheable.
380 : Filter func(f *FileMetadata) (eligible bool, cacheOK bool)
381 : // Compare compares two instances of FileMetadata and returns true if
382 : // the first one should be picked over the second one. It may assume
383 : // that both arguments are non-nil.
384 : Compare func(f1 *FileMetadata, f2 *FileMetadata) bool
385 : }
386 :
387 : // Zero implements AnnotationAggregator.Zero, returning nil as the zero value.
388 1 : func (fa PickFileAggregator) Zero(dst *FileMetadata) *FileMetadata {
389 1 : return nil
390 1 : }
391 :
392 1 : func (fa PickFileAggregator) mergePickedFiles(src *FileMetadata, dst *FileMetadata) *FileMetadata {
393 1 : switch {
394 1 : case src == nil:
395 1 : return dst
396 1 : case dst == nil:
397 1 : return src
398 1 : case fa.Compare(src, dst):
399 1 : return src
400 1 : default:
401 1 : return dst
402 : }
403 : }
404 :
405 : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
406 : // file as long as it is eligible to be picked.
407 : func (fa PickFileAggregator) Accumulate(
408 : f *FileMetadata, dst *FileMetadata,
409 1 : ) (v *FileMetadata, cacheOK bool) {
410 1 : eligible, ok := fa.Filter(f)
411 1 : if eligible {
412 1 : return fa.mergePickedFiles(f, dst), ok
413 1 : }
414 1 : return dst, ok
415 : }
416 :
417 : // Merge implements AnnotationAggregator.Merge by picking a single file based
418 : // on the output of PickFileAggregator.Compare.
419 1 : func (fa PickFileAggregator) Merge(src *FileMetadata, dst *FileMetadata) *FileMetadata {
420 1 : return fa.mergePickedFiles(src, dst)
421 1 : }
|