Line data Source code
1 : // Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "fmt"
9 : "math"
10 :
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/sstable"
15 : )
16 :
17 : // In-memory statistics about tables help inform compaction picking, but may
18 : // be expensive to calculate or load from disk. Every time a database is
19 : // opened, these statistics must be reloaded or recalculated. To minimize
20 : // impact on user activity and compactions, we load these statistics
21 : // asynchronously in the background and store loaded statistics in each
22 : // table's *FileMetadata.
23 : //
24 : // This file implements the asynchronous loading of statistics by maintaining
25 : // a list of files that require statistics, alongside their LSM levels.
26 : // Whenever new files are added to the LSM, the files are appended to
27 : // d.mu.tableStats.pending. If a stats collection job is not currently
28 : // running, one is started in a separate goroutine.
29 : //
30 : // The stats collection job grabs and clears the pending list, computes table
31 : // statistics relative to the current readState and updates the tables' file
32 : // metadata. New pending files may accumulate during a stats collection job,
33 : // so a completing job triggers a new job if necessary. Only one job runs at a
34 : // time.
35 : //
36 : // When an existing database is opened, all files lack in-memory statistics.
37 : // These files' stats are loaded incrementally whenever the pending list is
38 : // empty by scanning a current readState for files missing statistics. Once a
39 : // job completes a scan without finding any remaining files without
40 : // statistics, it flips a `loadedInitial` flag. From then on, the stats
41 : // collection job only needs to load statistics for new files appended to the
42 : // pending list.
43 :
44 1 : func (d *DB) maybeCollectTableStatsLocked() {
45 1 : if d.shouldCollectTableStatsLocked() {
46 1 : go d.collectTableStats()
47 1 : }
48 : }
49 :
50 : // updateTableStatsLocked is called when new files are introduced, after the
51 : // read state has been updated. It may trigger a new stat collection.
52 : // DB.mu must be locked when calling.
53 1 : func (d *DB) updateTableStatsLocked(newFiles []manifest.NewFileEntry) {
54 1 : var needStats bool
55 1 : for _, nf := range newFiles {
56 1 : if !nf.Meta.StatsValid() {
57 1 : needStats = true
58 1 : break
59 : }
60 : }
61 1 : if !needStats {
62 1 : return
63 1 : }
64 :
65 1 : d.mu.tableStats.pending = append(d.mu.tableStats.pending, newFiles...)
66 1 : d.maybeCollectTableStatsLocked()
67 : }
68 :
69 1 : func (d *DB) shouldCollectTableStatsLocked() bool {
70 1 : return !d.mu.tableStats.loading &&
71 1 : d.closed.Load() == nil &&
72 1 : !d.opts.private.disableTableStats &&
73 1 : (len(d.mu.tableStats.pending) > 0 || !d.mu.tableStats.loadedInitial)
74 1 : }
75 :
76 : // collectTableStats runs a table stats collection job, returning true if the
77 : // invocation did the collection work, false otherwise (e.g. if another job was
78 : // already running).
79 1 : func (d *DB) collectTableStats() bool {
80 1 : const maxTableStatsPerScan = 50
81 1 :
82 1 : d.mu.Lock()
83 1 : if !d.shouldCollectTableStatsLocked() {
84 1 : d.mu.Unlock()
85 1 : return false
86 1 : }
87 :
88 1 : pending := d.mu.tableStats.pending
89 1 : d.mu.tableStats.pending = nil
90 1 : d.mu.tableStats.loading = true
91 1 : jobID := d.mu.nextJobID
92 1 : d.mu.nextJobID++
93 1 : loadedInitial := d.mu.tableStats.loadedInitial
94 1 : // Drop DB.mu before performing IO.
95 1 : d.mu.Unlock()
96 1 :
97 1 : // Every run of collectTableStats either collects stats from the pending
98 1 : // list (if non-empty) or from scanning the version (loadedInitial is
99 1 : // false). This job only runs if at least one of those conditions holds.
100 1 :
101 1 : // Grab a read state to scan for tables.
102 1 : rs := d.loadReadState()
103 1 : var collected []collectedStats
104 1 : var hints []deleteCompactionHint
105 1 : if len(pending) > 0 {
106 1 : collected, hints = d.loadNewFileStats(rs, pending)
107 1 : } else {
108 1 : var moreRemain bool
109 1 : var buf [maxTableStatsPerScan]collectedStats
110 1 : collected, hints, moreRemain = d.scanReadStateTableStats(rs, buf[:0])
111 1 : loadedInitial = !moreRemain
112 1 : }
113 1 : rs.unref()
114 1 :
115 1 : // Update the FileMetadata with the loaded stats while holding d.mu.
116 1 : d.mu.Lock()
117 1 : defer d.mu.Unlock()
118 1 : d.mu.tableStats.loading = false
119 1 : if loadedInitial && !d.mu.tableStats.loadedInitial {
120 1 : d.mu.tableStats.loadedInitial = loadedInitial
121 1 : d.opts.EventListener.TableStatsLoaded(TableStatsInfo{
122 1 : JobID: jobID,
123 1 : })
124 1 : }
125 :
126 1 : maybeCompact := false
127 1 : for _, c := range collected {
128 1 : c.fileMetadata.Stats = c.TableStats
129 1 : maybeCompact = maybeCompact || fileCompensation(c.fileMetadata) > 0
130 1 : c.fileMetadata.StatsMarkValid()
131 1 : }
132 1 : d.mu.tableStats.cond.Broadcast()
133 1 : d.maybeCollectTableStatsLocked()
134 1 : if len(hints) > 0 && !d.opts.private.disableDeleteOnlyCompactions {
135 1 : // Verify that all of the hint tombstones' files still exist in the
136 1 : // current version. Otherwise, the tombstone itself may have been
137 1 : // compacted into L6 and more recent keys may have had their sequence
138 1 : // numbers zeroed.
139 1 : //
140 1 : // Note that it's possible that the tombstone file is being compacted
141 1 : // presently. In that case, the file will be present in v. When the
142 1 : // compaction finishes compacting the tombstone file, it will detect
143 1 : // and clear the hint.
144 1 : //
145 1 : // See DB.maybeUpdateDeleteCompactionHints.
146 1 : v := d.mu.versions.currentVersion()
147 1 : keepHints := hints[:0]
148 1 : for _, h := range hints {
149 1 : if v.Contains(h.tombstoneLevel, d.cmp, h.tombstoneFile) {
150 1 : keepHints = append(keepHints, h)
151 1 : }
152 : }
153 1 : d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, keepHints...)
154 : }
155 1 : if maybeCompact {
156 1 : d.maybeScheduleCompaction()
157 1 : }
158 1 : return true
159 : }
160 :
161 : type collectedStats struct {
162 : *fileMetadata
163 : manifest.TableStats
164 : }
165 :
166 : func (d *DB) loadNewFileStats(
167 : rs *readState, pending []manifest.NewFileEntry,
168 1 : ) ([]collectedStats, []deleteCompactionHint) {
169 1 : var hints []deleteCompactionHint
170 1 : collected := make([]collectedStats, 0, len(pending))
171 1 : for _, nf := range pending {
172 1 : // A file's stats might have been populated by an earlier call to
173 1 : // loadNewFileStats if the file was moved.
174 1 : // NB: We're not holding d.mu which protects f.Stats, but only
175 1 : // collectTableStats updates f.Stats for active files, and we
176 1 : // ensure only one goroutine runs it at a time through
177 1 : // d.mu.tableStats.loading.
178 1 : if nf.Meta.StatsValid() {
179 1 : continue
180 : }
181 :
182 : // The file isn't guaranteed to still be live in the readState's
183 : // version. It may have been deleted or moved. Skip it if it's not in
184 : // the expected level.
185 1 : if !rs.current.Contains(nf.Level, d.cmp, nf.Meta) {
186 1 : continue
187 : }
188 :
189 1 : if nf.Meta.Virtual {
190 1 : // cannot load virtual table stats
191 1 : continue
192 : }
193 :
194 1 : stats, newHints, err := d.loadTableStats(
195 1 : rs.current, nf.Level,
196 1 : nf.Meta.PhysicalMeta(),
197 1 : )
198 1 : if err != nil {
199 0 : d.opts.EventListener.BackgroundError(err)
200 0 : continue
201 : }
202 : // NB: We don't update the FileMetadata yet, because we aren't
203 : // holding DB.mu. We'll copy it to the FileMetadata after we're
204 : // finished with IO.
205 1 : collected = append(collected, collectedStats{
206 1 : fileMetadata: nf.Meta,
207 1 : TableStats: stats,
208 1 : })
209 1 : hints = append(hints, newHints...)
210 : }
211 1 : return collected, hints
212 : }
213 :
214 : // scanReadStateTableStats is run by an active stat collection job when there
215 : // are no pending new files, but there might be files that existed at Open for
216 : // which we haven't loaded table stats.
217 : func (d *DB) scanReadStateTableStats(
218 : rs *readState, fill []collectedStats,
219 1 : ) ([]collectedStats, []deleteCompactionHint, bool) {
220 1 : moreRemain := false
221 1 : var hints []deleteCompactionHint
222 1 : for l, levelMetadata := range rs.current.Levels {
223 1 : iter := levelMetadata.Iter()
224 1 : for f := iter.First(); f != nil; f = iter.Next() {
225 1 : if f.Virtual {
226 0 : // TODO(bananabrick): Support stats collection for virtual
227 0 : // sstables.
228 0 : continue
229 : }
230 :
231 : // NB: We're not holding d.mu which protects f.Stats, but only the
232 : // active stats collection job updates f.Stats for active files,
233 : // and we ensure only one goroutine runs it at a time through
234 : // d.mu.tableStats.loading. This makes it safe to read validity
235 : // through f.Stats.ValidLocked despite not holding d.mu.
236 1 : if f.StatsValid() {
237 1 : continue
238 : }
239 : // TODO(bilal): Remove this guard when table stats collection is
240 : // implemented for virtual sstables.
241 1 : if f.Virtual {
242 0 : continue
243 : }
244 :
245 : // Limit how much work we do per read state. The older the read
246 : // state is, the higher the likelihood files are no longer being
247 : // used in the current version. If we've exhausted our allowance,
248 : // return true for the last return value to signal there's more
249 : // work to do.
250 1 : if len(fill) == cap(fill) {
251 0 : moreRemain = true
252 0 : return fill, hints, moreRemain
253 0 : }
254 :
255 1 : stats, newHints, err := d.loadTableStats(
256 1 : rs.current, l, f.PhysicalMeta(),
257 1 : )
258 1 : if err != nil {
259 0 : // Set `moreRemain` so we'll try again.
260 0 : moreRemain = true
261 0 : d.opts.EventListener.BackgroundError(err)
262 0 : continue
263 : }
264 1 : fill = append(fill, collectedStats{
265 1 : fileMetadata: f,
266 1 : TableStats: stats,
267 1 : })
268 1 : hints = append(hints, newHints...)
269 : }
270 : }
271 1 : return fill, hints, moreRemain
272 : }
273 :
274 : // loadTableStats currently only supports stats collection for physical
275 : // sstables.
276 : //
277 : // TODO(bananabrick): Support stats collection for virtual sstables.
278 : func (d *DB) loadTableStats(
279 : v *version, level int, meta physicalMeta,
280 1 : ) (manifest.TableStats, []deleteCompactionHint, error) {
281 1 : var stats manifest.TableStats
282 1 : var compactionHints []deleteCompactionHint
283 1 : err := d.tableCache.withReader(
284 1 : meta, func(r *sstable.Reader) (err error) {
285 1 : stats.NumEntries = r.Properties.NumEntries
286 1 : stats.NumDeletions = r.Properties.NumDeletions
287 1 : if r.Properties.NumPointDeletions() > 0 {
288 1 : if err = d.loadTablePointKeyStats(r, v, level, meta, &stats); err != nil {
289 0 : return
290 0 : }
291 : }
292 1 : if r.Properties.NumRangeDeletions > 0 || r.Properties.NumRangeKeyDels > 0 {
293 1 : if compactionHints, err = d.loadTableRangeDelStats(
294 1 : r, v, level, meta, &stats,
295 1 : ); err != nil {
296 0 : return
297 0 : }
298 : }
299 : // TODO(travers): Once we have real-world data, consider collecting
300 : // additional stats that may provide improved heuristics for compaction
301 : // picking.
302 1 : stats.NumRangeKeySets = r.Properties.NumRangeKeySets
303 1 : stats.ValueBlocksSize = r.Properties.ValueBlocksSize
304 1 : return
305 : })
306 1 : if err != nil {
307 0 : return stats, nil, err
308 0 : }
309 1 : return stats, compactionHints, nil
310 : }
311 :
312 : // loadTablePointKeyStats calculates the point key statistics for the given
313 : // table. The provided manifest.TableStats are updated.
314 : func (d *DB) loadTablePointKeyStats(
315 : r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats,
316 1 : ) error {
317 1 : // TODO(jackson): If the file has a wide keyspace, the average
318 1 : // value size beneath the entire file might not be representative
319 1 : // of the size of the keys beneath the point tombstones.
320 1 : // We could write the ranges of 'clusters' of point tombstones to
321 1 : // a sstable property and call averageValueSizeBeneath for each of
322 1 : // these narrower ranges to improve the estimate.
323 1 : avgValLogicalSize, compressionRatio, err := d.estimateSizesBeneath(v, level, meta)
324 1 : if err != nil {
325 0 : return err
326 0 : }
327 1 : stats.PointDeletionsBytesEstimate =
328 1 : pointDeletionsBytesEstimate(meta.Size, &r.Properties, avgValLogicalSize, compressionRatio)
329 1 : return nil
330 : }
331 :
332 : // loadTableRangeDelStats calculates the range deletion and range key deletion
333 : // statistics for the given table.
334 : func (d *DB) loadTableRangeDelStats(
335 : r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats,
336 1 : ) ([]deleteCompactionHint, error) {
337 1 : iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta.FileMetadata)
338 1 : if err != nil {
339 0 : return nil, err
340 0 : }
341 1 : defer iter.Close()
342 1 : var compactionHints []deleteCompactionHint
343 1 : // We iterate over the defragmented range tombstones and range key deletions,
344 1 : // which ensures we don't double count ranges deleted at different sequence
345 1 : // numbers. Also, merging abutting tombstones reduces the number of calls to
346 1 : // estimateReclaimedSizeBeneath which is costly, and improves the accuracy of
347 1 : // our overall estimate.
348 1 : for s := iter.First(); s != nil; s = iter.Next() {
349 1 : start, end := s.Start, s.End
350 1 : // We only need to consider deletion size estimates for tables that contain
351 1 : // RANGEDELs.
352 1 : var maxRangeDeleteSeqNum uint64
353 1 : for _, k := range s.Keys {
354 1 : if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() {
355 1 : maxRangeDeleteSeqNum = k.SeqNum()
356 1 : break
357 : }
358 : }
359 :
360 : // If the file is in the last level of the LSM, there is no data beneath
361 : // it. The fact that there is still a range tombstone in a bottommost file
362 : // indicates two possibilites:
363 : // 1. an open snapshot kept the tombstone around, and the data the
364 : // tombstone deletes is contained within the file itself.
365 : // 2. the file was ingested.
366 : // In the first case, we'd like to estimate disk usage within the file
367 : // itself since compacting the file will drop that covered data. In the
368 : // second case, we expect that compacting the file will NOT drop any
369 : // data and rewriting the file is a waste of write bandwidth. We can
370 : // distinguish these cases by looking at the file metadata's sequence
371 : // numbers. A file's range deletions can only delete data within the
372 : // file at lower sequence numbers. All keys in an ingested sstable adopt
373 : // the same sequence number, preventing tombstones from deleting keys
374 : // within the same file. We check here if the largest RANGEDEL sequence
375 : // number is greater than the file's smallest sequence number. If it is,
376 : // the RANGEDEL could conceivably (although inconclusively) delete data
377 : // within the same file.
378 : //
379 : // Note that this heuristic is imperfect. If a table containing a range
380 : // deletion is ingested into L5 and subsequently compacted into L6 but
381 : // an open snapshot prevents elision of covered keys in L6, the
382 : // resulting RangeDeletionsBytesEstimate will incorrectly include all
383 : // covered keys.
384 : //
385 : // TODO(jackson): We could prevent the above error in the heuristic by
386 : // computing the file's RangeDeletionsBytesEstimate during the
387 : // compaction itself. It's unclear how common this is.
388 : //
389 : // NOTE: If the span `s` wholly contains a table containing range keys,
390 : // the returned size estimate will be slightly inflated by the range key
391 : // block. However, in practice, range keys are expected to be rare, and
392 : // the size of the range key block relative to the overall size of the
393 : // table is expected to be small.
394 1 : if level == numLevels-1 && meta.SmallestSeqNum < maxRangeDeleteSeqNum {
395 1 : size, err := r.EstimateDiskUsage(start, end)
396 1 : if err != nil {
397 0 : return nil, err
398 0 : }
399 1 : stats.RangeDeletionsBytesEstimate += size
400 1 :
401 1 : // As the file is in the bottommost level, there is no need to collect a
402 1 : // deletion hint.
403 1 : continue
404 : }
405 :
406 : // While the size estimates for point keys should only be updated if this
407 : // span contains a range del, the sequence numbers are required for the
408 : // hint. Unconditionally descend, but conditionally update the estimates.
409 1 : hintType := compactionHintFromKeys(s.Keys)
410 1 : estimate, hintSeqNum, err := d.estimateReclaimedSizeBeneath(v, level, start, end, hintType)
411 1 : if err != nil {
412 0 : return nil, err
413 0 : }
414 1 : stats.RangeDeletionsBytesEstimate += estimate
415 1 :
416 1 : // If any files were completely contained with the range,
417 1 : // hintSeqNum is the smallest sequence number contained in any
418 1 : // such file.
419 1 : if hintSeqNum == math.MaxUint64 {
420 1 : continue
421 : }
422 1 : hint := deleteCompactionHint{
423 1 : hintType: hintType,
424 1 : start: make([]byte, len(start)),
425 1 : end: make([]byte, len(end)),
426 1 : tombstoneFile: meta.FileMetadata,
427 1 : tombstoneLevel: level,
428 1 : tombstoneLargestSeqNum: s.LargestSeqNum(),
429 1 : tombstoneSmallestSeqNum: s.SmallestSeqNum(),
430 1 : fileSmallestSeqNum: hintSeqNum,
431 1 : }
432 1 : copy(hint.start, start)
433 1 : copy(hint.end, end)
434 1 : compactionHints = append(compactionHints, hint)
435 : }
436 1 : return compactionHints, err
437 : }
438 :
439 : func (d *DB) estimateSizesBeneath(
440 : v *version, level int, meta physicalMeta,
441 1 : ) (avgValueLogicalSize, compressionRatio float64, err error) {
442 1 : // Find all files in lower levels that overlap with meta,
443 1 : // summing their value sizes and entry counts.
444 1 : file := meta.FileMetadata
445 1 : var fileSum, keySum, valSum, entryCount uint64
446 1 :
447 1 : addPhysicalTableStats := func(r *sstable.Reader) (err error) {
448 1 : fileSum += file.Size
449 1 : entryCount += r.Properties.NumEntries
450 1 : keySum += r.Properties.RawKeySize
451 1 : valSum += r.Properties.RawValueSize
452 1 : return nil
453 1 : }
454 1 : addVirtualTableStats := func(v sstable.VirtualReader) (err error) {
455 0 : fileSum += file.Size
456 0 : entryCount += file.Stats.NumEntries
457 0 : keySum += v.Properties.RawKeySize
458 0 : valSum += v.Properties.RawValueSize
459 0 : return nil
460 0 : }
461 :
462 : // Include the file itself. This is important because in some instances, the
463 : // computed compression ratio is applied to the tombstones contained within
464 : // `meta` itself. If there are no files beneath `meta` in the LSM, we would
465 : // calculate a compression ratio of 0 which is not accurate for the file's
466 : // own tombstones.
467 1 : if err = d.tableCache.withReader(meta, addPhysicalTableStats); err != nil {
468 0 : return 0, 0, err
469 0 : }
470 :
471 1 : for l := level + 1; l < numLevels; l++ {
472 1 : overlaps := v.Overlaps(l, d.cmp, meta.Smallest.UserKey,
473 1 : meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
474 1 : iter := overlaps.Iter()
475 1 : for file = iter.First(); file != nil; file = iter.Next() {
476 1 : var err error
477 1 : if file.Virtual {
478 0 : err = d.tableCache.withVirtualReader(file.VirtualMeta(), addVirtualTableStats)
479 1 : } else {
480 1 : err = d.tableCache.withReader(file.PhysicalMeta(), addPhysicalTableStats)
481 1 : }
482 1 : if err != nil {
483 0 : return 0, 0, err
484 0 : }
485 : }
486 : }
487 1 : if entryCount == 0 {
488 0 : return 0, 0, nil
489 0 : }
490 : // RawKeySize and RawValueSize are uncompressed totals. We'll need to scale
491 : // the value sum according to the data size to account for compression,
492 : // index blocks and metadata overhead. Eg:
493 : //
494 : // Compression rate × Average uncompressed value size
495 : //
496 : // ↓
497 : //
498 : // FileSize RawValueSize
499 : // ----------------------- × ------------
500 : // RawKeySize+RawValueSize NumEntries
501 : //
502 : // We return the average logical value size plus the compression ratio,
503 : // leaving the scaling to the caller. This allows the caller to perform
504 : // additional compression ratio scaling if necessary.
505 1 : uncompressedSum := float64(keySum + valSum)
506 1 : compressionRatio = float64(fileSum) / uncompressedSum
507 1 : avgValueLogicalSize = (float64(valSum) / float64(entryCount))
508 1 : return avgValueLogicalSize, compressionRatio, nil
509 : }
510 :
511 : func (d *DB) estimateReclaimedSizeBeneath(
512 : v *version, level int, start, end []byte, hintType deleteCompactionHintType,
513 1 : ) (estimate uint64, hintSeqNum uint64, err error) {
514 1 : // Find all files in lower levels that overlap with the deleted range
515 1 : // [start, end).
516 1 : //
517 1 : // An overlapping file might be completely contained by the range
518 1 : // tombstone, in which case we can count the entire file size in
519 1 : // our estimate without doing any additional I/O.
520 1 : //
521 1 : // Otherwise, estimating the range for the file requires
522 1 : // additional I/O to read the file's index blocks.
523 1 : hintSeqNum = math.MaxUint64
524 1 : for l := level + 1; l < numLevels; l++ {
525 1 : overlaps := v.Overlaps(l, d.cmp, start, end, true /* exclusiveEnd */)
526 1 : iter := overlaps.Iter()
527 1 : for file := iter.First(); file != nil; file = iter.Next() {
528 1 : startCmp := d.cmp(start, file.Smallest.UserKey)
529 1 : endCmp := d.cmp(file.Largest.UserKey, end)
530 1 : if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest.IsExclusiveSentinel()) {
531 1 : // The range fully contains the file, so skip looking it up in table
532 1 : // cache/looking at its indexes and add the full file size. Whether the
533 1 : // disk estimate and hint seqnums are updated depends on a) the type of
534 1 : // hint that requested the estimate and b) the keys contained in this
535 1 : // current file.
536 1 : var updateEstimates, updateHints bool
537 1 : switch hintType {
538 1 : case deleteCompactionHintTypePointKeyOnly:
539 1 : // The range deletion byte estimates should only be updated if this
540 1 : // table contains point keys. This ends up being an overestimate in
541 1 : // the case that table also has range keys, but such keys are expected
542 1 : // to contribute a negligible amount of the table's overall size,
543 1 : // relative to point keys.
544 1 : if file.HasPointKeys {
545 1 : updateEstimates = true
546 1 : }
547 : // As the initiating span contained only range dels, hints can only be
548 : // updated if this table does _not_ contain range keys.
549 1 : if !file.HasRangeKeys {
550 1 : updateHints = true
551 1 : }
552 1 : case deleteCompactionHintTypeRangeKeyOnly:
553 1 : // The initiating span contained only range key dels. The estimates
554 1 : // apply only to point keys, and are therefore not updated.
555 1 : updateEstimates = false
556 1 : // As the initiating span contained only range key dels, hints can
557 1 : // only be updated if this table does _not_ contain point keys.
558 1 : if !file.HasPointKeys {
559 1 : updateHints = true
560 1 : }
561 1 : case deleteCompactionHintTypePointAndRangeKey:
562 1 : // Always update the estimates and hints, as this hint type can drop a
563 1 : // file, irrespective of the mixture of keys. Similar to above, the
564 1 : // range del bytes estimates is an overestimate.
565 1 : updateEstimates, updateHints = true, true
566 0 : default:
567 0 : panic(fmt.Sprintf("pebble: unknown hint type %s", hintType))
568 : }
569 1 : if updateEstimates {
570 1 : estimate += file.Size
571 1 : }
572 1 : if updateHints && hintSeqNum > file.SmallestSeqNum {
573 1 : hintSeqNum = file.SmallestSeqNum
574 1 : }
575 1 : } else if d.cmp(file.Smallest.UserKey, end) <= 0 && d.cmp(start, file.Largest.UserKey) <= 0 {
576 1 : // Partial overlap.
577 1 : if hintType == deleteCompactionHintTypeRangeKeyOnly {
578 1 : // If the hint that generated this overlap contains only range keys,
579 1 : // there is no need to calculate disk usage, as the reclaimable space
580 1 : // is expected to be minimal relative to point keys.
581 1 : continue
582 : }
583 1 : var size uint64
584 1 : var err error
585 1 : if file.Virtual {
586 1 : err = d.tableCache.withVirtualReader(
587 1 : file.VirtualMeta(), func(r sstable.VirtualReader) (err error) {
588 1 : size, err = r.EstimateDiskUsage(start, end)
589 1 : return err
590 1 : })
591 1 : } else {
592 1 : err = d.tableCache.withReader(
593 1 : file.PhysicalMeta(), func(r *sstable.Reader) (err error) {
594 1 : size, err = r.EstimateDiskUsage(start, end)
595 1 : return err
596 1 : })
597 : }
598 :
599 1 : if err != nil {
600 0 : return 0, hintSeqNum, err
601 0 : }
602 1 : estimate += size
603 : }
604 : }
605 : }
606 1 : return estimate, hintSeqNum, nil
607 : }
608 :
609 1 : func maybeSetStatsFromProperties(meta physicalMeta, props *sstable.Properties) bool {
610 1 : // If a table contains range deletions or range key deletions, we defer the
611 1 : // stats collection. There are two main reasons for this:
612 1 : //
613 1 : // 1. Estimating the potential for reclaimed space due to a range deletion
614 1 : // tombstone requires scanning the LSM - a potentially expensive operation
615 1 : // that should be deferred.
616 1 : // 2. Range deletions and / or range key deletions present an opportunity to
617 1 : // compute "deletion hints", which also requires a scan of the LSM to
618 1 : // compute tables that would be eligible for deletion.
619 1 : //
620 1 : // These two tasks are deferred to the table stats collector goroutine.
621 1 : if props.NumRangeDeletions != 0 || props.NumRangeKeyDels != 0 {
622 1 : return false
623 1 : }
624 :
625 : // If a table is more than 10% point deletions without user-provided size
626 : // estimates, don't calculate the PointDeletionsBytesEstimate statistic
627 : // using our limited knowledge. The table stats collector can populate the
628 : // stats and calculate an average of value size of all the tables beneath
629 : // the table in the LSM, which will be more accurate.
630 1 : if unsizedDels := (props.NumDeletions - props.NumSizedDeletions); unsizedDels > props.NumEntries/10 {
631 1 : return false
632 1 : }
633 :
634 1 : var pointEstimate uint64
635 1 : if props.NumEntries > 0 {
636 1 : // Use the file's own average key and value sizes as an estimate. This
637 1 : // doesn't require any additional IO and since the number of point
638 1 : // deletions in the file is low, the error introduced by this crude
639 1 : // estimate is expected to be small.
640 1 : avgValSize, compressionRatio := estimatePhysicalSizes(meta.Size, props)
641 1 : pointEstimate = pointDeletionsBytesEstimate(meta.Size, props, avgValSize, compressionRatio)
642 1 : }
643 :
644 1 : meta.Stats.NumEntries = props.NumEntries
645 1 : meta.Stats.NumDeletions = props.NumDeletions
646 1 : meta.Stats.NumRangeKeySets = props.NumRangeKeySets
647 1 : meta.Stats.PointDeletionsBytesEstimate = pointEstimate
648 1 : meta.Stats.RangeDeletionsBytesEstimate = 0
649 1 : meta.Stats.ValueBlocksSize = props.ValueBlocksSize
650 1 : meta.StatsMarkValid()
651 1 : return true
652 : }
653 :
654 : func pointDeletionsBytesEstimate(
655 : fileSize uint64, props *sstable.Properties, avgValLogicalSize, compressionRatio float64,
656 1 : ) (estimate uint64) {
657 1 : if props.NumEntries == 0 {
658 0 : return 0
659 0 : }
660 1 : numPointDels := props.NumPointDeletions()
661 1 : if numPointDels == 0 {
662 1 : return 0
663 1 : }
664 : // Estimate the potential space to reclaim using the table's own properties.
665 : // There may or may not be keys covered by any individual point tombstone.
666 : // If not, compacting the point tombstone into L6 will at least allow us to
667 : // drop the point deletion key and will reclaim the tombstone's key bytes.
668 : // If there are covered key(s), we also get to drop key and value bytes for
669 : // each covered key.
670 : //
671 : // Some point tombstones (DELSIZEDs) carry a user-provided estimate of the
672 : // uncompressed size of entries that will be elided by fully compacting the
673 : // tombstone. For these tombstones, there's no guesswork—we use the
674 : // RawPointTombstoneValueSizeHint property which is the sum of all these
675 : // tombstones' encoded values.
676 : //
677 : // For un-sized point tombstones (DELs), we estimate assuming that each
678 : // point tombstone on average covers 1 key and using average vaue sizes.
679 : // This is almost certainly an overestimate, but that's probably okay
680 : // because point tombstones can slow range iterations even when they don't
681 : // cover a key.
682 : //
683 : // TODO(jackson): This logic doesn't directly incorporate fixed per-key
684 : // overhead (8-byte trailer, plus at least 1 byte encoding the length of the
685 : // key and 1 byte encoding the length of the value). This overhead is
686 : // indirectly incorporated through the compression ratios, but that results
687 : // in the overhead being smeared per key-byte and value-byte, rather than
688 : // per-entry. This per-key fixed overhead can be nontrivial, especially for
689 : // dense swaths of point tombstones. Give some thought as to whether we
690 : // should directly include fixed per-key overhead in the calculations.
691 :
692 : // Below, we calculate the tombstone contributions and the shadowed keys'
693 : // contributions separately.
694 1 : var tombstonesLogicalSize float64
695 1 : var shadowedLogicalSize float64
696 1 :
697 1 : // 1. Calculate the contribution of the tombstone keys themselves.
698 1 : if props.RawPointTombstoneKeySize > 0 {
699 1 : tombstonesLogicalSize += float64(props.RawPointTombstoneKeySize)
700 1 : } else {
701 1 : // This sstable predates the existence of the RawPointTombstoneKeySize
702 1 : // property. We can use the average key size within the file itself and
703 1 : // the count of point deletions to estimate the size.
704 1 : tombstonesLogicalSize += float64(numPointDels * props.RawKeySize / props.NumEntries)
705 1 : }
706 :
707 : // 2. Calculate the contribution of the keys shadowed by tombstones.
708 : //
709 : // 2a. First account for keys shadowed by DELSIZED tombstones. THE DELSIZED
710 : // tombstones encode the size of both the key and value of the shadowed KV
711 : // entries. These sizes are aggregated into a sstable property.
712 1 : shadowedLogicalSize += float64(props.RawPointTombstoneValueSize)
713 1 :
714 1 : // 2b. Calculate the contribution of the KV entries shadowed by ordinary DEL
715 1 : // keys.
716 1 : numUnsizedDels := numPointDels - props.NumSizedDeletions
717 1 : {
718 1 : // The shadowed keys have the same exact user keys as the tombstones
719 1 : // themselves, so we can use the `tombstonesLogicalSize` we computed
720 1 : // earlier as an estimate. There's a complication that
721 1 : // `tombstonesLogicalSize` may include DELSIZED keys we already
722 1 : // accounted for.
723 1 : shadowedLogicalSize += float64(tombstonesLogicalSize) / float64(numPointDels) * float64(numUnsizedDels)
724 1 :
725 1 : // Calculate the contribution of the deleted values. The caller has
726 1 : // already computed an average logical size (possibly computed across
727 1 : // many sstables).
728 1 : shadowedLogicalSize += float64(numUnsizedDels) * avgValLogicalSize
729 1 : }
730 :
731 : // Scale both tombstone and shadowed totals by logical:physical ratios to
732 : // account for compression, metadata overhead, etc.
733 : //
734 : // Physical FileSize
735 : // ----------- = -----------------------
736 : // Logical RawKeySize+RawValueSize
737 : //
738 1 : return uint64((tombstonesLogicalSize + shadowedLogicalSize) * compressionRatio)
739 : }
740 :
741 : func estimatePhysicalSizes(
742 : fileSize uint64, props *sstable.Properties,
743 1 : ) (avgValLogicalSize, compressionRatio float64) {
744 1 : // RawKeySize and RawValueSize are uncompressed totals. Scale according to
745 1 : // the data size to account for compression, index blocks and metadata
746 1 : // overhead. Eg:
747 1 : //
748 1 : // Compression rate × Average uncompressed value size
749 1 : //
750 1 : // ↓
751 1 : //
752 1 : // FileSize RawValSize
753 1 : // ----------------------- × ----------
754 1 : // RawKeySize+RawValueSize NumEntries
755 1 : //
756 1 : uncompressedSum := props.RawKeySize + props.RawValueSize
757 1 : compressionRatio = float64(fileSize) / float64(uncompressedSum)
758 1 : avgValLogicalSize = (float64(props.RawValueSize) / float64(props.NumEntries))
759 1 : return avgValLogicalSize, compressionRatio
760 1 : }
761 :
762 : // newCombinedDeletionKeyspanIter returns a keyspan.FragmentIterator that
763 : // returns "ranged deletion" spans for a single table, providing a combined view
764 : // of both range deletion and range key deletion spans. The
765 : // tableRangedDeletionIter is intended for use in the specific case of computing
766 : // the statistics and deleteCompactionHints for a single table.
767 : //
768 : // As an example, consider the following set of spans from the range deletion
769 : // and range key blocks of a table:
770 : //
771 : // |---------| |---------| |-------| RANGEKEYDELs
772 : // |-----------|-------------| |-----| RANGEDELs
773 : // __________________________________________________________
774 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
775 : //
776 : // The tableRangedDeletionIter produces the following set of output spans, where
777 : // '1' indicates a span containing only range deletions, '2' is a span
778 : // containing only range key deletions, and '3' is a span containing a mixture
779 : // of both range deletions and range key deletions.
780 : //
781 : // 1 3 1 3 2 1 3 2
782 : // |-----|---------|-----|---|-----| |---|-|-----|
783 : // __________________________________________________________
784 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
785 : //
786 : // Algorithm.
787 : //
788 : // The iterator first defragments the range deletion and range key blocks
789 : // separately. During this defragmentation, the range key block is also filtered
790 : // so that keys other than range key deletes are ignored. The range delete and
791 : // range key delete keyspaces are then merged.
792 : //
793 : // Note that the only fragmentation introduced by merging is from where a range
794 : // del span overlaps with a range key del span. Within the bounds of any overlap
795 : // there is guaranteed to be no further fragmentation, as the constituent spans
796 : // have already been defragmented. To the left and right of any overlap, the
797 : // same reasoning applies. For example,
798 : //
799 : // |--------| |-------| RANGEKEYDEL
800 : // |---------------------------| RANGEDEL
801 : // |----1---|----3---|----1----|---2---| Merged, fragmented spans.
802 : // __________________________________________________________
803 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
804 : //
805 : // Any fragmented abutting spans produced by the merging iter will be of
806 : // differing types (i.e. a transition from a span with homogenous key kinds to a
807 : // heterogeneous span, or a transition from a span with exclusively range dels
808 : // to a span with exclusively range key dels). Therefore, further
809 : // defragmentation is not required.
810 : //
811 : // Each span returned by the tableRangeDeletionIter will have at most four keys,
812 : // corresponding to the largest and smallest sequence numbers encountered across
813 : // the range deletes and range keys deletes that comprised the merged spans.
814 : func newCombinedDeletionKeyspanIter(
815 : comparer *base.Comparer, r *sstable.Reader, m *fileMetadata,
816 1 : ) (keyspan.FragmentIterator, error) {
817 1 : // The range del iter and range key iter are each wrapped in their own
818 1 : // defragmenting iter. For each iter, abutting spans can always be merged.
819 1 : var equal = keyspan.DefragmentMethodFunc(func(_ base.Equal, a, b *keyspan.Span) bool { return true })
820 : // Reduce keys by maintaining a slice of at most length two, corresponding to
821 : // the largest and smallest keys in the defragmented span. This maintains the
822 : // contract that the emitted slice is sorted by (SeqNum, Kind) descending.
823 1 : reducer := func(current, incoming []keyspan.Key) []keyspan.Key {
824 1 : if len(current) == 0 && len(incoming) == 0 {
825 0 : // While this should never occur in practice, a defensive return is used
826 0 : // here to preserve correctness.
827 0 : return current
828 0 : }
829 1 : var largest, smallest keyspan.Key
830 1 : var set bool
831 1 : for _, keys := range [2][]keyspan.Key{current, incoming} {
832 1 : if len(keys) == 0 {
833 0 : continue
834 : }
835 1 : first, last := keys[0], keys[len(keys)-1]
836 1 : if !set {
837 1 : largest, smallest = first, last
838 1 : set = true
839 1 : continue
840 : }
841 1 : if first.Trailer > largest.Trailer {
842 1 : largest = first
843 1 : }
844 1 : if last.Trailer < smallest.Trailer {
845 1 : smallest = last
846 1 : }
847 : }
848 1 : if largest.Equal(comparer.Equal, smallest) {
849 1 : current = append(current[:0], largest)
850 1 : } else {
851 1 : current = append(current[:0], largest, smallest)
852 1 : }
853 1 : return current
854 : }
855 :
856 : // The separate iters for the range dels and range keys are wrapped in a
857 : // merging iter to join the keyspaces into a single keyspace. The separate
858 : // iters are only added if the particular key kind is present.
859 1 : mIter := &keyspan.MergingIter{}
860 1 : var transform = keyspan.TransformerFunc(func(cmp base.Compare, in keyspan.Span, out *keyspan.Span) error {
861 1 : if in.KeysOrder != keyspan.ByTrailerDesc {
862 0 : panic("pebble: combined deletion iter encountered keys in non-trailer descending order")
863 : }
864 1 : out.Start, out.End = in.Start, in.End
865 1 : out.Keys = append(out.Keys[:0], in.Keys...)
866 1 : out.KeysOrder = keyspan.ByTrailerDesc
867 1 : // NB: The order of by-trailer descending may have been violated,
868 1 : // because we've layered rangekey and rangedel iterators from the same
869 1 : // sstable into the same keyspan.MergingIter. The MergingIter will
870 1 : // return the keys in the order that the child iterators were provided.
871 1 : // Sort the keys to ensure they're sorted by trailer descending.
872 1 : keyspan.SortKeysByTrailer(&out.Keys)
873 1 : return nil
874 : })
875 1 : mIter.Init(comparer.Compare, transform, new(keyspan.MergingBuffers))
876 1 :
877 1 : iter, err := r.NewRawRangeDelIter()
878 1 : if err != nil {
879 0 : return nil, err
880 0 : }
881 1 : if iter != nil {
882 1 : dIter := &keyspan.DefragmentingIter{}
883 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
884 1 : iter = dIter
885 1 : // Truncate tombstones to the containing file's bounds if necessary.
886 1 : // See docs/range_deletions.md for why this is necessary.
887 1 : iter = keyspan.Truncate(
888 1 : comparer.Compare, iter, m.Smallest.UserKey, m.Largest.UserKey,
889 1 : nil, nil, false, /* panicOnUpperTruncate */
890 1 : )
891 1 : mIter.AddLevel(iter)
892 1 : }
893 :
894 1 : iter, err = r.NewRawRangeKeyIter()
895 1 : if err != nil {
896 0 : return nil, err
897 0 : }
898 1 : if iter != nil {
899 1 : // Wrap the range key iterator in a filter that elides keys other than range
900 1 : // key deletions.
901 1 : iter = keyspan.Filter(iter, func(in *keyspan.Span, out *keyspan.Span) (keep bool) {
902 1 : out.Start, out.End = in.Start, in.End
903 1 : out.Keys = out.Keys[:0]
904 1 : for _, k := range in.Keys {
905 1 : if k.Kind() != base.InternalKeyKindRangeKeyDelete {
906 1 : continue
907 : }
908 1 : out.Keys = append(out.Keys, k)
909 : }
910 1 : return len(out.Keys) > 0
911 : })
912 1 : dIter := &keyspan.DefragmentingIter{}
913 1 : dIter.Init(comparer, iter, equal, reducer, new(keyspan.DefragmentingBuffers))
914 1 : iter = dIter
915 1 : mIter.AddLevel(iter)
916 : }
917 :
918 1 : return mIter, nil
919 : }
920 :
921 : // rangeKeySetsAnnotator implements manifest.Annotator, annotating B-Tree nodes
922 : // with the sum of the files' counts of range key fragments. Its annotation type
923 : // is a *uint64. The count of range key sets may change once a table's stats are
924 : // loaded asynchronously, so its values are marked as cacheable only if a file's
925 : // stats have been loaded.
926 : type rangeKeySetsAnnotator struct{}
927 :
928 : var _ manifest.Annotator = rangeKeySetsAnnotator{}
929 :
930 1 : func (a rangeKeySetsAnnotator) Zero(dst interface{}) interface{} {
931 1 : if dst == nil {
932 1 : return new(uint64)
933 1 : }
934 0 : v := dst.(*uint64)
935 0 : *v = 0
936 0 : return v
937 : }
938 :
939 : func (a rangeKeySetsAnnotator) Accumulate(
940 : f *fileMetadata, dst interface{},
941 1 : ) (v interface{}, cacheOK bool) {
942 1 : vptr := dst.(*uint64)
943 1 : *vptr = *vptr + f.Stats.NumRangeKeySets
944 1 : return vptr, f.StatsValid()
945 1 : }
946 :
947 0 : func (a rangeKeySetsAnnotator) Merge(src interface{}, dst interface{}) interface{} {
948 0 : srcV := src.(*uint64)
949 0 : dstV := dst.(*uint64)
950 0 : *dstV = *dstV + *srcV
951 0 : return dstV
952 0 : }
953 :
954 : // countRangeKeySetFragments counts the number of RANGEKEYSET keys across all
955 : // files of the LSM. It only counts keys in files for which table stats have
956 : // been loaded. It uses a b-tree annotator to cache intermediate values between
957 : // calculations when possible.
958 1 : func countRangeKeySetFragments(v *version) (count uint64) {
959 1 : for l := 0; l < numLevels; l++ {
960 1 : if v.RangeKeyLevels[l].Empty() {
961 1 : continue
962 : }
963 1 : count += *v.RangeKeyLevels[l].Annotation(rangeKeySetsAnnotator{}).(*uint64)
964 : }
965 1 : return count
966 : }
967 :
968 : // tombstonesAnnotator implements manifest.Annotator, annotating B-Tree nodes
969 : // with the sum of the files' counts of tombstones (DEL, SINGLEDEL and RANGEDELk
970 : // eys). Its annotation type is a *uint64. The count of tombstones may change
971 : // once a table's stats are loaded asynchronously, so its values are marked as
972 : // cacheable only if a file's stats have been loaded.
973 : type tombstonesAnnotator struct{}
974 :
975 : var _ manifest.Annotator = tombstonesAnnotator{}
976 :
977 1 : func (a tombstonesAnnotator) Zero(dst interface{}) interface{} {
978 1 : if dst == nil {
979 1 : return new(uint64)
980 1 : }
981 1 : v := dst.(*uint64)
982 1 : *v = 0
983 1 : return v
984 : }
985 :
986 : func (a tombstonesAnnotator) Accumulate(
987 : f *fileMetadata, dst interface{},
988 1 : ) (v interface{}, cacheOK bool) {
989 1 : vptr := dst.(*uint64)
990 1 : *vptr = *vptr + f.Stats.NumDeletions
991 1 : return vptr, f.StatsValid()
992 1 : }
993 :
994 1 : func (a tombstonesAnnotator) Merge(src interface{}, dst interface{}) interface{} {
995 1 : srcV := src.(*uint64)
996 1 : dstV := dst.(*uint64)
997 1 : *dstV = *dstV + *srcV
998 1 : return dstV
999 1 : }
1000 :
1001 : // countTombstones counts the number of tombstone (DEL, SINGLEDEL and RANGEDEL)
1002 : // internal keys across all files of the LSM. It only counts keys in files for
1003 : // which table stats have been loaded. It uses a b-tree annotator to cache
1004 : // intermediate values between calculations when possible.
1005 1 : func countTombstones(v *version) (count uint64) {
1006 1 : for l := 0; l < numLevels; l++ {
1007 1 : if v.Levels[l].Empty() {
1008 1 : continue
1009 : }
1010 1 : count += *v.Levels[l].Annotation(tombstonesAnnotator{}).(*uint64)
1011 : }
1012 1 : return count
1013 : }
1014 :
1015 : // valueBlocksSizeAnnotator implements manifest.Annotator, annotating B-Tree
1016 : // nodes with the sum of the files' Properties.ValueBlocksSize. Its annotation
1017 : // type is a *uint64. The value block size may change once a table's stats are
1018 : // loaded asynchronously, so its values are marked as cacheable only if a
1019 : // file's stats have been loaded.
1020 : type valueBlocksSizeAnnotator struct{}
1021 :
1022 : var _ manifest.Annotator = valueBlocksSizeAnnotator{}
1023 :
1024 1 : func (a valueBlocksSizeAnnotator) Zero(dst interface{}) interface{} {
1025 1 : if dst == nil {
1026 1 : return new(uint64)
1027 1 : }
1028 1 : v := dst.(*uint64)
1029 1 : *v = 0
1030 1 : return v
1031 : }
1032 :
1033 : func (a valueBlocksSizeAnnotator) Accumulate(
1034 : f *fileMetadata, dst interface{},
1035 1 : ) (v interface{}, cacheOK bool) {
1036 1 : vptr := dst.(*uint64)
1037 1 : *vptr = *vptr + f.Stats.ValueBlocksSize
1038 1 : return vptr, f.StatsValid()
1039 1 : }
1040 :
1041 1 : func (a valueBlocksSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
1042 1 : srcV := src.(*uint64)
1043 1 : dstV := dst.(*uint64)
1044 1 : *dstV = *dstV + *srcV
1045 1 : return dstV
1046 1 : }
1047 :
1048 : // valueBlocksSizeForLevel returns the Properties.ValueBlocksSize across all
1049 : // files for a level of the LSM. It only includes the size for files for which
1050 : // table stats have been loaded. It uses a b-tree annotator to cache
1051 : // intermediate values between calculations when possible. It must not be
1052 : // called concurrently.
1053 : //
1054 : // REQUIRES: 0 <= level <= numLevels.
1055 1 : func valueBlocksSizeForLevel(v *version, level int) (count uint64) {
1056 1 : if v.Levels[level].Empty() {
1057 1 : return 0
1058 1 : }
1059 1 : return *v.Levels[level].Annotation(valueBlocksSizeAnnotator{}).(*uint64)
1060 : }
|