Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "math"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/sstable"
15 : )
16 :
17 : // In-memory statistics about tables help inform compaction picking, but may
18 : // be expensive to calculate or load from disk. Every time a database is
19 : // opened, these statistics must be reloaded or recalculated. To minimize
20 : // impact on user activity and compactions, we load these statistics
21 : // asynchronously in the background and store loaded statistics in each
22 : // table's *FileMetadata.
23 : //
24 : // This file implements the asynchronous loading of statistics by maintaining
25 : // a list of files that require statistics, alongside their LSM levels.
26 : // Whenever new files are added to the LSM, the files are appended to
27 : // d.mu.tableStats.pending. If a stats collection job is not currently
28 : // running, one is started in a separate goroutine.
29 : //
30 : // The stats collection job grabs and clears the pending list, computes table
31 : // statistics relative to the current readState and updates the tables' file
32 : // metadata. New pending files may accumulate during a stats collection job,
33 : // so a completing job triggers a new job if necessary. Only one job runs at a
34 : // time.
35 : //
36 : // When an existing database is opened, all files lack in-memory statistics.
37 : // These files' stats are loaded incrementally whenever the pending list is
38 : // empty by scanning a current readState for files missing statistics. Once a
39 : // job completes a scan without finding any remaining files without
40 : // statistics, it flips a `loadedInitial` flag. From then on, the stats
41 : // collection job only needs to load statistics for new files appended to the
42 : // pending list.
43 :
44 1 : func (d *DB) maybeCollectTableStatsLocked() {
45 1 : if d.shouldCollectTableStatsLocked() {
46 1 : go d.collectTableStats()
47 1 : }
48 : }
49 :
50 : // updateTableStatsLocked is called when new files are introduced, after the
51 : // read state has been updated. It may trigger a new stat collection.
52 : // DB.mu must be locked when calling.
53 1 : func (d *DB) updateTableStatsLocked(newFiles []manifest.NewFileEntry) {
54 1 : var needStats bool
55 1 : for _, nf := range newFiles {
56 1 : if !nf.Meta.StatsValid() {
57 1 : needStats = true
58 1 : break
59 : }
60 : }
61 1 : if !needStats {
62 1 : return
63 1 : }
64 :
65 1 : d.mu.tableStats.pending = append(d.mu.tableStats.pending, newFiles...)
66 1 : d.maybeCollectTableStatsLocked()
67 : }
68 :
69 1 : func (d *DB) shouldCollectTableStatsLocked() bool {
70 1 : return !d.mu.tableStats.loading &&
71 1 : d.closed.Load() == nil &&
72 1 : !d.opts.private.disableTableStats &&
73 1 : (len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
74 1 : }
75 :
76 : // collectTableStats runs a table stats collection job, returning true if the
77 : // invocation did the collection work, false otherwise (e.g. if another job was
78 : // already running).
79 1 : func (d *DB) collectTableStats() bool {
80 1 : const maxTableStatsPerScan = 50
81 1 :
82 1 : d.mu.Lock()
83 1 : if !d.shouldCollectTableStatsLocked() {
84 1 : d.mu.Unlock()
85 1 : return false
86 1 : }
87 :
88 1 : pending := d.mu.tableStats.pending
89 1 : d.mu.tableStats.pending = nil
90 1 : d.mu.tableStats.loading = true
91 1 : jobID := d.mu.nextJobID
92 1 : d.mu.nextJobID++
93 1 : loadedInitial := d.mu.tableStats.loadedInitial
94 1 : // Drop DB.mu before performing IO.
95 1 : d.mu.Unlock()
96 1 :
97 1 : // Every run of collectTableStats either collects stats from the pending
98 1 : // list (if non-empty) or from scanning the version (loadedInitial is
99 1 : // false). This job only runs if at least one of those conditions holds.
100 1 :
101 1 : // Grab a read state to scan for tables.
102 1 : rs := d.loadReadState()
103 1 : var collected []collectedStats
104 1 : var hints []deleteCompactionHint
105 1 : if len(pending) > 0 {
106 1 : collected, hints = d.loadNewFileStats(rs, pending)
107 1 : } else {
108 1 : var moreRemain bool
109 1 : var buf [maxTableStatsPerScan]collectedStats
110 1 : collected, hints, moreRemain = d.scanReadStateTableStats(rs, buf[:0])
111 1 : loadedInitial = !moreRemain
112 1 : }
113 1 : rs.unref()
114 1 :
115 1 : // Update the FileMetadata with the loaded stats while holding d.mu.
116 1 : d.mu.Lock()
117 1 : defer d.mu.Unlock()
118 1 : d.mu.tableStats.loading = false
119 1 : if loadedInitial && !d.mu.tableStats.loadedInitial {
120 1 : d.mu.tableStats.loadedInitial = loadedInitial
121 1 : d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
122 1 : JobID: jobID,
123 1 : })
124 1 : }
125 :
126 1 : maybeCompact := false
127 1 : for _, c := range collected {
128 1 : c.fileMetadata.Stats = c.TableStats
129 1 : maybeCompact = maybeCompact || fileCompensation(c.fileMetadata) > 0
130 1 : c.fileMetadata.StatsMarkValid()
131 1 : }
132 1 : d.mu.tableStats.cond.Broadcast()
133 1 : d.maybeCollectTableStatsLocked()
134 1 : if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
135 1 : // Verify that all of the hint tombstones' files still exist in the
136 1 : // current version. Otherwise, the tombstone itself may have been
137 1 : // compacted into L6 and more recent keys may have had their sequence
138 1 : // numbers zeroed.
139 1 : //
140 1 : // Note that it's possible that the tombstone file is being compacted
141 1 : // presently. In that case, the file will be present in v. When the
142 1 : // compaction finishes compacting the tombstone file, it will detect
143 1 : // and clear the hint.
144 1 : //
145 1 : // See DB.maybeUpdateDeleteCompactionHints.
146 1 : v := d.mu.versions.currentVersion()
147 1 : keepHints := hints[:0]
148 1 : for _, h := range hints {
149 1 : if v.Contains(h.tombstoneLevel, d.cmp, h.tombstoneFile) {
150 1 : keepHints = append(keepHints, h)
151 1 : }
152 : }
153 1 : d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
154 : }
155 1 : if maybeCompact {
156 1 : d.maybeScheduleCompaction()
157 1 : }
158 1 : return true
159 : }
160 :
161 : type collectedStats struct {
162 : *fileMetadata
163 : manifest.TableStats
164 : }
165 :
166 : func (d *DB) loadNewFileStats(
167 : rs *readState, pending []manifest.NewFileEntry,
168 1 : ) ([]collectedStats, []deleteCompactionHint) {
169 1 : var hints []deleteCompactionHint
170 1 : collected := make([]collectedStats, 0, len(pending))
171 1 : for _, nf := range pending {
172 1 : // A file's stats might have been populated by an earlier call to
173 1 : // loadNewFileStats if the file was moved.
174 1 : // NB: We're not holding d.mu which protects f.Stats, but only
175 1 : // collectTableStats updates f.Stats for active files, and we
176 1 : // ensure only one goroutine runs it at a time through
177 1 : // d.mu.tableStats.loading.
178 1 : if nf.Meta.StatsValid() {
179 1 : continue
180 : }
181 :
182 : // The file isn't guaranteed to still be live in the readState's
183 : // version. It may have been deleted or moved. Skip it if it's not in
184 : // the expected level.
185 1 : if !rs.current.Contains(nf.Level, d.cmp, nf.Meta) {
186 1 : continue
187 : }
188 :
189 1 : stats, newHints, err := d.loadTableStats(
190 1 : rs.current, nf.Level,
191 1 : nf.Meta,
192 1 : )
193 1 : if err != nil {
194 0 : d.opts.EventListener.BackgroundError(err)
195 0 : continue
196 : }
197 : // NB: We don't update the FileMetadata yet, because we aren't
198 : // holding DB.mu. We'll copy it to the FileMetadata after we're
199 : // finished with IO.
200 1 : collected = append(collected, collectedStats{
201 1 : fileMetadata: nf.Meta,
202 1 : TableStats: stats,
203 1 : })
204 1 : hints = append(hints, newHints...)
205 : }
206 1 : return collected, hints
207 : }
208 :
209 : // scanReadStateTableStats is run by an active stat collection job when there
210 : // are no pending new files, but there might be files that existed at Open for
211 : // which we haven't loaded table stats.
212 : func (d *DB) scanReadStateTableStats(
213 : rs *readState, fill []collectedStats,
214 1 : ) ([]collectedStats, []deleteCompactionHint, bool) {
215 1 : moreRemain := false
216 1 : var hints []deleteCompactionHint
217 1 : for l, levelMetadata := range rs.current.Levels {
218 1 : iter := levelMetadata.Iter()
219 1 : for f := iter.First(); f != nil; f = iter.Next() {
220 1 : // NB: We're not holding d.mu which protects f.Stats, but only the
221 1 : // active stats collection job updates f.Stats for active files,
222 1 : // and we ensure only one goroutine runs it at a time through
223 1 : // d.mu.tableStats.loading. This makes it safe to read validity
224 1 : // through f.Stats.ValidLocked despite not holding d.mu.
225 1 : if f.StatsValid() {
226 1 : continue
227 : }
228 :
229 : // Limit how much work we do per read state. The older the read
230 : // state is, the higher the likelihood files are no longer being
231 : // used in the current version. If we've exhausted our allowance,
232 : // return true for the last return value to signal there's more
233 : // work to do.
234 1 : if len(fill) == cap(fill) {
235 1 : moreRemain = true
236 1 : return fill, hints, moreRemain
237 1 : }
238 :
239 1 : stats, newHints, err := d.loadTableStats(
240 1 : rs.current, l, f,
241 1 : )
242 1 : if err != nil {
243 0 : // Set `moreRemain` so we'll try again.
244 0 : moreRemain = true
245 0 : d.opts.EventListener.BackgroundError(err)
246 0 : continue
247 : }
248 1 : fill = append(fill, collectedStats{
249 1 : fileMetadata: f,
250 1 : TableStats: stats,
251 1 : })
252 1 : hints = append(hints, newHints...)
253 : }
254 : }
255 1 : return fill, hints, moreRemain
256 : }
257 :
258 : func (d *DB) loadTableStats(
259 : v *version, level int, meta *fileMetadata,
260 1 : ) (manifest.TableStats, []deleteCompactionHint, error) {
261 1 : var stats manifest.TableStats
262 1 : var compactionHints []deleteCompactionHint
263 1 : err := d.tableCache.withCommonReader(
264 1 : meta, func(r sstable.CommonReader) (err error) {
265 1 : props := r.CommonProperties()
266 1 : stats.NumEntries = props.NumEntries
267 1 : stats.NumDeletions = props.NumDeletions
268 1 : if props.NumPointDeletions() > 0 {
269 1 : if err = d.loadTablePointKeyStats(props, v, level, meta, &stats); err != nil {
270 0 : return
271 0 : }
272 : }
273 1 : if props.NumRangeDeletions > 0 || props.NumRangeKeyDels > 0 {
274 1 : if compactionHints, err = d.loadTableRangeDelStats(
275 1 : r, v, level, meta, &stats,
276 1 : ); err != nil {
277 0 : return
278 0 : }
279 : }
280 : // TODO(travers): Once we have real-world data, consider collecting
281 : // additional stats that may provide improved heuristics for compaction
282 : // picking.
283 1 : stats.NumRangeKeySets = props.NumRangeKeySets
284 1 : stats.ValueBlocksSize = props.ValueBlocksSize
285 1 : return
286 : })
287 1 : if err != nil {
288 0 : return stats, nil, err
289 0 : }
290 1 : return stats, compactionHints, nil
291 : }
292 :
293 : // loadTablePointKeyStats calculates the point key statistics for the given
294 : // table. The provided manifest.TableStats are updated.
295 : func (d *DB) loadTablePointKeyStats(
296 : props *sstable.CommonProperties,
297 : v *version,
298 : level int,
299 : meta *fileMetadata,
300 : stats *manifest.TableStats,
301 1 : ) error {
302 1 : // TODO(jackson): If the file has a wide keyspace, the average
303 1 : // value size beneath the entire file might not be representative
304 1 : // of the size of the keys beneath the point tombstones.
305 1 : // We could write the ranges of 'clusters' of point tombstones to
306 1 : // a sstable property and call averageValueSizeBeneath for each of
307 1 : // these narrower ranges to improve the estimate.
308 1 : avgValLogicalSize, compressionRatio, err := d.estimateSizesBeneath(v, level, meta, props)
309 1 : if err != nil {
310 0 : return err
311 0 : }
312 1 : stats.PointDeletionsBytesEstimate =
313 1 : pointDeletionsBytesEstimate(meta.Size, props, avgValLogicalSize, compressionRatio)
314 1 : return nil
315 : }
316 :
317 : // loadTableRangeDelStats calculates the range deletion and range key deletion
318 : // statistics for the given table.
319 : func (d *DB) loadTableRangeDelStats(
320 : r sstable.CommonReader, v *version, level int, meta *fileMetadata, stats *manifest.TableStats,
321 1 : ) ([]deleteCompactionHint, error) {
322 1 : iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta)
323 1 : if err != nil {
324 0 : return nil, err
325 0 : }
326 1 : defer iter.Close()
327 1 : var compactionHints []deleteCompactionHint
328 1 : // We iterate over the defragmented range tombstones and range key deletions,
329 1 : // which ensures we don't double count ranges deleted at different sequence
330 1 : // numbers. Also, merging abutting tombstones reduces the number of calls to
331 1 : // estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
332 1 : // our overall estimate.
333 1 : for s := iter.First(); s != nil; s = iter.Next() {
334 1 : start, end := s.Start, s.End
335 1 : // We only need to consider deletion size estimates for tables that contain
336 1 : // RANGEDELs.
337 1 : var maxRangeDeleteSeqNum uint64
338 1 : for _, k := range s.Keys {
339 1 : if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
340 1 : maxRangeDeleteSeqNum = k.SeqNum()
341 1 : break
342 : }
343 : }
344 :
345 : // If the file is in the last level of the LSM, there is no data beneath
346 : // it. The fact that there is still a range tombstone in a bottommost file
347 : // indicates two possibilites:
348 : // 1. an open snapshot kept the tombstone around, and the data the
349 : // tombstone deletes is contained within the file itself.
350 : // 2. the file was ingested.
351 : // In the first case, we'd like to estimate disk usage within the file
352 : // itself since compacting the file will drop that covered data. In the
353 : // second case, we expect that compacting the file will NOT drop any
354 : // data and rewriting the file is a waste of write bandwidth. We can
355 : // distinguish these cases by looking at the file metadata's sequence
356 : // numbers. A file's range deletions can only delete data within the
357 : // file at lower sequence numbers. All keys in an ingested sstable adopt
358 : // the same sequence number, preventing tombstones from deleting keys
359 : // within the same file. We check here if the largest RANGEDEL sequence
360 : // number is greater than the file's smallest sequence number. If it is,
361 : // the RANGEDEL could conceivably (although inconclusively) delete data
362 : // within the same file.
363 : //
364 : // Note that this heuristic is imperfect. If a table containing a range
365 : // deletion is ingested into L5 and subsequently compacted into L6 but
366 : // an open snapshot prevents elision of covered keys in L6, the
367 : // resulting RangeDeletionsBytesEstimate will incorrectly include all
368 : // covered keys.
369 : //
370 : // TODO(jackson): We could prevent the above error in the heuristic by
371 : // computing the file's RangeDeletionsBytesEstimate during the
372 : // compaction itself. It's unclear how common this is.
373 : //
374 : // NOTE: If the span `s` wholly contains a table containing range keys,
375 : // the returned size estimate will be slightly inflated by the range key
376 : // block. However, in practice, range keys are expected to be rare, and
377 : // the size of the range key block relative to the overall size of the
378 : // table is expected to be small.
379 1 : if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
380 1 : size, err := r.EstimateDiskUsage(start, end)
381 1 : if err != nil {
382 0 : return nil, err
383 0 : }
384 1 : stats.RangeDeletionsBytesEstimate += size
385 1 :
386 1 : // As the file is in the bottommost level, there is no need to collect a
387 1 : // deletion hint.
388 1 : continue
389 : }
390 :
391 : // While the size estimates for point keys should only be updated if this
392 : // span contains a range del, the sequence numbers are required for the
393 : // hint. Unconditionally descend, but conditionally update the estimates.
394 1 : hintType := compactionHintFromKeys(s.Keys)
395 1 : estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(v, level, start, end, hintType)
396 1 : if err != nil {
397 0 : return nil, err
398 0 : }
399 1 : stats.RangeDeletionsBytesEstimate += estimate
400 1 :
401 1 : // If any files were completely contained with the range,
402 1 : // hintSeqNum is the smallest sequence number contained in any
403 1 : // such file.
404 1 : if hintSeqNum == math.MaxUint64 {
405 1 : continue
406 : }
407 1 : hint := deleteCompactionHint{
408 1 : hintType: hintType,
409 1 : start: make([]byte, len(start)),
410 1 : end: make([]byte, len(end)),
411 1 : tombstoneFile: meta,
412 1 : tombstoneLevel: level,
413 1 : tombstoneLargestSeqNum: s.LargestSeqNum(),
414 1 : tombstoneSmallestSeqNum: s.SmallestSeqNum(),
415 1 : fileSmallestSeqNum: hintSeqNum,
416 1 : }
417 1 : copy(hint.start, start)
418 1 : copy(hint.end, end)
419 1 : compactionHints = append(compactionHints, hint)
420 : }
421 1 : return compactionHints, err
422 : }
423 :
424 : func (d *DB) estimateSizesBeneath(
425 : v *version, level int, meta *fileMetadata, fileProps *sstable.CommonProperties,
426 1 : ) (avgValueLogicalSize, compressionRatio float64, err error) {
427 1 : // Find all files in lower levels that overlap with meta,
428 1 : // summing their value sizes and entry counts.
429 1 : file := meta
430 1 : var fileSum, keySum, valSum, entryCount uint64
431 1 : // Include the file itself. This is important because in some instances, the
432 1 : // computed compression ratio is applied to the tombstones contained within
433 1 : // `meta` itself. If there are no files beneath `meta` in the LSM, we would
434 1 : // calculate a compression ratio of 0 which is not accurate for the file's
435 1 : // own tombstones.
436 1 : fileSum += file.Size
437 1 : entryCount += fileProps.NumEntries
438 1 : keySum += fileProps.RawKeySize
439 1 : valSum += fileProps.RawValueSize
440 1 :
441 1 : addPhysicalTableStats := func(r *sstable.Reader) (err error) {
442 1 : fileSum += file.Size
443 1 : entryCount += r.Properties.NumEntries
444 1 : keySum += r.Properties.RawKeySize
445 1 : valSum += r.Properties.RawValueSize
446 1 : return nil
447 1 : }
448 1 : addVirtualTableStats := func(v sstable.VirtualReader) (err error) {
449 0 : fileSum += file.Size
450 0 : entryCount += file.Stats.NumEntries
451 0 : keySum += v.Properties.RawKeySize
452 0 : valSum += v.Properties.RawValueSize
453 0 : return nil
454 0 : }
455 :
456 1 : for l := level + 1; l < numLevels; l++ {
457 1 : overlaps := v.Overlaps(l, d.cmp, meta.Smallest.UserKey,
458 1 : meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
459 1 : iter := overlaps.Iter()
460 1 : for file = iter.First(); file != nil; file = iter.Next() {
461 1 : var err error
462 1 : if file.Virtual {
463 0 : err = d.tableCache.withVirtualReader(file.VirtualMeta(), addVirtualTableStats)
464 1 : } else {
465 1 : err = d.tableCache.withReader(file.PhysicalMeta(), addPhysicalTableStats)
466 1 : }
467 1 : if err != nil {
468 0 : return 0, 0, err
469 0 : }
470 : }
471 : }
472 1 : if entryCount == 0 {
473 0 : return 0, 0, nil
474 0 : }
475 : // RawKeySize and RawValueSize are uncompressed totals. We'll need to scale
476 : // the value sum according to the data size to account for compression,
477 : // index blocks and metadata overhead. Eg:
478 : //
479 : // Compression rate × Average uncompressed value size
480 : //
481 : // ↓
482 : //
483 : // FileSize RawValueSize
484 : // ----------------------- × ------------
485 : // RawKeySize+RawValueSize NumEntries
486 : //
487 : // We return the average logical value size plus the compression ratio,
488 : // leaving the scaling to the caller. This allows the caller to perform
489 : // additional compression ratio scaling if necessary.
490 1 : uncompressedSum := float64(keySum + valSum)
491 1 : compressionRatio = float64(fileSum) / uncompressedSum
492 1 : avgValueLogicalSize = (float64(valSum) / float64(entryCount))
493 1 : return avgValueLogicalSize, compressionRatio, nil
494 : }
495 :
496 : func (d *DB) estimateReclaimedSizeBeneath(
497 : v *version, level int, start, end []byte, hintType deleteCompactionHintType,
498 1 : ) (estimate uint64, hintSeqNum uint64, err error) {
499 1 : // Find all files in lower levels that overlap with the deleted range
500 1 : // [start, end).
501 1 : //
502 1 : // An overlapping file might be completely contained by the range
503 1 : // tombstone, in which case we can count the entire file size in
504 1 : // our estimate without doing any additional I/O.
505 1 : //
506 1 : // Otherwise, estimating the range for the file requires
507 1 : // additional I/O to read the file's index blocks.
508 1 : hintSeqNum = math.MaxUint64
509 1 : for l := level + 1; l < numLevels; l++ {
510 1 : overlaps := v.Overlaps(l, d.cmp, start, end, true /* exclusiveEnd */)
511 1 : iter := overlaps.Iter()
512 1 : for file := iter.First(); file != nil; file = iter.Next() {
513 1 : startCmp := d.cmp(start, file.Smallest.UserKey)
514 1 : endCmp := d.cmp(file.Largest.UserKey, end)
515 1 : if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest.IsExclusiveSentinel()) {
516 1 : // The range fully contains the file, so skip looking it up in table
517 1 : // cache/looking at its indexes and add the full file size. Whether the
518 1 : // disk estimate and hint seqnums are updated depends on a) the type of
519 1 : // hint that requested the estimate and b) the keys contained in this
520 1 : // current file.
521 1 : var updateEstimates, updateHints bool
522 1 : switch hintType {
523 1 : case deleteCompactionHintTypePointKeyOnly:
524 1 : // The range deletion byte estimates should only be updated if this
525 1 : // table contains point keys. This ends up being an overestimate in
526 1 : // the case that table also has range keys, but such keys are expected
527 1 : // to contribute a negligible amount of the table's overall size,
528 1 : // relative to point keys.
529 1 : if file.HasPointKeys {
530 1 : updateEstimates = true
531 1 : }
532 : // As the initiating span contained only range dels, hints can only be
533 : // updated if this table does _not_ contain range keys.
534 1 : if !file.HasRangeKeys {
535 1 : updateHints = true
536 1 : }
537 1 : case deleteCompactionHintTypeRangeKeyOnly:
538 1 : // The initiating span contained only range key dels. The estimates
539 1 : // apply only to point keys, and are therefore not updated.
540 1 : updateEstimates = false
541 1 : // As the initiating span contained only range key dels, hints can
542 1 : // only be updated if this table does _not_ contain point keys.
543 1 : if !file.HasPointKeys {
544 1 : updateHints = true
545 1 : }
546 1 : case deleteCompactionHintTypePointAndRangeKey:
547 1 : // Always update the estimates and hints, as this hint type can drop a
548 1 : // file, irrespective of the mixture of keys. Similar to above, the
549 1 : // range del bytes estimates is an overestimate.
550 1 : updateEstimates, updateHints = true, true
551 0 : default:
552 0 : panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
553 : }
554 1 : if updateEstimates {
555 1 : estimate += file.Size
556 1 : }
557 1 : if updateHints && hintSeqNum > file.SmallestSeqNum {
558 1 : hintSeqNum = file.SmallestSeqNum
559 1 : }
560 1 : } else if d.cmp(file.Smallest.UserKey, end) <= 0 && d.cmp(start, file.Largest.UserKey) <= 0 {
561 1 : // Partial overlap.
562 1 : if hintType == deleteCompactionHintTypeRangeKeyOnly {
563 1 : // If the hint that generated this overlap contains only range keys,
564 1 : // there is no need to calculate disk usage, as the reclaimable space
565 1 : // is expected to be minimal relative to point keys.
566 1 : continue
567 : }
568 1 : var size uint64
569 1 : var err error
570 1 : if file.Virtual {
571 1 : err = d.tableCache.withVirtualReader(
572 1 : file.VirtualMeta(), func(r sstable.VirtualReader) (err error) {
573 1 : size, err = r.EstimateDiskUsage(start, end)
574 1 : return err
575 1 : })
576 1 : } else {
577 1 : err = d.tableCache.withReader(
578 1 : file.PhysicalMeta(), func(r *sstable.Reader) (err error) {
579 1 : size, err = r.EstimateDiskUsage(start, end)
580 1 : return err
581 1 : })
582 : }
583 :
584 1 : if err != nil {
585 0 : return 0, hintSeqNum, err
586 0 : }
587 1 : estimate += size
588 : }
589 : }
590 : }
591 1 : return estimate, hintSeqNum, nil
592 : }
593 :
594 1 : func maybeSetStatsFromProperties(meta physicalMeta, props *sstable.Properties) bool {
595 1 : // If a table contains range deletions or range key deletions, we defer the
596 1 : // stats collection. There are two main reasons for this:
597 1 : //
598 1 : // 1. Estimating the potential for reclaimed space due to a range deletion
599 1 : // tombstone requires scanning the LSM - a potentially expensive operation
600 1 : // that should be deferred.
601 1 : // 2. Range deletions and / or range key deletions present an opportunity to
602 1 : // compute "deletion hints", which also requires a scan of the LSM to
603 1 : // compute tables that would be eligible for deletion.
604 1 : //
605 1 : // These two tasks are deferred to the table stats collector goroutine.
606 1 : if props.NumRangeDeletions != 0 || props.NumRangeKeyDels != 0 {
607 1 : return false
608 1 : }
609 :
610 : // If a table is more than 10% point deletions without user-provided size
611 : // estimates, don't calculate the PointDeletionsBytesEstimate statistic
612 : // using our limited knowledge. The table stats collector can populate the
613 : // stats and calculate an average of value size of all the tables beneath
614 : // the table in the LSM, which will be more accurate.
615 1 : if unsizedDels := (props.NumDeletions - props.NumSizedDeletions); unsizedDels > props.NumEntries/10 {
616 1 : return false
617 1 : }
618 :
619 1 : var pointEstimate uint64
620 1 : if props.NumEntries > 0 {
621 1 : // Use the file's own average key and value sizes as an estimate. This
622 1 : // doesn't require any additional IO and since the number of point
623 1 : // deletions in the file is low, the error introduced by this crude
624 1 : // estimate is expected to be small.
625 1 : commonProps := &props.CommonProperties
626 1 : avgValSize, compressionRatio := estimatePhysicalSizes(meta.Size, commonProps)
627 1 : pointEstimate = pointDeletionsBytesEstimate(meta.Size, commonProps, avgValSize, compressionRatio)
628 1 : }
629 :
630 1 : meta.Stats.NumEntries = props.NumEntries
631 1 : meta.Stats.NumDeletions = props.NumDeletions
632 1 : meta.Stats.NumRangeKeySets = props.NumRangeKeySets
633 1 : meta.Stats.PointDeletionsBytesEstimate = pointEstimate
634 1 : meta.Stats.RangeDeletionsBytesEstimate = 0
635 1 : meta.Stats.ValueBlocksSize = props.ValueBlocksSize
636 1 : meta.StatsMarkValid()
637 1 : return true
638 : }
639 :
640 : func pointDeletionsBytesEstimate(
641 : fileSize uint64, props *sstable.CommonProperties, avgValLogicalSize, compressionRatio float64,
642 1 : ) (estimate uint64) {
643 1 : if props.NumEntries == 0 {
644 0 : return 0
645 0 : }
646 1 : numPointDels := props.NumPointDeletions()
647 1 : if numPointDels == 0 {
648 1 : return 0
649 1 : }
650 : // Estimate the potential space to reclaim using the table's own properties.
651 : // There may or may not be keys covered by any individual point tombstone.
652 : // If not, compacting the point tombstone into L6 will at least allow us to
653 : // drop the point deletion key and will reclaim the tombstone's key bytes.
654 : // If there are covered key(s), we also get to drop key and value bytes for
655 : // each covered key.
656 : //
657 : // Some point tombstones (DELSIZEDs) carry a user-provided estimate of the
658 : // uncompressed size of entries that will be elided by fully compacting the
659 : // tombstone. For these tombstones, there's no guesswork—we use the
660 : // RawPointTombstoneValueSizeHint property which is the sum of all these
661 : // tombstones' encoded values.
662 : //
663 : // For un-sized point tombstones (DELs), we estimate assuming that each
664 : // point tombstone on average covers 1 key and using average value sizes.
665 : // This is almost certainly an overestimate, but that's probably okay
666 : // because point tombstones can slow range iterations even when they don't
667 : // cover a key.
668 : //
669 : // TODO(jackson): This logic doesn't directly incorporate fixed per-key
670 : // overhead (8-byte trailer, plus at least 1 byte encoding the length of the
671 : // key and 1 byte encoding the length of the value). This overhead is
672 : // indirectly incorporated through the compression ratios, but that results
673 : // in the overhead being smeared per key-byte and value-byte, rather than
674 : // per-entry. This per-key fixed overhead can be nontrivial, especially for
675 : // dense swaths of point tombstones. Give some thought as to whether we
676 : // should directly include fixed per-key overhead in the calculations.
677 :
678 : // Below, we calculate the tombstone contributions and the shadowed keys'
679 : // contributions separately.
680 1 : var tombstonesLogicalSize float64
681 1 : var shadowedLogicalSize float64
682 1 :
683 1 : // 1. Calculate the contribution of the tombstone keys themselves.
684 1 : if props.RawPointTombstoneKeySize > 0 {
685 1 : tombstonesLogicalSize += float64(props.RawPointTombstoneKeySize)
686 1 : } else {
687 1 : // This sstable predates the existence of the RawPointTombstoneKeySize
688 1 : // property. We can use the average key size within the file itself and
689 1 : // the count of point deletions to estimate the size.
690 1 : tombstonesLogicalSize += float64(numPointDels * props.RawKeySize / props.NumEntries)
691 1 : }
692 :
693 : // 2. Calculate the contribution of the keys shadowed by tombstones.
694 : //
695 : // 2a. First account for keys shadowed by DELSIZED tombstones. THE DELSIZED
696 : // tombstones encode the size of both the key and value of the shadowed KV
697 : // entries. These sizes are aggregated into a sstable property.
698 1 : shadowedLogicalSize += float64(props.RawPointTombstoneValueSize)
699 1 :
700 1 : // 2b. Calculate the contribution of the KV entries shadowed by ordinary DEL
701 1 : // keys.
702 1 : numUnsizedDels := numPointDels - props.NumSizedDeletions
703 1 : {
704 1 : // The shadowed keys have the same exact user keys as the tombstones
705 1 : // themselves, so we can use the `tombstonesLogicalSize` we computed
706 1 : // earlier as an estimate. There's a complication that
707 1 : // `tombstonesLogicalSize` may include DELSIZED keys we already
708 1 : // accounted for.
709 1 : shadowedLogicalSize += float64(tombstonesLogicalSize) / float64(numPointDels) * float64(numUnsizedDels)
710 1 :
711 1 : // Calculate the contribution of the deleted values. The caller has
712 1 : // already computed an average logical size (possibly computed across
713 1 : // many sstables).
714 1 : shadowedLogicalSize += float64(numUnsizedDels) * avgValLogicalSize
715 1 : }
716 :
717 : // Scale both tombstone and shadowed totals by logical:physical ratios to
718 : // account for compression, metadata overhead, etc.
719 : //
720 : // Physical FileSize
721 : // ----------- = -----------------------
722 : // Logical RawKeySize+RawValueSize
723 : //
724 1 : return uint64((tombstonesLogicalSize + shadowedLogicalSize) * compressionRatio)
725 : }
726 :
727 : func estimatePhysicalSizes(
728 : fileSize uint64, props *sstable.CommonProperties,
729 1 : ) (avgValLogicalSize, compressionRatio float64) {
730 1 : // RawKeySize and RawValueSize are uncompressed totals. Scale according to
731 1 : // the data size to account for compression, index blocks and metadata
732 1 : // overhead. Eg:
733 1 : //
734 1 : // Compression rate × Average uncompressed value size
735 1 : //
736 1 : // ↓
737 1 : //
738 1 : // FileSize RawValSize
739 1 : // ----------------------- × ----------
740 1 : // RawKeySize+RawValueSize NumEntries
741 1 : //
742 1 : uncompressedSum := props.RawKeySize + props.RawValueSize
743 1 : compressionRatio = float64(fileSize) / float64(uncompressedSum)
744 1 : avgValLogicalSize = (float64(props.RawValueSize) / float64(props.NumEntries))
745 1 : return avgValLogicalSize, compressionRatio
746 1 : }
747 :
748 : // newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
749 : // returns "ranged deletion" spans for a single table, providing a combined view
750 : // of both range deletion and range key deletion spans. The
751 : // tableRangedDeletionIter is intended for use in the specific case of computing
752 : // the statistics and deleteCompactionHints for a single table.
753 : //
754 : // As an example, consider the following set of spans from the range deletion
755 : // and range key blocks of a table:
756 : //
757 : // |---------| |---------| |-------| RANGEKEYDELs
758 : // |-----------|-------------| |-----| RANGEDELs
759 : // __________________________________________________________
760 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
761 : //
762 : // The tableRangedDeletionIter produces the following set of output spans, where
763 : // '1' indicates a span containing only range deletions, '2' is a span
764 : // containing only range key deletions, and '3' is a span containing a mixture
765 : // of both range deletions and range key deletions.
766 : //
767 : // 1 3 1 3 2 1 3 2
768 : // |-----|---------|-----|---|-----| |---|-|-----|
769 : // __________________________________________________________
770 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
771 : //
772 : // Algorithm.
773 : //
774 : // The iterator first defragments the range deletion and range key blocks
775 : // separately. During this defragmentation, the range key block is also filtered
776 : // so that keys other than range key deletes are ignored. The range delete and
777 : // range key delete keyspaces are then merged.
778 : //
779 : // Note that the only fragmentation introduced by merging is from where a range
780 : // del span overlaps with a range key del span. Within the bounds of any overlap
781 : // there is guaranteed to be no further fragmentation, as the constituent spans
782 : // have already been defragmented. To the left and right of any overlap, the
783 : // same reasoning applies. For example,
784 : //
785 : // |--------| |-------| RANGEKEYDEL
786 : // |---------------------------| RANGEDEL
787 : // |----1---|----3---|----1----|---2---| Merged, fragmented spans.
788 : // __________________________________________________________
789 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
790 : //
791 : // Any fragmented abutting spans produced by the merging iter will be of
792 : // differing types (i.e. a transition from a span with homogenous key kinds to a
793 : // heterogeneous span, or a transition from a span with exclusively range dels
794 : // to a span with exclusively range key dels). Therefore, further
795 : // defragmentation is not required.
796 : //
797 : // Each span returned by the tableRangeDeletionIter will have at most four keys,
798 : // corresponding to the largest and smallest sequence numbers encountered across
799 : // the range deletes and range keys deletes that comprised the merged spans.
800 : func newCombinedDeletionKeyspanIter(
801 : comparer *base.Comparer, cr sstable.CommonReader, m *fileMetadata,
802 1 : ) (keyspan.FragmentIterator, error) {
803 1 : // The range del iter and range key iter are each wrapped in their own
804 1 : // defragmenting iter. For each iter, abutting spans can always be merged.
805 1 : var equal = keyspan.DefragmentMethodFunc(func(_ base.Equal, a, b *keyspan.Span) bool { return true })
806 : // Reduce keys by maintaining a slice of at most length two, corresponding to
807 : // the largest and smallest keys in the defragmented span. This maintains the
808 : // contract that the emitted slice is sorted by (SeqNum, Kind) descending.
809 1 : reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
810 1 : if len(current) == 0 && len(incoming) == 0 {
811 0 : // While this should never occur in practice, a defensive return is used
812 0 : // here to preserve correctness.
813 0 : return current
814 0 : }
815 1 : var largest, smallest keyspan.Key
816 1 : var set bool
817 1 : for _, keys := range [2][]keyspan.Key{current, incoming} {
818 1 : if len(keys) == 0 {
819 0 : continue
820 : }
821 1 : first, last := keys[0], keys[len(keys)-1]
822 1 : if !set {
823 1 : largest, smallest = first, last
824 1 : set = true
825 1 : continue
826 : }
827 1 : if first.Trailer > largest.Trailer {
828 1 : largest = first
829 1 : }
830 1 : if last.Trailer < smallest.Trailer {
831 1 : smallest = last
832 1 : }
833 : }
834 1 : if largest.Equal(comparer.Equal, smallest) {
835 1 : current = append(current[:0], largest)
836 1 : } else {
837 1 : current = append(current[:0], largest, smallest)
838 1 : }
839 1 : return current
840 : }
841 :
842 : // The separate iters for the range dels and range keys are wrapped in a
843 : // merging iter to join the keyspaces into a single keyspace. The separate
844 : // iters are only added if the particular key kind is present.
845 1 : mIter := &keyspan.MergingIter{}
846 1 : var transform = keyspan.TransformerFunc(func(cmp base.Compare, in keyspan.Span, out *keyspan.Span) error {
847 1 : if in.KeysOrder != keyspan.ByTrailerDesc {
848 0 : panic("pebble: combined deletion iter encountered keys in non-trailer descending order")
849 : }
850 1 : out.Start, out.End = in.Start, in.End
851 1 : out.Keys = append(out.Keys[:0], in.Keys...)
852 1 : out.KeysOrder = keyspan.ByTrailerDesc
853 1 : // NB: The order of by-trailer descending may have been violated,
854 1 : // because we've layered rangekey and rangedel iterators from the same
855 1 : // sstable into the same keyspan.MergingIter. The MergingIter will
856 1 : // return the keys in the order that the child iterators were provided.
857 1 : // Sort the keys to ensure they're sorted by trailer descending.
858 1 : keyspan.SortKeysByTrailer(&out.Keys)
859 1 : return nil
860 : })
861 1 : mIter.Init(comparer.Compare, transform, new(keyspan.MergingBuffers))
862 1 :
863 1 : iter, err := cr.NewRawRangeDelIter()
864 1 : if err != nil {
865 0 : return nil, err
866 0 : }
867 1 : if iter != nil {
868 1 : dIter := &keyspan.DefragmentingIter{}
869 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
870 1 : iter = dIter
871 1 : // Truncate tombstones to the containing file's bounds if necessary.
872 1 : // See docs/range_deletions.md for why this is necessary.
873 1 : iter = keyspan.Truncate(
874 1 : comparer.Compare, iter, m.Smallest.UserKey, m.Largest.UserKey,
875 1 : nil, nil, false, /* panicOnUpperTruncate */
876 1 : )
877 1 : mIter.AddLevel(iter)
878 1 : }
879 :
880 1 : iter, err = cr.NewRawRangeKeyIter()
881 1 : if err != nil {
882 0 : return nil, err
883 0 : }
884 1 : if iter != nil {
885 1 : // Wrap the range key iterator in a filter that elides keys other than range
886 1 : // key deletions.
887 1 : iter = keyspan.Filter(iter, func(in *keyspan.Span, out *keyspan.Span) (keep bool) {
888 1 : out.Start, out.End = in.Start, in.End
889 1 : out.Keys = out.Keys[:0]
890 1 : for _, k := range in.Keys {
891 1 : if k.Kind() != base.InternalKeyKindRangeKeyDelete {
892 1 : continue
893 : }
894 1 : out.Keys = append(out.Keys, k)
895 : }
896 1 : return len(out.Keys) > 0
897 : })
898 1 : dIter := &keyspan.DefragmentingIter{}
899 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
900 1 : iter = dIter
901 1 : mIter.AddLevel(iter)
902 : }
903 :
904 1 : return mIter, nil
905 : }
906 :
907 : // rangeKeySetsAnnotator implements manifest.Annotator, annotating B-Tree nodes
908 : // with the sum of the files' counts of range key fragments. Its annotation type
909 : // is a *uint64. The count of range key sets may change once a table's stats are
910 : // loaded asynchronously, so its values are marked as cacheable only if a file's
911 : // stats have been loaded.
912 : type rangeKeySetsAnnotator struct{}
913 :
914 : var _ manifest.Annotator = rangeKeySetsAnnotator{}
915 :
916 1 : func (a rangeKeySetsAnnotator) Zero(dst interface{}) interface{} {
917 1 : if dst == nil {
918 1 : return new(uint64)
919 1 : }
920 0 : v := dst.(*uint64)
921 0 : *v = 0
922 0 : return v
923 : }
924 :
925 : func (a rangeKeySetsAnnotator) Accumulate(
926 : f *fileMetadata, dst interface{},
927 1 : ) (v interface{}, cacheOK bool) {
928 1 : vptr := dst.(*uint64)
929 1 : *vptr = *vptr + f.Stats.NumRangeKeySets
930 1 : return vptr, f.StatsValid()
931 1 : }
932 :
933 0 : func (a rangeKeySetsAnnotator) Merge(src interface{}, dst interface{}) interface{} {
934 0 : srcV := src.(*uint64)
935 0 : dstV := dst.(*uint64)
936 0 : *dstV = *dstV + *srcV
937 0 : return dstV
938 0 : }
939 :
940 : // countRangeKeySetFragments counts the number of RANGEKEYSET keys across all
941 : // files of the LSM. It only counts keys in files for which table stats have
942 : // been loaded. It uses a b-tree annotator to cache intermediate values between
943 : // calculations when possible.
944 1 : func countRangeKeySetFragments(v *version) (count uint64) {
945 1 : for l := 0; l < numLevels; l++ {
946 1 : if v.RangeKeyLevels[l].Empty() {
947 1 : continue
948 : }
949 1 : count += *v.RangeKeyLevels[l].Annotation(rangeKeySetsAnnotator{}).(*uint64)
950 : }
951 1 : return count
952 : }
953 :
954 : // tombstonesAnnotator implements manifest.Annotator, annotating B-Tree nodes
955 : // with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDELk
956 : // eys). Its annotation type is a *uint64. The count of tombstones may change
957 : // once a table's stats are loaded asynchronously, so its values are marked as
958 : // cacheable only if a file's stats have been loaded.
959 : type tombstonesAnnotator struct{}
960 :
961 : var _ manifest.Annotator = tombstonesAnnotator{}
962 :
963 1 : func (a tombstonesAnnotator) Zero(dst interface{}) interface{} {
964 1 : if dst == nil {
965 1 : return new(uint64)
966 1 : }
967 1 : v := dst.(*uint64)
968 1 : *v = 0
969 1 : return v
970 : }
971 :
972 : func (a tombstonesAnnotator) Accumulate(
973 : f *fileMetadata, dst interface{},
974 1 : ) (v interface{}, cacheOK bool) {
975 1 : vptr := dst.(*uint64)
976 1 : *vptr = *vptr + f.Stats.NumDeletions
977 1 : return vptr, f.StatsValid()
978 1 : }
979 :
980 1 : func (a tombstonesAnnotator) Merge(src interface{}, dst interface{}) interface{} {
981 1 : srcV := src.(*uint64)
982 1 : dstV := dst.(*uint64)
983 1 : *dstV = *dstV + *srcV
984 1 : return dstV
985 1 : }
986 :
987 : // countTombstones counts the number of tombstone (DEL, SINGLEDEL and RANGEDEL)
988 : // internal keys across all files of the LSM. It only counts keys in files for
989 : // which table stats have been loaded. It uses a b-tree annotator to cache
990 : // intermediate values between calculations when possible.
991 1 : func countTombstones(v *version) (count uint64) {
992 1 : for l := 0; l < numLevels; l++ {
993 1 : if v.Levels[l].Empty() {
994 1 : continue
995 : }
996 1 : count += *v.Levels[l].Annotation(tombstonesAnnotator{}).(*uint64)
997 : }
998 1 : return count
999 : }
1000 :
1001 : // valueBlocksSizeAnnotator implements manifest.Annotator, annotating B-Tree
1002 : // nodes with the sum of the files' Properties.ValueBlocksSize. Its annotation
1003 : // type is a *uint64. The value block size may change once a table's stats are
1004 : // loaded asynchronously, so its values are marked as cacheable only if a
1005 : // file's stats have been loaded.
1006 : type valueBlocksSizeAnnotator struct{}
1007 :
1008 : var _ manifest.Annotator = valueBlocksSizeAnnotator{}
1009 :
1010 1 : func (a valueBlocksSizeAnnotator) Zero(dst interface{}) interface{} {
1011 1 : if dst == nil {
1012 1 : return new(uint64)
1013 1 : }
1014 1 : v := dst.(*uint64)
1015 1 : *v = 0
1016 1 : return v
1017 : }
1018 :
1019 : func (a valueBlocksSizeAnnotator) Accumulate(
1020 : f *fileMetadata, dst interface{},
1021 1 : ) (v interface{}, cacheOK bool) {
1022 1 : vptr := dst.(*uint64)
1023 1 : *vptr = *vptr + f.Stats.ValueBlocksSize
1024 1 : return vptr, f.StatsValid()
1025 1 : }
1026 :
1027 1 : func (a valueBlocksSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
1028 1 : srcV := src.(*uint64)
1029 1 : dstV := dst.(*uint64)
1030 1 : *dstV = *dstV + *srcV
1031 1 : return dstV
1032 1 : }
1033 :
1034 : // valueBlocksSizeForLevel returns the Properties.ValueBlocksSize across all
1035 : // files for a level of the LSM. It only includes the size for files for which
1036 : // table stats have been loaded. It uses a b-tree annotator to cache
1037 : // intermediate values between calculations when possible. It must not be
1038 : // called concurrently.
1039 : //
1040 : // REQUIRES: 0 <= level <= numLevels.
1041 1 : func valueBlocksSizeForLevel(v *version, level int) (count uint64) {
1042 1 : if v.Levels[level].Empty() {
1043 1 : return 0
1044 1 : }
1045 1 : return *v.Levels[level].Annotation(valueBlocksSizeAnnotator{}).(*uint64)
1046 : }
|