Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package manifest
6 :
7 : import (
8 : "sort"
9 :
10 : "github.com/cockroachdb/pebble/internal/base"
11 : )
12 :
13 : // The Annotator type defined below is used by other packages to lazily
14 : // compute a value over a B-Tree. Each node of the B-Tree stores one
15 : // `annotation` per annotator, containing the result of the computation over
16 : // the node's subtree.
17 : //
18 : // An annotation is marked as valid if it's current with the current subtree
19 : // state. Annotations are marked as invalid whenever a node will be mutated
20 : // (in mut). Annotators may also return `false` from `Accumulate` to signal
21 : // that a computation for a file is not stable and may change in the future.
22 : // Annotations that include these unstable values are also marked as invalid
23 : // on the node, ensuring that future queries for the annotation will recompute
24 : // the value.
25 :
26 : // An Annotator defines a computation over a level's FileMetadata. If the
27 : // computation is stable and uses inputs that are fixed for the lifetime of
28 : // a FileMetadata, the LevelMetadata's internal data structures are annotated
29 : // with the intermediary computations. This allows the computation to be
30 : // computed incrementally as edits are applied to a level.
31 : type Annotator[T any] struct {
32 : Aggregator AnnotationAggregator[T]
33 :
34 : // scratch is used to hold the aggregated annotation value when computing
35 : // range annotations in order to avoid additional allocations.
36 : scratch *T
37 : }
38 :
39 : // An AnnotationAggregator defines how an annotation should be accumulated
40 : // from a single FileMetadata and merged with other annotated values.
41 : type AnnotationAggregator[T any] interface {
42 : // Zero returns the zero value of an annotation. This value is returned
43 : // when a LevelMetadata is empty. The dst argument, if non-nil, is an
44 : // obsolete value previously returned by this Annotator and may be
45 : // overwritten and reused to avoid a memory allocation.
46 : Zero(dst *T) *T
47 :
48 : // Accumulate computes the annotation for a single file in a level's
49 : // metadata. It merges the file's value into dst and returns a bool flag
50 : // indicating whether or not the value is stable and okay to cache as an
51 : // annotation. If the file's value may change over the life of the file,
52 : // the annotator must return false.
53 : //
54 : // Implementations may modify dst and return it to avoid an allocation.
55 : Accumulate(f *FileMetadata, dst *T) (v *T, cacheOK bool)
56 :
57 : // Merge combines two values src and dst, returning the result.
58 : // Implementations may modify dst and return it to avoid an allocation.
59 : Merge(src *T, dst *T) *T
60 : }
61 :
62 : type annotation struct {
63 : // annotator is a pointer to the Annotator that computed this annotation.
64 : // NB: This is untyped to allow AnnotationAggregator to use Go generics,
65 : // since annotations are stored in a slice on each node and a single
66 : // slice cannot contain elements with different type parameters.
67 : annotator interface{}
68 : // v is contains the annotation value, the output of either
69 : // AnnotationAggregator.Accumulate or AnnotationAggregator.Merge.
70 : // NB: This is untyped for the same reason as annotator above.
71 : v interface{}
72 : // valid indicates whether future reads of the annotation may use the
73 : // value as-is. If false, v will be zeroed and recalculated.
74 : valid bool
75 : }
76 :
77 : // findAnnotation finds this Annotator's annotation on a node, creating
78 : // one if it doesn't already exist.
79 1 : func (a *Annotator[T]) findAnnotation(n *node) *annotation {
80 1 : for i := range n.annot {
81 1 : if n.annot[i].annotator == a {
82 1 : return &n.annot[i]
83 1 : }
84 : }
85 :
86 : // This node has never been annotated by a. Create a new annotation.
87 1 : n.annot = append(n.annot, annotation{
88 1 : annotator: a,
89 1 : v: a.Aggregator.Zero(nil),
90 1 : })
91 1 : return &n.annot[len(n.annot)-1]
92 : }
93 :
94 : // nodeAnnotation computes this annotator's annotation of this node across all
95 : // files in the node's subtree. The second return value indicates whether the
96 : // annotation is stable and thus cacheable.
97 1 : func (a *Annotator[T]) nodeAnnotation(n *node) (_ *T, cacheOK bool) {
98 1 : annot := a.findAnnotation(n)
99 1 : t := annot.v.(*T)
100 1 : // If the annotation is already marked as valid, we can return it without
101 1 : // recomputing anything.
102 1 : if annot.valid {
103 1 : return t, true
104 1 : }
105 :
106 1 : t = a.Aggregator.Zero(t)
107 1 : valid := true
108 1 :
109 1 : for i := int16(0); i <= n.count; i++ {
110 1 : if !n.leaf {
111 1 : v, ok := a.nodeAnnotation(n.children[i])
112 1 : t = a.Aggregator.Merge(v, t)
113 1 : valid = valid && ok
114 1 : }
115 :
116 1 : if i < n.count {
117 1 : var ok bool
118 1 : t, ok = a.Aggregator.Accumulate(n.items[i], t)
119 1 : valid = valid && ok
120 1 : }
121 : }
122 :
123 1 : annot.v = t
124 1 : annot.valid = valid
125 1 :
126 1 : return t, annot.valid
127 : }
128 :
129 : // accumulateRangeAnnotation computes this annotator's annotation across all
130 : // files in the node's subtree which overlap with the range defined by bounds.
131 : // The computed annotation is accumulated into a.scratch.
132 : func (a *Annotator[T]) accumulateRangeAnnotation(
133 : n *node,
134 : cmp base.Compare,
135 : bounds base.UserKeyBounds,
136 : // fullyWithinLowerBound and fullyWithinUpperBound indicate whether this
137 : // node's subtree is already known to be within each bound.
138 : fullyWithinLowerBound bool,
139 : fullyWithinUpperBound bool,
140 1 : ) {
141 1 : // If this node's subtree is fully within the bounds, compute a regular
142 1 : // annotation.
143 1 : if fullyWithinLowerBound && fullyWithinUpperBound {
144 1 : v, _ := a.nodeAnnotation(n)
145 1 : a.scratch = a.Aggregator.Merge(v, a.scratch)
146 1 : return
147 1 : }
148 :
149 : // We will accumulate annotations from each item in the end-exclusive
150 : // range [leftItem, rightItem).
151 1 : leftItem, rightItem := 0, int(n.count)
152 1 : if !fullyWithinLowerBound {
153 1 : // leftItem is the index of the first item that overlaps the lower bound.
154 1 : leftItem = sort.Search(int(n.count), func(i int) bool {
155 1 : return cmp(bounds.Start, n.items[i].Largest.UserKey) <= 0
156 1 : })
157 : }
158 1 : if !fullyWithinUpperBound {
159 1 : // rightItem is the index of the first item that does not overlap the
160 1 : // upper bound.
161 1 : rightItem = sort.Search(int(n.count), func(i int) bool {
162 1 : return !bounds.End.IsUpperBoundFor(cmp, n.items[i].Smallest.UserKey)
163 1 : })
164 : }
165 :
166 : // Accumulate annotations from every item that overlaps the bounds.
167 1 : for i := leftItem; i < rightItem; i++ {
168 1 : v, _ := a.Aggregator.Accumulate(n.items[i], a.scratch)
169 1 : a.scratch = v
170 1 : }
171 :
172 1 : if !n.leaf {
173 1 : // We will accumulate annotations from each child in the end-inclusive
174 1 : // range [leftChild, rightChild].
175 1 : leftChild, rightChild := leftItem, rightItem
176 1 : // If the lower bound overlaps with the child at leftItem, there is no
177 1 : // need to accumulate annotations from the child to its left.
178 1 : if leftItem < int(n.count) && cmp(bounds.Start, n.items[leftItem].Smallest.UserKey) >= 0 {
179 1 : leftChild++
180 1 : }
181 : // If the upper bound spans beyond the child at rightItem, we must also
182 : // accumulate annotations from the child to its right.
183 1 : if rightItem < int(n.count) && bounds.End.IsUpperBoundFor(cmp, n.items[rightItem].Largest.UserKey) {
184 0 : rightChild++
185 0 : }
186 :
187 1 : for i := leftChild; i <= rightChild; i++ {
188 1 : a.accumulateRangeAnnotation(
189 1 : n.children[i],
190 1 : cmp,
191 1 : bounds,
192 1 : // If this child is to the right of leftItem, then its entire
193 1 : // subtree is within the lower bound.
194 1 : fullyWithinLowerBound || i > leftItem,
195 1 : // If this child is to the left of rightItem, then its entire
196 1 : // subtree is within the upper bound.
197 1 : fullyWithinUpperBound || i < rightItem,
198 1 : )
199 1 : }
200 : }
201 : }
202 :
203 : // InvalidateAnnotation removes any existing cached annotations from this
204 : // annotator from a node's subtree.
205 1 : func (a *Annotator[T]) invalidateNodeAnnotation(n *node) {
206 1 : annot := a.findAnnotation(n)
207 1 : annot.valid = false
208 1 : if !n.leaf {
209 0 : for i := int16(0); i <= n.count; i++ {
210 0 : a.invalidateNodeAnnotation(n.children[i])
211 0 : }
212 : }
213 : }
214 :
215 : // LevelAnnotation calculates the annotation defined by this Annotator for all
216 : // files in the given LevelMetadata. A pointer to the Annotator is used as the
217 : // key for pre-calculated values, so the same Annotator must be used to avoid
218 : // duplicate computation. Annotation must not be called concurrently, and in
219 : // practice this is achieved by requiring callers to hold DB.mu.
220 1 : func (a *Annotator[T]) LevelAnnotation(lm LevelMetadata) *T {
221 1 : if lm.Empty() {
222 1 : return a.Aggregator.Zero(nil)
223 1 : }
224 :
225 1 : v, _ := a.nodeAnnotation(lm.tree.root)
226 1 : return v
227 : }
228 :
229 : // MultiLevelAnnotation calculates the annotation defined by this Annotator for
230 : // all files across the given levels. A pointer to the Annotator is used as the
231 : // key for pre-calculated values, so the same Annotator must be used to avoid
232 : // duplicate computation. Annotation must not be called concurrently, and in
233 : // practice this is achieved by requiring callers to hold DB.mu.
234 1 : func (a *Annotator[T]) MultiLevelAnnotation(lms []LevelMetadata) *T {
235 1 : aggregated := a.Aggregator.Zero(nil)
236 1 : for l := 0; l < len(lms); l++ {
237 1 : if !lms[l].Empty() {
238 1 : v := a.LevelAnnotation(lms[l])
239 1 : aggregated = a.Aggregator.Merge(v, aggregated)
240 1 : }
241 : }
242 1 : return aggregated
243 : }
244 :
245 : // LevelRangeAnnotation calculates the annotation defined by this Annotator for
246 : // the files within LevelMetadata which are within the range
247 : // [lowerBound, upperBound). A pointer to the Annotator is used as the key for
248 : // pre-calculated values, so the same Annotator must be used to avoid duplicate
249 : // computation. Annotation must not be called concurrently, and in practice this
250 : // is achieved by requiring callers to hold DB.mu.
251 1 : func (a *Annotator[T]) LevelRangeAnnotation(lm LevelMetadata, bounds base.UserKeyBounds) *T {
252 1 : if lm.Empty() {
253 0 : return a.Aggregator.Zero(nil)
254 0 : }
255 :
256 1 : a.scratch = a.Aggregator.Zero(a.scratch)
257 1 : a.accumulateRangeAnnotation(lm.tree.root, lm.tree.cmp, bounds, false, false)
258 1 : return a.scratch
259 : }
260 :
261 : // InvalidateAnnotation clears any cached annotations defined by Annotator. A
262 : // pointer to the Annotator is used as the key for pre-calculated values, so
263 : // the same Annotator must be used to clear the appropriate cached annotation.
264 : // InvalidateAnnotation must not be called concurrently, and in practice this
265 : // is achieved by requiring callers to hold DB.mu.
266 1 : func (a *Annotator[T]) InvalidateLevelAnnotation(lm LevelMetadata) {
267 1 : if lm.Empty() {
268 0 : return
269 0 : }
270 1 : a.invalidateNodeAnnotation(lm.tree.root)
271 : }
272 :
273 : // sumAggregator defines an Aggregator which sums together a uint64 value
274 : // across files.
275 : type sumAggregator struct {
276 : accumulateFunc func(f *FileMetadata) (v uint64, cacheOK bool)
277 : }
278 :
279 1 : func (sa sumAggregator) Zero(dst *uint64) *uint64 {
280 1 : if dst == nil {
281 1 : return new(uint64)
282 1 : }
283 1 : *dst = 0
284 1 : return dst
285 : }
286 :
287 1 : func (sa sumAggregator) Accumulate(f *FileMetadata, dst *uint64) (v *uint64, cacheOK bool) {
288 1 : accumulated, ok := sa.accumulateFunc(f)
289 1 : *dst += accumulated
290 1 : return dst, ok
291 1 : }
292 :
293 1 : func (sa sumAggregator) Merge(src *uint64, dst *uint64) *uint64 {
294 1 : *dst += *src
295 1 : return dst
296 1 : }
297 :
298 : // SumAnnotator takes a function that computes a uint64 value from a single
299 : // FileMetadata and returns an Annotator that sums together the values across
300 : // files.
301 1 : func SumAnnotator(accumulate func(f *FileMetadata) (v uint64, cacheOK bool)) *Annotator[uint64] {
302 1 : return &Annotator[uint64]{
303 1 : Aggregator: sumAggregator{
304 1 : accumulateFunc: accumulate,
305 1 : },
306 1 : }
307 1 : }
308 :
309 : // NumFilesAnnotator is an Annotator which computes an annotation value
310 : // equal to the number of files included in the annotation. Particularly, it
311 : // can be used to efficiently calculate the number of files in a given key
312 : // range using range annotations.
313 1 : var NumFilesAnnotator = SumAnnotator(func(f *FileMetadata) (uint64, bool) {
314 1 : return 1, true
315 1 : })
316 :
317 : // PickFileAggregator implements the AnnotationAggregator interface. It defines
318 : // an aggregator that picks a single file from a set of eligible files.
319 : type PickFileAggregator struct {
320 : // Filter takes a FileMetadata and returns whether it is eligible to be
321 : // picked by this PickFileAggregator. The second return value indicates
322 : // whether this eligibility is stable and thus cacheable.
323 : Filter func(f *FileMetadata) (eligible bool, cacheOK bool)
324 : // Compare compares two instances of FileMetadata and returns true if
325 : // the first one should be picked over the second one. It may assume
326 : // that both arguments are non-nil.
327 : Compare func(f1 *FileMetadata, f2 *FileMetadata) bool
328 : }
329 :
330 : // Zero implements AnnotationAggregator.Zero, returning nil as the zero value.
331 1 : func (fa PickFileAggregator) Zero(dst *FileMetadata) *FileMetadata {
332 1 : return nil
333 1 : }
334 :
335 1 : func (fa PickFileAggregator) mergePickedFiles(src *FileMetadata, dst *FileMetadata) *FileMetadata {
336 1 : switch {
337 1 : case src == nil:
338 1 : return dst
339 1 : case dst == nil:
340 1 : return src
341 1 : case fa.Compare(src, dst):
342 1 : return src
343 1 : default:
344 1 : return dst
345 : }
346 : }
347 :
348 : // Accumulate implements AnnotationAggregator.Accumulate, accumulating a single
349 : // file as long as it is eligible to be picked.
350 : func (fa PickFileAggregator) Accumulate(
351 : f *FileMetadata, dst *FileMetadata,
352 1 : ) (v *FileMetadata, cacheOK bool) {
353 1 : eligible, ok := fa.Filter(f)
354 1 : if eligible {
355 1 : return fa.mergePickedFiles(f, dst), ok
356 1 : }
357 1 : return dst, ok
358 : }
359 :
360 : // Merge implements AnnotationAggregator.Merge by picking a single file based
361 : // on the output of PickFileAggregator.Compare.
362 1 : func (fa PickFileAggregator) Merge(src *FileMetadata, dst *FileMetadata) *FileMetadata {
363 1 : return fa.mergePickedFiles(src, dst)
364 1 : }
|