Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "math"
11 : "slices"
12 : "time"
13 :
14 : "github.com/cockroachdb/crlib/crtime"
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
20 : "github.com/cockroachdb/pebble/internal/manifest"
21 : "github.com/cockroachdb/pebble/sstable"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : "github.com/cockroachdb/redact"
24 : )
25 :
26 : // In-memory statistics about tables help inform compaction picking, but may
27 : // be expensive to calculate or load from disk. Every time a database is
28 : // opened, these statistics must be reloaded or recalculated. To minimize
29 : // impact on user activity and compactions, we load these statistics
30 : // asynchronously in the background and store loaded statistics in each
31 : // table's *TableMetadata.
32 : //
33 : // This file implements the asynchronous loading of statistics by maintaining
34 : // a list of files that require statistics, alongside their LSM levels.
35 : // Whenever new files are added to the LSM, the files are appended to
36 : // d.mu.tableStats.pending. If a stats collection job is not currently
37 : // running, one is started in a separate goroutine.
38 : //
39 : // The stats collection job grabs and clears the pending list, computes table
40 : // statistics relative to the current readState and updates the tables' file
41 : // metadata. New pending files may accumulate during a stats collection job,
42 : // so a completing job triggers a new job if necessary. Only one job runs at a
43 : // time.
44 : //
45 : // When an existing database is opened, all files lack in-memory statistics.
46 : // These files' stats are loaded incrementally whenever the pending list is
47 : // empty by scanning a current readState for files missing statistics. Once a
48 : // job completes a scan without finding any remaining files without
49 : // statistics, it flips a `loadedInitial` flag. From then on, the stats
50 : // collection job only needs to load statistics for new files appended to the
51 : // pending list.
52 :
53 2 : func (d *DB) maybeCollectTableStatsLocked() {
54 2 : if d.shouldCollectTableStatsLocked() {
55 2 : go d.collectTableStats()
56 2 : }
57 : }
58 :
59 : // updateTableStatsLocked is called when new files are introduced, after the
60 : // read state has been updated. It may trigger a new stat collection.
61 : // DB.mu must be locked when calling.
62 2 : func (d *DB) updateTableStatsLocked(newTables []manifest.NewTableEntry) {
63 2 : var needStats bool
64 2 : for _, nf := range newTables {
65 2 : if !nf.Meta.StatsValid() {
66 2 : needStats = true
67 2 : break
68 : }
69 : }
70 2 : if !needStats {
71 2 : return
72 2 : }
73 :
74 2 : d.mu.tableStats.pending = append(d.mu.tableStats.pending, newTables...)
75 2 : d.maybeCollectTableStatsLocked()
76 : }
77 :
78 2 : func (d *DB) shouldCollectTableStatsLocked() bool {
79 2 : return !d.mu.tableStats.loading &&
80 2 : d.closed.Load() == nil &&
81 2 : !d.opts.DisableTableStats &&
82 2 : (len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
83 2 : }
84 :
85 : // collectTableStats runs a table stats collection job, returning true if the
86 : // invocation did the collection work, false otherwise (e.g. if another job was
87 : // already running).
88 2 : func (d *DB) collectTableStats() bool {
89 2 : const maxTableStatsPerScan = 50
90 2 :
91 2 : d.mu.Lock()
92 2 : if !d.shouldCollectTableStatsLocked() {
93 2 : d.mu.Unlock()
94 2 : return false
95 2 : }
96 2 : ctx := context.Background()
97 2 :
98 2 : pending := d.mu.tableStats.pending
99 2 : d.mu.tableStats.pending = nil
100 2 : d.mu.tableStats.loading = true
101 2 : jobID := d.newJobIDLocked()
102 2 : loadedInitial := d.mu.tableStats.loadedInitial
103 2 : // Drop DB.mu before performing IO.
104 2 : d.mu.Unlock()
105 2 :
106 2 : // Every run of collectTableStats either collects stats from the pending
107 2 : // list (if non-empty) or from scanning the version (loadedInitial is
108 2 : // false). This job only runs if at least one of those conditions holds.
109 2 :
110 2 : // Grab a read state to scan for tables.
111 2 : rs := d.loadReadState()
112 2 : var collected []collectedStats
113 2 : var hints []deleteCompactionHint
114 2 : if len(pending) > 0 {
115 2 : collected, hints = d.loadNewFileStats(ctx, rs, pending)
116 2 : } else {
117 2 : var moreRemain bool
118 2 : var buf [maxTableStatsPerScan]collectedStats
119 2 : collected, hints, moreRemain = d.scanReadStateTableStats(ctx, rs, buf[:0])
120 2 : loadedInitial = !moreRemain
121 2 : }
122 2 : rs.unref()
123 2 :
124 2 : // Update the TableMetadata with the loaded stats while holding d.mu.
125 2 : d.mu.Lock()
126 2 : defer d.mu.Unlock()
127 2 : d.mu.tableStats.loading = false
128 2 : if loadedInitial && !d.mu.tableStats.loadedInitial {
129 2 : d.mu.tableStats.loadedInitial = loadedInitial
130 2 : d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
131 2 : JobID: int(jobID),
132 2 : })
133 2 : }
134 :
135 2 : maybeCompact := false
136 2 : for _, c := range collected {
137 2 : c.TableMetadata.Stats = c.TableStats
138 2 : maybeCompact = maybeCompact || tableTombstoneCompensation(c.TableMetadata) > 0
139 2 : sanityCheckStats(c.TableMetadata, d.opts.Logger, "collected stats")
140 2 : c.TableMetadata.StatsMarkValid()
141 2 : }
142 :
143 2 : d.mu.tableStats.cond.Broadcast()
144 2 : d.maybeCollectTableStatsLocked()
145 2 : if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
146 2 : // Verify that all of the hint tombstones' files still exist in the
147 2 : // current version. Otherwise, the tombstone itself may have been
148 2 : // compacted into L6 and more recent keys may have had their sequence
149 2 : // numbers zeroed.
150 2 : //
151 2 : // Note that it's possible that the tombstone file is being compacted
152 2 : // presently. In that case, the file will be present in v. When the
153 2 : // compaction finishes compacting the tombstone file, it will detect
154 2 : // and clear the hint.
155 2 : //
156 2 : // See DB.maybeUpdateDeleteCompactionHints.
157 2 : v := d.mu.versions.currentVersion()
158 2 : keepHints := hints[:0]
159 2 : for _, h := range hints {
160 2 : if v.Contains(h.tombstoneLevel, h.tombstoneFile) {
161 2 : keepHints = append(keepHints, h)
162 2 : }
163 : }
164 2 : d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
165 : }
166 2 : if maybeCompact {
167 2 : d.maybeScheduleCompaction()
168 2 : }
169 2 : return true
170 : }
171 :
172 : type collectedStats struct {
173 : *manifest.TableMetadata
174 : manifest.TableStats
175 : }
176 :
177 : func (d *DB) loadNewFileStats(
178 : ctx context.Context, rs *readState, pending []manifest.NewTableEntry,
179 2 : ) ([]collectedStats, []deleteCompactionHint) {
180 2 : var hints []deleteCompactionHint
181 2 : collected := make([]collectedStats, 0, len(pending))
182 2 : for _, nf := range pending {
183 2 : // A file's stats might have been populated by an earlier call to
184 2 : // loadNewFileStats if the file was moved.
185 2 : // NB: We're not holding d.mu which protects f.Stats, but only
186 2 : // collectTableStats updates f.Stats for active files, and we
187 2 : // ensure only one goroutine runs it at a time through
188 2 : // d.mu.tableStats.loading.
189 2 : if nf.Meta.StatsValid() {
190 2 : continue
191 : }
192 :
193 : // The file isn't guaranteed to still be live in the readState's
194 : // version. It may have been deleted or moved. Skip it if it's not in
195 : // the expected level.
196 2 : if !rs.current.Contains(nf.Level, nf.Meta) {
197 2 : continue
198 : }
199 :
200 2 : stats, newHints, err := d.loadTableStats(ctx, rs.current, nf.Level, nf.Meta)
201 2 : if err != nil {
202 1 : d.opts.EventListener.BackgroundError(err)
203 1 : continue
204 : }
205 : // NB: We don't update the TableMetadata yet, because we aren't holding
206 : // DB.mu. We'll copy it to the TableMetadata after we're finished with
207 : // IO.
208 2 : collected = append(collected, collectedStats{
209 2 : TableMetadata: nf.Meta,
210 2 : TableStats: stats,
211 2 : })
212 2 : hints = append(hints, newHints...)
213 : }
214 2 : return collected, hints
215 : }
216 :
217 : // scanReadStateTableStats is run by an active stat collection job when there
218 : // are no pending new files, but there might be files that existed at Open for
219 : // which we haven't loaded table stats.
220 : func (d *DB) scanReadStateTableStats(
221 : ctx context.Context, rs *readState, fill []collectedStats,
222 2 : ) ([]collectedStats, []deleteCompactionHint, bool) {
223 2 : moreRemain := false
224 2 : var hints []deleteCompactionHint
225 2 : sizesChecked := make(map[base.DiskFileNum]struct{})
226 2 : for l, levelMetadata := range rs.current.Levels {
227 2 : for f := range levelMetadata.All() {
228 2 : // NB: We're not holding d.mu which protects f.Stats, but only the
229 2 : // active stats collection job updates f.Stats for active files,
230 2 : // and we ensure only one goroutine runs it at a time through
231 2 : // d.mu.tableStats.loading. This makes it safe to read validity
232 2 : // through f.Stats.ValidLocked despite not holding d.mu.
233 2 : if f.StatsValid() {
234 2 : continue
235 : }
236 :
237 : // Limit how much work we do per read state. The older the read
238 : // state is, the higher the likelihood files are no longer being
239 : // used in the current version. If we've exhausted our allowance,
240 : // return true for the last return value to signal there's more
241 : // work to do.
242 2 : if len(fill) == cap(fill) {
243 2 : moreRemain = true
244 2 : return fill, hints, moreRemain
245 2 : }
246 :
247 : // If the file is remote and not SharedForeign, we should check if its size
248 : // matches. This is because checkConsistency skips over remote files.
249 : //
250 : // SharedForeign and External files are skipped as their sizes are allowed
251 : // to have a mismatch; the size stored in the TableBacking is just the part
252 : // of the file that is referenced by this Pebble instance, not the size of
253 : // the whole object.
254 2 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
255 2 : if err != nil {
256 0 : // Set `moreRemain` so we'll try again.
257 0 : moreRemain = true
258 0 : d.opts.EventListener.BackgroundError(err)
259 0 : continue
260 : }
261 :
262 2 : shouldCheckSize := objMeta.IsRemote() &&
263 2 : !d.objProvider.IsSharedForeign(objMeta) &&
264 2 : !objMeta.IsExternal()
265 2 : if _, ok := sizesChecked[f.TableBacking.DiskFileNum]; !ok && shouldCheckSize {
266 2 : size, err := d.objProvider.Size(objMeta)
267 2 : fileSize := f.TableBacking.Size
268 2 : if err != nil {
269 0 : moreRemain = true
270 0 : d.opts.EventListener.BackgroundError(err)
271 0 : continue
272 : }
273 2 : if size != int64(fileSize) {
274 0 : err := errors.Errorf(
275 0 : "during consistency check in loadTableStats: L%d: %s: object size mismatch (%s): %d (provider) != %d (MANIFEST)",
276 0 : errors.Safe(l), f.TableNum, d.objProvider.Path(objMeta),
277 0 : errors.Safe(size), errors.Safe(fileSize))
278 0 : d.opts.EventListener.BackgroundError(err)
279 0 : d.opts.Logger.Fatalf("%s", err)
280 0 : }
281 :
282 2 : sizesChecked[f.TableBacking.DiskFileNum] = struct{}{}
283 : }
284 :
285 2 : stats, newHints, err := d.loadTableStats(ctx, rs.current, l, f)
286 2 : if err != nil {
287 0 : // Set `moreRemain` so we'll try again.
288 0 : moreRemain = true
289 0 : d.opts.EventListener.BackgroundError(err)
290 0 : continue
291 : }
292 2 : fill = append(fill, collectedStats{
293 2 : TableMetadata: f,
294 2 : TableStats: stats,
295 2 : })
296 2 : hints = append(hints, newHints...)
297 : }
298 : }
299 2 : return fill, hints, moreRemain
300 : }
301 :
302 : func (d *DB) loadTableStats(
303 : ctx context.Context, v *manifest.Version, level int, meta *manifest.TableMetadata,
304 2 : ) (manifest.TableStats, []deleteCompactionHint, error) {
305 2 : var stats manifest.TableStats
306 2 : var compactionHints []deleteCompactionHint
307 2 :
308 2 : err := d.fileCache.withReader(
309 2 : ctx, block.NoReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
310 2 : loadedProps, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
311 2 : if err != nil {
312 0 : return err
313 0 : }
314 2 : props := loadedProps.CommonProperties
315 2 : if meta.Virtual {
316 2 : props = loadedProps.GetScaledProperties(meta.TableBacking.Size, meta.Size)
317 2 : }
318 2 : stats.NumEntries = props.NumEntries
319 2 : stats.NumDeletions = props.NumDeletions
320 2 : stats.NumRangeKeySets = props.NumRangeKeySets
321 2 : stats.ValueBlocksSize = props.ValueBlocksSize
322 2 : stats.RawKeySize = props.RawKeySize
323 2 : stats.RawValueSize = props.RawValueSize
324 2 : stats.CompressionType = block.CompressionProfileByName(props.CompressionName)
325 2 : if props.NumDataBlocks > 0 {
326 2 : stats.TombstoneDenseBlocksRatio = float64(props.NumTombstoneDenseBlocks) / float64(props.NumDataBlocks)
327 2 : }
328 :
329 2 : if props.NumPointDeletions() > 0 {
330 2 : if err = d.loadTablePointKeyStats(ctx, &props, v, level, meta, &stats); err != nil {
331 0 : return
332 0 : }
333 : }
334 2 : if r.Attributes.Intersects(sstable.AttributeRangeDels | sstable.AttributeRangeKeyDels) {
335 2 : compactionHints, err = d.loadTableRangeDelStats(ctx, r, v, level, meta, &stats, env)
336 2 : if err != nil {
337 0 : return
338 0 : }
339 : }
340 2 : return
341 : })
342 2 : if err != nil {
343 1 : return stats, nil, err
344 1 : }
345 2 : return stats, compactionHints, nil
346 : }
347 :
348 : // loadTablePointKeyStats calculates the point key statistics for the given
349 : // table. The provided manifest.TableStats are updated.
350 : func (d *DB) loadTablePointKeyStats(
351 : ctx context.Context,
352 : props *sstable.CommonProperties,
353 : v *manifest.Version,
354 : level int,
355 : meta *manifest.TableMetadata,
356 : stats *manifest.TableStats,
357 2 : ) error {
358 2 : // TODO(jackson): If the file has a wide keyspace, the average
359 2 : // value size beneath the entire file might not be representative
360 2 : // of the size of the keys beneath the point tombstones.
361 2 : // We could write the ranges of 'clusters' of point tombstones to
362 2 : // a sstable property and call averageValueSizeBeneath for each of
363 2 : // these narrower ranges to improve the estimate.
364 2 : avgValLogicalSize, compressionRatio, err := d.estimateSizesBeneath(ctx, v, level, meta, props)
365 2 : if err != nil {
366 0 : return err
367 0 : }
368 2 : stats.PointDeletionsBytesEstimate =
369 2 : pointDeletionsBytesEstimate(props, avgValLogicalSize, compressionRatio)
370 2 : return nil
371 : }
372 :
373 : // loadTableRangeDelStats calculates the range deletion and range key deletion
374 : // statistics for the given table.
375 : func (d *DB) loadTableRangeDelStats(
376 : ctx context.Context,
377 : r *sstable.Reader,
378 : v *manifest.Version,
379 : level int,
380 : meta *manifest.TableMetadata,
381 : stats *manifest.TableStats,
382 : env sstable.ReadEnv,
383 2 : ) ([]deleteCompactionHint, error) {
384 2 : iter, err := newCombinedDeletionKeyspanIter(ctx, d.opts.Comparer, r, meta, env)
385 2 : if err != nil {
386 0 : return nil, err
387 0 : }
388 2 : defer iter.Close()
389 2 : var compactionHints []deleteCompactionHint
390 2 : // We iterate over the defragmented range tombstones and range key deletions,
391 2 : // which ensures we don't double count ranges deleted at different sequence
392 2 : // numbers. Also, merging abutting tombstones reduces the number of calls to
393 2 : // estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
394 2 : // our overall estimate.
395 2 : s, err := iter.First()
396 2 : for ; s != nil; s, err = iter.Next() {
397 2 : start, end := s.Start, s.End
398 2 : // We only need to consider deletion size estimates for tables that contain
399 2 : // RANGEDELs.
400 2 : var maxRangeDeleteSeqNum base.SeqNum
401 2 : for _, k := range s.Keys {
402 2 : if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
403 2 : maxRangeDeleteSeqNum = k.SeqNum()
404 2 : break
405 : }
406 : }
407 :
408 : // If the file is in the last level of the LSM, there is no data beneath
409 : // it. The fact that there is still a range tombstone in a bottommost file
410 : // indicates two possibilites:
411 : // 1. an open snapshot kept the tombstone around, and the data the
412 : // tombstone deletes is contained within the file itself.
413 : // 2. the file was ingested.
414 : // In the first case, we'd like to estimate disk usage within the file
415 : // itself since compacting the file will drop that covered data. In the
416 : // second case, we expect that compacting the file will NOT drop any
417 : // data and rewriting the file is a waste of write bandwidth. We can
418 : // distinguish these cases by looking at the table metadata's sequence
419 : // numbers. A file's range deletions can only delete data within the
420 : // file at lower sequence numbers. All keys in an ingested sstable adopt
421 : // the same sequence number, preventing tombstones from deleting keys
422 : // within the same file. We check here if the largest RANGEDEL sequence
423 : // number is greater than the file's smallest sequence number. If it is,
424 : // the RANGEDEL could conceivably (although inconclusively) delete data
425 : // within the same file.
426 : //
427 : // Note that this heuristic is imperfect. If a table containing a range
428 : // deletion is ingested into L5 and subsequently compacted into L6 but
429 : // an open snapshot prevents elision of covered keys in L6, the
430 : // resulting RangeDeletionsBytesEstimate will incorrectly include all
431 : // covered keys.
432 : //
433 : // TODO(jackson): We could prevent the above error in the heuristic by
434 : // computing the file's RangeDeletionsBytesEstimate during the
435 : // compaction itself. It's unclear how common this is.
436 : //
437 : // NOTE: If the span `s` wholly contains a table containing range keys,
438 : // the returned size estimate will be slightly inflated by the range key
439 : // block. However, in practice, range keys are expected to be rare, and
440 : // the size of the range key block relative to the overall size of the
441 : // table is expected to be small.
442 2 : if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
443 2 : size, err := r.EstimateDiskUsage(start, end, env)
444 2 : if err != nil {
445 0 : return nil, err
446 0 : }
447 2 : stats.RangeDeletionsBytesEstimate += size
448 2 :
449 2 : // As the file is in the bottommost level, there is no need to collect a
450 2 : // deletion hint.
451 2 : continue
452 : }
453 :
454 : // While the size estimates for point keys should only be updated if this
455 : // span contains a range del, the sequence numbers are required for the
456 : // hint. Unconditionally descend, but conditionally update the estimates.
457 2 : hintType := compactionHintFromKeys(s.Keys)
458 2 : estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(ctx, v, level, start, end, hintType)
459 2 : if err != nil {
460 0 : return nil, err
461 0 : }
462 2 : stats.RangeDeletionsBytesEstimate += estimate
463 2 :
464 2 : // hintSeqNum is the smallest sequence number contained in any
465 2 : // file overlapping with the hint and in a level below it.
466 2 : if hintSeqNum == math.MaxUint64 {
467 2 : continue
468 : }
469 2 : compactionHints = append(compactionHints, deleteCompactionHint{
470 2 : hintType: hintType,
471 2 : start: slices.Clone(start),
472 2 : end: slices.Clone(end),
473 2 : tombstoneFile: meta,
474 2 : tombstoneLevel: level,
475 2 : tombstoneLargestSeqNum: s.LargestSeqNum(),
476 2 : tombstoneSmallestSeqNum: s.SmallestSeqNum(),
477 2 : fileSmallestSeqNum: hintSeqNum,
478 2 : })
479 : }
480 2 : if err != nil {
481 0 : return nil, err
482 0 : }
483 2 : return compactionHints, nil
484 : }
485 :
486 : func (d *DB) estimateSizesBeneath(
487 : ctx context.Context,
488 : v *manifest.Version,
489 : level int,
490 : meta *manifest.TableMetadata,
491 : fileProps *sstable.CommonProperties,
492 2 : ) (avgValueLogicalSize, compressionRatio float64, err error) {
493 2 : // Find all files in lower levels that overlap with meta,
494 2 : // summing their value sizes and entry counts.
495 2 :
496 2 : // Include the file itself. This is important because in some instances, the
497 2 : // computed compression ratio is applied to the tombstones contained within
498 2 : // `meta` itself. If there are no files beneath `meta` in the LSM, we would
499 2 : // calculate a compression ratio of 0 which is not accurate for the file's
500 2 : // own tombstones.
501 2 : var (
502 2 : // TODO(sumeer): The entryCount includes the tombstones, which can be small,
503 2 : // resulting in a lower than expected avgValueLogicalSize. For an example of
504 2 : // this effect see the estimate in testdata/compaction_picker_scores (search
505 2 : // for "point-deletions-bytes-estimate: 163850").
506 2 : fileSum = meta.Size
507 2 : entryCount = fileProps.NumEntries
508 2 : keySum = fileProps.RawKeySize
509 2 : valSum = fileProps.RawValueSize
510 2 : )
511 2 :
512 2 : for l := level + 1; l < numLevels; l++ {
513 2 : for tableBeneath := range v.Overlaps(l, meta.UserKeyBounds()).All() {
514 2 : fileSum += tableBeneath.Size
515 2 : if tableBeneath.StatsValid() {
516 2 : entryCount += tableBeneath.Stats.NumEntries
517 2 : keySum += tableBeneath.Stats.RawKeySize
518 2 : valSum += tableBeneath.Stats.RawValueSize
519 2 : continue
520 : }
521 : // If stats aren't available, we need to read the properties block.
522 2 : err := d.fileCache.withReader(ctx, block.NoReadEnv, tableBeneath, func(v *sstable.Reader, _ sstable.ReadEnv) (err error) {
523 2 : loadedProps, err := v.ReadPropertiesBlock(ctx, nil /* buffer pool */)
524 2 : if err != nil {
525 0 : return err
526 0 : }
527 2 : props := loadedProps.CommonProperties
528 2 : if tableBeneath.Virtual {
529 2 : props = loadedProps.GetScaledProperties(tableBeneath.TableBacking.Size, tableBeneath.Size)
530 2 : }
531 :
532 2 : entryCount += props.NumEntries
533 2 : keySum += props.RawKeySize
534 2 : valSum += props.RawValueSize
535 2 : return nil
536 : })
537 2 : if err != nil {
538 0 : return 0, 0, err
539 0 : }
540 : }
541 : }
542 2 : if entryCount == 0 {
543 0 : return 0, 0, nil
544 0 : }
545 : // RawKeySize and RawValueSize are uncompressed totals. We'll need to scale
546 : // the value sum according to the data size to account for compression,
547 : // index blocks and metadata overhead. Eg:
548 : //
549 : // Compression rate × Average uncompressed value size
550 : //
551 : // ↓
552 : //
553 : // FileSize RawValueSize
554 : // ----------------------- × ------------
555 : // RawKeySize+RawValueSize NumEntries
556 : //
557 : // We return the average logical value size plus the compression ratio,
558 : // leaving the scaling to the caller. This allows the caller to perform
559 : // additional compression ratio scaling if necessary.
560 2 : uncompressedSum := float64(keySum + valSum)
561 2 : compressionRatio = float64(fileSum) / uncompressedSum
562 2 : if compressionRatio > 1 {
563 2 : // We can get huge compression ratios due to the fixed overhead of files
564 2 : // containing a tiny amount of data. By setting this to 1, we are ignoring
565 2 : // that overhead, but we accept that tradeoff since the total bytes in
566 2 : // such overhead is not large.
567 2 : compressionRatio = 1
568 2 : }
569 2 : avgValueLogicalSize = (float64(valSum) / float64(entryCount))
570 2 : return avgValueLogicalSize, compressionRatio, nil
571 : }
572 :
573 : func (d *DB) estimateReclaimedSizeBeneath(
574 : ctx context.Context,
575 : v *manifest.Version,
576 : level int,
577 : start, end []byte,
578 : hintType deleteCompactionHintType,
579 2 : ) (estimate uint64, hintSeqNum base.SeqNum, err error) {
580 2 : // Find all files in lower levels that overlap with the deleted range
581 2 : // [start, end).
582 2 : //
583 2 : // An overlapping file might be completely contained by the range
584 2 : // tombstone, in which case we can count the entire file size in
585 2 : // our estimate without doing any additional I/O.
586 2 : //
587 2 : // Otherwise, estimating the range for the file requires
588 2 : // additional I/O to read the file's index blocks.
589 2 : hintSeqNum = math.MaxUint64
590 2 : // TODO(jbowens): When there are multiple sub-levels in L0 and the RANGEDEL
591 2 : // is from a higher sub-level, we incorrectly skip the files in the lower
592 2 : // sub-levels when estimating this overlap.
593 2 : for l := level + 1; l < numLevels; l++ {
594 2 : for file := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(start, end)).All() {
595 2 : // Determine whether we need to update size estimates and hint seqnums
596 2 : // based on the type of hint and the type of keys in this file.
597 2 : var updateEstimates, updateHints bool
598 2 : switch hintType {
599 2 : case deleteCompactionHintTypePointKeyOnly:
600 2 : // The range deletion byte estimates should only be updated if this
601 2 : // table contains point keys. This ends up being an overestimate in
602 2 : // the case that table also has range keys, but such keys are expected
603 2 : // to contribute a negligible amount of the table's overall size,
604 2 : // relative to point keys.
605 2 : if file.HasPointKeys {
606 2 : updateEstimates = true
607 2 : }
608 : // As the initiating span contained only range dels, hints can only be
609 : // updated if this table does _not_ contain range keys.
610 2 : if !file.HasRangeKeys {
611 2 : updateHints = true
612 2 : }
613 2 : case deleteCompactionHintTypeRangeKeyOnly:
614 2 : // The initiating span contained only range key dels. The estimates
615 2 : // apply only to point keys, and are therefore not updated.
616 2 : updateEstimates = false
617 2 : // As the initiating span contained only range key dels, hints can
618 2 : // only be updated if this table does _not_ contain point keys.
619 2 : if !file.HasPointKeys {
620 2 : updateHints = true
621 2 : }
622 2 : case deleteCompactionHintTypePointAndRangeKey:
623 2 : // Always update the estimates and hints, as this hint type can drop a
624 2 : // file, irrespective of the mixture of keys. Similar to above, the
625 2 : // range del bytes estimates is an overestimate.
626 2 : updateEstimates, updateHints = true, true
627 0 : default:
628 0 : panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
629 : }
630 2 : startCmp := d.cmp(start, file.Smallest().UserKey)
631 2 : endCmp := d.cmp(file.Largest().UserKey, end)
632 2 : if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest().IsExclusiveSentinel()) {
633 2 : // The range fully contains the file, so skip looking it up in table
634 2 : // cache/looking at its indexes and add the full file size.
635 2 : if updateEstimates {
636 2 : estimate += file.Size
637 2 : }
638 2 : if updateHints && hintSeqNum > file.SmallestSeqNum {
639 2 : hintSeqNum = file.SmallestSeqNum
640 2 : }
641 2 : } else if d.cmp(file.Smallest().UserKey, end) <= 0 && d.cmp(start, file.Largest().UserKey) <= 0 {
642 2 : // Partial overlap.
643 2 : if hintType == deleteCompactionHintTypeRangeKeyOnly {
644 2 : // If the hint that generated this overlap contains only range keys,
645 2 : // there is no need to calculate disk usage, as the reclaimable space
646 2 : // is expected to be minimal relative to point keys.
647 2 : continue
648 : }
649 2 : var size uint64
650 2 : err := d.fileCache.withReader(ctx, block.NoReadEnv, file,
651 2 : func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
652 2 : size, err = r.EstimateDiskUsage(start, end, env)
653 2 : return err
654 2 : })
655 2 : if err != nil {
656 0 : return 0, hintSeqNum, err
657 0 : }
658 2 : estimate += size
659 2 : if updateHints && hintSeqNum > file.SmallestSeqNum && d.FormatMajorVersion() >= FormatVirtualSSTables {
660 2 : // If the format major version is past Virtual SSTables, deletion only
661 2 : // hints can also apply to partial overlaps with sstables.
662 2 : hintSeqNum = file.SmallestSeqNum
663 2 : }
664 : }
665 : }
666 : }
667 2 : return estimate, hintSeqNum, nil
668 : }
669 :
670 : var lastSanityCheckStatsLog crtime.AtomicMono
671 :
672 2 : func sanityCheckStats(meta *manifest.TableMetadata, logger Logger, info string) {
673 2 : // Values for PointDeletionsBytesEstimate and RangeDeletionsBytesEstimate that
674 2 : // exceed this value are likely indicative of a bug (eg, underflow).
675 2 : const maxDeletionBytesEstimate = 1 << 50 // 1 PiB
676 2 :
677 2 : if meta.Stats.PointDeletionsBytesEstimate > maxDeletionBytesEstimate ||
678 2 : meta.Stats.RangeDeletionsBytesEstimate > maxDeletionBytesEstimate {
679 0 : if invariants.Enabled {
680 0 : panic(fmt.Sprintf("%s: table %s has extreme deletion bytes estimates: point=%d range=%d",
681 0 : info, meta.TableNum,
682 0 : redact.Safe(meta.Stats.PointDeletionsBytesEstimate),
683 0 : redact.Safe(meta.Stats.RangeDeletionsBytesEstimate),
684 0 : ))
685 : }
686 0 : if v := lastSanityCheckStatsLog.Load(); v == 0 || v.Elapsed() > 30*time.Second {
687 0 : logger.Errorf("%s: table %s has extreme deletion bytes estimates: point=%d range=%d",
688 0 : info, meta.TableNum,
689 0 : redact.Safe(meta.Stats.PointDeletionsBytesEstimate),
690 0 : redact.Safe(meta.Stats.RangeDeletionsBytesEstimate),
691 0 : )
692 0 : lastSanityCheckStatsLog.Store(crtime.NowMono())
693 0 : }
694 : }
695 : }
696 :
697 : func maybeSetStatsFromProperties(
698 : meta *manifest.TableMetadata, props *sstable.CommonProperties, logger Logger,
699 2 : ) bool {
700 2 : // If a table contains range deletions or range key deletions, we defer the
701 2 : // stats collection. There are two main reasons for this:
702 2 : //
703 2 : // 1. Estimating the potential for reclaimed space due to a range deletion
704 2 : // tombstone requires scanning the LSM - a potentially expensive operation
705 2 : // that should be deferred.
706 2 : // 2. Range deletions and / or range key deletions present an opportunity to
707 2 : // compute "deletion hints", which also requires a scan of the LSM to
708 2 : // compute tables that would be eligible for deletion.
709 2 : //
710 2 : // These two tasks are deferred to the table stats collector goroutine.
711 2 : if props.NumRangeDeletions != 0 || props.NumRangeKeyDels != 0 {
712 2 : return false
713 2 : }
714 :
715 : // If a table is more than 10% point deletions without user-provided size
716 : // estimates, don't calculate the PointDeletionsBytesEstimate statistic
717 : // using our limited knowledge. The table stats collector can populate the
718 : // stats and calculate an average of value size of all the tables beneath
719 : // the table in the LSM, which will be more accurate.
720 2 : if unsizedDels := (props.NumDeletions - props.NumSizedDeletions); unsizedDels > props.NumEntries/10 {
721 2 : return false
722 2 : }
723 :
724 2 : var pointEstimate uint64
725 2 : if props.NumEntries > 0 {
726 2 : // Use the file's own average key and value sizes as an estimate. This
727 2 : // doesn't require any additional IO and since the number of point
728 2 : // deletions in the file is low, the error introduced by this crude
729 2 : // estimate is expected to be small.
730 2 : avgValSize, compressionRatio := estimatePhysicalSizes(meta.Size, props)
731 2 : pointEstimate = pointDeletionsBytesEstimate(props, avgValSize, compressionRatio)
732 2 : }
733 :
734 2 : meta.Stats.NumEntries = props.NumEntries
735 2 : meta.Stats.NumDeletions = props.NumDeletions
736 2 : meta.Stats.NumRangeKeySets = props.NumRangeKeySets
737 2 : meta.Stats.PointDeletionsBytesEstimate = pointEstimate
738 2 : meta.Stats.RangeDeletionsBytesEstimate = 0
739 2 : meta.Stats.ValueBlocksSize = props.ValueBlocksSize
740 2 : meta.Stats.RawKeySize = props.RawKeySize
741 2 : meta.Stats.RawValueSize = props.RawValueSize
742 2 : meta.Stats.CompressionType = block.CompressionProfileByName(props.CompressionName)
743 2 : meta.StatsMarkValid()
744 2 : sanityCheckStats(meta, logger, "stats from properties")
745 2 : return true
746 : }
747 :
748 : func pointDeletionsBytesEstimate(
749 : props *sstable.CommonProperties, avgValLogicalSize, compressionRatio float64,
750 2 : ) (estimate uint64) {
751 2 : if props.NumEntries == 0 {
752 0 : return 0
753 0 : }
754 2 : numPointDels := props.NumPointDeletions()
755 2 : if numPointDels == 0 {
756 2 : return 0
757 2 : }
758 : // Estimate the potential space to reclaim using the table's own properties.
759 : // There may or may not be keys covered by any individual point tombstone.
760 : // If not, compacting the point tombstone into L6 will at least allow us to
761 : // drop the point deletion key and will reclaim the tombstone's key bytes.
762 : // If there are covered key(s), we also get to drop key and value bytes for
763 : // each covered key.
764 : //
765 : // Some point tombstones (DELSIZEDs) carry a user-provided estimate of the
766 : // uncompressed size of entries that will be elided by fully compacting the
767 : // tombstone. For these tombstones, there's no guesswork—we use the
768 : // RawPointTombstoneValueSizeHint property which is the sum of all these
769 : // tombstones' encoded values.
770 : //
771 : // For un-sized point tombstones (DELs), we estimate assuming that each
772 : // point tombstone on average covers 1 key and using average value sizes.
773 : // This is almost certainly an overestimate, but that's probably okay
774 : // because point tombstones can slow range iterations even when they don't
775 : // cover a key.
776 : //
777 : // TODO(jackson): This logic doesn't directly incorporate fixed per-key
778 : // overhead (8-byte trailer, plus at least 1 byte encoding the length of the
779 : // key and 1 byte encoding the length of the value). This overhead is
780 : // indirectly incorporated through the compression ratios, but that results
781 : // in the overhead being smeared per key-byte and value-byte, rather than
782 : // per-entry. This per-key fixed overhead can be nontrivial, especially for
783 : // dense swaths of point tombstones. Give some thought as to whether we
784 : // should directly include fixed per-key overhead in the calculations.
785 :
786 : // Below, we calculate the tombstone contributions and the shadowed keys'
787 : // contributions separately.
788 2 : var tombstonesLogicalSize float64
789 2 : var shadowedLogicalSize float64
790 2 :
791 2 : // 1. Calculate the contribution of the tombstone keys themselves.
792 2 : if props.RawPointTombstoneKeySize > 0 {
793 2 : tombstonesLogicalSize += float64(props.RawPointTombstoneKeySize)
794 2 : } else {
795 0 : // This sstable predates the existence of the RawPointTombstoneKeySize
796 0 : // property. We can use the average key size within the file itself and
797 0 : // the count of point deletions to estimate the size.
798 0 : tombstonesLogicalSize += float64(numPointDels * props.RawKeySize / props.NumEntries)
799 0 : }
800 :
801 : // 2. Calculate the contribution of the keys shadowed by tombstones.
802 : //
803 : // 2a. First account for keys shadowed by DELSIZED tombstones. THE DELSIZED
804 : // tombstones encode the size of both the key and value of the shadowed KV
805 : // entries. These sizes are aggregated into a sstable property.
806 2 : shadowedLogicalSize += float64(props.RawPointTombstoneValueSize)
807 2 :
808 2 : // 2b. Calculate the contribution of the KV entries shadowed by ordinary DEL
809 2 : // keys.
810 2 : numUnsizedDels := invariants.SafeSub(numPointDels, props.NumSizedDeletions)
811 2 : {
812 2 : // The shadowed keys have the same exact user keys as the tombstones
813 2 : // themselves, so we can use the `tombstonesLogicalSize` we computed
814 2 : // earlier as an estimate. There's a complication that
815 2 : // `tombstonesLogicalSize` may include DELSIZED keys we already
816 2 : // accounted for.
817 2 : shadowedLogicalSize += float64(tombstonesLogicalSize) / float64(numPointDels) * float64(numUnsizedDels)
818 2 :
819 2 : // Calculate the contribution of the deleted values. The caller has
820 2 : // already computed an average logical size (possibly computed across
821 2 : // many sstables).
822 2 : shadowedLogicalSize += float64(numUnsizedDels) * avgValLogicalSize
823 2 : }
824 :
825 : // Scale both tombstone and shadowed totals by logical:physical ratios to
826 : // account for compression, metadata overhead, etc.
827 : //
828 : // Physical FileSize
829 : // ----------- = -----------------------
830 : // Logical RawKeySize+RawValueSize
831 : //
832 2 : return uint64((tombstonesLogicalSize + shadowedLogicalSize) * compressionRatio)
833 : }
834 :
835 : func estimatePhysicalSizes(
836 : fileSize uint64, props *sstable.CommonProperties,
837 2 : ) (avgValLogicalSize, compressionRatio float64) {
838 2 : // RawKeySize and RawValueSize are uncompressed totals. Scale according to
839 2 : // the data size to account for compression, index blocks and metadata
840 2 : // overhead. Eg:
841 2 : //
842 2 : // Compression rate × Average uncompressed value size
843 2 : //
844 2 : // ↓
845 2 : //
846 2 : // FileSize RawValSize
847 2 : // ----------------------- × ----------
848 2 : // RawKeySize+RawValueSize NumEntries
849 2 : //
850 2 : uncompressedSum := props.RawKeySize + props.RawValueSize
851 2 : compressionRatio = float64(fileSize) / float64(uncompressedSum)
852 2 : if compressionRatio > 1 {
853 2 : // We can get huge compression ratios due to the fixed overhead of files
854 2 : // containing a tiny amount of data. By setting this to 1, we are ignoring
855 2 : // that overhead, but we accept that tradeoff since the total bytes in
856 2 : // such overhead is not large.
857 2 : compressionRatio = 1
858 2 : }
859 2 : avgValLogicalSize = (float64(props.RawValueSize) / float64(props.NumEntries))
860 2 : return avgValLogicalSize, compressionRatio
861 : }
862 :
863 : // newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
864 : // returns "ranged deletion" spans for a single table, providing a combined view
865 : // of both range deletion and range key deletion spans. The
866 : // tableRangedDeletionIter is intended for use in the specific case of computing
867 : // the statistics and deleteCompactionHints for a single table.
868 : //
869 : // As an example, consider the following set of spans from the range deletion
870 : // and range key blocks of a table:
871 : //
872 : // |---------| |---------| |-------| RANGEKEYDELs
873 : // |-----------|-------------| |-----| RANGEDELs
874 : // __________________________________________________________
875 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
876 : //
877 : // The tableRangedDeletionIter produces the following set of output spans, where
878 : // '1' indicates a span containing only range deletions, '2' is a span
879 : // containing only range key deletions, and '3' is a span containing a mixture
880 : // of both range deletions and range key deletions.
881 : //
882 : // 1 3 1 3 2 1 3 2
883 : // |-----|---------|-----|---|-----| |---|-|-----|
884 : // __________________________________________________________
885 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
886 : //
887 : // Algorithm.
888 : //
889 : // The iterator first defragments the range deletion and range key blocks
890 : // separately. During this defragmentation, the range key block is also filtered
891 : // so that keys other than range key deletes are ignored. The range delete and
892 : // range key delete keyspaces are then merged.
893 : //
894 : // Note that the only fragmentation introduced by merging is from where a range
895 : // del span overlaps with a range key del span. Within the bounds of any overlap
896 : // there is guaranteed to be no further fragmentation, as the constituent spans
897 : // have already been defragmented. To the left and right of any overlap, the
898 : // same reasoning applies. For example,
899 : //
900 : // |--------| |-------| RANGEKEYDEL
901 : // |---------------------------| RANGEDEL
902 : // |----1---|----3---|----1----|---2---| Merged, fragmented spans.
903 : // __________________________________________________________
904 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
905 : //
906 : // Any fragmented abutting spans produced by the merging iter will be of
907 : // differing types (i.e. a transition from a span with homogenous key kinds to a
908 : // heterogeneous span, or a transition from a span with exclusively range dels
909 : // to a span with exclusively range key dels). Therefore, further
910 : // defragmentation is not required.
911 : //
912 : // Each span returned by the tableRangeDeletionIter will have at most four keys,
913 : // corresponding to the largest and smallest sequence numbers encountered across
914 : // the range deletes and range keys deletes that comprised the merged spans.
915 : func newCombinedDeletionKeyspanIter(
916 : ctx context.Context,
917 : comparer *base.Comparer,
918 : r *sstable.Reader,
919 : m *manifest.TableMetadata,
920 : env sstable.ReadEnv,
921 2 : ) (keyspan.FragmentIterator, error) {
922 2 : // The range del iter and range key iter are each wrapped in their own
923 2 : // defragmenting iter. For each iter, abutting spans can always be merged.
924 2 : var equal = keyspan.DefragmentMethodFunc(func(_ base.CompareRangeSuffixes, a, b *keyspan.Span) bool { return true })
925 : // Reduce keys by maintaining a slice of at most length two, corresponding to
926 : // the largest and smallest keys in the defragmented span. This maintains the
927 : // contract that the emitted slice is sorted by (SeqNum, Kind) descending.
928 2 : reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
929 2 : if len(current) == 0 && len(incoming) == 0 {
930 0 : // While this should never occur in practice, a defensive return is used
931 0 : // here to preserve correctness.
932 0 : return current
933 0 : }
934 2 : var largest, smallest keyspan.Key
935 2 : var set bool
936 2 : for _, keys := range [2][]keyspan.Key{current, incoming} {
937 2 : if len(keys) == 0 {
938 0 : continue
939 : }
940 2 : first, last := keys[0], keys[len(keys)-1]
941 2 : if !set {
942 2 : largest, smallest = first, last
943 2 : set = true
944 2 : continue
945 : }
946 2 : if first.Trailer > largest.Trailer {
947 2 : largest = first
948 2 : }
949 2 : if last.Trailer < smallest.Trailer {
950 2 : smallest = last
951 2 : }
952 : }
953 2 : if largest.Equal(comparer.CompareRangeSuffixes, smallest) {
954 2 : current = append(current[:0], largest)
955 2 : } else {
956 2 : current = append(current[:0], largest, smallest)
957 2 : }
958 2 : return current
959 : }
960 :
961 : // The separate iters for the range dels and range keys are wrapped in a
962 : // merging iter to join the keyspaces into a single keyspace. The separate
963 : // iters are only added if the particular key kind is present.
964 2 : mIter := &keyspanimpl.MergingIter{}
965 2 : var transform = keyspan.TransformerFunc(func(_ base.CompareRangeSuffixes, in keyspan.Span, out *keyspan.Span) error {
966 2 : if in.KeysOrder != keyspan.ByTrailerDesc {
967 0 : return base.AssertionFailedf("combined deletion iter encountered keys in non-trailer descending order")
968 0 : }
969 2 : out.Start, out.End = in.Start, in.End
970 2 : out.Keys = append(out.Keys[:0], in.Keys...)
971 2 : out.KeysOrder = keyspan.ByTrailerDesc
972 2 : // NB: The order of by-trailer descending may have been violated,
973 2 : // because we've layered rangekey and rangedel iterators from the same
974 2 : // sstable into the same keyspanimpl.MergingIter. The MergingIter will
975 2 : // return the keys in the order that the child iterators were provided.
976 2 : // Sort the keys to ensure they're sorted by trailer descending.
977 2 : keyspan.SortKeysByTrailer(out.Keys)
978 2 : return nil
979 : })
980 2 : mIter.Init(comparer, transform, new(keyspanimpl.MergingBuffers))
981 2 : iter, err := r.NewRawRangeDelIter(ctx, m.FragmentIterTransforms(), env)
982 2 : if err != nil {
983 0 : return nil, err
984 0 : }
985 2 : if iter != nil {
986 2 : // Assert expected bounds. In previous versions of Pebble, range
987 2 : // deletions persisted to sstables could exceed the bounds of the
988 2 : // containing files due to "split user keys." This required readers to
989 2 : // constrain the tombstones' bounds to the containing file at read time.
990 2 : // See docs/range_deletions.md for an extended discussion of the design
991 2 : // and invariants at that time.
992 2 : //
993 2 : // We've since compacted away all 'split user-keys' and in the process
994 2 : // eliminated all "untruncated range tombstones" for physical sstables.
995 2 : // We no longer need to perform truncation at read time for these
996 2 : // sstables.
997 2 : //
998 2 : // At the same time, we've also introduced the concept of "virtual
999 2 : // SSTables" where the table metadata's effective bounds can again be
1000 2 : // reduced to be narrower than the contained tombstones. These virtual
1001 2 : // SSTables handle truncation differently, performing it using
1002 2 : // keyspan.Truncate when the sstable's range deletion iterator is
1003 2 : // opened.
1004 2 : //
1005 2 : // Together, these mean that we should never see untruncated range
1006 2 : // tombstones any more—and the merging iterator no longer accounts for
1007 2 : // their existence. Since there's abundant subtlety that we're relying
1008 2 : // on, we choose to be conservative and assert that these invariants
1009 2 : // hold. We could (and previously did) choose to only validate these
1010 2 : // bounds in invariants builds, but the most likely avenue for these
1011 2 : // tombstones' existence is through a bug in a migration and old data
1012 2 : // sitting around in an old store from long ago.
1013 2 : //
1014 2 : // The table stats collector will read all files' range deletions
1015 2 : // asynchronously after Open, and provides a perfect opportunity to
1016 2 : // validate our invariants without harming user latency. We also
1017 2 : // previously performed truncation here which similarly required key
1018 2 : // comparisons, so replacing those key comparisons with assertions
1019 2 : // should be roughly similar in performance.
1020 2 : //
1021 2 : // TODO(jackson): Only use AssertBounds in invariants builds in the
1022 2 : // following release.
1023 2 : iter = keyspan.AssertBounds(
1024 2 : iter, m.PointKeyBounds.Smallest(), m.PointKeyBounds.LargestUserKey(), comparer.Compare,
1025 2 : )
1026 2 : dIter := &keyspan.DefragmentingIter{}
1027 2 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1028 2 : iter = dIter
1029 2 : mIter.AddLevel(iter)
1030 2 : }
1031 :
1032 2 : iter, err = r.NewRawRangeKeyIter(ctx, m.FragmentIterTransforms(), env)
1033 2 : if err != nil {
1034 0 : return nil, err
1035 0 : }
1036 2 : if iter != nil {
1037 2 : // Assert expected bounds in tests.
1038 2 : if invariants.Sometimes(50) {
1039 2 : if m.HasRangeKeys {
1040 2 : iter = keyspan.AssertBounds(
1041 2 : iter, m.RangeKeyBounds.Smallest(), m.RangeKeyBounds.LargestUserKey(), comparer.Compare,
1042 2 : )
1043 2 : }
1044 : }
1045 : // Wrap the range key iterator in a filter that elides keys other than range
1046 : // key deletions.
1047 2 : iter = keyspan.Filter(iter, func(in *keyspan.Span, buf []keyspan.Key) []keyspan.Key {
1048 2 : keys := buf[:0]
1049 2 : for _, k := range in.Keys {
1050 2 : if k.Kind() != base.InternalKeyKindRangeKeyDelete {
1051 2 : continue
1052 : }
1053 2 : keys = append(keys, k)
1054 : }
1055 2 : return keys
1056 : }, comparer.Compare)
1057 2 : dIter := &keyspan.DefragmentingIter{}
1058 2 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1059 2 : iter = dIter
1060 2 : mIter.AddLevel(iter)
1061 : }
1062 :
1063 2 : return mIter, nil
1064 : }
1065 :
1066 : // rangeKeySetsAnnotator is a manifest.Annotator that annotates B-Tree nodes
1067 : // with the sum of the files' counts of range key fragments. The count of range
1068 : // key sets may change once a table's stats are loaded asynchronously, so its
1069 : // values are marked as cacheable only if a file's stats have been loaded.
1070 2 : var rangeKeySetsAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1071 2 : return f.Stats.NumRangeKeySets, f.StatsValid()
1072 2 : })
1073 :
1074 : // tombstonesAnnotator is a manifest.Annotator that annotates B-Tree nodes
1075 : // with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDEL
1076 : // keys). The count of tombstones may change once a table's stats are loaded
1077 : // asynchronously, so its values are marked as cacheable only if a file's stats
1078 : // have been loaded.
1079 2 : var tombstonesAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1080 2 : return f.Stats.NumDeletions, f.StatsValid()
1081 2 : })
1082 :
1083 : // valueBlocksSizeAnnotator is a manifest.Annotator that annotates B-Tree
1084 : // nodes with the sum of the files' Properties.ValueBlocksSize. The value block
1085 : // size may change once a table's stats are loaded asynchronously, so its
1086 : // values are marked as cacheable only if a file's stats have been loaded.
1087 2 : var valueBlockSizeAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1088 2 : return f.Stats.ValueBlocksSize, f.StatsValid()
1089 2 : })
1090 :
1091 : // pointDeletionsBytesEstimateAnnotator is a manifest.Annotator that annotates
1092 : // B-Tree nodes with the sum of the files' PointDeletionsBytesEstimate. This
1093 : // value may change once a table's stats are loaded asynchronously, so its
1094 : // values are marked as cacheable only if a file's stats have been loaded.
1095 2 : var pointDeletionsBytesEstimateAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1096 2 : return f.Stats.PointDeletionsBytesEstimate, f.StatsValid()
1097 2 : })
1098 :
1099 : // rangeDeletionsBytesEstimateAnnotator is a manifest.Annotator that annotates
1100 : // B-Tree nodes with the sum of the files' RangeDeletionsBytesEstimate. This
1101 : // value may change once a table's stats are loaded asynchronously, so its
1102 : // values are marked as cacheable only if a file's stats have been loaded.
1103 2 : var rangeDeletionsBytesEstimateAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1104 2 : return f.Stats.RangeDeletionsBytesEstimate, f.StatsValid()
1105 2 : })
1106 :
1107 : // compressionTypeAnnotator is a manifest.Annotator that annotates B-tree
1108 : // nodes with the compression type of the file. Its annotation type is
1109 : // compressionTypes. The compression type may change once a table's stats are
1110 : // loaded asynchronously, so its values are marked as cacheable only if a file's
1111 : // stats have been loaded.
1112 : var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
1113 : Aggregator: compressionTypeAggregator{},
1114 : }
1115 :
1116 : type compressionTypeAggregator struct{}
1117 :
1118 : type compressionTypes struct {
1119 : snappy, zstd, minlz, none, unknown uint64
1120 : }
1121 :
1122 2 : func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
1123 2 : if dst == nil {
1124 2 : return new(compressionTypes)
1125 2 : }
1126 0 : *dst = compressionTypes{}
1127 0 : return dst
1128 : }
1129 :
1130 : func (a compressionTypeAggregator) Accumulate(
1131 : f *manifest.TableMetadata, dst *compressionTypes,
1132 2 : ) (v *compressionTypes, cacheOK bool) {
1133 2 : switch f.Stats.CompressionType {
1134 2 : case sstable.SnappyCompression:
1135 2 : dst.snappy++
1136 1 : case sstable.ZstdCompression:
1137 1 : dst.zstd++
1138 2 : case sstable.MinLZCompression:
1139 2 : dst.minlz++
1140 2 : case sstable.NoCompression:
1141 2 : dst.none++
1142 2 : default:
1143 2 : dst.unknown++
1144 : }
1145 2 : return dst, f.StatsValid()
1146 : }
1147 :
1148 : func (a compressionTypeAggregator) Merge(
1149 : src *compressionTypes, dst *compressionTypes,
1150 2 : ) *compressionTypes {
1151 2 : dst.snappy += src.snappy
1152 2 : dst.zstd += src.zstd
1153 2 : dst.minlz += src.minlz
1154 2 : dst.none += src.none
1155 2 : dst.unknown += src.unknown
1156 2 : return dst
1157 2 : }
|