Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "math"
11 : "slices"
12 : "time"
13 :
14 : "github.com/cockroachdb/crlib/crmath"
15 : "github.com/cockroachdb/crlib/crtime"
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/base"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
21 : "github.com/cockroachdb/pebble/internal/manifest"
22 : "github.com/cockroachdb/pebble/sstable"
23 : "github.com/cockroachdb/pebble/sstable/block"
24 : "github.com/cockroachdb/redact"
25 : )
26 :
27 : // In-memory statistics about tables help inform compaction picking, but may
28 : // be expensive to calculate or load from disk. Every time a database is
29 : // opened, these statistics must be reloaded or recalculated. To minimize
30 : // impact on user activity and compactions, we load these statistics
31 : // asynchronously in the background and store loaded statistics in each
32 : // table's *TableMetadata.
33 : //
34 : // This file implements the asynchronous loading of statistics by maintaining
35 : // a list of files that require statistics, alongside their LSM levels.
36 : // Whenever new files are added to the LSM, the files are appended to
37 : // d.mu.tableStats.pending. If a stats collection job is not currently
38 : // running, one is started in a separate goroutine.
39 : //
40 : // The stats collection job grabs and clears the pending list, computes table
41 : // statistics relative to the current readState and updates the tables' file
42 : // metadata. New pending files may accumulate during a stats collection job,
43 : // so a completing job triggers a new job if necessary. Only one job runs at a
44 : // time.
45 : //
46 : // When an existing database is opened, all files lack in-memory statistics.
47 : // These files' stats are loaded incrementally whenever the pending list is
48 : // empty by scanning a current readState for files missing statistics. Once a
49 : // job completes a scan without finding any remaining files without
50 : // statistics, it flips a `loadedInitial` flag. From then on, the stats
51 : // collection job only needs to load statistics for new files appended to the
52 : // pending list.
53 :
54 1 : func (d *DB) maybeCollectTableStatsLocked() {
55 1 : if d.shouldCollectTableStatsLocked() {
56 1 : go d.collectTableStats()
57 1 : }
58 : }
59 :
60 : // updateTableStatsLocked is called when new files are introduced, after the
61 : // read state has been updated. It may trigger a new stat collection.
62 : // DB.mu must be locked when calling.
63 1 : func (d *DB) updateTableStatsLocked(newTables []manifest.NewTableEntry) {
64 1 : var needStats bool
65 1 : for _, nf := range newTables {
66 1 : if _, statsValid := nf.Meta.Stats(); !statsValid {
67 1 : needStats = true
68 1 : break
69 : }
70 : }
71 1 : if !needStats {
72 1 : return
73 1 : }
74 :
75 1 : d.mu.tableStats.pending = append(d.mu.tableStats.pending, newTables...)
76 1 : d.maybeCollectTableStatsLocked()
77 : }
78 :
79 1 : func (d *DB) shouldCollectTableStatsLocked() bool {
80 1 : return !d.mu.tableStats.loading &&
81 1 : d.closed.Load() == nil &&
82 1 : !d.opts.DisableTableStats &&
83 1 : (len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
84 1 : }
85 :
86 : // collectTableStats runs a table stats collection job, returning true if the
87 : // invocation did the collection work, false otherwise (e.g. if another job was
88 : // already running).
89 1 : func (d *DB) collectTableStats() bool {
90 1 : const maxTableStatsPerScan = 50
91 1 :
92 1 : d.mu.Lock()
93 1 : if !d.shouldCollectTableStatsLocked() {
94 1 : d.mu.Unlock()
95 1 : return false
96 1 : }
97 1 : ctx := context.Background()
98 1 :
99 1 : pending := d.mu.tableStats.pending
100 1 : d.mu.tableStats.pending = nil
101 1 : d.mu.tableStats.loading = true
102 1 : jobID := d.newJobIDLocked()
103 1 : // Drop DB.mu before performing IO.
104 1 : d.mu.Unlock()
105 1 :
106 1 : // Every run of collectTableStats either collects stats from the pending
107 1 : // list (if non-empty) or from scanning the version (loadedInitial is
108 1 : // false). This job only runs if at least one of those conditions holds.
109 1 :
110 1 : // Grab a read state to scan for tables.
111 1 : rs := d.loadReadState()
112 1 : var collected []collectedStats
113 1 : var hints []deleteCompactionHint
114 1 : initialLoadCompleted := false
115 1 : if len(pending) > 0 {
116 1 : collected, hints = d.loadNewFileStats(ctx, rs, pending)
117 1 : } else {
118 1 : var moreRemain bool
119 1 : var buf [maxTableStatsPerScan]collectedStats
120 1 : collected, hints, moreRemain = d.scanReadStateTableStats(ctx, rs.current, buf[:0])
121 1 : if !moreRemain {
122 1 : // Once we're done with table stats, load blob file properties.
123 1 : moreRemain = d.scanBlobFileProperties(ctx, rs.current, maxTableStatsPerScan-len(collected))
124 1 : if !moreRemain {
125 1 : initialLoadCompleted = true
126 1 : }
127 : }
128 : }
129 1 : rs.unref()
130 1 :
131 1 : // Update the TableMetadata with the loaded stats while holding d.mu.
132 1 : d.mu.Lock()
133 1 : defer d.mu.Unlock()
134 1 : d.mu.tableStats.loading = false
135 1 : if initialLoadCompleted && !d.mu.tableStats.loadedInitial {
136 1 : d.mu.tableStats.loadedInitial = true
137 1 : d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
138 1 : JobID: int(jobID),
139 1 : })
140 1 : }
141 :
142 1 : maybeCompact := false
143 1 : for _, c := range collected {
144 1 : sanityCheckStats(c.TableMetadata, &c.TableStats, d.opts.Logger, "collected stats")
145 1 : c.TableMetadata.PopulateStats(&c.TableStats)
146 1 : maybeCompact = maybeCompact || tableTombstoneCompensation(c.TableMetadata) > 0
147 1 : }
148 :
149 1 : d.mu.tableStats.cond.Broadcast()
150 1 : d.maybeCollectTableStatsLocked()
151 1 : if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
152 1 : // Verify that all of the hint tombstones' files still exist in the
153 1 : // current version. Otherwise, the tombstone itself may have been
154 1 : // compacted into L6 and more recent keys may have had their sequence
155 1 : // numbers zeroed.
156 1 : //
157 1 : // Note that it's possible that the tombstone file is being compacted
158 1 : // presently. In that case, the file will be present in v. When the
159 1 : // compaction finishes compacting the tombstone file, it will detect
160 1 : // and clear the hint.
161 1 : //
162 1 : // See DB.maybeUpdateDeleteCompactionHints.
163 1 : v := d.mu.versions.currentVersion()
164 1 : keepHints := hints[:0]
165 1 : for _, h := range hints {
166 1 : if v.Contains(h.tombstoneLevel, h.tombstoneFile) {
167 1 : keepHints = append(keepHints, h)
168 1 : }
169 : }
170 1 : d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
171 : }
172 1 : if maybeCompact {
173 1 : d.maybeScheduleCompaction()
174 1 : }
175 1 : return true
176 : }
177 :
178 : type collectedStats struct {
179 : *manifest.TableMetadata
180 : manifest.TableStats
181 : }
182 :
183 : func (d *DB) loadNewFileStats(
184 : ctx context.Context, rs *readState, pending []manifest.NewTableEntry,
185 1 : ) ([]collectedStats, []deleteCompactionHint) {
186 1 : var hints []deleteCompactionHint
187 1 : collected := make([]collectedStats, 0, len(pending))
188 1 : for _, nf := range pending {
189 1 : // A file's stats might have been populated by an earlier call to
190 1 : // loadNewFileStats if the file was moved.
191 1 : // NB: Only collectTableStats updates f.Stats for active files, and we
192 1 : // ensure only one goroutine runs it at a time through
193 1 : // d.mu.tableStats.loading.
194 1 : if _, ok := nf.Meta.Stats(); ok {
195 1 : continue
196 : }
197 :
198 : // The file isn't guaranteed to still be live in the readState's
199 : // version. It may have been deleted or moved. Skip it if it's not in
200 : // the expected level.
201 1 : if !rs.current.Contains(nf.Level, nf.Meta) {
202 1 : continue
203 : }
204 :
205 1 : stats, newHints, err := d.loadTableStats(ctx, rs.current, nf.Level, nf.Meta)
206 1 : if err != nil {
207 0 : d.opts.EventListener.BackgroundError(err)
208 0 : continue
209 : }
210 : // NB: We don't update the TableMetadata yet, because we aren't holding
211 : // DB.mu. We'll copy it to the TableMetadata after we're finished with
212 : // IO.
213 1 : collected = append(collected, collectedStats{
214 1 : TableMetadata: nf.Meta,
215 1 : TableStats: stats,
216 1 : })
217 1 : hints = append(hints, newHints...)
218 : }
219 1 : return collected, hints
220 : }
221 :
222 : // scanReadStateTableStats is run by an active stat collection job when there
223 : // are no pending new files, but there might be files that existed at Open for
224 : // which we haven't loaded table stats.
225 : func (d *DB) scanReadStateTableStats(
226 : ctx context.Context, version *manifest.Version, fill []collectedStats,
227 1 : ) (_ []collectedStats, _ []deleteCompactionHint, moreRemain bool) {
228 1 : var hints []deleteCompactionHint
229 1 : sizesChecked := make(map[base.DiskFileNum]struct{})
230 1 : // TODO(radu): an O(#tables) scan every time could be problematic.
231 1 : for l, levelMetadata := range version.Levels {
232 1 : for f := range levelMetadata.All() {
233 1 : // NB: Only the active stats collection job updates f.Stats for active
234 1 : // files, and we ensure only one goroutine runs it at a time through
235 1 : // d.mu.tableStats.loading.
236 1 : if _, ok := f.Stats(); ok {
237 1 : continue
238 : }
239 :
240 : // Limit how much work we do per read state. The older the read
241 : // state is, the higher the likelihood files are no longer being
242 : // used in the current version. If we've exhausted our allowance,
243 : // return true for the last return value to signal there's more
244 : // work to do.
245 1 : if len(fill) == cap(fill) {
246 1 : return fill, hints, true
247 1 : }
248 :
249 : // If the file is remote and not SharedForeign, we should check if its size
250 : // matches. This is because checkConsistency skips over remote files.
251 : //
252 : // SharedForeign and External files are skipped as their sizes are allowed
253 : // to have a mismatch; the size stored in the TableBacking is just the part
254 : // of the file that is referenced by this Pebble instance, not the size of
255 : // the whole object.
256 1 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
257 1 : if err != nil {
258 0 : // Set `moreRemain` so we'll try again.
259 0 : moreRemain = true
260 0 : d.opts.EventListener.BackgroundError(err)
261 0 : continue
262 : }
263 :
264 1 : shouldCheckSize := objMeta.IsRemote() &&
265 1 : !d.objProvider.IsSharedForeign(objMeta) &&
266 1 : !objMeta.IsExternal()
267 1 : if _, ok := sizesChecked[f.TableBacking.DiskFileNum]; !ok && shouldCheckSize {
268 1 : size, err := d.objProvider.Size(objMeta)
269 1 : fileSize := f.TableBacking.Size
270 1 : if err != nil {
271 0 : moreRemain = true
272 0 : d.opts.EventListener.BackgroundError(err)
273 0 : continue
274 : }
275 1 : if size != int64(fileSize) {
276 0 : err := errors.Errorf(
277 0 : "during consistency check in loadTableStats: L%d: %s: object size mismatch (%s): %d (provider) != %d (MANIFEST)",
278 0 : errors.Safe(l), f.TableNum, d.objProvider.Path(objMeta),
279 0 : errors.Safe(size), errors.Safe(fileSize))
280 0 : d.opts.EventListener.BackgroundError(err)
281 0 : d.opts.Logger.Fatalf("%s", err)
282 0 : }
283 :
284 1 : sizesChecked[f.TableBacking.DiskFileNum] = struct{}{}
285 : }
286 :
287 1 : stats, newHints, err := d.loadTableStats(ctx, version, l, f)
288 1 : if err != nil {
289 0 : // Set `moreRemain` so we'll try again.
290 0 : moreRemain = true
291 0 : d.opts.EventListener.BackgroundError(err)
292 0 : continue
293 : }
294 1 : fill = append(fill, collectedStats{
295 1 : TableMetadata: f,
296 1 : TableStats: stats,
297 1 : })
298 1 : hints = append(hints, newHints...)
299 : }
300 : }
301 1 : return fill, hints, moreRemain
302 : }
303 :
304 : // populateBlobFileProperties reads at most maxNum blob file properties for blob
305 : // files that don't have them populated. Returns false once all properties have
306 : // been populated.
307 : func (d *DB) scanBlobFileProperties(
308 : ctx context.Context, version *manifest.Version, maxNum int,
309 1 : ) (moreRemain bool) {
310 1 : // TODO(radu): an O(#files) scan every time could be problematic.
311 1 : // We could remember the last blob file ID and scan from there.
312 1 : for f := range version.BlobFiles.All() {
313 1 : if _, propsValid := f.Physical.Properties(); propsValid {
314 1 : // Properties are already populated.
315 1 : continue
316 : }
317 1 : if maxNum == 0 {
318 1 : // We've reached the limit for this scan, and there are more files
319 1 : // remaining.
320 1 : return true
321 1 : }
322 1 : maxNum--
323 1 : v, err := d.fileCache.findOrCreateBlob(ctx, f.Physical, block.InitFileReadStats{})
324 1 : if err != nil {
325 0 : // Set moreRemain so we'll try again.
326 0 : moreRemain = true
327 0 : continue
328 : }
329 1 : blobReader := v.Value().mustBlob()
330 1 : blobProps, err := blobReader.ReadProperties(ctx)
331 1 : v.Unref()
332 1 : if err != nil {
333 0 : // Set moreRemain so we'll try again.
334 0 : moreRemain = true
335 0 : continue
336 : }
337 : // It is ok to call PopulateProperties here because this function runs as
338 : // part of a table statistics job, and at most one goroutine runs this at a
339 : // time (see d.mu.tableStats.loading).
340 1 : f.Physical.PopulateProperties(&blobProps)
341 : }
342 1 : return moreRemain
343 : }
344 :
345 : func (d *DB) loadTableStats(
346 : ctx context.Context, v *manifest.Version, level int, meta *manifest.TableMetadata,
347 1 : ) (manifest.TableStats, []deleteCompactionHint, error) {
348 1 : var compactionHints []deleteCompactionHint
349 1 : var rangeDeletionsBytesEstimate uint64
350 1 :
351 1 : backingProps, backingPropsOk := meta.TableBacking.Properties()
352 1 :
353 1 : blockReadEnv := block.ReadEnv{
354 1 : Level: base.MakeLevel(level),
355 1 : }
356 1 : // If the stats are already available (always the case other than after
357 1 : // initial startup), and there are no range deletions or range key deletions,
358 1 : // we avoid opening the table.
359 1 : if !backingPropsOk || backingProps.NumRangeDeletions > 0 || backingProps.NumRangeKeyDels > 0 {
360 1 : err := d.fileCache.withReader(
361 1 : ctx, blockReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
362 1 : if !backingPropsOk {
363 1 : loadedProps, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
364 1 : if err != nil {
365 0 : return err
366 0 : }
367 1 : backingProps = meta.TableBacking.PopulateProperties(&loadedProps)
368 : }
369 1 : if backingProps.NumRangeDeletions > 0 || backingProps.NumRangeKeyDels > 0 {
370 1 : compactionHints, rangeDeletionsBytesEstimate, err = d.loadTableRangeDelStats(ctx, r, v, level, meta, env)
371 1 : if err != nil {
372 0 : return err
373 0 : }
374 : }
375 1 : return err
376 : })
377 1 : if err != nil {
378 0 : return manifest.TableStats{}, nil, err
379 0 : }
380 : }
381 :
382 1 : var stats manifest.TableStats
383 1 : stats.RangeDeletionsBytesEstimate = rangeDeletionsBytesEstimate
384 1 :
385 1 : if backingProps.NumPointDeletions() > 0 {
386 1 : var err error
387 1 : stats.PointDeletionsBytesEstimate, err = d.loadTablePointKeyStats(ctx, backingProps, v, level, meta)
388 1 : if err != nil {
389 0 : return stats, nil, err
390 0 : }
391 : }
392 1 : return stats, compactionHints, nil
393 : }
394 :
395 : // loadTablePointKeyStats calculates the point key statistics for the given
396 : // table.
397 : //
398 : // The backing props are scaled as necessary if the table is virtual.
399 : func (d *DB) loadTablePointKeyStats(
400 : ctx context.Context,
401 : props *manifest.TableBackingProperties,
402 : v *manifest.Version,
403 : level int,
404 : meta *manifest.TableMetadata,
405 1 : ) (pointDeletionsBytes uint64, _ error) {
406 1 : // TODO(jackson): If the file has a wide keyspace, the average
407 1 : // value size beneath the entire file might not be representative
408 1 : // of the size of the keys beneath the point tombstones.
409 1 : // We could write the ranges of 'clusters' of point tombstones to
410 1 : // a sstable property and call averageValueSizeBeneath for each of
411 1 : // these narrower ranges to improve the estimate.
412 1 : avgValLogicalSize, compressionRatio, err := d.estimateSizesBeneath(ctx, v, level, meta, props)
413 1 : if err != nil {
414 0 : return 0, err
415 0 : }
416 1 : pointDeletionsBytes = pointDeletionsBytesEstimate(props, avgValLogicalSize, compressionRatio)
417 1 : pointDeletionsBytes = meta.ScaleStatistic(pointDeletionsBytes)
418 1 : return pointDeletionsBytes, nil
419 : }
420 :
421 : // loadTableRangeDelStats calculates the range deletion and range key deletion
422 : // statistics for the given table.
423 : func (d *DB) loadTableRangeDelStats(
424 : ctx context.Context,
425 : r *sstable.Reader,
426 : v *manifest.Version,
427 : level int,
428 : meta *manifest.TableMetadata,
429 : env sstable.ReadEnv,
430 1 : ) (_ []deleteCompactionHint, rangeDeletionsBytesEstimate uint64, _ error) {
431 1 : iter, err := newCombinedDeletionKeyspanIter(ctx, d.opts.Comparer, r, meta, env)
432 1 : if err != nil {
433 0 : return nil, 0, err
434 0 : }
435 1 : defer iter.Close()
436 1 : var compactionHints []deleteCompactionHint
437 1 : // We iterate over the defragmented range tombstones and range key deletions,
438 1 : // which ensures we don't double count ranges deleted at different sequence
439 1 : // numbers. Also, merging abutting tombstones reduces the number of calls to
440 1 : // estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
441 1 : // our overall estimate.
442 1 : s, err := iter.First()
443 1 : for ; s != nil; s, err = iter.Next() {
444 1 : start, end := s.Start, s.End
445 1 : // We only need to consider deletion size estimates for tables that contain
446 1 : // RANGEDELs.
447 1 : var maxRangeDeleteSeqNum base.SeqNum
448 1 : for _, k := range s.Keys {
449 1 : if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
450 1 : maxRangeDeleteSeqNum = k.SeqNum()
451 1 : break
452 : }
453 : }
454 :
455 : // If the file is in the last level of the LSM, there is no data beneath
456 : // it. The fact that there is still a range tombstone in a bottommost file
457 : // indicates two possibilites:
458 : // 1. an open snapshot kept the tombstone around, and the data the
459 : // tombstone deletes is contained within the file itself.
460 : // 2. the file was ingested.
461 : // In the first case, we'd like to estimate disk usage within the file
462 : // itself since compacting the file will drop that covered data. In the
463 : // second case, we expect that compacting the file will NOT drop any
464 : // data and rewriting the file is a waste of write bandwidth. We can
465 : // distinguish these cases by looking at the table metadata's sequence
466 : // numbers. A file's range deletions can only delete data within the
467 : // file at lower sequence numbers. All keys in an ingested sstable adopt
468 : // the same sequence number, preventing tombstones from deleting keys
469 : // within the same file. We check here if the largest RANGEDEL sequence
470 : // number is greater than the file's smallest sequence number. If it is,
471 : // the RANGEDEL could conceivably (although inconclusively) delete data
472 : // within the same file.
473 : //
474 : // Note that this heuristic is imperfect. If a table containing a range
475 : // deletion is ingested into L5 and subsequently compacted into L6 but
476 : // an open snapshot prevents elision of covered keys in L6, the
477 : // resulting RangeDeletionsBytesEstimate will incorrectly include all
478 : // covered keys.
479 : //
480 : // TODO(jackson): We could prevent the above error in the heuristic by
481 : // computing the file's RangeDeletionsBytesEstimate during the
482 : // compaction itself. It's unclear how common this is.
483 : //
484 : // NOTE: If the span `s` wholly contains a table containing range keys,
485 : // the returned size estimate will be slightly inflated by the range key
486 : // block. However, in practice, range keys are expected to be rare, and
487 : // the size of the range key block relative to the overall size of the
488 : // table is expected to be small.
489 1 : if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
490 1 : size, err := estimateDiskUsageInTableAndBlobReferences(r, start, end, env, meta)
491 1 : if err != nil {
492 0 : return nil, 0, err
493 0 : }
494 1 : rangeDeletionsBytesEstimate += size
495 1 :
496 1 : // As the file is in the bottommost level, there is no need to collect a
497 1 : // deletion hint.
498 1 : continue
499 : }
500 :
501 : // While the size estimates for point keys should only be updated if this
502 : // span contains a range del, the sequence numbers are required for the
503 : // hint. Unconditionally descend, but conditionally update the estimates.
504 1 : hintType := compactionHintFromKeys(s.Keys)
505 1 : estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(ctx, v, level, start, end, hintType)
506 1 : if err != nil {
507 0 : return nil, 0, err
508 0 : }
509 1 : rangeDeletionsBytesEstimate += estimate
510 1 :
511 1 : // hintSeqNum is the smallest sequence number contained in any
512 1 : // file overlapping with the hint and in a level below it.
513 1 : if hintSeqNum == math.MaxUint64 {
514 1 : continue
515 : }
516 1 : compactionHints = append(compactionHints, deleteCompactionHint{
517 1 : hintType: hintType,
518 1 : start: slices.Clone(start),
519 1 : end: slices.Clone(end),
520 1 : tombstoneFile: meta,
521 1 : tombstoneLevel: level,
522 1 : tombstoneLargestSeqNum: s.LargestSeqNum(),
523 1 : tombstoneSmallestSeqNum: s.SmallestSeqNum(),
524 1 : fileSmallestSeqNum: hintSeqNum,
525 1 : })
526 : }
527 1 : if err != nil {
528 0 : return nil, 0, err
529 0 : }
530 1 : return compactionHints, rangeDeletionsBytesEstimate, nil
531 : }
532 :
533 : // estimateSizesBeneath calculates two statistics describing the data in the LSM
534 : // below the provided table metadata:
535 : //
536 : // 1. The average logical size of values: This is a precompression sum of
537 : // non-tombstone values. It's helpful for estimating how much data a DEL
538 : // might delete.
539 : // 2. The compression ratio of the data beneath the table.
540 : //
541 : // estimateSizesBeneath walks the LSM table metadata for all tables beneath meta
542 : // (plus the table itself), computing the above statistics.
543 : func (d *DB) estimateSizesBeneath(
544 : ctx context.Context,
545 : v *manifest.Version,
546 : level int,
547 : meta *manifest.TableMetadata,
548 : fileProps *manifest.TableBackingProperties,
549 1 : ) (avgValueLogicalSize, compressionRatio float64, err error) {
550 1 : // Find all files in lower levels that overlap with meta,
551 1 : // summing their value sizes and entry counts.
552 1 :
553 1 : // Include the file itself. This is important because in some instances, the
554 1 : // computed compression ratio is applied to the tombstones contained within
555 1 : // `meta` itself. If there are no files beneath `meta` in the LSM, we would
556 1 : // calculate a compression ratio of 0 which is not accurate for the file's
557 1 : // own tombstones.
558 1 : var (
559 1 : fileSum = meta.Size + meta.EstimatedReferenceSize()
560 1 : entryCount = fileProps.NumEntries
561 1 : deletionCount = fileProps.NumDeletions
562 1 : keySum = fileProps.RawKeySize
563 1 : valSum = fileProps.RawValueSize
564 1 : )
565 1 :
566 1 : for l := level + 1; l < numLevels; l++ {
567 1 : for tableBeneath := range v.Overlaps(l, meta.UserKeyBounds()).All() {
568 1 : fileSum += tableBeneath.Size + tableBeneath.EstimatedReferenceSize()
569 1 :
570 1 : backingProps, ok := tableBeneath.TableBacking.Properties()
571 1 : if !ok {
572 1 : // If properties aren't available, we need to read the properties block.
573 1 : err := d.fileCache.withReader(ctx, block.NoReadEnv, tableBeneath, func(v *sstable.Reader, _ sstable.ReadEnv) (err error) {
574 1 : loadedProps, err := v.ReadPropertiesBlock(ctx, nil /* buffer pool */)
575 1 : if err != nil {
576 0 : return err
577 0 : }
578 : // It is ok to call PopulateProperties here because this function runs as part of
579 : // a table statistics job, and at most one goroutine runs this at a
580 : // time (see d.mu.tableStats.loading).
581 1 : backingProps = tableBeneath.TableBacking.PopulateProperties(&loadedProps)
582 1 : return nil
583 : })
584 1 : if err != nil {
585 0 : return 0, 0, err
586 0 : }
587 : }
588 :
589 1 : entryCount += tableBeneath.ScaleStatistic(backingProps.NumEntries)
590 1 : deletionCount += tableBeneath.ScaleStatistic(backingProps.NumDeletions)
591 1 : keySum += tableBeneath.ScaleStatistic(backingProps.RawKeySize)
592 1 : valSum += tableBeneath.ScaleStatistic(backingProps.RawValueSize)
593 1 : continue
594 : }
595 : }
596 1 : if entryCount == 0 {
597 0 : return 0, 0, nil
598 0 : }
599 : // RawKeySize and RawValueSize are uncompressed totals. We'll need to scale
600 : // the value sum according to the data size to account for compression,
601 : // index blocks and metadata overhead. Eg:
602 : //
603 : // Compression rate × Average uncompressed value size
604 : //
605 : // ↓
606 : //
607 : // FileSize RawValueSize
608 : // ----------------------- × -------------------------
609 : // RawKeySize+RawValueSize NumEntries - NumDeletions
610 : //
611 : // We return the average logical value size plus the compression ratio,
612 : // leaving the scaling to the caller. This allows the caller to perform
613 : // additional compression ratio scaling if necessary.
614 1 : uncompressedSum := float64(keySum + valSum)
615 1 : compressionRatio = float64(fileSum) / uncompressedSum
616 1 : if compressionRatio > 1 {
617 1 : // We can get huge compression ratios due to the fixed overhead of files
618 1 : // containing a tiny amount of data. By setting this to 1, we are ignoring
619 1 : // that overhead, but we accept that tradeoff since the total bytes in
620 1 : // such overhead is not large.
621 1 : compressionRatio = 1
622 1 : }
623 : // When calculating the average value size, we subtract the number of
624 : // deletions from the total number of entries.
625 1 : avgValueLogicalSize = float64(valSum) / float64(max(1, invariants.SafeSub(entryCount, deletionCount)))
626 1 : return avgValueLogicalSize, compressionRatio, nil
627 : }
628 :
629 : func (d *DB) estimateReclaimedSizeBeneath(
630 : ctx context.Context,
631 : v *manifest.Version,
632 : level int,
633 : start, end []byte,
634 : hintType deleteCompactionHintType,
635 1 : ) (estimate uint64, hintSeqNum base.SeqNum, err error) {
636 1 : // Find all files in lower levels that overlap with the deleted range
637 1 : // [start, end).
638 1 : //
639 1 : // An overlapping file might be completely contained by the range
640 1 : // tombstone, in which case we can count the entire file size in
641 1 : // our estimate without doing any additional I/O.
642 1 : //
643 1 : // Otherwise, estimating the range for the file requires
644 1 : // additional I/O to read the file's index blocks.
645 1 : hintSeqNum = math.MaxUint64
646 1 : // TODO(jbowens): When there are multiple sub-levels in L0 and the RANGEDEL
647 1 : // is from a higher sub-level, we incorrectly skip the files in the lower
648 1 : // sub-levels when estimating this overlap.
649 1 : for l := level + 1; l < numLevels; l++ {
650 1 : for file := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(start, end)).All() {
651 1 : // Determine whether we need to update size estimates and hint seqnums
652 1 : // based on the type of hint and the type of keys in this file.
653 1 : var updateEstimates, updateHints bool
654 1 : switch hintType {
655 1 : case deleteCompactionHintTypePointKeyOnly:
656 1 : // The range deletion byte estimates should only be updated if this
657 1 : // table contains point keys. This ends up being an overestimate in
658 1 : // the case that table also has range keys, but such keys are expected
659 1 : // to contribute a negligible amount of the table's overall size,
660 1 : // relative to point keys.
661 1 : if file.HasPointKeys {
662 1 : updateEstimates = true
663 1 : }
664 : // As the initiating span contained only range dels, hints can only be
665 : // updated if this table does _not_ contain range keys.
666 1 : if !file.HasRangeKeys {
667 1 : updateHints = true
668 1 : }
669 1 : case deleteCompactionHintTypeRangeKeyOnly:
670 1 : // The initiating span contained only range key dels. The estimates
671 1 : // apply only to point keys, and are therefore not updated.
672 1 : updateEstimates = false
673 1 : // As the initiating span contained only range key dels, hints can
674 1 : // only be updated if this table does _not_ contain point keys.
675 1 : if !file.HasPointKeys {
676 1 : updateHints = true
677 1 : }
678 1 : case deleteCompactionHintTypePointAndRangeKey:
679 1 : // Always update the estimates and hints, as this hint type can drop a
680 1 : // file, irrespective of the mixture of keys. Similar to above, the
681 1 : // range del bytes estimates is an overestimate.
682 1 : updateEstimates, updateHints = true, true
683 0 : default:
684 0 : panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
685 : }
686 1 : startCmp := d.cmp(start, file.Smallest().UserKey)
687 1 : endCmp := d.cmp(file.Largest().UserKey, end)
688 1 : if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest().IsExclusiveSentinel()) {
689 1 : // The range fully contains the file, so skip looking it up in table
690 1 : // cache/looking at its indexes and add the full file size.
691 1 : if updateEstimates {
692 1 : estimate += file.Size + file.EstimatedReferenceSize()
693 1 : }
694 1 : if updateHints && hintSeqNum > file.SmallestSeqNum {
695 1 : hintSeqNum = file.SmallestSeqNum
696 1 : }
697 1 : } else if d.cmp(file.Smallest().UserKey, end) <= 0 && d.cmp(start, file.Largest().UserKey) <= 0 {
698 1 : // Partial overlap.
699 1 : if hintType == deleteCompactionHintTypeRangeKeyOnly {
700 1 : // If the hint that generated this overlap contains only range keys,
701 1 : // there is no need to calculate disk usage, as the reclaimable space
702 1 : // is expected to be minimal relative to point keys.
703 1 : continue
704 : }
705 1 : var size uint64
706 1 : err := d.fileCache.withReader(ctx, block.NoReadEnv, file,
707 1 : func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
708 1 : size, err = estimateDiskUsageInTableAndBlobReferences(r, start, end, env, file)
709 1 : return err
710 1 : })
711 1 : if err != nil {
712 0 : return 0, hintSeqNum, err
713 0 : }
714 1 : estimate += size
715 1 : if updateHints && hintSeqNum > file.SmallestSeqNum && d.FormatMajorVersion() >= FormatVirtualSSTables {
716 1 : // If the format major version is past Virtual SSTables, deletion only
717 1 : // hints can also apply to partial overlaps with sstables.
718 1 : hintSeqNum = file.SmallestSeqNum
719 1 : }
720 : }
721 : }
722 : }
723 1 : return estimate, hintSeqNum, nil
724 : }
725 :
726 : var lastSanityCheckStatsLog crtime.AtomicMono
727 :
728 : func sanityCheckStats(
729 : meta *manifest.TableMetadata, stats *manifest.TableStats, logger Logger, info string,
730 1 : ) {
731 1 : // Values for PointDeletionsBytesEstimate and RangeDeletionsBytesEstimate that
732 1 : // exceed this value are likely indicative of a bug (eg, underflow).
733 1 : const maxDeletionBytesEstimate = 1 << 50 // 1 PiB
734 1 :
735 1 : if stats.PointDeletionsBytesEstimate > maxDeletionBytesEstimate ||
736 1 : stats.RangeDeletionsBytesEstimate > maxDeletionBytesEstimate {
737 0 : if invariants.Enabled {
738 0 : panic(fmt.Sprintf("%s: table %s has extreme deletion bytes estimates: point=%d range=%d",
739 0 : info, meta.TableNum,
740 0 : redact.Safe(stats.PointDeletionsBytesEstimate),
741 0 : redact.Safe(stats.RangeDeletionsBytesEstimate),
742 0 : ))
743 : }
744 0 : if v := lastSanityCheckStatsLog.Load(); v == 0 || v.Elapsed() > 30*time.Second {
745 0 : logger.Errorf("%s: table %s has extreme deletion bytes estimates: point=%d range=%d",
746 0 : info, meta.TableNum,
747 0 : redact.Safe(stats.PointDeletionsBytesEstimate),
748 0 : redact.Safe(stats.RangeDeletionsBytesEstimate),
749 0 : )
750 0 : lastSanityCheckStatsLog.Store(crtime.NowMono())
751 0 : }
752 : }
753 : }
754 :
755 : // estimateDiskUsageInTableAndBlobReferences estimates the disk usage within a
756 : // sstable and its referenced values. The size of blob files is computed using
757 : // linear interpolation.
758 : func estimateDiskUsageInTableAndBlobReferences(
759 : r *sstable.Reader, start, end []byte, env sstable.ReadEnv, meta *manifest.TableMetadata,
760 1 : ) (uint64, error) {
761 1 : size, err := r.EstimateDiskUsage(start, end, env, meta.IterTransforms())
762 1 : if err != nil {
763 0 : return 0, err
764 0 : }
765 :
766 1 : estimatedTableSize := max(size, 1)
767 1 : originalTableSize := max(meta.Size, 1)
768 1 : referenceSize := crmath.ScaleUint64(meta.EstimatedReferenceSize(),
769 1 : estimatedTableSize, originalTableSize)
770 1 : return size + referenceSize, nil
771 : }
772 :
773 : // maybeSetStatsFromProperties sets the table backing properties and attempts to
774 : // set the table stats from the properties, for a table that was created by an
775 : // ingestion or compaction.
776 1 : func maybeSetStatsFromProperties(meta *manifest.TableMetadata, props *sstable.Properties) bool {
777 1 : meta.TableBacking.PopulateProperties(props)
778 1 : if invariants.Enabled && meta.Virtual {
779 0 : panic("table expected to be physical")
780 : }
781 : // If a table contains any deletions, we defer the stats collection. There
782 : // are two main reasons for this:
783 : //
784 : // 1. Estimating the potential for reclaimed space due to a deletion
785 : // requires scanning the LSM - a potentially expensive operation that
786 : // should be deferred.
787 : // 2. Range deletions present an opportunity to compute "deletion hints",
788 : // which also requires a scan of the LSM to compute tables that would be
789 : // eligible for deletion.
790 : //
791 : // These two tasks are deferred to the table stats collector goroutine.
792 : //
793 : // Note that even if the point deletions are sized (DELSIZEDs), an accurate
794 : // compression ratio is necessary to calculate an accurate estimate of the
795 : // physical disk space they reclaim. To do that, we need to scan the LSM
796 : // beneath the file.
797 1 : if props.NumDeletions != 0 || props.NumRangeKeyDels != 0 {
798 1 : return false
799 1 : }
800 1 : meta.PopulateStats(new(manifest.TableStats))
801 1 : return true
802 : }
803 :
804 : // pointDeletionBytesEstimate returns an estimation of the total disk space that
805 : // may be dropped by the physical table's point deletions by compacting them.
806 : // The results should be scaled accordingly for virtual tables.
807 : func pointDeletionsBytesEstimate(
808 : props *manifest.TableBackingProperties, avgValLogicalSize, compressionRatio float64,
809 1 : ) (estimate uint64) {
810 1 : if props.NumEntries == 0 {
811 0 : return 0
812 0 : }
813 1 : numPointDels := props.NumPointDeletions()
814 1 : if numPointDels == 0 {
815 0 : return 0
816 0 : }
817 : // Estimate the potential space to reclaim using the table's own properties.
818 : // There may or may not be keys covered by any individual point tombstone.
819 : // If not, compacting the point tombstone into L6 will at least allow us to
820 : // drop the point deletion key and will reclaim the tombstone's key bytes.
821 : // If there are covered key(s), we also get to drop key and value bytes for
822 : // each covered key.
823 : //
824 : // Some point tombstones (DELSIZEDs) carry a user-provided estimate of the
825 : // uncompressed size of entries that will be elided by fully compacting the
826 : // tombstone. For these tombstones, there's no guesswork—we use the
827 : // RawPointTombstoneValueSizeHint property which is the sum of all these
828 : // tombstones' encoded values.
829 : //
830 : // For un-sized point tombstones (DELs), we estimate assuming that each
831 : // point tombstone on average covers 1 key and using average value sizes.
832 : // This is almost certainly an overestimate, but that's probably okay
833 : // because point tombstones can slow range iterations even when they don't
834 : // cover a key.
835 : //
836 : // TODO(jackson): This logic doesn't directly incorporate fixed per-key
837 : // overhead (8-byte trailer, plus at least 1 byte encoding the length of the
838 : // key and 1 byte encoding the length of the value). This overhead is
839 : // indirectly incorporated through the compression ratios, but that results
840 : // in the overhead being smeared per key-byte and value-byte, rather than
841 : // per-entry. This per-key fixed overhead can be nontrivial, especially for
842 : // dense swaths of point tombstones. Give some thought as to whether we
843 : // should directly include fixed per-key overhead in the calculations.
844 :
845 : // Below, we calculate the tombstone contributions and the shadowed keys'
846 : // contributions separately.
847 1 : var tombstonesLogicalSize float64
848 1 : var shadowedLogicalSize float64
849 1 :
850 1 : // 1. Calculate the contribution of the tombstone keys themselves.
851 1 : if props.RawPointTombstoneKeySize > 0 {
852 1 : tombstonesLogicalSize += float64(props.RawPointTombstoneKeySize)
853 1 : } else {
854 0 : // This sstable predates the existence of the RawPointTombstoneKeySize
855 0 : // property. We can use the average key size within the file itself and
856 0 : // the count of point deletions to estimate the size.
857 0 : tombstonesLogicalSize += float64(numPointDels * props.RawKeySize / props.NumEntries)
858 0 : }
859 :
860 : // 2. Calculate the contribution of the keys shadowed by tombstones.
861 : //
862 : // 2a. First account for keys shadowed by DELSIZED tombstones. THE DELSIZED
863 : // tombstones encode the size of both the key and value of the shadowed KV
864 : // entries. These sizes are aggregated into a sstable property.
865 1 : shadowedLogicalSize += float64(props.RawPointTombstoneValueSize)
866 1 :
867 1 : // 2b. Calculate the contribution of the KV entries shadowed by ordinary DEL
868 1 : // keys.
869 1 : numUnsizedDels := invariants.SafeSub(numPointDels, props.NumSizedDeletions)
870 1 : {
871 1 : // The shadowed keys have the same exact user keys as the tombstones
872 1 : // themselves, so we can use the `tombstonesLogicalSize` we computed
873 1 : // earlier as an estimate. There's a complication that
874 1 : // `tombstonesLogicalSize` may include DELSIZED keys we already
875 1 : // accounted for.
876 1 : shadowedLogicalSize += tombstonesLogicalSize / float64(numPointDels) * float64(numUnsizedDels)
877 1 :
878 1 : // Calculate the contribution of the deleted values. The caller has
879 1 : // already computed an average logical size (possibly computed across
880 1 : // many sstables).
881 1 : shadowedLogicalSize += float64(numUnsizedDels) * avgValLogicalSize
882 1 : }
883 :
884 : // Scale both tombstone and shadowed totals by logical:physical ratios to
885 : // account for compression, metadata overhead, etc.
886 : //
887 : // Physical FileSize
888 : // ----------- = -----------------------
889 : // Logical RawKeySize+RawValueSize
890 : //
891 1 : return uint64((tombstonesLogicalSize + shadowedLogicalSize) * compressionRatio)
892 : }
893 :
894 : // newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
895 : // returns "ranged deletion" spans for a single table, providing a combined view
896 : // of both range deletion and range key deletion spans. The
897 : // tableRangedDeletionIter is intended for use in the specific case of computing
898 : // the statistics and deleteCompactionHints for a single table.
899 : //
900 : // As an example, consider the following set of spans from the range deletion
901 : // and range key blocks of a table:
902 : //
903 : // |---------| |---------| |-------| RANGEKEYDELs
904 : // |-----------|-------------| |-----| RANGEDELs
905 : // __________________________________________________________
906 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
907 : //
908 : // The tableRangedDeletionIter produces the following set of output spans, where
909 : // '1' indicates a span containing only range deletions, '2' is a span
910 : // containing only range key deletions, and '3' is a span containing a mixture
911 : // of both range deletions and range key deletions.
912 : //
913 : // 1 3 1 3 2 1 3 2
914 : // |-----|---------|-----|---|-----| |---|-|-----|
915 : // __________________________________________________________
916 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
917 : //
918 : // Algorithm.
919 : //
920 : // The iterator first defragments the range deletion and range key blocks
921 : // separately. During this defragmentation, the range key block is also filtered
922 : // so that keys other than range key deletes are ignored. The range delete and
923 : // range key delete keyspaces are then merged.
924 : //
925 : // Note that the only fragmentation introduced by merging is from where a range
926 : // del span overlaps with a range key del span. Within the bounds of any overlap
927 : // there is guaranteed to be no further fragmentation, as the constituent spans
928 : // have already been defragmented. To the left and right of any overlap, the
929 : // same reasoning applies. For example,
930 : //
931 : // |--------| |-------| RANGEKEYDEL
932 : // |---------------------------| RANGEDEL
933 : // |----1---|----3---|----1----|---2---| Merged, fragmented spans.
934 : // __________________________________________________________
935 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
936 : //
937 : // Any fragmented abutting spans produced by the merging iter will be of
938 : // differing types (i.e. a transition from a span with homogenous key kinds to a
939 : // heterogeneous span, or a transition from a span with exclusively range dels
940 : // to a span with exclusively range key dels). Therefore, further
941 : // defragmentation is not required.
942 : //
943 : // Each span returned by the tableRangeDeletionIter will have at most four keys,
944 : // corresponding to the largest and smallest sequence numbers encountered across
945 : // the range deletes and range keys deletes that comprised the merged spans.
946 : func newCombinedDeletionKeyspanIter(
947 : ctx context.Context,
948 : comparer *base.Comparer,
949 : r *sstable.Reader,
950 : m *manifest.TableMetadata,
951 : env sstable.ReadEnv,
952 1 : ) (keyspan.FragmentIterator, error) {
953 1 : // The range del iter and range key iter are each wrapped in their own
954 1 : // defragmenting iter. For each iter, abutting spans can always be merged.
955 1 : var equal = keyspan.DefragmentMethodFunc(func(_ base.CompareRangeSuffixes, a, b *keyspan.Span) bool { return true })
956 : // Reduce keys by maintaining a slice of at most length two, corresponding to
957 : // the largest and smallest keys in the defragmented span. This maintains the
958 : // contract that the emitted slice is sorted by (SeqNum, Kind) descending.
959 1 : reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
960 1 : if len(current) == 0 && len(incoming) == 0 {
961 0 : // While this should never occur in practice, a defensive return is used
962 0 : // here to preserve correctness.
963 0 : return current
964 0 : }
965 1 : var largest, smallest keyspan.Key
966 1 : var set bool
967 1 : for _, keys := range [2][]keyspan.Key{current, incoming} {
968 1 : if len(keys) == 0 {
969 0 : continue
970 : }
971 1 : first, last := keys[0], keys[len(keys)-1]
972 1 : if !set {
973 1 : largest, smallest = first, last
974 1 : set = true
975 1 : continue
976 : }
977 1 : if first.Trailer > largest.Trailer {
978 1 : largest = first
979 1 : }
980 1 : if last.Trailer < smallest.Trailer {
981 1 : smallest = last
982 1 : }
983 : }
984 1 : if largest.Equal(comparer.CompareRangeSuffixes, smallest) {
985 1 : current = append(current[:0], largest)
986 1 : } else {
987 1 : current = append(current[:0], largest, smallest)
988 1 : }
989 1 : return current
990 : }
991 :
992 : // The separate iters for the range dels and range keys are wrapped in a
993 : // merging iter to join the keyspaces into a single keyspace. The separate
994 : // iters are only added if the particular key kind is present.
995 1 : mIter := &keyspanimpl.MergingIter{}
996 1 : var transform = keyspan.TransformerFunc(func(_ base.CompareRangeSuffixes, in keyspan.Span, out *keyspan.Span) error {
997 1 : if in.KeysOrder != keyspan.ByTrailerDesc {
998 0 : return base.AssertionFailedf("combined deletion iter encountered keys in non-trailer descending order")
999 0 : }
1000 1 : out.Start, out.End = in.Start, in.End
1001 1 : out.Keys = append(out.Keys[:0], in.Keys...)
1002 1 : out.KeysOrder = keyspan.ByTrailerDesc
1003 1 : // NB: The order of by-trailer descending may have been violated,
1004 1 : // because we've layered rangekey and rangedel iterators from the same
1005 1 : // sstable into the same keyspanimpl.MergingIter. The MergingIter will
1006 1 : // return the keys in the order that the child iterators were provided.
1007 1 : // Sort the keys to ensure they're sorted by trailer descending.
1008 1 : keyspan.SortKeysByTrailer(out.Keys)
1009 1 : return nil
1010 : })
1011 1 : mIter.Init(comparer, transform, new(keyspanimpl.MergingBuffers))
1012 1 : iter, err := r.NewRawRangeDelIter(ctx, m.FragmentIterTransforms(), env)
1013 1 : if err != nil {
1014 0 : return nil, err
1015 0 : }
1016 1 : if iter != nil {
1017 1 : // Assert expected bounds. In previous versions of Pebble, range
1018 1 : // deletions persisted to sstables could exceed the bounds of the
1019 1 : // containing files due to "split user keys." This required readers to
1020 1 : // constrain the tombstones' bounds to the containing file at read time.
1021 1 : // See docs/range_deletions.md for an extended discussion of the design
1022 1 : // and invariants at that time.
1023 1 : //
1024 1 : // We've since compacted away all 'split user-keys' and in the process
1025 1 : // eliminated all "untruncated range tombstones" for physical sstables.
1026 1 : // We no longer need to perform truncation at read time for these
1027 1 : // sstables.
1028 1 : //
1029 1 : // At the same time, we've also introduced the concept of "virtual
1030 1 : // SSTables" where the table metadata's effective bounds can again be
1031 1 : // reduced to be narrower than the contained tombstones. These virtual
1032 1 : // SSTables handle truncation differently, performing it using
1033 1 : // keyspan.Truncate when the sstable's range deletion iterator is
1034 1 : // opened.
1035 1 : //
1036 1 : // Together, these mean that we should never see untruncated range
1037 1 : // tombstones any more—and the merging iterator no longer accounts for
1038 1 : // their existence. Since there's abundant subtlety that we're relying
1039 1 : // on, we choose to be conservative and assert that these invariants
1040 1 : // hold. We could (and previously did) choose to only validate these
1041 1 : // bounds in invariants builds, but the most likely avenue for these
1042 1 : // tombstones' existence is through a bug in a migration and old data
1043 1 : // sitting around in an old store from long ago.
1044 1 : //
1045 1 : // The table stats collector will read all files' range deletions
1046 1 : // asynchronously after Open, and provides a perfect opportunity to
1047 1 : // validate our invariants without harming user latency. We also
1048 1 : // previously performed truncation here which similarly required key
1049 1 : // comparisons, so replacing those key comparisons with assertions
1050 1 : // should be roughly similar in performance.
1051 1 : //
1052 1 : // TODO(jackson): Only use AssertBounds in invariants builds in the
1053 1 : // following release.
1054 1 : iter = keyspan.AssertBounds(
1055 1 : iter, m.PointKeyBounds.Smallest(), m.PointKeyBounds.LargestUserKey(), comparer.Compare,
1056 1 : )
1057 1 : dIter := &keyspan.DefragmentingIter{}
1058 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1059 1 : iter = dIter
1060 1 : mIter.AddLevel(iter)
1061 1 : }
1062 :
1063 1 : iter, err = r.NewRawRangeKeyIter(ctx, m.FragmentIterTransforms(), env)
1064 1 : if err != nil {
1065 0 : return nil, err
1066 0 : }
1067 1 : if iter != nil {
1068 1 : // Assert expected bounds in tests.
1069 1 : if invariants.Sometimes(50) {
1070 1 : if m.HasRangeKeys {
1071 1 : iter = keyspan.AssertBounds(
1072 1 : iter, m.RangeKeyBounds.Smallest(), m.RangeKeyBounds.LargestUserKey(), comparer.Compare,
1073 1 : )
1074 1 : }
1075 : }
1076 : // Wrap the range key iterator in a filter that elides keys other than range
1077 : // key deletions.
1078 1 : iter = keyspan.Filter(iter, func(in *keyspan.Span, buf []keyspan.Key) []keyspan.Key {
1079 1 : keys := buf[:0]
1080 1 : for _, k := range in.Keys {
1081 1 : if k.Kind() != base.InternalKeyKindRangeKeyDelete {
1082 1 : continue
1083 : }
1084 1 : keys = append(keys, k)
1085 : }
1086 1 : return keys
1087 : }, comparer.Compare)
1088 1 : dIter := &keyspan.DefragmentingIter{}
1089 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1090 1 : iter = dIter
1091 1 : mIter.AddLevel(iter)
1092 : }
1093 :
1094 1 : return mIter, nil
1095 : }
1096 :
1097 : type deletionBytes struct {
1098 : // PointDels contains a sum of TableStats.PointDeletionsBytesEstimate.
1099 : PointDels uint64
1100 : // RangeDels contains a sum of TableStats.RangeDeletionsBytesEstimate.
1101 : RangeDels uint64
1102 : }
1103 :
1104 : var deletionBytesAnnotator = manifest.MakeTableAnnotator[deletionBytes](
1105 : manifest.NewTableAnnotationIdx(),
1106 : manifest.TableAnnotatorFuncs[deletionBytes]{
1107 1 : Merge: func(dst *deletionBytes, src deletionBytes) {
1108 1 : dst.PointDels += src.PointDels
1109 1 : dst.RangeDels += src.RangeDels
1110 1 : },
1111 1 : Table: func(t *manifest.TableMetadata) (v deletionBytes, cacheOK bool) {
1112 1 : if stats, ok := t.Stats(); ok {
1113 1 : return deletionBytes{
1114 1 : PointDels: stats.PointDeletionsBytesEstimate,
1115 1 : RangeDels: stats.RangeDeletionsBytesEstimate,
1116 1 : }, true
1117 1 : }
1118 1 : return deletionBytes{}, false
1119 : },
1120 : },
1121 : )
1122 :
1123 : // annotatedTableProps are properties derived from TableBackingProperties that
1124 : // are aggregated for metrics.
1125 : type aggregatedTableProps struct {
1126 : // NumRangeKeySets is the sum of the tables' counts of range key fragments.
1127 : NumRangeKeySets uint64
1128 : // NumDeletions is the sum of the tables' counts of tombstones (DEL, SINGLEDEL
1129 : // and RANGEDEL keys).
1130 : NumDeletions uint64
1131 : // ValueBlocksSize is the sum of the tables' Properties.ValueBlocksSize.
1132 : ValueBlocksSize uint64
1133 :
1134 : CompressionMetrics CompressionMetrics
1135 : }
1136 :
1137 : var tablePropsAnnotator = manifest.MakeTableAnnotator[aggregatedTableProps](
1138 : manifest.NewTableAnnotationIdx(),
1139 : manifest.TableAnnotatorFuncs[aggregatedTableProps]{
1140 0 : Merge: func(dst *aggregatedTableProps, src aggregatedTableProps) {
1141 0 : dst.NumRangeKeySets += src.NumRangeKeySets
1142 0 : dst.NumDeletions += src.NumDeletions
1143 0 : dst.ValueBlocksSize += src.ValueBlocksSize
1144 0 : dst.CompressionMetrics.MergeWith(&src.CompressionMetrics)
1145 0 : },
1146 0 : Table: func(t *manifest.TableMetadata) (v aggregatedTableProps, cacheOK bool) {
1147 0 : props, propsValid := t.TableBacking.Properties()
1148 0 : if propsValid {
1149 0 : v.NumRangeKeySets = props.NumRangeKeySets
1150 0 : v.NumDeletions = props.NumDeletions
1151 0 : v.ValueBlocksSize = props.ValueBlocksSize
1152 0 : }
1153 0 : if !propsValid || props.CompressionStats.IsEmpty() {
1154 0 : v.CompressionMetrics.CompressedBytesWithoutStats = t.ScaleStatistic(t.Size)
1155 0 : } else {
1156 0 : compressionStats := props.CompressionStats
1157 0 : if t.Virtual {
1158 0 : // Scale the compression stats for virtual tables.
1159 0 : compressionStats = compressionStats.Scale(t.Size, t.TableBacking.Size)
1160 0 : }
1161 0 : v.CompressionMetrics.Add(&compressionStats)
1162 : }
1163 0 : return v, propsValid
1164 : },
1165 : })
1166 :
1167 : // compressionStatsAnnotator is a manifest.TableAnnotator that annotates B-tree nodes
1168 : // with the compression statistics for tables. Its annotation type is
1169 : // block.CompressionStats. The compression type may change once a table's stats
1170 : // are loaded asynchronously, so its values are marked as cacheable only if a
1171 : // file's stats have been loaded. Statistics for virtual tables are estimated
1172 : // from the physical table statistics, proportional to the estimated virtual
1173 : // table size.
1174 : var blobCompressionStatsAnnotator = manifest.MakeBlobFileAnnotator[CompressionMetrics](
1175 : manifest.NewBlobAnnotationIdx(),
1176 : manifest.BlobFileAnnotatorFuncs[CompressionMetrics]{
1177 0 : Merge: func(dst *CompressionMetrics, src CompressionMetrics) {
1178 0 : dst.MergeWith(&src)
1179 0 : },
1180 0 : BlobFile: func(f manifest.BlobFileMetadata) (v CompressionMetrics, cacheOK bool) {
1181 0 : props, propsValid := f.Physical.Properties()
1182 0 : if !propsValid || props.CompressionStats.IsEmpty() {
1183 0 : v.CompressedBytesWithoutStats += f.Physical.Size
1184 0 : return v, propsValid
1185 0 : }
1186 0 : compressionStats := props.CompressionStats
1187 0 : v.Add(&compressionStats)
1188 0 : return v, true
1189 : },
1190 : },
1191 : )
|