Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "math"
11 : "time"
12 :
13 : "github.com/cockroachdb/crlib/crtime"
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/sstable"
21 : "github.com/cockroachdb/pebble/sstable/block"
22 : "github.com/cockroachdb/redact"
23 : )
24 :
25 : // In-memory statistics about tables help inform compaction picking, but may
26 : // be expensive to calculate or load from disk. Every time a database is
27 : // opened, these statistics must be reloaded or recalculated. To minimize
28 : // impact on user activity and compactions, we load these statistics
29 : // asynchronously in the background and store loaded statistics in each
30 : // table's *TableMetadata.
31 : //
32 : // This file implements the asynchronous loading of statistics by maintaining
33 : // a list of files that require statistics, alongside their LSM levels.
34 : // Whenever new files are added to the LSM, the files are appended to
35 : // d.mu.tableStats.pending. If a stats collection job is not currently
36 : // running, one is started in a separate goroutine.
37 : //
38 : // The stats collection job grabs and clears the pending list, computes table
39 : // statistics relative to the current readState and updates the tables' file
40 : // metadata. New pending files may accumulate during a stats collection job,
41 : // so a completing job triggers a new job if necessary. Only one job runs at a
42 : // time.
43 : //
44 : // When an existing database is opened, all files lack in-memory statistics.
45 : // These files' stats are loaded incrementally whenever the pending list is
46 : // empty by scanning a current readState for files missing statistics. Once a
47 : // job completes a scan without finding any remaining files without
48 : // statistics, it flips a `loadedInitial` flag. From then on, the stats
49 : // collection job only needs to load statistics for new files appended to the
50 : // pending list.
51 :
52 1 : func (d *DB) maybeCollectTableStatsLocked() {
53 1 : if d.shouldCollectTableStatsLocked() {
54 1 : go d.collectTableStats()
55 1 : }
56 : }
57 :
58 : // updateTableStatsLocked is called when new files are introduced, after the
59 : // read state has been updated. It may trigger a new stat collection.
60 : // DB.mu must be locked when calling.
61 1 : func (d *DB) updateTableStatsLocked(newTables []manifest.NewTableEntry) {
62 1 : var needStats bool
63 1 : for _, nf := range newTables {
64 1 : if !nf.Meta.StatsValid() {
65 1 : needStats = true
66 1 : break
67 : }
68 : }
69 1 : if !needStats {
70 1 : return
71 1 : }
72 :
73 1 : d.mu.tableStats.pending = append(d.mu.tableStats.pending, newTables...)
74 1 : d.maybeCollectTableStatsLocked()
75 : }
76 :
77 1 : func (d *DB) shouldCollectTableStatsLocked() bool {
78 1 : return !d.mu.tableStats.loading &&
79 1 : d.closed.Load() == nil &&
80 1 : !d.opts.DisableTableStats &&
81 1 : (len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
82 1 : }
83 :
84 : // collectTableStats runs a table stats collection job, returning true if the
85 : // invocation did the collection work, false otherwise (e.g. if another job was
86 : // already running).
87 1 : func (d *DB) collectTableStats() bool {
88 1 : const maxTableStatsPerScan = 50
89 1 :
90 1 : d.mu.Lock()
91 1 : if !d.shouldCollectTableStatsLocked() {
92 1 : d.mu.Unlock()
93 1 : return false
94 1 : }
95 :
96 1 : pending := d.mu.tableStats.pending
97 1 : d.mu.tableStats.pending = nil
98 1 : d.mu.tableStats.loading = true
99 1 : jobID := d.newJobIDLocked()
100 1 : loadedInitial := d.mu.tableStats.loadedInitial
101 1 : // Drop DB.mu before performing IO.
102 1 : d.mu.Unlock()
103 1 :
104 1 : // Every run of collectTableStats either collects stats from the pending
105 1 : // list (if non-empty) or from scanning the version (loadedInitial is
106 1 : // false). This job only runs if at least one of those conditions holds.
107 1 :
108 1 : // Grab a read state to scan for tables.
109 1 : rs := d.loadReadState()
110 1 : var collected []collectedStats
111 1 : var hints []deleteCompactionHint
112 1 : if len(pending) > 0 {
113 1 : collected, hints = d.loadNewFileStats(rs, pending)
114 1 : } else {
115 1 : var moreRemain bool
116 1 : var buf [maxTableStatsPerScan]collectedStats
117 1 : collected, hints, moreRemain = d.scanReadStateTableStats(rs, buf[:0])
118 1 : loadedInitial = !moreRemain
119 1 : }
120 1 : rs.unref()
121 1 :
122 1 : // Update the TableMetadata with the loaded stats while holding d.mu.
123 1 : d.mu.Lock()
124 1 : defer d.mu.Unlock()
125 1 : d.mu.tableStats.loading = false
126 1 : if loadedInitial && !d.mu.tableStats.loadedInitial {
127 1 : d.mu.tableStats.loadedInitial = loadedInitial
128 1 : d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
129 1 : JobID: int(jobID),
130 1 : })
131 1 : }
132 :
133 1 : maybeCompact := false
134 1 : for _, c := range collected {
135 1 : c.tableMetadata.Stats = c.TableStats
136 1 : maybeCompact = maybeCompact || fileCompensation(c.tableMetadata) > 0
137 1 : sanityCheckStats(c.tableMetadata, d.opts.Logger, "collected stats")
138 1 : c.tableMetadata.StatsMarkValid()
139 1 : }
140 :
141 1 : d.mu.tableStats.cond.Broadcast()
142 1 : d.maybeCollectTableStatsLocked()
143 1 : if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
144 1 : // Verify that all of the hint tombstones' files still exist in the
145 1 : // current version. Otherwise, the tombstone itself may have been
146 1 : // compacted into L6 and more recent keys may have had their sequence
147 1 : // numbers zeroed.
148 1 : //
149 1 : // Note that it's possible that the tombstone file is being compacted
150 1 : // presently. In that case, the file will be present in v. When the
151 1 : // compaction finishes compacting the tombstone file, it will detect
152 1 : // and clear the hint.
153 1 : //
154 1 : // See DB.maybeUpdateDeleteCompactionHints.
155 1 : v := d.mu.versions.currentVersion()
156 1 : keepHints := hints[:0]
157 1 : for _, h := range hints {
158 1 : if v.Contains(h.tombstoneLevel, h.tombstoneFile) {
159 1 : keepHints = append(keepHints, h)
160 1 : }
161 : }
162 1 : d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
163 : }
164 1 : if maybeCompact {
165 1 : d.maybeScheduleCompaction()
166 1 : }
167 1 : return true
168 : }
169 :
170 : type collectedStats struct {
171 : *tableMetadata
172 : manifest.TableStats
173 : }
174 :
175 : func (d *DB) loadNewFileStats(
176 : rs *readState, pending []manifest.NewTableEntry,
177 1 : ) ([]collectedStats, []deleteCompactionHint) {
178 1 : var hints []deleteCompactionHint
179 1 : collected := make([]collectedStats, 0, len(pending))
180 1 : for _, nf := range pending {
181 1 : // A file's stats might have been populated by an earlier call to
182 1 : // loadNewFileStats if the file was moved.
183 1 : // NB: We're not holding d.mu which protects f.Stats, but only
184 1 : // collectTableStats updates f.Stats for active files, and we
185 1 : // ensure only one goroutine runs it at a time through
186 1 : // d.mu.tableStats.loading.
187 1 : if nf.Meta.StatsValid() {
188 1 : continue
189 : }
190 :
191 : // The file isn't guaranteed to still be live in the readState's
192 : // version. It may have been deleted or moved. Skip it if it's not in
193 : // the expected level.
194 1 : if !rs.current.Contains(nf.Level, nf.Meta) {
195 1 : continue
196 : }
197 :
198 1 : stats, newHints, err := d.loadTableStats(
199 1 : rs.current, nf.Level,
200 1 : nf.Meta,
201 1 : )
202 1 : if err != nil {
203 0 : d.opts.EventListener.BackgroundError(err)
204 0 : continue
205 : }
206 : // NB: We don't update the TableMetadata yet, because we aren't holding
207 : // DB.mu. We'll copy it to the TableMetadata after we're finished with
208 : // IO.
209 1 : collected = append(collected, collectedStats{
210 1 : tableMetadata: nf.Meta,
211 1 : TableStats: stats,
212 1 : })
213 1 : hints = append(hints, newHints...)
214 : }
215 1 : return collected, hints
216 : }
217 :
218 : // scanReadStateTableStats is run by an active stat collection job when there
219 : // are no pending new files, but there might be files that existed at Open for
220 : // which we haven't loaded table stats.
221 : func (d *DB) scanReadStateTableStats(
222 : rs *readState, fill []collectedStats,
223 1 : ) ([]collectedStats, []deleteCompactionHint, bool) {
224 1 : moreRemain := false
225 1 : var hints []deleteCompactionHint
226 1 : sizesChecked := make(map[base.DiskFileNum]struct{})
227 1 : for l, levelMetadata := range rs.current.Levels {
228 1 : for f := range levelMetadata.All() {
229 1 : // NB: We're not holding d.mu which protects f.Stats, but only the
230 1 : // active stats collection job updates f.Stats for active files,
231 1 : // and we ensure only one goroutine runs it at a time through
232 1 : // d.mu.tableStats.loading. This makes it safe to read validity
233 1 : // through f.Stats.ValidLocked despite not holding d.mu.
234 1 : if f.StatsValid() {
235 1 : continue
236 : }
237 :
238 : // Limit how much work we do per read state. The older the read
239 : // state is, the higher the likelihood files are no longer being
240 : // used in the current version. If we've exhausted our allowance,
241 : // return true for the last return value to signal there's more
242 : // work to do.
243 1 : if len(fill) == cap(fill) {
244 1 : moreRemain = true
245 1 : return fill, hints, moreRemain
246 1 : }
247 :
248 : // If the file is remote and not SharedForeign, we should check if its size
249 : // matches. This is because checkConsistency skips over remote files.
250 : //
251 : // SharedForeign and External files are skipped as their sizes are allowed
252 : // to have a mismatch; the size stored in the FileBacking is just the part
253 : // of the file that is referenced by this Pebble instance, not the size of
254 : // the whole object.
255 1 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, f.FileBacking.DiskFileNum)
256 1 : if err != nil {
257 0 : // Set `moreRemain` so we'll try again.
258 0 : moreRemain = true
259 0 : d.opts.EventListener.BackgroundError(err)
260 0 : continue
261 : }
262 :
263 1 : shouldCheckSize := objMeta.IsRemote() &&
264 1 : !d.objProvider.IsSharedForeign(objMeta) &&
265 1 : !objMeta.IsExternal()
266 1 : if _, ok := sizesChecked[f.FileBacking.DiskFileNum]; !ok && shouldCheckSize {
267 0 : size, err := d.objProvider.Size(objMeta)
268 0 : fileSize := f.FileBacking.Size
269 0 : if err != nil {
270 0 : moreRemain = true
271 0 : d.opts.EventListener.BackgroundError(err)
272 0 : continue
273 : }
274 0 : if size != int64(fileSize) {
275 0 : err := errors.Errorf(
276 0 : "during consistency check in loadTableStats: L%d: %s: object size mismatch (%s): %d (provider) != %d (MANIFEST)",
277 0 : errors.Safe(l), f.FileNum, d.objProvider.Path(objMeta),
278 0 : errors.Safe(size), errors.Safe(fileSize))
279 0 : d.opts.EventListener.BackgroundError(err)
280 0 : d.opts.Logger.Fatalf("%s", err)
281 0 : }
282 :
283 0 : sizesChecked[f.FileBacking.DiskFileNum] = struct{}{}
284 : }
285 :
286 1 : stats, newHints, err := d.loadTableStats(
287 1 : rs.current, l, f,
288 1 : )
289 1 : if err != nil {
290 0 : // Set `moreRemain` so we'll try again.
291 0 : moreRemain = true
292 0 : d.opts.EventListener.BackgroundError(err)
293 0 : continue
294 : }
295 1 : fill = append(fill, collectedStats{
296 1 : tableMetadata: f,
297 1 : TableStats: stats,
298 1 : })
299 1 : hints = append(hints, newHints...)
300 : }
301 : }
302 1 : return fill, hints, moreRemain
303 : }
304 :
305 : func (d *DB) loadTableStats(
306 : v *version, level int, meta *tableMetadata,
307 1 : ) (manifest.TableStats, []deleteCompactionHint, error) {
308 1 : var stats manifest.TableStats
309 1 : var compactionHints []deleteCompactionHint
310 1 :
311 1 : err := d.fileCache.withReader(
312 1 : context.TODO(), block.NoReadEnv, meta, func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
313 1 : props := r.Properties.CommonProperties
314 1 : if meta.Virtual {
315 1 : props = r.Properties.GetScaledProperties(meta.FileBacking.Size, meta.Size)
316 1 : }
317 1 : stats.NumEntries = props.NumEntries
318 1 : stats.NumDeletions = props.NumDeletions
319 1 : stats.NumRangeKeySets = props.NumRangeKeySets
320 1 : stats.ValueBlocksSize = props.ValueBlocksSize
321 1 : stats.CompressionType = block.CompressionFromString(props.CompressionName)
322 1 : if props.NumDataBlocks > 0 {
323 1 : stats.TombstoneDenseBlocksRatio = float64(props.NumTombstoneDenseBlocks) / float64(props.NumDataBlocks)
324 1 : }
325 :
326 1 : if props.NumPointDeletions() > 0 {
327 1 : if err = d.loadTablePointKeyStats(&props, v, level, meta, &stats); err != nil {
328 0 : return
329 0 : }
330 : }
331 1 : if props.NumRangeDeletions > 0 || props.NumRangeKeyDels > 0 {
332 1 : if compactionHints, err = d.loadTableRangeDelStats(
333 1 : r, v, level, meta, &stats, env,
334 1 : ); err != nil {
335 0 : return
336 0 : }
337 : }
338 1 : return
339 : })
340 1 : if err != nil {
341 0 : return stats, nil, err
342 0 : }
343 1 : return stats, compactionHints, nil
344 : }
345 :
346 : // loadTablePointKeyStats calculates the point key statistics for the given
347 : // table. The provided manifest.TableStats are updated.
348 : func (d *DB) loadTablePointKeyStats(
349 : props *sstable.CommonProperties,
350 : v *version,
351 : level int,
352 : meta *tableMetadata,
353 : stats *manifest.TableStats,
354 1 : ) error {
355 1 : // TODO(jackson): If the file has a wide keyspace, the average
356 1 : // value size beneath the entire file might not be representative
357 1 : // of the size of the keys beneath the point tombstones.
358 1 : // We could write the ranges of 'clusters' of point tombstones to
359 1 : // a sstable property and call averageValueSizeBeneath for each of
360 1 : // these narrower ranges to improve the estimate.
361 1 : avgValLogicalSize, compressionRatio, err := d.estimateSizesBeneath(v, level, meta, props)
362 1 :
363 1 : if err != nil {
364 0 : return err
365 0 : }
366 1 : stats.PointDeletionsBytesEstimate =
367 1 : pointDeletionsBytesEstimate(meta.Size, props, avgValLogicalSize, compressionRatio)
368 1 : return nil
369 : }
370 :
371 : // loadTableRangeDelStats calculates the range deletion and range key deletion
372 : // statistics for the given table.
373 : func (d *DB) loadTableRangeDelStats(
374 : r *sstable.Reader,
375 : v *version,
376 : level int,
377 : meta *tableMetadata,
378 : stats *manifest.TableStats,
379 : env sstable.ReadEnv,
380 1 : ) ([]deleteCompactionHint, error) {
381 1 : iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta, env)
382 1 : if err != nil {
383 0 : return nil, err
384 0 : }
385 1 : defer iter.Close()
386 1 : var compactionHints []deleteCompactionHint
387 1 : // We iterate over the defragmented range tombstones and range key deletions,
388 1 : // which ensures we don't double count ranges deleted at different sequence
389 1 : // numbers. Also, merging abutting tombstones reduces the number of calls to
390 1 : // estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
391 1 : // our overall estimate.
392 1 : s, err := iter.First()
393 1 : for ; s != nil; s, err = iter.Next() {
394 1 : start, end := s.Start, s.End
395 1 : // We only need to consider deletion size estimates for tables that contain
396 1 : // RANGEDELs.
397 1 : var maxRangeDeleteSeqNum base.SeqNum
398 1 : for _, k := range s.Keys {
399 1 : if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
400 1 : maxRangeDeleteSeqNum = k.SeqNum()
401 1 : break
402 : }
403 : }
404 :
405 : // If the file is in the last level of the LSM, there is no data beneath
406 : // it. The fact that there is still a range tombstone in a bottommost file
407 : // indicates two possibilites:
408 : // 1. an open snapshot kept the tombstone around, and the data the
409 : // tombstone deletes is contained within the file itself.
410 : // 2. the file was ingested.
411 : // In the first case, we'd like to estimate disk usage within the file
412 : // itself since compacting the file will drop that covered data. In the
413 : // second case, we expect that compacting the file will NOT drop any
414 : // data and rewriting the file is a waste of write bandwidth. We can
415 : // distinguish these cases by looking at the table metadata's sequence
416 : // numbers. A file's range deletions can only delete data within the
417 : // file at lower sequence numbers. All keys in an ingested sstable adopt
418 : // the same sequence number, preventing tombstones from deleting keys
419 : // within the same file. We check here if the largest RANGEDEL sequence
420 : // number is greater than the file's smallest sequence number. If it is,
421 : // the RANGEDEL could conceivably (although inconclusively) delete data
422 : // within the same file.
423 : //
424 : // Note that this heuristic is imperfect. If a table containing a range
425 : // deletion is ingested into L5 and subsequently compacted into L6 but
426 : // an open snapshot prevents elision of covered keys in L6, the
427 : // resulting RangeDeletionsBytesEstimate will incorrectly include all
428 : // covered keys.
429 : //
430 : // TODO(jackson): We could prevent the above error in the heuristic by
431 : // computing the file's RangeDeletionsBytesEstimate during the
432 : // compaction itself. It's unclear how common this is.
433 : //
434 : // NOTE: If the span `s` wholly contains a table containing range keys,
435 : // the returned size estimate will be slightly inflated by the range key
436 : // block. However, in practice, range keys are expected to be rare, and
437 : // the size of the range key block relative to the overall size of the
438 : // table is expected to be small.
439 1 : if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
440 1 : size, err := r.EstimateDiskUsage(start, end, env)
441 1 : if err != nil {
442 0 : return nil, err
443 0 : }
444 1 : stats.RangeDeletionsBytesEstimate += size
445 1 :
446 1 : // As the file is in the bottommost level, there is no need to collect a
447 1 : // deletion hint.
448 1 : continue
449 : }
450 :
451 : // While the size estimates for point keys should only be updated if this
452 : // span contains a range del, the sequence numbers are required for the
453 : // hint. Unconditionally descend, but conditionally update the estimates.
454 1 : hintType := compactionHintFromKeys(s.Keys)
455 1 : estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(v, level, start, end, hintType)
456 1 : if err != nil {
457 0 : return nil, err
458 0 : }
459 1 : stats.RangeDeletionsBytesEstimate += estimate
460 1 :
461 1 : // hintSeqNum is the smallest sequence number contained in any
462 1 : // file overlapping with the hint and in a level below it.
463 1 : if hintSeqNum == math.MaxUint64 {
464 1 : continue
465 : }
466 1 : hint := deleteCompactionHint{
467 1 : hintType: hintType,
468 1 : start: make([]byte, len(start)),
469 1 : end: make([]byte, len(end)),
470 1 : tombstoneFile: meta,
471 1 : tombstoneLevel: level,
472 1 : tombstoneLargestSeqNum: s.LargestSeqNum(),
473 1 : tombstoneSmallestSeqNum: s.SmallestSeqNum(),
474 1 : fileSmallestSeqNum: hintSeqNum,
475 1 : }
476 1 : copy(hint.start, start)
477 1 : copy(hint.end, end)
478 1 : compactionHints = append(compactionHints, hint)
479 : }
480 1 : if err != nil {
481 0 : return nil, err
482 0 : }
483 1 : return compactionHints, nil
484 : }
485 :
486 : func (d *DB) estimateSizesBeneath(
487 : v *version, level int, meta *tableMetadata, fileProps *sstable.CommonProperties,
488 1 : ) (avgValueLogicalSize, compressionRatio float64, err error) {
489 1 : // Find all files in lower levels that overlap with meta,
490 1 : // summing their value sizes and entry counts.
491 1 : file := meta
492 1 : var fileSum, keySum, valSum, entryCount uint64
493 1 : // Include the file itself. This is important because in some instances, the
494 1 : // computed compression ratio is applied to the tombstones contained within
495 1 : // `meta` itself. If there are no files beneath `meta` in the LSM, we would
496 1 : // calculate a compression ratio of 0 which is not accurate for the file's
497 1 : // own tombstones.
498 1 : fileSum += file.Size
499 1 : // TODO(sumeer): The entryCount includes the tombstones, which can be small,
500 1 : // resulting in a lower than expected avgValueLogicalSize. For an example of
501 1 : // this effect see the estimate in testdata/compaction_picker_scores (search
502 1 : // for "point-deletions-bytes-estimate: 163850").
503 1 : entryCount += fileProps.NumEntries
504 1 : keySum += fileProps.RawKeySize
505 1 : valSum += fileProps.RawValueSize
506 1 :
507 1 : addPhysicalTableStats := func(r *sstable.Reader, _ sstable.ReadEnv) (err error) {
508 1 : fileSum += file.Size
509 1 : entryCount += r.Properties.NumEntries
510 1 : keySum += r.Properties.RawKeySize
511 1 : valSum += r.Properties.RawValueSize
512 1 : return nil
513 1 : }
514 1 : addVirtualTableStats := func(v *sstable.Reader, _ sstable.ReadEnv) (err error) {
515 1 : fileSum += file.Size
516 1 : entryCount += file.Stats.NumEntries
517 1 : keySum += v.Properties.RawKeySize
518 1 : valSum += v.Properties.RawValueSize
519 1 : return nil
520 1 : }
521 :
522 1 : for l := level + 1; l < numLevels; l++ {
523 1 : // NB: This is subtle. The addPhysicalTableStats and
524 1 : // addVirtualTableStats functions close over file, which is the loop
525 1 : // variable.
526 1 : for file = range v.Overlaps(l, meta.UserKeyBounds()).All() {
527 1 : var err error
528 1 : if file.Virtual {
529 1 : err = d.fileCache.withReader(context.TODO(), block.NoReadEnv, file.VirtualMeta(), addVirtualTableStats)
530 1 : } else {
531 1 : err = d.fileCache.withReader(context.TODO(), block.NoReadEnv, file.PhysicalMeta(), addPhysicalTableStats)
532 1 : }
533 1 : if err != nil {
534 0 : return 0, 0, err
535 0 : }
536 : }
537 : }
538 1 : if entryCount == 0 {
539 0 : return 0, 0, nil
540 0 : }
541 : // RawKeySize and RawValueSize are uncompressed totals. We'll need to scale
542 : // the value sum according to the data size to account for compression,
543 : // index blocks and metadata overhead. Eg:
544 : //
545 : // Compression rate × Average uncompressed value size
546 : //
547 : // ↓
548 : //
549 : // FileSize RawValueSize
550 : // ----------------------- × ------------
551 : // RawKeySize+RawValueSize NumEntries
552 : //
553 : // We return the average logical value size plus the compression ratio,
554 : // leaving the scaling to the caller. This allows the caller to perform
555 : // additional compression ratio scaling if necessary.
556 1 : uncompressedSum := float64(keySum + valSum)
557 1 : compressionRatio = float64(fileSum) / uncompressedSum
558 1 : if compressionRatio > 1 {
559 1 : // We can get huge compression ratios due to the fixed overhead of files
560 1 : // containing a tiny amount of data. By setting this to 1, we are ignoring
561 1 : // that overhead, but we accept that tradeoff since the total bytes in
562 1 : // such overhead is not large.
563 1 : compressionRatio = 1
564 1 : }
565 1 : avgValueLogicalSize = (float64(valSum) / float64(entryCount))
566 1 : return avgValueLogicalSize, compressionRatio, nil
567 : }
568 :
569 : func (d *DB) estimateReclaimedSizeBeneath(
570 : v *version, level int, start, end []byte, hintType deleteCompactionHintType,
571 1 : ) (estimate uint64, hintSeqNum base.SeqNum, err error) {
572 1 : // Find all files in lower levels that overlap with the deleted range
573 1 : // [start, end).
574 1 : //
575 1 : // An overlapping file might be completely contained by the range
576 1 : // tombstone, in which case we can count the entire file size in
577 1 : // our estimate without doing any additional I/O.
578 1 : //
579 1 : // Otherwise, estimating the range for the file requires
580 1 : // additional I/O to read the file's index blocks.
581 1 : hintSeqNum = math.MaxUint64
582 1 : // TODO(jbowens): When there are multiple sub-levels in L0 and the RANGEDEL
583 1 : // is from a higher sub-level, we incorrectly skip the files in the lower
584 1 : // sub-levels when estimating this overlap.
585 1 : for l := level + 1; l < numLevels; l++ {
586 1 : for file := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(start, end)).All() {
587 1 : // Determine whether we need to update size estimates and hint seqnums
588 1 : // based on the type of hint and the type of keys in this file.
589 1 : var updateEstimates, updateHints bool
590 1 : switch hintType {
591 1 : case deleteCompactionHintTypePointKeyOnly:
592 1 : // The range deletion byte estimates should only be updated if this
593 1 : // table contains point keys. This ends up being an overestimate in
594 1 : // the case that table also has range keys, but such keys are expected
595 1 : // to contribute a negligible amount of the table's overall size,
596 1 : // relative to point keys.
597 1 : if file.HasPointKeys {
598 1 : updateEstimates = true
599 1 : }
600 : // As the initiating span contained only range dels, hints can only be
601 : // updated if this table does _not_ contain range keys.
602 1 : if !file.HasRangeKeys {
603 1 : updateHints = true
604 1 : }
605 1 : case deleteCompactionHintTypeRangeKeyOnly:
606 1 : // The initiating span contained only range key dels. The estimates
607 1 : // apply only to point keys, and are therefore not updated.
608 1 : updateEstimates = false
609 1 : // As the initiating span contained only range key dels, hints can
610 1 : // only be updated if this table does _not_ contain point keys.
611 1 : if !file.HasPointKeys {
612 1 : updateHints = true
613 1 : }
614 1 : case deleteCompactionHintTypePointAndRangeKey:
615 1 : // Always update the estimates and hints, as this hint type can drop a
616 1 : // file, irrespective of the mixture of keys. Similar to above, the
617 1 : // range del bytes estimates is an overestimate.
618 1 : updateEstimates, updateHints = true, true
619 0 : default:
620 0 : panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
621 : }
622 1 : startCmp := d.cmp(start, file.Smallest().UserKey)
623 1 : endCmp := d.cmp(file.Largest().UserKey, end)
624 1 : if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest().IsExclusiveSentinel()) {
625 1 : // The range fully contains the file, so skip looking it up in table
626 1 : // cache/looking at its indexes and add the full file size.
627 1 : if updateEstimates {
628 1 : estimate += file.Size
629 1 : }
630 1 : if updateHints && hintSeqNum > file.SmallestSeqNum {
631 1 : hintSeqNum = file.SmallestSeqNum
632 1 : }
633 1 : } else if d.cmp(file.Smallest().UserKey, end) <= 0 && d.cmp(start, file.Largest().UserKey) <= 0 {
634 1 : // Partial overlap.
635 1 : if hintType == deleteCompactionHintTypeRangeKeyOnly {
636 1 : // If the hint that generated this overlap contains only range keys,
637 1 : // there is no need to calculate disk usage, as the reclaimable space
638 1 : // is expected to be minimal relative to point keys.
639 1 : continue
640 : }
641 1 : var size uint64
642 1 : var err error
643 1 : if file.Virtual {
644 1 : err = d.fileCache.withReader(
645 1 : context.TODO(), block.NoReadEnv,
646 1 : file.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
647 1 : size, err = r.EstimateDiskUsage(start, end, env)
648 1 : return err
649 1 : })
650 1 : } else {
651 1 : err = d.fileCache.withReader(
652 1 : context.TODO(), block.NoReadEnv,
653 1 : file.PhysicalMeta(), func(r *sstable.Reader, env sstable.ReadEnv) (err error) {
654 1 : size, err = r.EstimateDiskUsage(start, end, env)
655 1 : return err
656 1 : })
657 : }
658 :
659 1 : if err != nil {
660 0 : return 0, hintSeqNum, err
661 0 : }
662 1 : estimate += size
663 1 : if updateHints && hintSeqNum > file.SmallestSeqNum && d.FormatMajorVersion() >= FormatVirtualSSTables {
664 1 : // If the format major version is past Virtual SSTables, deletion only
665 1 : // hints can also apply to partial overlaps with sstables.
666 1 : hintSeqNum = file.SmallestSeqNum
667 1 : }
668 : }
669 : }
670 : }
671 1 : return estimate, hintSeqNum, nil
672 : }
673 :
674 : var lastSanityCheckStatsLog crtime.AtomicMono
675 :
676 1 : func sanityCheckStats(meta *tableMetadata, logger Logger, info string) {
677 1 : // Values for PointDeletionsBytesEstimate and RangeDeletionsBytesEstimate that
678 1 : // exceed this value are most likely indicative of a bug.
679 1 : const maxDeletionBytesEstimate = 16 << 30 // 16 GiB
680 1 :
681 1 : if meta.Stats.PointDeletionsBytesEstimate > maxDeletionBytesEstimate ||
682 1 : meta.Stats.RangeDeletionsBytesEstimate > maxDeletionBytesEstimate {
683 1 : if v := lastSanityCheckStatsLog.Load(); v == 0 || v.Elapsed() > 30*time.Second {
684 1 : logger.Errorf("%s: table %s has extreme deletion bytes estimates: point=%d range=%d",
685 1 : info, meta.FileNum,
686 1 : redact.Safe(meta.Stats.PointDeletionsBytesEstimate),
687 1 : redact.Safe(meta.Stats.RangeDeletionsBytesEstimate),
688 1 : )
689 1 : lastSanityCheckStatsLog.Store(crtime.NowMono())
690 1 : }
691 : }
692 : }
693 :
694 : func maybeSetStatsFromProperties(
695 : meta *tableMetadata, props *sstable.CommonProperties, logger Logger,
696 1 : ) bool {
697 1 : // If a table contains range deletions or range key deletions, we defer the
698 1 : // stats collection. There are two main reasons for this:
699 1 : //
700 1 : // 1. Estimating the potential for reclaimed space due to a range deletion
701 1 : // tombstone requires scanning the LSM - a potentially expensive operation
702 1 : // that should be deferred.
703 1 : // 2. Range deletions and / or range key deletions present an opportunity to
704 1 : // compute "deletion hints", which also requires a scan of the LSM to
705 1 : // compute tables that would be eligible for deletion.
706 1 : //
707 1 : // These two tasks are deferred to the table stats collector goroutine.
708 1 : if props.NumRangeDeletions != 0 || props.NumRangeKeyDels != 0 {
709 1 : return false
710 1 : }
711 :
712 : // If a table is more than 10% point deletions without user-provided size
713 : // estimates, don't calculate the PointDeletionsBytesEstimate statistic
714 : // using our limited knowledge. The table stats collector can populate the
715 : // stats and calculate an average of value size of all the tables beneath
716 : // the table in the LSM, which will be more accurate.
717 1 : if unsizedDels := (props.NumDeletions - props.NumSizedDeletions); unsizedDels > props.NumEntries/10 {
718 1 : return false
719 1 : }
720 :
721 1 : var pointEstimate uint64
722 1 : if props.NumEntries > 0 {
723 1 : // Use the file's own average key and value sizes as an estimate. This
724 1 : // doesn't require any additional IO and since the number of point
725 1 : // deletions in the file is low, the error introduced by this crude
726 1 : // estimate is expected to be small.
727 1 : commonProps := props
728 1 : avgValSize, compressionRatio := estimatePhysicalSizes(meta.Size, commonProps)
729 1 : pointEstimate = pointDeletionsBytesEstimate(meta.Size, commonProps, avgValSize, compressionRatio)
730 1 : }
731 :
732 1 : meta.Stats.NumEntries = props.NumEntries
733 1 : meta.Stats.NumDeletions = props.NumDeletions
734 1 : meta.Stats.NumRangeKeySets = props.NumRangeKeySets
735 1 : meta.Stats.PointDeletionsBytesEstimate = pointEstimate
736 1 : meta.Stats.RangeDeletionsBytesEstimate = 0
737 1 : meta.Stats.ValueBlocksSize = props.ValueBlocksSize
738 1 : meta.Stats.CompressionType = block.CompressionFromString(props.CompressionName)
739 1 : meta.StatsMarkValid()
740 1 : sanityCheckStats(meta, logger, "stats from properties")
741 1 : return true
742 : }
743 :
744 : func pointDeletionsBytesEstimate(
745 : fileSize uint64, props *sstable.CommonProperties, avgValLogicalSize, compressionRatio float64,
746 1 : ) (estimate uint64) {
747 1 : if props.NumEntries == 0 {
748 0 : return 0
749 0 : }
750 1 : numPointDels := props.NumPointDeletions()
751 1 : if numPointDels == 0 {
752 1 : return 0
753 1 : }
754 : // Estimate the potential space to reclaim using the table's own properties.
755 : // There may or may not be keys covered by any individual point tombstone.
756 : // If not, compacting the point tombstone into L6 will at least allow us to
757 : // drop the point deletion key and will reclaim the tombstone's key bytes.
758 : // If there are covered key(s), we also get to drop key and value bytes for
759 : // each covered key.
760 : //
761 : // Some point tombstones (DELSIZEDs) carry a user-provided estimate of the
762 : // uncompressed size of entries that will be elided by fully compacting the
763 : // tombstone. For these tombstones, there's no guesswork—we use the
764 : // RawPointTombstoneValueSizeHint property which is the sum of all these
765 : // tombstones' encoded values.
766 : //
767 : // For un-sized point tombstones (DELs), we estimate assuming that each
768 : // point tombstone on average covers 1 key and using average value sizes.
769 : // This is almost certainly an overestimate, but that's probably okay
770 : // because point tombstones can slow range iterations even when they don't
771 : // cover a key.
772 : //
773 : // TODO(jackson): This logic doesn't directly incorporate fixed per-key
774 : // overhead (8-byte trailer, plus at least 1 byte encoding the length of the
775 : // key and 1 byte encoding the length of the value). This overhead is
776 : // indirectly incorporated through the compression ratios, but that results
777 : // in the overhead being smeared per key-byte and value-byte, rather than
778 : // per-entry. This per-key fixed overhead can be nontrivial, especially for
779 : // dense swaths of point tombstones. Give some thought as to whether we
780 : // should directly include fixed per-key overhead in the calculations.
781 :
782 : // Below, we calculate the tombstone contributions and the shadowed keys'
783 : // contributions separately.
784 1 : var tombstonesLogicalSize float64
785 1 : var shadowedLogicalSize float64
786 1 :
787 1 : // 1. Calculate the contribution of the tombstone keys themselves.
788 1 : if props.RawPointTombstoneKeySize > 0 {
789 1 : tombstonesLogicalSize += float64(props.RawPointTombstoneKeySize)
790 1 : } else {
791 0 : // This sstable predates the existence of the RawPointTombstoneKeySize
792 0 : // property. We can use the average key size within the file itself and
793 0 : // the count of point deletions to estimate the size.
794 0 : tombstonesLogicalSize += float64(numPointDels * props.RawKeySize / props.NumEntries)
795 0 : }
796 :
797 : // 2. Calculate the contribution of the keys shadowed by tombstones.
798 : //
799 : // 2a. First account for keys shadowed by DELSIZED tombstones. THE DELSIZED
800 : // tombstones encode the size of both the key and value of the shadowed KV
801 : // entries. These sizes are aggregated into a sstable property.
802 1 : shadowedLogicalSize += float64(props.RawPointTombstoneValueSize)
803 1 :
804 1 : // 2b. Calculate the contribution of the KV entries shadowed by ordinary DEL
805 1 : // keys.
806 1 : numUnsizedDels := numPointDels - props.NumSizedDeletions
807 1 : {
808 1 : // The shadowed keys have the same exact user keys as the tombstones
809 1 : // themselves, so we can use the `tombstonesLogicalSize` we computed
810 1 : // earlier as an estimate. There's a complication that
811 1 : // `tombstonesLogicalSize` may include DELSIZED keys we already
812 1 : // accounted for.
813 1 : shadowedLogicalSize += float64(tombstonesLogicalSize) / float64(numPointDels) * float64(numUnsizedDels)
814 1 :
815 1 : // Calculate the contribution of the deleted values. The caller has
816 1 : // already computed an average logical size (possibly computed across
817 1 : // many sstables).
818 1 : shadowedLogicalSize += float64(numUnsizedDels) * avgValLogicalSize
819 1 : }
820 :
821 : // Scale both tombstone and shadowed totals by logical:physical ratios to
822 : // account for compression, metadata overhead, etc.
823 : //
824 : // Physical FileSize
825 : // ----------- = -----------------------
826 : // Logical RawKeySize+RawValueSize
827 : //
828 1 : return uint64((tombstonesLogicalSize + shadowedLogicalSize) * compressionRatio)
829 : }
830 :
831 : func estimatePhysicalSizes(
832 : fileSize uint64, props *sstable.CommonProperties,
833 1 : ) (avgValLogicalSize, compressionRatio float64) {
834 1 : // RawKeySize and RawValueSize are uncompressed totals. Scale according to
835 1 : // the data size to account for compression, index blocks and metadata
836 1 : // overhead. Eg:
837 1 : //
838 1 : // Compression rate × Average uncompressed value size
839 1 : //
840 1 : // ↓
841 1 : //
842 1 : // FileSize RawValSize
843 1 : // ----------------------- × ----------
844 1 : // RawKeySize+RawValueSize NumEntries
845 1 : //
846 1 : uncompressedSum := props.RawKeySize + props.RawValueSize
847 1 : compressionRatio = float64(fileSize) / float64(uncompressedSum)
848 1 : if compressionRatio > 1 {
849 1 : // We can get huge compression ratios due to the fixed overhead of files
850 1 : // containing a tiny amount of data. By setting this to 1, we are ignoring
851 1 : // that overhead, but we accept that tradeoff since the total bytes in
852 1 : // such overhead is not large.
853 1 : compressionRatio = 1
854 1 : }
855 1 : avgValLogicalSize = (float64(props.RawValueSize) / float64(props.NumEntries))
856 1 : return avgValLogicalSize, compressionRatio
857 : }
858 :
859 : // newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
860 : // returns "ranged deletion" spans for a single table, providing a combined view
861 : // of both range deletion and range key deletion spans. The
862 : // tableRangedDeletionIter is intended for use in the specific case of computing
863 : // the statistics and deleteCompactionHints for a single table.
864 : //
865 : // As an example, consider the following set of spans from the range deletion
866 : // and range key blocks of a table:
867 : //
868 : // |---------| |---------| |-------| RANGEKEYDELs
869 : // |-----------|-------------| |-----| RANGEDELs
870 : // __________________________________________________________
871 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
872 : //
873 : // The tableRangedDeletionIter produces the following set of output spans, where
874 : // '1' indicates a span containing only range deletions, '2' is a span
875 : // containing only range key deletions, and '3' is a span containing a mixture
876 : // of both range deletions and range key deletions.
877 : //
878 : // 1 3 1 3 2 1 3 2
879 : // |-----|---------|-----|---|-----| |---|-|-----|
880 : // __________________________________________________________
881 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
882 : //
883 : // Algorithm.
884 : //
885 : // The iterator first defragments the range deletion and range key blocks
886 : // separately. During this defragmentation, the range key block is also filtered
887 : // so that keys other than range key deletes are ignored. The range delete and
888 : // range key delete keyspaces are then merged.
889 : //
890 : // Note that the only fragmentation introduced by merging is from where a range
891 : // del span overlaps with a range key del span. Within the bounds of any overlap
892 : // there is guaranteed to be no further fragmentation, as the constituent spans
893 : // have already been defragmented. To the left and right of any overlap, the
894 : // same reasoning applies. For example,
895 : //
896 : // |--------| |-------| RANGEKEYDEL
897 : // |---------------------------| RANGEDEL
898 : // |----1---|----3---|----1----|---2---| Merged, fragmented spans.
899 : // __________________________________________________________
900 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
901 : //
902 : // Any fragmented abutting spans produced by the merging iter will be of
903 : // differing types (i.e. a transition from a span with homogenous key kinds to a
904 : // heterogeneous span, or a transition from a span with exclusively range dels
905 : // to a span with exclusively range key dels). Therefore, further
906 : // defragmentation is not required.
907 : //
908 : // Each span returned by the tableRangeDeletionIter will have at most four keys,
909 : // corresponding to the largest and smallest sequence numbers encountered across
910 : // the range deletes and range keys deletes that comprised the merged spans.
911 : func newCombinedDeletionKeyspanIter(
912 : comparer *base.Comparer, r *sstable.Reader, m *tableMetadata, env sstable.ReadEnv,
913 1 : ) (keyspan.FragmentIterator, error) {
914 1 : // The range del iter and range key iter are each wrapped in their own
915 1 : // defragmenting iter. For each iter, abutting spans can always be merged.
916 1 : var equal = keyspan.DefragmentMethodFunc(func(_ base.CompareRangeSuffixes, a, b *keyspan.Span) bool { return true })
917 : // Reduce keys by maintaining a slice of at most length two, corresponding to
918 : // the largest and smallest keys in the defragmented span. This maintains the
919 : // contract that the emitted slice is sorted by (SeqNum, Kind) descending.
920 1 : reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
921 1 : if len(current) == 0 && len(incoming) == 0 {
922 0 : // While this should never occur in practice, a defensive return is used
923 0 : // here to preserve correctness.
924 0 : return current
925 0 : }
926 1 : var largest, smallest keyspan.Key
927 1 : var set bool
928 1 : for _, keys := range [2][]keyspan.Key{current, incoming} {
929 1 : if len(keys) == 0 {
930 0 : continue
931 : }
932 1 : first, last := keys[0], keys[len(keys)-1]
933 1 : if !set {
934 1 : largest, smallest = first, last
935 1 : set = true
936 1 : continue
937 : }
938 1 : if first.Trailer > largest.Trailer {
939 1 : largest = first
940 1 : }
941 1 : if last.Trailer < smallest.Trailer {
942 1 : smallest = last
943 1 : }
944 : }
945 1 : if largest.Equal(comparer.CompareRangeSuffixes, smallest) {
946 1 : current = append(current[:0], largest)
947 1 : } else {
948 1 : current = append(current[:0], largest, smallest)
949 1 : }
950 1 : return current
951 : }
952 :
953 : // The separate iters for the range dels and range keys are wrapped in a
954 : // merging iter to join the keyspaces into a single keyspace. The separate
955 : // iters are only added if the particular key kind is present.
956 1 : mIter := &keyspanimpl.MergingIter{}
957 1 : var transform = keyspan.TransformerFunc(func(_ base.CompareRangeSuffixes, in keyspan.Span, out *keyspan.Span) error {
958 1 : if in.KeysOrder != keyspan.ByTrailerDesc {
959 0 : panic("pebble: combined deletion iter encountered keys in non-trailer descending order")
960 : }
961 1 : out.Start, out.End = in.Start, in.End
962 1 : out.Keys = append(out.Keys[:0], in.Keys...)
963 1 : out.KeysOrder = keyspan.ByTrailerDesc
964 1 : // NB: The order of by-trailer descending may have been violated,
965 1 : // because we've layered rangekey and rangedel iterators from the same
966 1 : // sstable into the same keyspanimpl.MergingIter. The MergingIter will
967 1 : // return the keys in the order that the child iterators were provided.
968 1 : // Sort the keys to ensure they're sorted by trailer descending.
969 1 : keyspan.SortKeysByTrailer(out.Keys)
970 1 : return nil
971 : })
972 1 : mIter.Init(comparer, transform, new(keyspanimpl.MergingBuffers))
973 1 : iter, err := r.NewRawRangeDelIter(context.TODO(), m.FragmentIterTransforms(), env)
974 1 : if err != nil {
975 0 : return nil, err
976 0 : }
977 1 : if iter != nil {
978 1 : // Assert expected bounds. In previous versions of Pebble, range
979 1 : // deletions persisted to sstables could exceed the bounds of the
980 1 : // containing files due to "split user keys." This required readers to
981 1 : // constrain the tombstones' bounds to the containing file at read time.
982 1 : // See docs/range_deletions.md for an extended discussion of the design
983 1 : // and invariants at that time.
984 1 : //
985 1 : // We've since compacted away all 'split user-keys' and in the process
986 1 : // eliminated all "untruncated range tombstones" for physical sstables.
987 1 : // We no longer need to perform truncation at read time for these
988 1 : // sstables.
989 1 : //
990 1 : // At the same time, we've also introduced the concept of "virtual
991 1 : // SSTables" where the table metadata's effective bounds can again be
992 1 : // reduced to be narrower than the contained tombstones. These virtual
993 1 : // SSTables handle truncation differently, performing it using
994 1 : // keyspan.Truncate when the sstable's range deletion iterator is
995 1 : // opened.
996 1 : //
997 1 : // Together, these mean that we should never see untruncated range
998 1 : // tombstones any more—and the merging iterator no longer accounts for
999 1 : // their existence. Since there's abundant subtlety that we're relying
1000 1 : // on, we choose to be conservative and assert that these invariants
1001 1 : // hold. We could (and previously did) choose to only validate these
1002 1 : // bounds in invariants builds, but the most likely avenue for these
1003 1 : // tombstones' existence is through a bug in a migration and old data
1004 1 : // sitting around in an old store from long ago.
1005 1 : //
1006 1 : // The table stats collector will read all files range deletions
1007 1 : // asynchronously after Open, and provides a perfect opportunity to
1008 1 : // validate our invariants without harming user latency. We also
1009 1 : // previously performed truncation here which similarly required key
1010 1 : // comparisons, so replacing those key comparisons with assertions
1011 1 : // should be roughly similar in performance.
1012 1 : //
1013 1 : // TODO(jackson): Only use AssertBounds in invariants builds in the
1014 1 : // following release.
1015 1 : iter = keyspan.AssertBounds(
1016 1 : iter, m.PointKeyBounds.Smallest(), m.PointKeyBounds.LargestUserKey(), comparer.Compare,
1017 1 : )
1018 1 : dIter := &keyspan.DefragmentingIter{}
1019 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1020 1 : iter = dIter
1021 1 : mIter.AddLevel(iter)
1022 1 : }
1023 :
1024 1 : iter, err = r.NewRawRangeKeyIter(context.TODO(), m.FragmentIterTransforms(), env)
1025 1 : if err != nil {
1026 0 : return nil, err
1027 0 : }
1028 1 : if iter != nil {
1029 1 : // Assert expected bounds in tests.
1030 1 : if invariants.Sometimes(50) {
1031 1 : if m.HasRangeKeys {
1032 1 : iter = keyspan.AssertBounds(
1033 1 : iter, m.RangeKeyBounds.Smallest(), m.RangeKeyBounds.LargestUserKey(), comparer.Compare,
1034 1 : )
1035 1 : }
1036 : }
1037 : // Wrap the range key iterator in a filter that elides keys other than range
1038 : // key deletions.
1039 1 : iter = keyspan.Filter(iter, func(in *keyspan.Span, buf []keyspan.Key) []keyspan.Key {
1040 1 : keys := buf[:0]
1041 1 : for _, k := range in.Keys {
1042 1 : if k.Kind() != base.InternalKeyKindRangeKeyDelete {
1043 1 : continue
1044 : }
1045 1 : keys = append(keys, k)
1046 : }
1047 1 : return keys
1048 : }, comparer.Compare)
1049 1 : dIter := &keyspan.DefragmentingIter{}
1050 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
1051 1 : iter = dIter
1052 1 : mIter.AddLevel(iter)
1053 : }
1054 :
1055 1 : return mIter, nil
1056 : }
1057 :
1058 : // rangeKeySetsAnnotator is a manifest.Annotator that annotates B-Tree nodes
1059 : // with the sum of the files' counts of range key fragments. The count of range
1060 : // key sets may change once a table's stats are loaded asynchronously, so its
1061 : // values are marked as cacheable only if a file's stats have been loaded.
1062 1 : var rangeKeySetsAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1063 1 : return f.Stats.NumRangeKeySets, f.StatsValid()
1064 1 : })
1065 :
1066 : // tombstonesAnnotator is a manifest.Annotator that annotates B-Tree nodes
1067 : // with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDEL
1068 : // keys). The count of tombstones may change once a table's stats are loaded
1069 : // asynchronously, so its values are marked as cacheable only if a file's stats
1070 : // have been loaded.
1071 1 : var tombstonesAnnotator = manifest.SumAnnotator(func(f *manifest.TableMetadata) (uint64, bool) {
1072 1 : return f.Stats.NumDeletions, f.StatsValid()
1073 1 : })
1074 :
1075 : // valueBlocksSizeAnnotator is a manifest.Annotator that annotates B-Tree
1076 : // nodes with the sum of the files' Properties.ValueBlocksSize. The value block
1077 : // size may change once a table's stats are loaded asynchronously, so its
1078 : // values are marked as cacheable only if a file's stats have been loaded.
1079 1 : var valueBlockSizeAnnotator = manifest.SumAnnotator(func(f *tableMetadata) (uint64, bool) {
1080 1 : return f.Stats.ValueBlocksSize, f.StatsValid()
1081 1 : })
1082 :
1083 : // pointDeletionsBytesEstimateAnnotator is a manifest.Annotator that annotates
1084 : // B-Tree nodes with the sum of the files' PointDeletionsBytesEstimate. This
1085 : // value may change once a table's stats are loaded asynchronously, so its
1086 : // values are marked as cacheable only if a file's stats have been loaded.
1087 1 : var pointDeletionsBytesEstimateAnnotator = manifest.SumAnnotator(func(f *tableMetadata) (uint64, bool) {
1088 1 : return f.Stats.PointDeletionsBytesEstimate, f.StatsValid()
1089 1 : })
1090 :
1091 : // rangeDeletionsBytesEstimateAnnotator is a manifest.Annotator that annotates
1092 : // B-Tree nodes with the sum of the files' RangeDeletionsBytesEstimate. This
1093 : // value may change once a table's stats are loaded asynchronously, so its
1094 : // values are marked as cacheable only if a file's stats have been loaded.
1095 1 : var rangeDeletionsBytesEstimateAnnotator = manifest.SumAnnotator(func(f *tableMetadata) (uint64, bool) {
1096 1 : return f.Stats.RangeDeletionsBytesEstimate, f.StatsValid()
1097 1 : })
1098 :
1099 : // compressionTypeAnnotator is a manifest.Annotator that annotates B-tree
1100 : // nodes with the compression type of the file. Its annotation type is
1101 : // compressionTypes. The compression type may change once a table's stats are
1102 : // loaded asynchronously, so its values are marked as cacheable only if a file's
1103 : // stats have been loaded.
1104 : var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
1105 : Aggregator: compressionTypeAggregator{},
1106 : }
1107 :
1108 : type compressionTypeAggregator struct{}
1109 :
1110 : type compressionTypes struct {
1111 : snappy, zstd, minlz, none, unknown uint64
1112 : }
1113 :
1114 1 : func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
1115 1 : if dst == nil {
1116 1 : return new(compressionTypes)
1117 1 : }
1118 0 : *dst = compressionTypes{}
1119 0 : return dst
1120 : }
1121 :
1122 : func (a compressionTypeAggregator) Accumulate(
1123 : f *tableMetadata, dst *compressionTypes,
1124 1 : ) (v *compressionTypes, cacheOK bool) {
1125 1 : switch f.Stats.CompressionType {
1126 1 : case SnappyCompression:
1127 1 : dst.snappy++
1128 1 : case ZstdCompression:
1129 1 : dst.zstd++
1130 0 : case MinlzCompression:
1131 0 : dst.minlz++
1132 1 : case NoCompression:
1133 1 : dst.none++
1134 1 : default:
1135 1 : dst.unknown++
1136 : }
1137 1 : return dst, f.StatsValid()
1138 : }
1139 :
1140 : func (a compressionTypeAggregator) Merge(
1141 : src *compressionTypes, dst *compressionTypes,
1142 1 : ) *compressionTypes {
1143 1 : dst.snappy += src.snappy
1144 1 : dst.zstd += src.zstd
1145 1 : dst.minlz += src.minlz
1146 1 : dst.none += src.none
1147 1 : dst.unknown += src.unknown
1148 1 : return dst
1149 1 : }
|