Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "fmt"
11 : "iter"
12 : "math"
13 : "slices"
14 : "sort"
15 : "strings"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/humanize"
20 : "github.com/cockroachdb/pebble/internal/invariants"
21 : "github.com/cockroachdb/pebble/internal/manifest"
22 : "github.com/cockroachdb/pebble/internal/problemspans"
23 : )
24 :
25 : // The minimum count for an intra-L0 compaction. This matches the RocksDB
26 : // heuristic.
27 : const minIntraL0Count = 4
28 :
29 : type compactionEnv struct {
30 : // diskAvailBytes holds a statistic on the number of bytes available on
31 : // disk, as reported by the filesystem. It's used to be more restrictive in
32 : // expanding compactions if available disk space is limited.
33 : //
34 : // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
35 : // and whenever a compaction or flush completes. Since file removal is the
36 : // primary means of reclaiming space, there is a rough bound on the
37 : // statistic's staleness when available bytes is growing. Compactions and
38 : // flushes are longer, slower operations and provide a much looser bound
39 : // when available bytes is decreasing.
40 : diskAvailBytes uint64
41 : earliestUnflushedSeqNum base.SeqNum
42 : earliestSnapshotSeqNum base.SeqNum
43 : inProgressCompactions []compactionInfo
44 : readCompactionEnv readCompactionEnv
45 : // problemSpans is checked by the compaction picker to avoid compactions that
46 : // overlap an active "problem span". It can be nil when there are no problem
47 : // spans.
48 : problemSpans *problemspans.ByLevel
49 : }
50 :
51 : type compactionPickerMetrics struct {
52 : levels [numLevels]struct {
53 : score float64
54 : fillFactor float64
55 : compensatedFillFactor float64
56 : }
57 : }
58 :
59 : type compactionPicker interface {
60 : getMetrics([]compactionInfo) compactionPickerMetrics
61 : getBaseLevel() int
62 : estimatedCompactionDebt() uint64
63 : pickHighPrioritySpaceCompaction(env compactionEnv) pickedCompaction
64 : pickAutoScore(env compactionEnv) (pc pickedCompaction)
65 : pickAutoNonScore(env compactionEnv) (pc pickedCompaction)
66 : forceBaseLevel1()
67 : }
68 :
69 : // A pickedCompaction describes a potential compaction that the compaction
70 : // picker has selected, based on its heuristics. When a compaction begins to
71 : // execute, it is converted into a compaction struct by ConstructCompaction.
72 : type pickedCompaction interface {
73 : // ManualID returns the ID of the manual compaction, or 0 if the picked
74 : // compaction is not a result of a manual compaction.
75 : ManualID() uint64
76 : // ConstructCompaction creates a compaction from the picked compaction.
77 : ConstructCompaction(*DB, CompactionGrantHandle) compaction
78 : // WaitingCompaction returns a WaitingCompaction description of this
79 : // compaction for consumption by the compaction scheduler.
80 : WaitingCompaction() WaitingCompaction
81 : }
82 :
83 : // readCompactionEnv is used to hold data required to perform read compactions
84 : type readCompactionEnv struct {
85 : rescheduleReadCompaction *bool
86 : readCompactions *readCompactionQueue
87 : flushing bool
88 : }
89 :
90 : // Information about in-progress compactions provided to the compaction picker.
91 : // These are used to constrain the new compactions that will be picked.
92 : type compactionInfo struct {
93 : // versionEditApplied is true if this compaction's version edit has already
94 : // been committed. The compaction may still be in-progress deleting newly
95 : // obsolete files.
96 : versionEditApplied bool
97 : // kind indicates the kind of compaction.
98 : kind compactionKind
99 : inputs []compactionLevel
100 : outputLevel int
101 : // bounds may be nil if the compaction does not involve sstables
102 : // (specifically, a blob file rewrite).
103 : bounds *base.UserKeyBounds
104 : }
105 :
106 0 : func (info compactionInfo) String() string {
107 0 : var buf bytes.Buffer
108 0 : var largest int
109 0 : for i, in := range info.inputs {
110 0 : if i > 0 {
111 0 : fmt.Fprintf(&buf, " -> ")
112 0 : }
113 0 : fmt.Fprintf(&buf, "L%d", in.level)
114 0 : for f := range in.files.All() {
115 0 : fmt.Fprintf(&buf, " %s", f.TableNum)
116 0 : }
117 0 : if largest < in.level {
118 0 : largest = in.level
119 0 : }
120 : }
121 0 : if largest != info.outputLevel || len(info.inputs) == 1 {
122 0 : fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
123 0 : }
124 0 : return buf.String()
125 : }
126 :
127 : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
128 : // sublevel.
129 : type sublevelInfo struct {
130 : manifest.LevelSlice
131 : sublevel manifest.Layer
132 : }
133 :
134 1 : func (cl sublevelInfo) Clone() sublevelInfo {
135 1 : return sublevelInfo{
136 1 : sublevel: cl.sublevel,
137 1 : LevelSlice: cl.LevelSlice,
138 1 : }
139 1 : }
140 0 : func (cl sublevelInfo) String() string {
141 0 : return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
142 0 : }
143 :
144 : // generateSublevelInfo will generate the level slices for each of the sublevels
145 : // from the level slice for all of L0.
146 1 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
147 1 : sublevelMap := make(map[uint64][]*manifest.TableMetadata)
148 1 : for f := range levelFiles.All() {
149 1 : sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
150 1 : }
151 :
152 1 : var sublevels []int
153 1 : for level := range sublevelMap {
154 1 : sublevels = append(sublevels, int(level))
155 1 : }
156 1 : sort.Ints(sublevels)
157 1 :
158 1 : var levelSlices []sublevelInfo
159 1 : for _, sublevel := range sublevels {
160 1 : metas := sublevelMap[uint64(sublevel)]
161 1 : levelSlices = append(
162 1 : levelSlices,
163 1 : sublevelInfo{
164 1 : manifest.NewLevelSliceKeySorted(cmp, metas),
165 1 : manifest.L0Sublevel(sublevel),
166 1 : },
167 1 : )
168 1 : }
169 1 : return levelSlices
170 : }
171 :
172 : // pickedCompactionMetrics holds metrics related to the compaction picking process
173 : type pickedCompactionMetrics struct {
174 : // scores contains candidateLevelInfo.scores.
175 : scores []float64
176 : singleLevelOverlappingRatio float64
177 : multiLevelOverlappingRatio float64
178 : }
179 :
180 : // pickedTableCompaction contains information about a compaction of sstables
181 : // that has already been chosen, and is being constructed. Compaction
182 : // construction info lives in this struct, and is copied over into the
183 : // compaction struct in constructCompaction.
184 : type pickedTableCompaction struct {
185 : // score of the chosen compaction (candidateLevelInfo.score).
186 : score float64
187 : // kind indicates the kind of compaction.
188 : kind compactionKind
189 : // manualID > 0 iff this is a manual compaction. It exists solely for
190 : // internal bookkeeping.
191 : manualID uint64
192 : // startLevel is the level that is being compacted. Inputs from startLevel
193 : // and outputLevel will be merged to produce a set of outputLevel files.
194 : startLevel *compactionLevel
195 : // outputLevel is the level that files are being produced in. outputLevel is
196 : // equal to startLevel+1 except when:
197 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
198 : // - in multilevel compaction, the output level is the lowest level involved in
199 : // the compaction
200 : outputLevel *compactionLevel
201 : // inputs contain levels involved in the compaction in ascending order
202 : inputs []compactionLevel
203 : // LBase at the time of compaction picking. Might be uninitialized for
204 : // intra-L0 compactions.
205 : baseLevel int
206 : // L0-specific compaction info. Set to a non-nil value for all compactions
207 : // where startLevel == 0 that were generated by L0Sublevels.
208 : lcf *manifest.L0CompactionFiles
209 : // maxOutputFileSize is the maximum size of an individual table created
210 : // during compaction.
211 : maxOutputFileSize uint64
212 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
213 : // single output table with the tables in the grandparent level.
214 : maxOverlapBytes uint64
215 : // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
216 : // overlap in its output level with. If the overlap is greater than
217 : // maxReadCompaction bytes, then we don't proceed with the compaction.
218 : maxReadCompactionBytes uint64
219 :
220 : // The boundaries of the input data.
221 : bounds base.UserKeyBounds
222 : version *manifest.Version
223 : l0Organizer *manifest.L0Organizer
224 : pickerMetrics pickedCompactionMetrics
225 : }
226 :
227 : // Assert that *pickedTableCompaction implements pickedCompaction.
228 : var _ pickedCompaction = (*pickedTableCompaction)(nil)
229 :
230 : // ManualID returns the ID of the manual compaction, or 0 if the picked
231 : // compaction is not a result of a manual compaction.
232 1 : func (pc *pickedTableCompaction) ManualID() uint64 { return pc.manualID }
233 :
234 : // Kind returns the kind of compaction.
235 0 : func (pc *pickedTableCompaction) Kind() compactionKind { return pc.kind }
236 :
237 : // Score returns the score of the level at the time the compaction was picked.
238 0 : func (pc *pickedTableCompaction) Score() float64 { return pc.score }
239 :
240 : // ConstructCompaction creates a compaction struct from the
241 : // pickedTableCompaction.
242 : func (pc *pickedTableCompaction) ConstructCompaction(
243 : d *DB, grantHandle CompactionGrantHandle,
244 1 : ) compaction {
245 1 : return newCompaction(
246 1 : pc,
247 1 : d.opts,
248 1 : d.timeNow(),
249 1 : d.ObjProvider(),
250 1 : grantHandle,
251 1 : d.TableFormat(),
252 1 : d.determineCompactionValueSeparation)
253 1 : }
254 :
255 : // WaitingCompaction returns a WaitingCompaction description of this compaction
256 : // for consumption by the compaction scheduler.
257 1 : func (pc *pickedTableCompaction) WaitingCompaction() WaitingCompaction {
258 1 : if pc.manualID > 0 {
259 1 : return WaitingCompaction{Priority: manualCompactionPriority, Score: pc.score}
260 1 : }
261 1 : entry, ok := scheduledCompactionMap[pc.kind]
262 1 : if !ok {
263 0 : panic(errors.AssertionFailedf("unexpected compactionKind %s", pc.kind))
264 : }
265 1 : return WaitingCompaction{
266 1 : Optional: entry.optional,
267 1 : Priority: entry.priority,
268 1 : Score: pc.score,
269 1 : }
270 : }
271 :
272 1 : func defaultOutputLevel(startLevel, baseLevel int) int {
273 1 : outputLevel := startLevel + 1
274 1 : if startLevel == 0 {
275 1 : outputLevel = baseLevel
276 1 : }
277 1 : if outputLevel >= numLevels-1 {
278 1 : outputLevel = numLevels - 1
279 1 : }
280 1 : return outputLevel
281 : }
282 :
283 : func newPickedTableCompaction(
284 : opts *Options,
285 : cur *manifest.Version,
286 : l0Organizer *manifest.L0Organizer,
287 : startLevel, outputLevel, baseLevel int,
288 1 : ) *pickedTableCompaction {
289 1 : if outputLevel > 0 && baseLevel == 0 {
290 0 : panic("base level cannot be 0")
291 : }
292 1 : if startLevel > 0 && startLevel < baseLevel {
293 0 : panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
294 0 : startLevel, baseLevel))
295 : }
296 :
297 1 : targetFileSize := opts.TargetFileSize(outputLevel, baseLevel)
298 1 : pc := &pickedTableCompaction{
299 1 : version: cur,
300 1 : l0Organizer: l0Organizer,
301 1 : baseLevel: baseLevel,
302 1 : inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
303 1 : maxOutputFileSize: uint64(targetFileSize),
304 1 : maxOverlapBytes: maxGrandparentOverlapBytes(targetFileSize),
305 1 : maxReadCompactionBytes: maxReadCompactionBytes(targetFileSize),
306 1 : }
307 1 : pc.startLevel = &pc.inputs[0]
308 1 : pc.outputLevel = &pc.inputs[1]
309 1 : return pc
310 : }
311 :
312 : // adjustedOutputLevel is the output level used for the purpose of
313 : // determining the target output file size, overlap bytes, and expanded
314 : // bytes, taking into account the base level.
315 0 : func adjustedOutputLevel(outputLevel int, baseLevel int) int {
316 0 : if outputLevel == 0 {
317 0 : return 0
318 0 : }
319 0 : if baseLevel == 0 {
320 0 : panic("base level cannot be 0")
321 : }
322 : // Output level is in the range [baseLevel, numLevels). For the purpose of
323 : // determining the target output file size, overlap bytes, and expanded
324 : // bytes, we want to adjust the range to [1, numLevels).
325 0 : return 1 + outputLevel - baseLevel
326 : }
327 :
328 : func newPickedCompactionFromL0(
329 : lcf *manifest.L0CompactionFiles,
330 : opts *Options,
331 : vers *manifest.Version,
332 : l0Organizer *manifest.L0Organizer,
333 : baseLevel int,
334 : isBase bool,
335 1 : ) *pickedTableCompaction {
336 1 : outputLevel := baseLevel
337 1 : if !isBase {
338 1 : outputLevel = 0 // Intra L0
339 1 : }
340 :
341 1 : pc := newPickedTableCompaction(opts, vers, l0Organizer, 0, outputLevel, baseLevel)
342 1 : pc.lcf = lcf
343 1 :
344 1 : // Manually build the compaction as opposed to calling
345 1 : // pickAutoHelper. This is because L0Sublevels has already added
346 1 : // any overlapping L0 SSTables that need to be added, and
347 1 : // because compactions built by L0SSTables do not necessarily
348 1 : // pick contiguous sequences of files in pc.version.Levels[0].
349 1 : pc.startLevel.files = manifest.NewLevelSliceSeqSorted(lcf.Files)
350 1 : return pc
351 : }
352 :
353 0 : func (pc *pickedTableCompaction) String() string {
354 0 : var builder strings.Builder
355 0 : builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
356 0 : builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
357 0 : builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel)))
358 0 : builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
359 0 : builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
360 0 : builder.WriteString(fmt.Sprintf(`bounds=%s, `, pc.bounds))
361 0 : builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
362 0 : builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
363 0 : builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
364 0 : builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
365 0 : builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.startLevel.l0SublevelInfo))
366 0 : builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
367 0 : return builder.String()
368 0 : }
369 :
370 : // Clone creates a deep copy of the pickedCompaction
371 1 : func (pc *pickedTableCompaction) clone() *pickedTableCompaction {
372 1 :
373 1 : // Quickly copy over fields that do not require special deep copy care, and
374 1 : // set all fields that will require a deep copy to nil.
375 1 : newPC := &pickedTableCompaction{
376 1 : score: pc.score,
377 1 : kind: pc.kind,
378 1 : baseLevel: pc.baseLevel,
379 1 : maxOutputFileSize: pc.maxOutputFileSize,
380 1 : maxOverlapBytes: pc.maxOverlapBytes,
381 1 : maxReadCompactionBytes: pc.maxReadCompactionBytes,
382 1 : bounds: pc.bounds.Clone(),
383 1 :
384 1 : // TODO(msbutler): properly clone picker metrics
385 1 : pickerMetrics: pc.pickerMetrics,
386 1 :
387 1 : // Both copies see the same manifest, therefore, it's ok for them to share
388 1 : // the same pc.version and pc.l0Organizer.
389 1 : version: pc.version,
390 1 : l0Organizer: pc.l0Organizer,
391 1 : }
392 1 :
393 1 : newPC.inputs = make([]compactionLevel, len(pc.inputs))
394 1 : for i := range pc.inputs {
395 1 : newPC.inputs[i] = pc.inputs[i].Clone()
396 1 : if i == 0 {
397 1 : newPC.startLevel = &newPC.inputs[i]
398 1 : } else if i == len(pc.inputs)-1 {
399 1 : newPC.outputLevel = &newPC.inputs[i]
400 1 : }
401 : }
402 :
403 1 : if len(pc.startLevel.l0SublevelInfo) > 0 {
404 1 : newPC.startLevel.l0SublevelInfo = make([]sublevelInfo, len(pc.startLevel.l0SublevelInfo))
405 1 : for i := range pc.startLevel.l0SublevelInfo {
406 1 : newPC.startLevel.l0SublevelInfo[i] = pc.startLevel.l0SublevelInfo[i].Clone()
407 1 : }
408 : }
409 1 : if pc.lcf != nil {
410 1 : newPC.lcf = pc.lcf.Clone()
411 1 : }
412 1 : return newPC
413 : }
414 :
415 : // setupInputs returns true if a compaction has been set up using the provided inputLevel and
416 : // pc.outputLevel. It returns false if a concurrent compaction is occurring on the start or
417 : // output level files. Note that inputLevel is not necessarily pc.startLevel. In multiLevel
418 : // compactions, inputs are set by calling setupInputs once for each adjacent pair of levels.
419 : // This will preserve level invariants when expanding the compaction. pc.bounds will be updated
420 : // to reflect the key range of the inputs.
421 : func (pc *pickedTableCompaction) setupInputs(
422 : opts *Options,
423 : diskAvailBytes uint64,
424 : inProgressCompactions []compactionInfo,
425 : inputLevel *compactionLevel,
426 : problemSpans *problemspans.ByLevel,
427 1 : ) bool {
428 1 : cmp := opts.Comparer.Compare
429 1 : if !canCompactTables(inputLevel.files, inputLevel.level, problemSpans) {
430 1 : return false
431 1 : }
432 1 : pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, inputLevel.files.All())
433 1 :
434 1 : // Setup output files and attempt to grow the inputLevel files with
435 1 : // the expanded key range. No need to do this for intra-L0 compactions;
436 1 : // outputLevel.files is left empty for those.
437 1 : if inputLevel.level != pc.outputLevel.level {
438 1 : // Determine the sstables in the output level which overlap with the compaction
439 1 : // key range.
440 1 : pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.bounds)
441 1 : if !canCompactTables(pc.outputLevel.files, pc.outputLevel.level, problemSpans) {
442 1 : return false
443 1 : }
444 1 : pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, pc.outputLevel.files.All())
445 1 :
446 1 : // maxExpandedBytes is the maximum size of an expanded compaction. If
447 1 : // growing a compaction results in a larger size, the original compaction
448 1 : // is used instead.
449 1 : targetFileSize := opts.TargetFileSize(pc.outputLevel.level, pc.baseLevel)
450 1 : maxExpandedBytes := expandedCompactionByteSizeLimit(opts, targetFileSize, diskAvailBytes)
451 1 :
452 1 : // Grow the sstables in inputLevel.level as long as it doesn't affect the number
453 1 : // of sstables included from pc.outputLevel.level.
454 1 : if pc.lcf != nil && inputLevel.level == 0 {
455 1 : pc.growL0ForBase(cmp, maxExpandedBytes)
456 1 : } else if pc.grow(cmp, pc.bounds, maxExpandedBytes, inputLevel, problemSpans) {
457 1 : // inputLevel was expanded, adjust key range if necessary.
458 1 : pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds, inputLevel.files.All())
459 1 : }
460 : }
461 :
462 1 : if inputLevel.level == 0 {
463 1 : // If L0 is involved, it should always be the startLevel of the compaction.
464 1 : pc.startLevel.l0SublevelInfo = generateSublevelInfo(cmp, pc.startLevel.files)
465 1 : }
466 :
467 1 : return !outputKeyRangeAlreadyCompacting(cmp, inProgressCompactions, pc)
468 : }
469 :
470 : // grow grows the number of inputs at startLevel without changing the number of
471 : // pc.outputLevel files in the compaction, and returns whether the inputs grew. sm
472 : // and la are the smallest and largest InternalKeys in all of the inputs.
473 : func (pc *pickedTableCompaction) grow(
474 : cmp base.Compare,
475 : bounds base.UserKeyBounds,
476 : maxExpandedBytes uint64,
477 : inputLevel *compactionLevel,
478 : problemSpans *problemspans.ByLevel,
479 1 : ) bool {
480 1 : if pc.outputLevel.files.Empty() {
481 1 : return false
482 1 : }
483 1 : expandedInputLevel := pc.version.Overlaps(inputLevel.level, bounds)
484 1 : if !canCompactTables(expandedInputLevel, inputLevel.level, problemSpans) {
485 1 : return false
486 1 : }
487 1 : if expandedInputLevel.Len() <= inputLevel.files.Len() {
488 1 : return false
489 1 : }
490 1 : if expandedInputLevel.AggregateSizeSum()+pc.outputLevel.files.AggregateSizeSum() >= maxExpandedBytes {
491 1 : return false
492 1 : }
493 : // Check that expanding the input level does not change the number of overlapping files in output level.
494 : // We need to include the outputLevel iter because without it, in a multiLevel scenario,
495 : // expandedInputLevel's key range not fully cover all files currently in pc.outputLevel,
496 : // since pc.outputLevel was created using the entire key range which includes higher levels.
497 1 : expandedOutputLevel := pc.version.Overlaps(pc.outputLevel.level,
498 1 : manifest.KeyRange(cmp, expandedInputLevel.All(), pc.outputLevel.files.All()))
499 1 : if expandedOutputLevel.Len() != pc.outputLevel.files.Len() {
500 1 : return false
501 1 : }
502 1 : if !canCompactTables(expandedOutputLevel, pc.outputLevel.level, problemSpans) {
503 0 : return false
504 0 : }
505 1 : inputLevel.files = expandedInputLevel
506 1 : return true
507 : }
508 :
509 : // Similar logic as pc.grow. Additional L0 files are optionally added to the
510 : // compaction at this step. Note that the bounds passed in are not the bounds
511 : // of the compaction, but rather the smallest and largest internal keys that
512 : // the compaction cannot include from L0 without pulling in more Lbase
513 : // files. Consider this example:
514 : //
515 : // L0: c-d e+f g-h
516 : // Lbase: a-b e+f i-j
517 : //
518 : // a b c d e f g h i j
519 : //
520 : // The e-f files have already been chosen in the compaction. As pulling
521 : // in more LBase files is undesirable, the logic below will pass in
522 : // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
523 : // will expand the compaction to include c-d and g-h from L0. The
524 : // bounds passed in are exclusive; the compaction cannot be expanded
525 : // to include files that "touch" it.
526 1 : func (pc *pickedTableCompaction) growL0ForBase(cmp base.Compare, maxExpandedBytes uint64) bool {
527 1 : if invariants.Enabled {
528 1 : if pc.startLevel.level != 0 {
529 0 : panic(fmt.Sprintf("pc.startLevel.level is %d, expected 0", pc.startLevel.level))
530 : }
531 : }
532 :
533 1 : if pc.outputLevel.files.Empty() {
534 1 : // If there are no overlapping fields in the output level, we do not
535 1 : // attempt to expand the compaction to encourage move compactions.
536 1 : return false
537 1 : }
538 :
539 1 : smallestBaseKey := base.InvalidInternalKey
540 1 : largestBaseKey := base.InvalidInternalKey
541 1 : // NB: We use Reslice to access the underlying level's files, but
542 1 : // we discard the returned slice. The pc.outputLevel.files slice
543 1 : // is not modified.
544 1 : _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
545 1 : if sm := start.Prev(); sm != nil {
546 1 : smallestBaseKey = sm.Largest()
547 1 : }
548 1 : if la := end.Next(); la != nil {
549 1 : largestBaseKey = la.Smallest()
550 1 : }
551 : })
552 1 : oldLcf := pc.lcf.Clone()
553 1 : if !pc.l0Organizer.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
554 1 : return false
555 1 : }
556 :
557 1 : var newStartLevelFiles []*manifest.TableMetadata
558 1 : iter := pc.version.Levels[0].Iter()
559 1 : var sizeSum uint64
560 1 : for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
561 1 : if pc.lcf.FilesIncluded[f.L0Index] {
562 1 : newStartLevelFiles = append(newStartLevelFiles, f)
563 1 : sizeSum += f.Size
564 1 : }
565 : }
566 :
567 1 : if sizeSum+pc.outputLevel.files.AggregateSizeSum() >= maxExpandedBytes {
568 1 : *pc.lcf = *oldLcf
569 1 : return false
570 1 : }
571 :
572 1 : pc.startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
573 1 : pc.bounds = manifest.ExtendKeyRange(cmp, pc.bounds,
574 1 : pc.startLevel.files.All(), pc.outputLevel.files.All())
575 1 : return true
576 : }
577 :
578 : // estimatedInputSize returns an estimate of the size of the compaction's
579 : // inputs, including the estimated physical size of input tables' blob
580 : // references.
581 1 : func (pc *pickedTableCompaction) estimatedInputSize() uint64 {
582 1 : var bytesToCompact uint64
583 1 : for i := range pc.inputs {
584 1 : bytesToCompact += pc.inputs[i].files.AggregateSizeSum()
585 1 : }
586 1 : return bytesToCompact
587 : }
588 :
589 : // setupMultiLevelCandidate returns true if it successfully added another level
590 : // to the compaction.
591 : // Note that adding a new level will never change the startLevel inputs, but we
592 : // will attempt to expand the inputs of the intermediate level to the output key range,
593 : // if size constraints allow it.
594 : // For example, consider the following LSM structure, with the initial compaction
595 : // from L1->L2:
596 : // startLevel: L1 [a-b]
597 : // outputLevel: L2 [a-c]
598 : // L1: |a-b | d--e
599 : // L2: |a---c| d----f
600 : // L3: a---------e
601 : //
602 : // When adding L3, we'll expand L2 to include d-f via a call to setupInputs with
603 : // startLevel=L2. L1 will not be expanded.
604 : // startLevel: L1 [a-b]
605 : // intermediateLevel: L2 [a-c, d-f]
606 : // outputLevel: L3 [a-e]
607 : // L1: |a-b | d--e
608 : // L2: |a---c d----f|
609 : // L3: |a---------e |
610 1 : func (pc *pickedTableCompaction) setupMultiLevelCandidate(opts *Options, env compactionEnv) bool {
611 1 : pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
612 1 :
613 1 : // Recalibrate startLevel and outputLevel:
614 1 : // - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
615 1 : // - push outputLevel to extraLevels and move the new level to outputLevel
616 1 : pc.startLevel = &pc.inputs[0]
617 1 : pc.outputLevel = &pc.inputs[2]
618 1 : return pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, &pc.inputs[1], nil /* TODO(radu) */)
619 1 : }
620 :
621 : // canCompactTables returns true if the tables in the level slice are not
622 : // compacting already and don't intersect any problem spans.
623 : func canCompactTables(
624 : inputs manifest.LevelSlice, level int, problemSpans *problemspans.ByLevel,
625 1 : ) bool {
626 1 : for f := range inputs.All() {
627 1 : if f.IsCompacting() {
628 1 : return false
629 1 : }
630 1 : if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
631 0 : return false
632 0 : }
633 : }
634 1 : return true
635 : }
636 :
637 : // newCompactionPickerByScore creates a compactionPickerByScore associated with
638 : // the newest version. The picker is used under logLock (until a new version is
639 : // installed).
640 : func newCompactionPickerByScore(
641 : v *manifest.Version,
642 : lvs *latestVersionState,
643 : opts *Options,
644 : inProgressCompactions []compactionInfo,
645 1 : ) *compactionPickerByScore {
646 1 : p := &compactionPickerByScore{
647 1 : opts: opts,
648 1 : vers: v,
649 1 : latestVersionState: lvs,
650 1 : }
651 1 : p.initLevelMaxBytes(inProgressCompactions)
652 1 : return p
653 1 : }
654 :
655 : // Information about a candidate compaction level that has been identified by
656 : // the compaction picker.
657 : type candidateLevelInfo struct {
658 : // The fill factor of the level, calculated using uncompensated file sizes and
659 : // without any adjustments. A factor > 1 means that the level has more data
660 : // than the ideal size for that level.
661 : //
662 : // For L0, the fill factor is calculated based on the number of sublevels
663 : // (see calculateL0FillFactor).
664 : //
665 : // For L1+, the fill factor is the ratio between the total uncompensated file
666 : // size and the ideal size of the level (based on the total size of the DB).
667 : fillFactor float64
668 :
669 : // The score of the level, used to rank levels.
670 : //
671 : // If the level doesn't require compaction, the score is 0. Otherwise:
672 : // - for L6 the score is equal to the fillFactor;
673 : // - for L0-L5:
674 : // - if the fillFactor is < 1: the score is equal to the fillFactor;
675 : // - if the fillFactor is >= 1: the score is the ratio between the
676 : // fillFactor and the next level's fillFactor.
677 : score float64
678 :
679 : // The fill factor of the level after accounting for level size compensation.
680 : //
681 : // For L0, the compensatedFillFactor is equal to the fillFactor as we don't
682 : // account for level size compensation in L0.
683 : //
684 : // For l1+, the compensatedFillFactor takes into account the estimated
685 : // savings in the lower levels because of deletions.
686 : //
687 : // The compensated fill factor is used to determine if the level should be
688 : // compacted (see calculateLevelScores).
689 : compensatedFillFactor float64
690 :
691 : level int
692 : // The level to compact to.
693 : outputLevel int
694 : // The file in level that will be compacted. Additional files may be
695 : // picked by the compaction, and a pickedCompaction created for the
696 : // compaction.
697 : file manifest.LevelFile
698 : }
699 :
700 1 : func (c *candidateLevelInfo) shouldCompact() bool {
701 1 : return c.score > 0
702 1 : }
703 :
704 1 : func tableTombstoneCompensation(t *manifest.TableMetadata) uint64 {
705 1 : if stats, ok := t.Stats(); ok {
706 1 : return stats.PointDeletionsBytesEstimate + stats.RangeDeletionsBytesEstimate
707 1 : }
708 1 : return 0
709 : }
710 :
711 : // tableCompensatedSize returns t's size, including an estimate of the physical
712 : // size of its external references, and inflated according to compaction
713 : // priorities.
714 1 : func tableCompensatedSize(t *manifest.TableMetadata) uint64 {
715 1 : // Add in the estimate of disk space that may be reclaimed by compacting the
716 1 : // table's tombstones.
717 1 : return t.Size + t.EstimatedReferenceSize() + tableTombstoneCompensation(t)
718 1 : }
719 :
720 : // totalCompensatedSize computes the compensated size over a table metadata
721 : // iterator. Note that this function is linear in the files available to the
722 : // iterator. Use the compensatedSizeAnnotator if querying the total
723 : // compensated size of a level.
724 1 : func totalCompensatedSize(iter iter.Seq[*manifest.TableMetadata]) uint64 {
725 1 : var sz uint64
726 1 : for f := range iter {
727 1 : sz += tableCompensatedSize(f)
728 1 : }
729 1 : return sz
730 : }
731 :
732 : // compactionPickerByScore holds the state and logic for picking a compaction. A
733 : // compaction picker is associated with a single version. A new compaction
734 : // picker is created and initialized every time a new version is installed.
735 : type compactionPickerByScore struct {
736 : opts *Options
737 : vers *manifest.Version
738 : // Unlike vers, which is immutable and the latest version when this picker
739 : // is created, latestVersionState represents the mutable state of the latest
740 : // version. This means that at some point in the future a
741 : // compactionPickerByScore created in the past will have mutually
742 : // inconsistent state in vers and latestVersionState. This is not a problem
743 : // since (a) a new picker is created in UpdateVersionLocked when a new
744 : // version is installed, and (b) only the latest picker is used for picking
745 : // compactions. This is ensured by holding versionSet.logLock for both (a)
746 : // and (b).
747 : latestVersionState *latestVersionState
748 : // The level to target for L0 compactions. Levels L1 to baseLevel must be
749 : // empty.
750 : baseLevel int
751 : // levelMaxBytes holds the dynamically adjusted max bytes setting for each
752 : // level.
753 : levelMaxBytes [numLevels]int64
754 : dbSizeBytes uint64
755 : }
756 :
757 : var _ compactionPicker = &compactionPickerByScore{}
758 :
759 0 : func (p *compactionPickerByScore) getMetrics(inProgress []compactionInfo) compactionPickerMetrics {
760 0 : var m compactionPickerMetrics
761 0 : for _, info := range p.calculateLevelScores(inProgress) {
762 0 : m.levels[info.level].score = info.score
763 0 : m.levels[info.level].fillFactor = info.fillFactor
764 0 : m.levels[info.level].compensatedFillFactor = info.compensatedFillFactor
765 0 : }
766 0 : return m
767 : }
768 :
769 1 : func (p *compactionPickerByScore) getBaseLevel() int {
770 1 : if p == nil {
771 0 : return 1
772 0 : }
773 1 : return p.baseLevel
774 : }
775 :
776 : // estimatedCompactionDebt estimates the number of bytes which need to be
777 : // compacted before the LSM tree becomes stable.
778 1 : func (p *compactionPickerByScore) estimatedCompactionDebt() uint64 {
779 1 : if p == nil {
780 0 : return 0
781 0 : }
782 :
783 : // We assume that all the bytes in L0 need to be compacted to Lbase. This is
784 : // unlike the RocksDB logic that figures out whether L0 needs compaction.
785 1 : bytesAddedToNextLevel := p.vers.Levels[0].AggregateSize()
786 1 : lbaseSize := p.vers.Levels[p.baseLevel].AggregateSize()
787 1 :
788 1 : var compactionDebt uint64
789 1 : if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
790 1 : // We only incur compaction debt if both L0 and Lbase contain data. If L0
791 1 : // is empty, no compaction is necessary. If Lbase is empty, a move-based
792 1 : // compaction from L0 would occur.
793 1 : compactionDebt += bytesAddedToNextLevel + lbaseSize
794 1 : }
795 :
796 : // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
797 : // bytes added to `level` in the loop.
798 1 : for level := p.baseLevel; level < numLevels-1; level++ {
799 1 : levelSize := p.vers.Levels[level].AggregateSize() + bytesAddedToNextLevel
800 1 : nextLevelSize := p.vers.Levels[level+1].AggregateSize()
801 1 : if levelSize > uint64(p.levelMaxBytes[level]) {
802 1 : bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
803 1 : if nextLevelSize > 0 {
804 1 : // We only incur compaction debt if the next level contains data. If the
805 1 : // next level is empty, a move-based compaction would be used.
806 1 : levelRatio := float64(nextLevelSize) / float64(levelSize)
807 1 : // The current level contributes bytesAddedToNextLevel to compactions.
808 1 : // The next level contributes levelRatio * bytesAddedToNextLevel.
809 1 : compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
810 1 : }
811 1 : } else {
812 1 : // We're not moving any bytes to the next level.
813 1 : bytesAddedToNextLevel = 0
814 1 : }
815 : }
816 1 : return compactionDebt
817 : }
818 :
819 1 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
820 1 : // The levelMaxBytes calculations here differ from RocksDB in two ways:
821 1 : //
822 1 : // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
823 1 : // level in L1-L6, rather than determining the size of the bottom level
824 1 : // based on the total amount of data in the dB. The RocksDB calculation is
825 1 : // problematic if L0 contains a significant fraction of data, or if the
826 1 : // level sizes are roughly equal and thus there is a significant fraction
827 1 : // of data outside of the largest level.
828 1 : //
829 1 : // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
830 1 : // baseBytesMax as the maximum of the configured LBaseMaxBytes and the
831 1 : // size of L0. This is problematic because baseBytesMax is used to compute
832 1 : // the max size of lower levels. A very large baseBytesMax will result in
833 1 : // an overly large value for the size of lower levels which will caused
834 1 : // those levels not to be compacted even when they should be
835 1 : // compacted. This often results in "inverted" LSM shapes where Ln is
836 1 : // larger than Ln+1.
837 1 :
838 1 : // Determine the first non-empty level and the total DB size.
839 1 : firstNonEmptyLevel := -1
840 1 : var dbSize uint64
841 1 : for level := 1; level < numLevels; level++ {
842 1 : if p.vers.Levels[level].AggregateSize() > 0 {
843 1 : if firstNonEmptyLevel == -1 {
844 1 : firstNonEmptyLevel = level
845 1 : }
846 1 : dbSize += p.vers.Levels[level].AggregateSize()
847 : }
848 : }
849 1 : for _, c := range inProgressCompactions {
850 1 : if c.outputLevel == 0 || c.outputLevel == -1 {
851 1 : continue
852 : }
853 1 : if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
854 1 : firstNonEmptyLevel = c.outputLevel
855 1 : }
856 : }
857 :
858 : // Initialize the max-bytes setting for each level to "infinity" which will
859 : // disallow compaction for that level. We'll fill in the actual value below
860 : // for levels we want to allow compactions from.
861 1 : for level := 0; level < numLevels; level++ {
862 1 : p.levelMaxBytes[level] = math.MaxInt64
863 1 : }
864 :
865 1 : dbSizeBelowL0 := dbSize
866 1 : dbSize += p.vers.Levels[0].AggregateSize()
867 1 : p.dbSizeBytes = dbSize
868 1 : if dbSizeBelowL0 == 0 {
869 1 : // No levels for L1 and up contain any data. Target L0 compactions for the
870 1 : // last level or to the level to which there is an ongoing L0 compaction.
871 1 : p.baseLevel = numLevels - 1
872 1 : if firstNonEmptyLevel >= 0 {
873 1 : p.baseLevel = firstNonEmptyLevel
874 1 : }
875 1 : return
876 : }
877 :
878 1 : bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
879 1 :
880 1 : curLevelSize := bottomLevelSize
881 1 : for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
882 1 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
883 1 : }
884 :
885 : // Compute base level (where L0 data is compacted to).
886 1 : baseBytesMax := uint64(p.opts.LBaseMaxBytes)
887 1 : p.baseLevel = firstNonEmptyLevel
888 1 : for p.baseLevel > 1 && curLevelSize > baseBytesMax {
889 1 : p.baseLevel--
890 1 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
891 1 : }
892 :
893 1 : smoothedLevelMultiplier := 1.0
894 1 : if p.baseLevel < numLevels-1 {
895 1 : smoothedLevelMultiplier = math.Pow(
896 1 : float64(bottomLevelSize)/float64(baseBytesMax),
897 1 : 1.0/float64(numLevels-p.baseLevel-1))
898 1 : }
899 :
900 1 : levelSize := float64(baseBytesMax)
901 1 : for level := p.baseLevel; level < numLevels; level++ {
902 1 : if level > p.baseLevel && levelSize > 0 {
903 1 : levelSize *= smoothedLevelMultiplier
904 1 : }
905 : // Round the result since test cases use small target level sizes, which
906 : // can be impacted by floating-point imprecision + integer truncation.
907 1 : roundedLevelSize := math.Round(levelSize)
908 1 : if roundedLevelSize > float64(math.MaxInt64) {
909 0 : p.levelMaxBytes[level] = math.MaxInt64
910 1 : } else {
911 1 : p.levelMaxBytes[level] = int64(roundedLevelSize)
912 1 : }
913 : }
914 : }
915 :
916 : type levelSizeAdjust struct {
917 : incomingActualBytes uint64
918 : outgoingActualBytes uint64
919 : outgoingCompensatedBytes uint64
920 : }
921 :
922 1 : func (a levelSizeAdjust) compensated() uint64 {
923 1 : return a.incomingActualBytes - a.outgoingCompensatedBytes
924 1 : }
925 :
926 1 : func (a levelSizeAdjust) actual() uint64 {
927 1 : return a.incomingActualBytes - a.outgoingActualBytes
928 1 : }
929 :
930 1 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
931 1 : // Compute size adjustments for each level based on the in-progress
932 1 : // compactions. We sum the file sizes of all files leaving and entering each
933 1 : // level in in-progress compactions. For outgoing files, we also sum a
934 1 : // separate sum of 'compensated file sizes', which are inflated according
935 1 : // to deletion estimates.
936 1 : //
937 1 : // When we adjust a level's size according to these values during score
938 1 : // calculation, we subtract the compensated size of start level inputs to
939 1 : // account for the fact that score calculation uses compensated sizes.
940 1 : //
941 1 : // Since compensated file sizes may be compensated because they reclaim
942 1 : // space from the output level's files, we only add the real file size to
943 1 : // the output level.
944 1 : //
945 1 : // This is slightly different from RocksDB's behavior, which simply elides
946 1 : // compacting files from the level size calculation.
947 1 : var sizeAdjust [numLevels]levelSizeAdjust
948 1 : for i := range inProgressCompactions {
949 1 : c := &inProgressCompactions[i]
950 1 : // If this compaction's version edit has already been applied, there's
951 1 : // no need to adjust: The LSM we'll examine will already reflect the
952 1 : // new LSM state.
953 1 : if c.versionEditApplied {
954 1 : continue
955 : }
956 :
957 1 : for _, input := range c.inputs {
958 1 : actualSize := input.files.AggregateSizeSum()
959 1 : compensatedSize := totalCompensatedSize(input.files.All())
960 1 :
961 1 : if input.level != c.outputLevel {
962 1 : sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
963 1 : sizeAdjust[input.level].outgoingActualBytes += actualSize
964 1 : if c.outputLevel != -1 {
965 1 : sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
966 1 : }
967 : }
968 : }
969 : }
970 1 : return sizeAdjust
971 : }
972 :
973 : // calculateLevelScores calculates the candidateLevelInfo for all levels and
974 : // returns them in decreasing score order.
975 : func (p *compactionPickerByScore) calculateLevelScores(
976 : inProgressCompactions []compactionInfo,
977 1 : ) [numLevels]candidateLevelInfo {
978 1 : var scores [numLevels]candidateLevelInfo
979 1 : for i := range scores {
980 1 : scores[i].level = i
981 1 : scores[i].outputLevel = i + 1
982 1 : }
983 1 : l0FillFactor := calculateL0FillFactor(p.vers, p.latestVersionState.l0Organizer, p.opts, inProgressCompactions)
984 1 : scores[0] = candidateLevelInfo{
985 1 : outputLevel: p.baseLevel,
986 1 : fillFactor: l0FillFactor,
987 1 : compensatedFillFactor: l0FillFactor, // No compensation for L0.
988 1 : }
989 1 : sizeAdjust := calculateSizeAdjust(inProgressCompactions)
990 1 : for level := 1; level < numLevels; level++ {
991 1 : // Actual file size.
992 1 : compensatedLevelSize := p.vers.Levels[level].AggregateSize()
993 1 : // Deletions.
994 1 : delBytes := deletionBytesAnnotator.LevelAnnotation(p.vers.Levels[level])
995 1 : compensatedLevelSize += delBytes.PointDels + delBytes.RangeDels
996 1 : // Adjustments for in-progress compactions.
997 1 : compensatedLevelSize += sizeAdjust[level].compensated()
998 1 : scores[level].compensatedFillFactor = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
999 1 : scores[level].fillFactor = float64(p.vers.Levels[level].AggregateSize()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
1000 1 : }
1001 :
1002 : // Adjust each level's fill factor by the fill factor of the next level to get
1003 : // an (uncompensated) score; and each level's compensated fill factor by the
1004 : // fill factor of the next level to get a compensated score.
1005 : //
1006 : // The compensated score is used to determine if the level should be compacted
1007 : // at all. The (uncompensated) score is used as the value used to rank levels.
1008 : //
1009 : // If the next level has a high fill factor, and is thus a priority for
1010 : // compaction, this reduces the priority for compacting the current level. If
1011 : // the next level has a low fill factor (i.e. it is below its target size),
1012 : // this increases the priority for compacting the current level.
1013 : //
1014 : // The effect of this adjustment is to help prioritize compactions in lower
1015 : // levels. The following example shows the scores and the fill factors. In this
1016 : // scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase) is significantly above its
1017 : // target size. The original score prioritizes compactions from those two
1018 : // levels, but doing so ends up causing a future problem: data piles up in the
1019 : // higher levels, starving L5->L6 compactions, and to a lesser degree starving
1020 : // L4->L5 compactions.
1021 : //
1022 : // Note that in the example shown there is no level size compensation so the
1023 : // compensatedFillFactor and fillFactor are the same for each level.
1024 : //
1025 : // score fillFactor compensatedFillFactor size max-size
1026 : // L0 3.2 68.0 68.0 2.2 G -
1027 : // L3 3.2 21.1 21.1 1.3 G 64 M
1028 : // L4 3.4 6.7 6.7 3.1 G 467 M
1029 : // L5 3.4 2.0 2.0 6.6 G 3.3 G
1030 : // L6 0 0.6 0.6 14 G 24 G
1031 : //
1032 : // TODO(radu): the way compensation works needs some rethinking. For example,
1033 : // if compacting L5 can free up a lot of space in L6, the score of L5 should
1034 : // go *up* with the fill factor of L6, not the other way around.
1035 1 : for level := 0; level < numLevels; level++ {
1036 1 : if level > 0 && level < p.baseLevel {
1037 1 : continue
1038 : }
1039 1 : const compensatedFillFactorThreshold = 1.0
1040 1 : if scores[level].compensatedFillFactor < compensatedFillFactorThreshold {
1041 1 : // No need to compact this level; score stays 0.
1042 1 : continue
1043 : }
1044 1 : score := scores[level].fillFactor
1045 1 : compensatedScore := scores[level].compensatedFillFactor
1046 1 : if level < numLevels-1 {
1047 1 : nextLevel := scores[level].outputLevel
1048 1 : // Avoid absurdly large scores by placing a floor on the factor that we'll
1049 1 : // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily.
1050 1 : denominator := max(0.01, scores[nextLevel].fillFactor)
1051 1 : score /= denominator
1052 1 : compensatedScore /= denominator
1053 1 : }
1054 :
1055 : // We calculated a compensated score above by dividing the
1056 : // compensatedFillFactor by the next level's fill factor. Previous
1057 : // versions of Pebble had a default behavior of only considering levels
1058 : // with a compensatedScore >= 1.0 eligible for compaction. This wasn't a
1059 : // principled decision and has been experimentally observed to limit
1060 : // productive compactions that would reclaim disk space, but were
1061 : // prohibited because the output level's fill factor was > 1.0.
1062 : //
1063 : // We allow the use of old behavior through the
1064 : // UseDeprecatedCompensatedScore option, which if true, only considers
1065 : // the level eligible for compaction iff both compensatedFillFactor and
1066 : // compensatedScore are >= 1.0.
1067 : //
1068 : // Otherwise, only L0 requires the compensatedScore to be >= 1.0; all
1069 : // other levels only require the compensatedFillFactor to be >= 1.0. L0
1070 : // is treated exceptionally out of concern that a large, LBase that's
1071 : // being compacted into the next level may prevent L0->Lbase compactions
1072 : // and attempting to pick L0 compactions may result in intra-L0
1073 : // compactions.
1074 1 : const compensatedScoreThreshold = 1.0
1075 1 : if (level == 0 || p.opts.Experimental.UseDeprecatedCompensatedScore()) &&
1076 1 : compensatedScore < compensatedScoreThreshold {
1077 1 : // No need to compact this level; score stays 0.
1078 1 : continue
1079 : }
1080 1 : scores[level].score = score
1081 : }
1082 : // Sort by score (decreasing) and break ties by level (increasing).
1083 1 : slices.SortFunc(scores[:], func(a, b candidateLevelInfo) int {
1084 1 : if a.score != b.score {
1085 1 : return cmp.Compare(b.score, a.score)
1086 1 : }
1087 1 : return cmp.Compare(a.level, b.level)
1088 : })
1089 1 : return scores
1090 : }
1091 :
1092 : // calculateL0FillFactor calculates a float value representing the relative
1093 : // priority of compacting L0. A value less than 1 indicates that L0 does not
1094 : // need any compactions.
1095 : //
1096 : // L0 is special in that files within L0 may overlap one another, so a different
1097 : // set of heuristics that take into account read amplification apply.
1098 : func calculateL0FillFactor(
1099 : vers *manifest.Version,
1100 : l0Organizer *manifest.L0Organizer,
1101 : opts *Options,
1102 : inProgressCompactions []compactionInfo,
1103 1 : ) float64 {
1104 1 : // Use the sublevel count to calculate the score. The base vs intra-L0
1105 1 : // compaction determination happens in pickAuto, not here.
1106 1 : score := float64(2*l0Organizer.MaxDepthAfterOngoingCompactions()) /
1107 1 : float64(opts.L0CompactionThreshold)
1108 1 :
1109 1 : // Also calculate a score based on the file count but use it only if it
1110 1 : // produces a higher score than the sublevel-based one. This heuristic is
1111 1 : // designed to accommodate cases where L0 is accumulating non-overlapping
1112 1 : // files in L0. Letting too many non-overlapping files accumulate in few
1113 1 : // sublevels is undesirable, because:
1114 1 : // 1) we can produce a massive backlog to compact once files do overlap.
1115 1 : // 2) constructing L0 sublevels has a runtime that grows superlinearly with
1116 1 : // the number of files in L0 and must be done while holding D.mu.
1117 1 : noncompactingFiles := vers.Levels[0].Len()
1118 1 : for _, c := range inProgressCompactions {
1119 1 : for _, cl := range c.inputs {
1120 1 : if cl.level == 0 {
1121 1 : noncompactingFiles -= cl.files.Len()
1122 1 : }
1123 : }
1124 : }
1125 1 : fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
1126 1 : if score < fileScore {
1127 1 : score = fileScore
1128 1 : }
1129 1 : return score
1130 : }
1131 :
1132 : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
1133 : // compaction around. Currently, this function implements a heuristic similar to
1134 : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
1135 : // function is linear with respect to the number of files in `level` and
1136 : // `outputLevel`.
1137 : func pickCompactionSeedFile(
1138 : vers *manifest.Version,
1139 : virtualBackings *manifest.VirtualBackings,
1140 : opts *Options,
1141 : level, outputLevel int,
1142 : earliestSnapshotSeqNum base.SeqNum,
1143 : problemSpans *problemspans.ByLevel,
1144 1 : ) (manifest.LevelFile, bool) {
1145 1 : // Select the file within the level to compact. We want to minimize write
1146 1 : // amplification, but also ensure that (a) deletes are propagated to the
1147 1 : // bottom level in a timely fashion, and (b) virtual sstables that are
1148 1 : // pinning backing sstables where most of the data is garbage are compacted
1149 1 : // away. Doing (a) and (b) reclaims disk space. A table's smallest sequence
1150 1 : // number provides a measure of its age. The ratio of overlapping-bytes /
1151 1 : // table-size gives an indication of write amplification (a smaller ratio is
1152 1 : // preferrable).
1153 1 : //
1154 1 : // The current heuristic is based off the RocksDB kMinOverlappingRatio
1155 1 : // heuristic. It chooses the file with the minimum overlapping ratio with
1156 1 : // the target level, which minimizes write amplification.
1157 1 : //
1158 1 : // The heuristic uses a "compensated size" for the denominator, which is the
1159 1 : // file size inflated by (a) an estimate of the space that may be reclaimed
1160 1 : // through compaction, and (b) a fraction of the amount of garbage in the
1161 1 : // backing sstable pinned by this (virtual) sstable.
1162 1 : //
1163 1 : // TODO(peter): For concurrent compactions, we may want to try harder to
1164 1 : // pick a seed file whose resulting compaction bounds do not overlap with
1165 1 : // an in-progress compaction.
1166 1 :
1167 1 : cmp := opts.Comparer.Compare
1168 1 : startIter := vers.Levels[level].Iter()
1169 1 : outputIter := vers.Levels[outputLevel].Iter()
1170 1 :
1171 1 : var file manifest.LevelFile
1172 1 : smallestRatio := uint64(math.MaxUint64)
1173 1 :
1174 1 : outputFile := outputIter.First()
1175 1 :
1176 1 : for f := startIter.First(); f != nil; f = startIter.Next() {
1177 1 : var overlappingBytes uint64
1178 1 : if f.IsCompacting() {
1179 1 : // Move on if this file is already being compacted. We'll likely
1180 1 : // still need to move past the overlapping output files regardless,
1181 1 : // but in cases where all start-level files are compacting we won't.
1182 1 : continue
1183 : }
1184 1 : if problemSpans != nil && problemSpans.Overlaps(level, f.UserKeyBounds()) {
1185 0 : // File touches problem span which temporarily disallows auto compactions.
1186 0 : continue
1187 : }
1188 :
1189 : // Trim any output-level files smaller than f.
1190 1 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest(), f.Smallest()) < 0 {
1191 1 : outputFile = outputIter.Next()
1192 1 : }
1193 :
1194 1 : skip := false
1195 1 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest(), f.Largest()) <= 0 {
1196 1 : overlappingBytes += outputFile.Size
1197 1 : if outputFile.IsCompacting() {
1198 1 : // If one of the overlapping files is compacting, we're not going to be
1199 1 : // able to compact f anyway, so skip it.
1200 1 : skip = true
1201 1 : break
1202 : }
1203 1 : if problemSpans != nil && problemSpans.Overlaps(outputLevel, outputFile.UserKeyBounds()) {
1204 0 : // Overlapping file touches problem span which temporarily disallows auto compactions.
1205 0 : skip = true
1206 0 : break
1207 : }
1208 :
1209 : // For files in the bottommost level of the LSM, the
1210 : // Stats.RangeDeletionsBytesEstimate field is set to the estimate
1211 : // of bytes /within/ the file itself that may be dropped by
1212 : // recompacting the file. These bytes from obsolete keys would not
1213 : // need to be rewritten if we compacted `f` into `outputFile`, so
1214 : // they don't contribute to write amplification. Subtracting them
1215 : // out of the overlapping bytes helps prioritize these compactions
1216 : // that are cheaper than their file sizes suggest.
1217 1 : if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
1218 1 : if stats, ok := outputFile.Stats(); ok {
1219 1 : overlappingBytes -= stats.RangeDeletionsBytesEstimate
1220 1 : }
1221 : }
1222 :
1223 : // If the file in the next level extends beyond f's largest key,
1224 : // break out and don't advance outputIter because f's successor
1225 : // might also overlap.
1226 : //
1227 : // Note, we stop as soon as we encounter an output-level file with a
1228 : // largest key beyond the input-level file's largest bound. We
1229 : // perform a simple user key comparison here using sstableKeyCompare
1230 : // which handles the potential for exclusive largest key bounds.
1231 : // There's some subtlety when the bounds are equal (eg, equal and
1232 : // inclusive, or equal and exclusive). Current Pebble doesn't split
1233 : // user keys across sstables within a level (and in format versions
1234 : // FormatSplitUserKeysMarkedCompacted and later we guarantee no
1235 : // split user keys exist within the entire LSM). In that case, we're
1236 : // assured that neither the input level nor the output level's next
1237 : // file shares the same user key, so compaction expansion will not
1238 : // include them in any compaction compacting `f`.
1239 : //
1240 : // NB: If we /did/ allow split user keys, or we're running on an
1241 : // old database with an earlier format major version where there are
1242 : // existing split user keys, this logic would be incorrect. Consider
1243 : // L1: [a#120,a#100] [a#80,a#60]
1244 : // L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
1245 : // While considering the first file in L1, [a#120,a#100], we'd skip
1246 : // past all of the files in L2. When considering the second file in
1247 : // L1, we'd improperly conclude that the second file overlaps
1248 : // nothing in the second level and is cheap to compact, when in
1249 : // reality we'd need to expand the compaction to include all 5
1250 : // files.
1251 1 : if sstableKeyCompare(cmp, outputFile.Largest(), f.Largest()) > 0 {
1252 1 : break
1253 : }
1254 1 : outputFile = outputIter.Next()
1255 : }
1256 1 : if skip {
1257 1 : continue
1258 : }
1259 :
1260 1 : compSz := tableCompensatedSize(f) + responsibleForGarbageBytes(virtualBackings, f)
1261 1 : scaledRatio := overlappingBytes * 1024 / compSz
1262 1 : if scaledRatio < smallestRatio {
1263 1 : smallestRatio = scaledRatio
1264 1 : file = startIter.Take()
1265 1 : }
1266 : }
1267 1 : return file, file.TableMetadata != nil
1268 : }
1269 :
1270 : // responsibleForGarbageBytes returns the amount of garbage in the backing
1271 : // sstable that we consider the responsibility of this virtual sstable. For
1272 : // non-virtual sstables, this is of course 0. For virtual sstables, we equally
1273 : // distribute the responsibility of the garbage across all the virtual
1274 : // sstables that are referencing the same backing sstable. One could
1275 : // alternatively distribute this in proportion to the virtual sst sizes, but
1276 : // it isn't clear that more sophisticated heuristics are worth it, given that
1277 : // the garbage cannot be reclaimed until all the referencing virtual sstables
1278 : // are compacted.
1279 : func responsibleForGarbageBytes(
1280 : virtualBackings *manifest.VirtualBackings, m *manifest.TableMetadata,
1281 1 : ) uint64 {
1282 1 : if !m.Virtual {
1283 1 : return 0
1284 1 : }
1285 1 : useCount, virtualizedSize := virtualBackings.Usage(m.TableBacking.DiskFileNum)
1286 1 : // Since virtualizedSize is the sum of the estimated size of all virtual
1287 1 : // ssts, we allow for the possibility that virtualizedSize could exceed
1288 1 : // m.TableBacking.Size.
1289 1 : totalGarbage := int64(m.TableBacking.Size) - int64(virtualizedSize)
1290 1 : if totalGarbage <= 0 {
1291 1 : return 0
1292 1 : }
1293 1 : if useCount == 0 {
1294 0 : // This cannot happen if m exists in the latest version. The call to
1295 0 : // ResponsibleForGarbageBytes during compaction picking ensures that m
1296 0 : // exists in the latest version by holding versionSet.logLock.
1297 0 : panic(errors.AssertionFailedf("%s has zero useCount", m.String()))
1298 : }
1299 1 : return uint64(totalGarbage) / uint64(useCount)
1300 : }
1301 :
1302 1 : func (p *compactionPickerByScore) getCompactionConcurrency() int {
1303 1 : lower, upper := p.opts.CompactionConcurrencyRange()
1304 1 : if lower >= upper {
1305 1 : return upper
1306 1 : }
1307 : // Compaction concurrency is controlled by L0 read-amp. We allow one
1308 : // additional compaction per L0CompactionConcurrency sublevels, as well as
1309 : // one additional compaction per CompactionDebtConcurrency bytes of
1310 : // compaction debt. Compaction concurrency is tied to L0 sublevels as that
1311 : // signal is independent of the database size. We tack on the compaction
1312 : // debt as a second signal to prevent compaction concurrency from dropping
1313 : // significantly right after a base compaction finishes, and before those
1314 : // bytes have been compacted further down the LSM.
1315 : //
1316 : // Let n be the number of in-progress compactions.
1317 : //
1318 : // l0ReadAmp >= ccSignal1 then can run another compaction, where
1319 : // ccSignal1 = n * p.opts.Experimental.L0CompactionConcurrency
1320 : // Rearranging,
1321 : // n <= l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency.
1322 : // So we can run up to
1323 : // l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency extra compactions.
1324 1 : l0ReadAmpCompactions := 0
1325 1 : if p.opts.Experimental.L0CompactionConcurrency > 0 {
1326 1 : l0ReadAmp := p.latestVersionState.l0Organizer.MaxDepthAfterOngoingCompactions()
1327 1 : l0ReadAmpCompactions = (l0ReadAmp / p.opts.Experimental.L0CompactionConcurrency)
1328 1 : }
1329 : // compactionDebt >= ccSignal2 then can run another compaction, where
1330 : // ccSignal2 = uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
1331 : // Rearranging,
1332 : // n <= compactionDebt / p.opts.Experimental.CompactionDebtConcurrency
1333 : // So we can run up to
1334 : // compactionDebt / p.opts.Experimental.CompactionDebtConcurrency extra
1335 : // compactions.
1336 1 : compactionDebtCompactions := 0
1337 1 : if p.opts.Experimental.CompactionDebtConcurrency > 0 {
1338 1 : compactionDebt := p.estimatedCompactionDebt()
1339 1 : compactionDebtCompactions = int(compactionDebt / p.opts.Experimental.CompactionDebtConcurrency)
1340 1 : }
1341 :
1342 1 : compactableGarbageCompactions := 0
1343 1 : garbageFractionLimit := p.opts.Experimental.CompactionGarbageFractionForMaxConcurrency()
1344 1 : if garbageFractionLimit > 0 && p.dbSizeBytes > 0 {
1345 1 : delBytes := deletionBytesAnnotator.MultiLevelAnnotation(p.vers.Levels[:])
1346 1 : compactableGarbageBytes := delBytes.PointDels + delBytes.RangeDels
1347 1 : garbageFraction := float64(compactableGarbageBytes) / float64(p.dbSizeBytes)
1348 1 : compactableGarbageCompactions =
1349 1 : int((garbageFraction / garbageFractionLimit) * float64(upper-lower))
1350 1 : }
1351 :
1352 1 : extraCompactions := max(l0ReadAmpCompactions, compactionDebtCompactions, compactableGarbageCompactions, 0)
1353 1 :
1354 1 : return min(lower+extraCompactions, upper)
1355 : }
1356 :
1357 : // TODO(sumeer): remove unless someone actually finds this useful.
1358 : func (p *compactionPickerByScore) logCompactionForTesting(
1359 : env compactionEnv, scores [numLevels]candidateLevelInfo, pc *pickedTableCompaction,
1360 0 : ) {
1361 0 : var buf bytes.Buffer
1362 0 : for i := 0; i < numLevels; i++ {
1363 0 : if i != 0 && i < p.baseLevel {
1364 0 : continue
1365 : }
1366 :
1367 0 : var info *candidateLevelInfo
1368 0 : for j := range scores {
1369 0 : if scores[j].level == i {
1370 0 : info = &scores[j]
1371 0 : break
1372 : }
1373 : }
1374 :
1375 0 : marker := " "
1376 0 : if pc.startLevel.level == info.level {
1377 0 : marker = "*"
1378 0 : }
1379 0 : fmt.Fprintf(&buf, " %sL%d: score:%5.1f fillFactor:%5.1f compensatedFillFactor:%5.1f %8s %8s",
1380 0 : marker, info.level, info.score, info.fillFactor, info.compensatedFillFactor,
1381 0 : humanize.Bytes.Int64(int64(totalCompensatedSize(
1382 0 : p.vers.Levels[info.level].All(),
1383 0 : ))),
1384 0 : humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
1385 0 : )
1386 0 :
1387 0 : count := 0
1388 0 : for i := range env.inProgressCompactions {
1389 0 : c := &env.inProgressCompactions[i]
1390 0 : if c.inputs[0].level != info.level {
1391 0 : continue
1392 : }
1393 0 : count++
1394 0 : if count == 1 {
1395 0 : fmt.Fprintf(&buf, " [")
1396 0 : } else {
1397 0 : fmt.Fprintf(&buf, " ")
1398 0 : }
1399 0 : fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
1400 : }
1401 0 : if count > 0 {
1402 0 : fmt.Fprintf(&buf, "]")
1403 0 : }
1404 0 : fmt.Fprintf(&buf, "\n")
1405 : }
1406 0 : p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
1407 0 : pc.startLevel.level, pc.outputLevel.level, buf.String())
1408 : }
1409 :
1410 : // pickHighPrioritySpaceCompaction checks for a high-priority space reclamation
1411 : // compaction. Under some circumstances, we want to persue a compaction for the
1412 : // purpose of reclaiming disk space even when there are eligible default
1413 : // compactions.
1414 : func (p *compactionPickerByScore) pickHighPrioritySpaceCompaction(
1415 : env compactionEnv,
1416 1 : ) pickedCompaction {
1417 1 : if pc := p.pickBlobFileRewriteCompactionHighPriority(env); pc != nil {
1418 1 : return pc
1419 1 : }
1420 : // NB: We can't just return the above result because the above func returns
1421 : // a *pickedBlobFileCompaction, not a pickedCompaction. We need to return an
1422 : // untyped nil.
1423 1 : return nil
1424 : }
1425 :
1426 : // pickAutoScore picks the best score-based compaction, if any.
1427 : //
1428 : // On each call, pickAutoScore computes per-level size adjustments based on
1429 : // in-progress compactions, and computes a per-level score. The levels are
1430 : // iterated over in decreasing score order trying to find a valid compaction
1431 : // anchored at that level.
1432 : //
1433 : // If a score-based compaction cannot be found, pickAuto falls back to looking
1434 : // for an elision-only compaction to remove obsolete keys.
1435 1 : func (p *compactionPickerByScore) pickAutoScore(env compactionEnv) pickedCompaction {
1436 1 : scores := p.calculateLevelScores(env.inProgressCompactions)
1437 1 :
1438 1 : // Check for a score-based compaction. candidateLevelInfos are first sorted
1439 1 : // by whether they should be compacted, so if we find a level which shouldn't
1440 1 : // be compacted, we can break early.
1441 1 : for i := range scores {
1442 1 : info := &scores[i]
1443 1 : if !info.shouldCompact() {
1444 1 : break
1445 : }
1446 1 : if info.level == numLevels-1 {
1447 1 : continue
1448 : }
1449 :
1450 1 : if info.level == 0 {
1451 1 : ptc := pickL0(env, p.opts, p.vers, p.latestVersionState.l0Organizer, p.baseLevel)
1452 1 : if ptc != nil {
1453 1 : p.addScoresToPickedCompactionMetrics(ptc, scores)
1454 1 : ptc.score = info.score
1455 1 : if false {
1456 0 : p.logCompactionForTesting(env, scores, ptc)
1457 0 : }
1458 1 : return ptc
1459 : }
1460 1 : continue
1461 : }
1462 :
1463 : // info.level > 0
1464 1 : var ok bool
1465 1 : info.file, ok = pickCompactionSeedFile(p.vers, &p.latestVersionState.virtualBackings, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum, env.problemSpans)
1466 1 : if !ok {
1467 1 : continue
1468 : }
1469 :
1470 1 : pc := pickAutoLPositive(env, p.opts, p.vers, p.latestVersionState.l0Organizer, *info, p.baseLevel)
1471 1 : if pc != nil {
1472 1 : p.addScoresToPickedCompactionMetrics(pc, scores)
1473 1 : pc.score = info.score
1474 1 : if false {
1475 0 : p.logCompactionForTesting(env, scores, pc)
1476 0 : }
1477 1 : return pc
1478 : }
1479 : }
1480 1 : return nil
1481 : }
1482 :
1483 : // pickAutoNonScore picks the best non-score-based compaction, if any.
1484 1 : func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc pickedCompaction) {
1485 1 : // Check for files which contain excessive point tombstones that could slow
1486 1 : // down reads. Unlike elision-only compactions, these compactions may select
1487 1 : // a file at any level rather than only the lowest level.
1488 1 : if pc := p.pickTombstoneDensityCompaction(env); pc != nil {
1489 1 : return pc
1490 1 : }
1491 :
1492 : // Check for L6 files with tombstones that may be elided. These files may
1493 : // exist if a snapshot prevented the elision of a tombstone or because of
1494 : // a move compaction. These are low-priority compactions because they
1495 : // don't help us keep up with writes, just reclaim disk space.
1496 1 : if pc := p.pickElisionOnlyCompaction(env); pc != nil {
1497 1 : return pc
1498 1 : }
1499 :
1500 : // Check for virtual SST rewrites. These compactions materialize virtual tables
1501 : // to reclaim space in backing files with low utilization.
1502 1 : if pc := p.pickVirtualRewriteCompaction(env); pc != nil {
1503 1 : return pc
1504 1 : }
1505 :
1506 : // Check for blob file rewrites. These are low-priority compactions because
1507 : // they don't help us keep up with writes, just reclaim disk space.
1508 1 : if pc := p.pickBlobFileRewriteCompactionLowPriority(env); pc != nil {
1509 1 : return pc
1510 1 : }
1511 :
1512 1 : if pc := p.pickReadTriggeredCompaction(env); pc != nil {
1513 0 : return pc
1514 0 : }
1515 :
1516 : // NB: This should only be run if a read compaction wasn't
1517 : // scheduled.
1518 : //
1519 : // We won't be scheduling a read compaction right now, and in
1520 : // read heavy workloads, compactions won't be scheduled frequently
1521 : // because flushes aren't frequent. So we need to signal to the
1522 : // iterator to schedule a compaction when it adds compactions to
1523 : // the read compaction queue.
1524 : //
1525 : // We need the nil check here because without it, we have some
1526 : // tests which don't set that variable fail. Since there's a
1527 : // chance that one of those tests wouldn't want extra compactions
1528 : // to be scheduled, I added this check here, instead of
1529 : // setting rescheduleReadCompaction in those tests.
1530 1 : if env.readCompactionEnv.rescheduleReadCompaction != nil {
1531 1 : *env.readCompactionEnv.rescheduleReadCompaction = true
1532 1 : }
1533 :
1534 : // At the lowest possible compaction-picking priority, look for files marked
1535 : // for compaction. Pebble will mark files for compaction if they have atomic
1536 : // compaction units that span multiple files. While current Pebble code does
1537 : // not construct such sstables, RocksDB and earlier versions of Pebble may
1538 : // have created them. These split user keys form sets of files that must be
1539 : // compacted together for correctness (referred to as "atomic compaction
1540 : // units" within the code). Rewrite them in-place.
1541 : //
1542 : // It's also possible that a file may have been marked for compaction by
1543 : // even earlier versions of Pebble code, since TableMetadata's
1544 : // MarkedForCompaction field is persisted in the manifest. That's okay. We
1545 : // previously would've ignored the designation, whereas now we'll re-compact
1546 : // the file in place.
1547 1 : if p.vers.MarkedForCompaction.Count() > 0 {
1548 0 : if pc := p.pickRewriteCompaction(env); pc != nil {
1549 0 : return pc
1550 0 : }
1551 : }
1552 :
1553 1 : return nil
1554 : }
1555 :
1556 : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
1557 : pc *pickedTableCompaction, candInfo [numLevels]candidateLevelInfo,
1558 1 : ) {
1559 1 :
1560 1 : // candInfo is sorted by score, not by compaction level.
1561 1 : infoByLevel := [numLevels]candidateLevelInfo{}
1562 1 : for i := range candInfo {
1563 1 : level := candInfo[i].level
1564 1 : infoByLevel[level] = candInfo[i]
1565 1 : }
1566 : // Gather the compaction scores for the levels participating in the compaction.
1567 1 : pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
1568 1 : inputIdx := 0
1569 1 : for i := range infoByLevel {
1570 1 : if pc.inputs[inputIdx].level == infoByLevel[i].level {
1571 1 : pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].score
1572 1 : inputIdx++
1573 1 : }
1574 1 : if inputIdx == len(pc.inputs) {
1575 1 : break
1576 : }
1577 : }
1578 : }
1579 :
1580 : // elisionOnlyAnnotator is a manifest.TableAnnotator that annotates B-Tree
1581 : // nodes with the *fileMetadata of a file meeting the obsolete keys criteria
1582 : // for an elision-only compaction within the subtree. If multiple files meet
1583 : // the criteria, it chooses whichever file has the lowest LargestSeqNum. The
1584 : // lowest LargestSeqNum file will be the first eligible for an elision-only
1585 : // compaction once snapshots less than or equal to its LargestSeqNum are closed.
1586 : var elisionOnlyAnnotator = manifest.MakePickFileAnnotator(
1587 : manifest.NewTableAnnotationIdx(),
1588 : manifest.PickFileAnnotatorFuncs{
1589 1 : Filter: func(f *manifest.TableMetadata) (eligible bool, cacheOK bool) {
1590 1 : if f.IsCompacting() {
1591 1 : return false, true
1592 1 : }
1593 :
1594 1 : backingProps, backingPropsValid := f.TableBacking.Properties()
1595 1 : stats, statsValid := f.Stats()
1596 1 : if !backingPropsValid || !statsValid {
1597 1 : return false, false
1598 1 : }
1599 :
1600 : // Bottommost files are large and not worthwhile to compact just
1601 : // to remove a few tombstones. Consider a file eligible only if
1602 : // either its own range deletions delete at least 10% of its data or
1603 : // its deletion tombstones make at least 10% of its entries.
1604 : //
1605 : // TODO(jackson): This does not account for duplicate user keys
1606 : // which may be collapsed. Ideally, we would have 'obsolete keys'
1607 : // statistics that would include tombstones, the keys that are
1608 : // dropped by tombstones and duplicated user keys. See #847.
1609 : //
1610 : // Note that tables that contain exclusively range keys (i.e. no point keys,
1611 : // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
1612 : // from elision-only compactions.
1613 : // TODO(travers): Consider an alternative heuristic for elision of range-keys.
1614 1 : eligible = stats.RangeDeletionsBytesEstimate*10 >= f.Size || backingProps.NumDeletions*10 > backingProps.NumEntries
1615 1 : return eligible, true
1616 : },
1617 1 : Compare: func(f1 *manifest.TableMetadata, f2 *manifest.TableMetadata) bool {
1618 1 : return f1.LargestSeqNum < f2.LargestSeqNum
1619 1 : },
1620 : },
1621 : )
1622 :
1623 : // pickedCompactionFromCandidateFile creates a pickedCompaction from a *fileMetadata
1624 : // with various checks to ensure that the file still exists in the expected level
1625 : // and isn't already being compacted.
1626 : func (p *compactionPickerByScore) pickedCompactionFromCandidateFile(
1627 : candidate *manifest.TableMetadata,
1628 : env compactionEnv,
1629 : startLevel int,
1630 : outputLevel int,
1631 : kind compactionKind,
1632 1 : ) *pickedTableCompaction {
1633 1 : if candidate == nil || candidate.IsCompacting() {
1634 1 : return nil
1635 1 : }
1636 :
1637 1 : var inputs manifest.LevelSlice
1638 1 : if startLevel == 0 && outputLevel > 0 {
1639 1 : // Overlapping L0 files must also be compacted alongside the candidate.
1640 1 : // Some compactions attempt to rewrite a file in place (e.g. virtual rewrite)
1641 1 : // so we only do this for L0->Lbase compactions.
1642 1 : inputs = p.vers.Overlaps(0, candidate.UserKeyBounds())
1643 1 : } else {
1644 1 : inputs = p.vers.Levels[startLevel].Find(p.opts.Comparer.Compare, candidate)
1645 1 : }
1646 1 : if invariants.Enabled {
1647 1 : found := false
1648 1 : for f := range inputs.All() {
1649 1 : if f.TableNum == candidate.TableNum {
1650 1 : found = true
1651 1 : }
1652 : }
1653 1 : if !found {
1654 0 : panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.TableNum, startLevel))
1655 : }
1656 : }
1657 :
1658 1 : pc := newPickedTableCompaction(p.opts, p.vers, p.latestVersionState.l0Organizer,
1659 1 : startLevel, outputLevel, p.baseLevel)
1660 1 : pc.kind = kind
1661 1 : pc.startLevel.files = inputs
1662 1 :
1663 1 : if !pc.setupInputs(p.opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
1664 1 : return nil
1665 1 : }
1666 1 : return pc
1667 : }
1668 :
1669 : // pickElisionOnlyCompaction looks for compactions of sstables in the
1670 : // bottommost level containing obsolete records that may now be dropped.
1671 : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
1672 : env compactionEnv,
1673 1 : ) (pc *pickedTableCompaction) {
1674 1 : if p.opts.private.disableElisionOnlyCompactions {
1675 1 : return nil
1676 1 : }
1677 1 : candidate := elisionOnlyAnnotator.LevelAnnotation(p.vers.Levels[numLevels-1])
1678 1 : if candidate == nil {
1679 1 : return nil
1680 1 : }
1681 1 : if candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
1682 1 : return nil
1683 1 : }
1684 1 : return p.pickedCompactionFromCandidateFile(candidate, env, numLevels-1, numLevels-1, compactionKindElisionOnly)
1685 : }
1686 :
1687 : // pickRewriteCompaction attempts to construct a compaction that
1688 : // rewrites a file marked for compaction. pickRewriteCompaction will
1689 : // pull in adjacent files in the file's atomic compaction unit if
1690 : // necessary. A rewrite compaction outputs files to the same level as
1691 : // the input level.
1692 : func (p *compactionPickerByScore) pickRewriteCompaction(
1693 : env compactionEnv,
1694 0 : ) (pc *pickedTableCompaction) {
1695 0 : for candidate, level := range p.vers.MarkedForCompaction.Ascending() {
1696 0 : if pc := p.pickedCompactionFromCandidateFile(candidate, env, level, level, compactionKindRewrite); pc != nil {
1697 0 : return pc
1698 0 : }
1699 : }
1700 0 : return nil
1701 : }
1702 :
1703 : // pickVirtualRewriteCompaction looks for backing tables that have a low percentage
1704 : // of referenced data and materializes their virtual sstables.
1705 : func (p *compactionPickerByScore) pickVirtualRewriteCompaction(
1706 : env compactionEnv,
1707 1 : ) *pickedTableCompaction {
1708 1 :
1709 1 : for _, c := range env.inProgressCompactions {
1710 1 : // Allow only one virtual rewrite compaction at a time.
1711 1 : if c.kind == compactionKindVirtualRewrite {
1712 1 : return nil
1713 1 : }
1714 : }
1715 :
1716 : // We'll pick one virtual table at a time to materialize. This works with our
1717 : // compaction system, which currently doesn't support outputting to multiple levels
1718 : // or selecting files that aren't contiguous in a level. Successfully materializing
1719 : // one of the backing's virtual table will also make the backing more likely to be
1720 : // picked again, since the space amp will increase.
1721 1 : referencedDataPct, _, vtablesByLevel := p.latestVersionState.virtualBackings.ReplacementCandidate()
1722 1 :
1723 1 : if 1-referencedDataPct < p.opts.Experimental.VirtualTableRewriteUnreferencedFraction() {
1724 1 : return nil
1725 1 : }
1726 :
1727 1 : for level, tables := range vtablesByLevel {
1728 1 : for _, vt := range tables {
1729 1 : if vt.IsCompacting() {
1730 1 : continue
1731 : }
1732 1 : if pc := p.pickedCompactionFromCandidateFile(vt, env, level, level, compactionKindVirtualRewrite); pc != nil {
1733 1 : return pc
1734 1 : }
1735 : }
1736 : }
1737 :
1738 1 : return nil
1739 : }
1740 :
1741 : // pickBlobFileRewriteCompactionHighPriority picks a compaction that rewrites a
1742 : // blob file to reclaim disk space if the heuristics for high-priority blob file
1743 : // rewrites are met.
1744 : func (p *compactionPickerByScore) pickBlobFileRewriteCompactionHighPriority(
1745 : env compactionEnv,
1746 1 : ) (pc *pickedBlobFileCompaction) {
1747 1 : policy := p.opts.Experimental.ValueSeparationPolicy()
1748 1 : if policy.GarbageRatioHighPriority >= 1.0 {
1749 1 : // High-priority blob file rewrite compactions are disabled.
1750 1 : return nil
1751 1 : }
1752 1 : aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
1753 1 : if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
1754 1 : // No blob files with any garbage to rewrite.
1755 1 : return nil
1756 1 : }
1757 :
1758 1 : garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
1759 1 : float64(aggregateStats.ValueSize)
1760 1 : if garbagePct <= policy.GarbageRatioHighPriority {
1761 1 : // Not enough garbage to warrant a rewrite compaction.
1762 1 : return nil
1763 1 : }
1764 1 : pc = p.pickBlobFileRewriteCandidate(env)
1765 1 : if pc != nil {
1766 1 : pc.highPriority = true
1767 1 : }
1768 1 : return pc
1769 : }
1770 :
1771 : // pickBlobFileRewriteCompactionLowPriority picks a compaction that rewrites a
1772 : // blob file to reclaim disk space if the heuristics for low-priority blob file
1773 : // rewrites are met.
1774 : func (p *compactionPickerByScore) pickBlobFileRewriteCompactionLowPriority(
1775 : env compactionEnv,
1776 1 : ) (pc *pickedBlobFileCompaction) {
1777 1 : aggregateStats, heuristicStats := p.latestVersionState.blobFiles.Stats()
1778 1 : if heuristicStats.CountFilesEligible == 0 && heuristicStats.CountFilesTooRecent == 0 {
1779 1 : // No blob files with any garbage to rewrite.
1780 1 : return nil
1781 1 : }
1782 1 : policy := p.opts.Experimental.ValueSeparationPolicy()
1783 1 : if policy.GarbageRatioLowPriority >= 1.0 {
1784 1 : // Blob file rewrite compactions are disabled.
1785 1 : return nil
1786 1 : }
1787 :
1788 : // We want to use ReferencedValueSize here instead of ReferencedBackingValueSize
1789 : // to get the estimate of live data in blob files. We'll check below to see if there
1790 : // are actually any candidates with garbage to reclaim.
1791 1 : garbagePct := float64(aggregateStats.ValueSize-aggregateStats.ReferencedValueSize) /
1792 1 : float64(aggregateStats.ValueSize)
1793 1 : if garbagePct <= policy.GarbageRatioLowPriority {
1794 1 : // Not enough garbage to warrant a rewrite compaction.
1795 1 : return nil
1796 1 : }
1797 1 : return p.pickBlobFileRewriteCandidate(env)
1798 : }
1799 :
1800 : func (p *compactionPickerByScore) pickBlobFileRewriteCandidate(
1801 : env compactionEnv,
1802 1 : ) (pc *pickedBlobFileCompaction) {
1803 1 : // Check if there is an ongoing blob file rewrite compaction. If there is,
1804 1 : // don't schedule a new one.
1805 1 : for _, c := range env.inProgressCompactions {
1806 1 : if c.kind == compactionKindBlobFileRewrite {
1807 1 : return nil
1808 1 : }
1809 : }
1810 1 : candidate, ok := p.latestVersionState.blobFiles.ReplacementCandidate()
1811 1 : if !ok {
1812 1 : // None meet the heuristic.
1813 1 : return nil
1814 1 : }
1815 1 : return &pickedBlobFileCompaction{
1816 1 : vers: p.vers,
1817 1 : file: candidate,
1818 1 : referencingTables: p.latestVersionState.blobFiles.ReferencingTables(candidate.FileID),
1819 1 : }
1820 : }
1821 :
1822 : // pickTombstoneDensityCompaction looks for a compaction that eliminates
1823 : // regions of extremely high point tombstone density. For each level, it picks
1824 : // a file where the ratio of tombstone-dense blocks is at least
1825 : // options.Experimental.MinTombstoneDenseRatio, prioritizing compaction of
1826 : // files with higher ratios of tombstone-dense blocks.
1827 : func (p *compactionPickerByScore) pickTombstoneDensityCompaction(
1828 : env compactionEnv,
1829 1 : ) (pc *pickedTableCompaction) {
1830 1 : if p.opts.Experimental.TombstoneDenseCompactionThreshold <= 0 {
1831 0 : // Tombstone density compactions are disabled.
1832 0 : return nil
1833 0 : }
1834 :
1835 1 : var candidate *manifest.TableMetadata
1836 1 : var candidateTombstoneDenseBlocksRatio float64
1837 1 : var level int
1838 1 : // If a candidate file has a very high overlapping ratio, point tombstones
1839 1 : // in it are likely sparse in keyspace even if the sstable itself is tombstone
1840 1 : // dense. These tombstones likely wouldn't be slow to iterate over, so we exclude
1841 1 : // these files from tombstone density compactions. The threshold of 40.0 is
1842 1 : // chosen somewhat arbitrarily, after some observations around excessively large
1843 1 : // tombstone density compactions.
1844 1 : const maxOverlappingRatio = 40.0
1845 1 : // NB: we don't consider the lowest level because elision-only compactions
1846 1 : // handle that case.
1847 1 : lastNonEmptyLevel := numLevels - 1
1848 1 : for l := numLevels - 2; l >= 0; l-- {
1849 1 : iter := p.vers.Levels[l].Iter()
1850 1 : for f := iter.First(); f != nil; f = iter.Next() {
1851 1 : if f.IsCompacting() || f.Size == 0 {
1852 1 : continue
1853 : }
1854 1 : props, propsValid := f.TableBacking.Properties()
1855 1 : if !propsValid {
1856 1 : continue
1857 : }
1858 1 : if props.TombstoneDenseBlocksRatio < p.opts.Experimental.TombstoneDenseCompactionThreshold {
1859 1 : continue
1860 : }
1861 1 : overlaps := p.vers.Overlaps(lastNonEmptyLevel, f.UserKeyBounds())
1862 1 : if float64(overlaps.AggregateSizeSum())/float64(f.Size) > maxOverlappingRatio {
1863 0 : continue
1864 : }
1865 1 : if candidate == nil || candidateTombstoneDenseBlocksRatio < props.TombstoneDenseBlocksRatio {
1866 1 : candidate = f
1867 1 : candidateTombstoneDenseBlocksRatio = props.TombstoneDenseBlocksRatio
1868 1 : level = l
1869 1 : }
1870 : }
1871 : // We prefer lower level (ie. L5) candidates over higher level (ie. L4) ones.
1872 1 : if candidate != nil {
1873 1 : break
1874 : }
1875 1 : if !p.vers.Levels[l].Empty() {
1876 1 : lastNonEmptyLevel = l
1877 1 : }
1878 : }
1879 :
1880 1 : return p.pickedCompactionFromCandidateFile(candidate, env, level, defaultOutputLevel(level, p.baseLevel), compactionKindTombstoneDensity)
1881 : }
1882 :
1883 : // pickAutoLPositive picks an automatic compaction for the candidate
1884 : // file in a positive-numbered level. This function must not be used for
1885 : // L0.
1886 : func pickAutoLPositive(
1887 : env compactionEnv,
1888 : opts *Options,
1889 : vers *manifest.Version,
1890 : l0Organizer *manifest.L0Organizer,
1891 : cInfo candidateLevelInfo,
1892 : baseLevel int,
1893 1 : ) (pc *pickedTableCompaction) {
1894 1 : if cInfo.level == 0 {
1895 0 : panic("pebble: pickAutoLPositive called for L0")
1896 : }
1897 :
1898 1 : pc = newPickedTableCompaction(opts, vers, l0Organizer, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
1899 1 : if pc.outputLevel.level != cInfo.outputLevel {
1900 0 : panic("pebble: compaction picked unexpected output level")
1901 : }
1902 1 : pc.startLevel.files = cInfo.file.Slice()
1903 1 :
1904 1 : if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
1905 1 : return nil
1906 1 : }
1907 1 : return pc.maybeAddLevel(opts, env)
1908 : }
1909 :
1910 : // maybeAddLevel maybe adds a level to the picked compaction.
1911 : // Multilevel compactions are only allowed if the max compaction concurrency
1912 : // is greater than 1, and there are no in-progress multi-level compactions.
1913 : func (pc *pickedTableCompaction) maybeAddLevel(
1914 : opts *Options, env compactionEnv,
1915 1 : ) *pickedTableCompaction {
1916 1 : pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
1917 1 : if pc.outputLevel.level == numLevels-1 {
1918 1 : // Don't add a level if the current output level is in L6.
1919 1 : return pc
1920 1 : }
1921 : // We allow at most one in-progress multiLevel compaction at any time.
1922 1 : for _, c := range env.inProgressCompactions {
1923 1 : if len(c.inputs) > 2 {
1924 1 : return pc
1925 1 : }
1926 : }
1927 1 : _, upper := opts.CompactionConcurrencyRange()
1928 1 : if upper == 1 {
1929 1 : // If the maximum compaction concurrency is 1, avoid picking a multi-level compactions
1930 1 : // as they could block compactions from L0.
1931 1 : return pc
1932 1 : }
1933 1 : if !opts.Experimental.MultiLevelCompactionHeuristic().allowL0() && pc.startLevel.level == 0 {
1934 1 : return pc
1935 1 : }
1936 1 : targetFileSize := opts.TargetFileSize(pc.outputLevel.level, pc.baseLevel)
1937 1 : if pc.estimatedInputSize() > expandedCompactionByteSizeLimit(opts, targetFileSize, env.diskAvailBytes) {
1938 0 : // Don't add a level if the current compaction exceeds the compaction size limit
1939 0 : return pc
1940 0 : }
1941 1 : return opts.Experimental.MultiLevelCompactionHeuristic().pick(pc, opts, env)
1942 : }
1943 :
1944 : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
1945 : type MultiLevelHeuristic interface {
1946 : // Evaluate returns the preferred compaction.
1947 : pick(pc *pickedTableCompaction, opts *Options, env compactionEnv) *pickedTableCompaction
1948 :
1949 : // Returns if the heuristic allows L0 to be involved in ML compaction
1950 : allowL0() bool
1951 :
1952 : // String implements fmt.Stringer.
1953 : String() string
1954 : }
1955 :
1956 : // NoMultiLevel will never add an additional level to the compaction.
1957 : type NoMultiLevel struct{}
1958 :
1959 : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
1960 :
1961 1 : func OptionNoMultiLevel() MultiLevelHeuristic {
1962 1 : return NoMultiLevel{}
1963 1 : }
1964 :
1965 : func (nml NoMultiLevel) pick(
1966 : pc *pickedTableCompaction, opts *Options, env compactionEnv,
1967 1 : ) *pickedTableCompaction {
1968 1 : return pc
1969 1 : }
1970 :
1971 1 : func (nml NoMultiLevel) allowL0() bool { return false }
1972 1 : func (nml NoMultiLevel) String() string { return "none" }
1973 :
1974 1 : func (pc *pickedTableCompaction) predictedWriteAmp() float64 {
1975 1 : var bytesToCompact uint64
1976 1 : var higherLevelBytes uint64
1977 1 : for i := range pc.inputs {
1978 1 : levelSize := pc.inputs[i].files.AggregateSizeSum()
1979 1 : bytesToCompact += levelSize
1980 1 : if i != len(pc.inputs)-1 {
1981 1 : higherLevelBytes += levelSize
1982 1 : }
1983 : }
1984 1 : return float64(bytesToCompact) / float64(higherLevelBytes)
1985 : }
1986 :
1987 1 : func (pc *pickedTableCompaction) overlappingRatio() float64 {
1988 1 : var higherLevelBytes uint64
1989 1 : var lowestLevelBytes uint64
1990 1 : for i := range pc.inputs {
1991 1 : levelSize := pc.inputs[i].files.AggregateSizeSum()
1992 1 : if i == len(pc.inputs)-1 {
1993 1 : lowestLevelBytes += levelSize
1994 1 : continue
1995 : }
1996 1 : higherLevelBytes += levelSize
1997 : }
1998 1 : return float64(lowestLevelBytes) / float64(higherLevelBytes)
1999 : }
2000 :
2001 : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
2002 : // an additional level to the picked compaction if it reduces predicted write
2003 : // amp of the compaction + the addPropensity constant.
2004 : type WriteAmpHeuristic struct {
2005 : // addPropensity is a constant that affects the propensity to conduct multilevel
2006 : // compactions. If positive, a multilevel compaction may get picked even if
2007 : // the single level compaction has lower write amp, and vice versa.
2008 : AddPropensity float64
2009 :
2010 : // AllowL0 if true, allow l0 to be involved in a ML compaction.
2011 : AllowL0 bool
2012 : }
2013 :
2014 : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
2015 :
2016 : // Default write amp heuristic with no propensity towards multi-level
2017 : // and no multilevel compactions involving L0.
2018 : var defaultWriteAmpHeuristic = &WriteAmpHeuristic{}
2019 :
2020 1 : func OptionWriteAmpHeuristic() MultiLevelHeuristic {
2021 1 : return defaultWriteAmpHeuristic
2022 1 : }
2023 :
2024 : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
2025 : // picking slows down the compaction picking process. This should be as fast as
2026 : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
2027 : // in-progress flushes and compactions from completing, etc. Consider ways to
2028 : // deduplicate work, given that setupInputs has already been called.
2029 : func (wa WriteAmpHeuristic) pick(
2030 : pcOrig *pickedTableCompaction, opts *Options, env compactionEnv,
2031 1 : ) *pickedTableCompaction {
2032 1 : pcMulti := pcOrig.clone()
2033 1 : if !pcMulti.setupMultiLevelCandidate(opts, env) {
2034 1 : return pcOrig
2035 1 : }
2036 : // We consider the addition of a level as an "expansion" of the compaction.
2037 : // If pcMulti is past the expanded compaction byte size limit already,
2038 : // we don't consider it.
2039 1 : targetFileSize := opts.TargetFileSize(pcMulti.outputLevel.level, pcMulti.baseLevel)
2040 1 : if pcMulti.estimatedInputSize() >= expandedCompactionByteSizeLimit(opts, targetFileSize, env.diskAvailBytes) {
2041 0 : return pcOrig
2042 0 : }
2043 1 : picked := pcOrig
2044 1 : if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
2045 1 : picked = pcMulti
2046 1 : }
2047 : // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
2048 1 : picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
2049 1 : return picked
2050 : }
2051 :
2052 1 : func (wa WriteAmpHeuristic) allowL0() bool {
2053 1 : return wa.AllowL0
2054 1 : }
2055 :
2056 : // String implements fmt.Stringer.
2057 1 : func (wa WriteAmpHeuristic) String() string {
2058 1 : return fmt.Sprintf("wamp(%.2f, %t)", wa.AddPropensity, wa.AllowL0)
2059 1 : }
2060 :
2061 : // Helper method to pick compactions originating from L0. Uses information about
2062 : // sublevels to generate a compaction.
2063 : func pickL0(
2064 : env compactionEnv,
2065 : opts *Options,
2066 : vers *manifest.Version,
2067 : l0Organizer *manifest.L0Organizer,
2068 : baseLevel int,
2069 1 : ) *pickedTableCompaction {
2070 1 : // It is important to pass information about Lbase files to L0Sublevels
2071 1 : // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
2072 1 : // compaction. Without this, we observed reduced concurrency of L0=>Lbase
2073 1 : // compactions, and increasing read amplification in L0.
2074 1 : //
2075 1 : // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
2076 1 : // has been shown to not cause a performance regression.
2077 1 : lcf := l0Organizer.PickBaseCompaction(opts.Logger, 1, vers.Levels[baseLevel].Slice(), baseLevel, env.problemSpans)
2078 1 : if lcf != nil {
2079 1 : pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, true)
2080 1 : if pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
2081 1 : if pc.startLevel.files.Empty() {
2082 0 : opts.Logger.Errorf("%v", base.AssertionFailedf("empty compaction chosen"))
2083 0 : }
2084 1 : return pc.maybeAddLevel(opts, env)
2085 : }
2086 : // TODO(radu): investigate why this happens.
2087 : // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
2088 : }
2089 :
2090 : // Couldn't choose a base compaction. Try choosing an intra-L0
2091 : // compaction. Note that we pass in L0CompactionThreshold here as opposed to
2092 : // 1, since choosing a single sublevel intra-L0 compaction is
2093 : // counterproductive.
2094 1 : lcf = l0Organizer.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count, env.problemSpans)
2095 1 : if lcf != nil {
2096 1 : pc := newPickedCompactionFromL0(lcf, opts, vers, l0Organizer, baseLevel, false)
2097 1 : if pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
2098 1 : if pc.startLevel.files.Empty() {
2099 0 : opts.Logger.Fatalf("empty compaction chosen")
2100 0 : }
2101 : // A single-file intra-L0 compaction is unproductive.
2102 1 : if iter := pc.startLevel.files.Iter(); iter.First() != nil && iter.Next() != nil {
2103 1 : pc.bounds = manifest.KeyRange(opts.Comparer.Compare, pc.startLevel.files.All())
2104 1 : return pc
2105 1 : }
2106 0 : } else {
2107 0 : // TODO(radu): investigate why this happens.
2108 0 : // opts.Logger.Errorf("%v", base.AssertionFailedf("setupInputs failed"))
2109 0 : }
2110 : }
2111 1 : return nil
2112 : }
2113 :
2114 : func newPickedManualCompaction(
2115 : vers *manifest.Version,
2116 : l0Organizer *manifest.L0Organizer,
2117 : opts *Options,
2118 : env compactionEnv,
2119 : baseLevel int,
2120 : manual *manualCompaction,
2121 1 : ) (pc *pickedTableCompaction, retryLater bool) {
2122 1 : outputLevel := manual.level + 1
2123 1 : if manual.level == 0 {
2124 1 : outputLevel = baseLevel
2125 1 : } else if manual.level < baseLevel {
2126 1 : // The start level for a compaction must be >= Lbase. A manual
2127 1 : // compaction could have been created adhering to that condition, and
2128 1 : // then an automatic compaction came in and compacted all of the
2129 1 : // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
2130 1 : // ignore this manual compaction as there is nothing to do (manual.level
2131 1 : // points to an empty level).
2132 1 : return nil, false
2133 1 : }
2134 : // This conflictsWithInProgress call is necessary for the manual compaction to
2135 : // be retried when it conflicts with an ongoing automatic compaction. Without
2136 : // it, the compaction is dropped due to pc.setupInputs returning false since
2137 : // the input/output range is already being compacted, and the manual
2138 : // compaction ends with a non-compacted LSM.
2139 1 : if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
2140 1 : return nil, true
2141 1 : }
2142 1 : pc = newPickedTableCompaction(opts, vers, l0Organizer, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
2143 1 : pc.manualID = manual.id
2144 1 : manual.outputLevel = pc.outputLevel.level
2145 1 : pc.startLevel.files = vers.Overlaps(manual.level, base.UserKeyBoundsInclusive(manual.start, manual.end))
2146 1 : if pc.startLevel.files.Empty() {
2147 1 : // Nothing to do
2148 1 : return nil, false
2149 1 : }
2150 : // We use nil problemSpans because we don't want problem spans to prevent
2151 : // manual compactions.
2152 1 : if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, nil /* problemSpans */) {
2153 1 : // setupInputs returned false indicating there's a conflicting
2154 1 : // concurrent compaction.
2155 1 : return nil, true
2156 1 : }
2157 1 : if pc = pc.maybeAddLevel(opts, env); pc == nil {
2158 0 : return nil, false
2159 0 : }
2160 1 : if pc.outputLevel.level != outputLevel {
2161 1 : if len(pc.inputs) > 2 {
2162 1 : // Multilevel compactions relax this invariant.
2163 1 : } else {
2164 0 : panic("pebble: compaction picked unexpected output level")
2165 : }
2166 : }
2167 1 : return pc, false
2168 : }
2169 :
2170 : // pickDownloadCompaction picks a download compaction for the downloadSpan,
2171 : // which could be specified as being performed either by a copy compaction of
2172 : // the backing file or a rewrite compaction.
2173 : func pickDownloadCompaction(
2174 : vers *manifest.Version,
2175 : l0Organizer *manifest.L0Organizer,
2176 : opts *Options,
2177 : env compactionEnv,
2178 : baseLevel int,
2179 : kind compactionKind,
2180 : level int,
2181 : file *manifest.TableMetadata,
2182 1 : ) (pc *pickedTableCompaction) {
2183 1 : // Check if the file is compacting already.
2184 1 : if file.CompactionState == manifest.CompactionStateCompacting {
2185 0 : return nil
2186 0 : }
2187 1 : if kind != compactionKindCopy && kind != compactionKindRewrite {
2188 0 : panic("invalid download/rewrite compaction kind")
2189 : }
2190 1 : pc = newPickedTableCompaction(opts, vers, l0Organizer, level, level, baseLevel)
2191 1 : pc.kind = kind
2192 1 : pc.startLevel.files = manifest.NewLevelSliceKeySorted(opts.Comparer.Compare, []*manifest.TableMetadata{file})
2193 1 : if !pc.setupInputs(opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, nil /* problemSpans */) {
2194 0 : // setupInputs returned false indicating there's a conflicting
2195 0 : // concurrent compaction.
2196 0 : return nil
2197 0 : }
2198 1 : if pc.outputLevel.level != level {
2199 0 : panic("pebble: download compaction picked unexpected output level")
2200 : }
2201 1 : return pc
2202 : }
2203 :
2204 : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
2205 : env compactionEnv,
2206 1 : ) (pc *pickedTableCompaction) {
2207 1 : // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
2208 1 : // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
2209 1 : // lower priority.
2210 1 : if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
2211 1 : return nil
2212 1 : }
2213 1 : for env.readCompactionEnv.readCompactions.size > 0 {
2214 0 : rc := env.readCompactionEnv.readCompactions.remove()
2215 0 : if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
2216 0 : break
2217 : }
2218 : }
2219 1 : return pc
2220 : }
2221 :
2222 : func pickReadTriggeredCompactionHelper(
2223 : p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
2224 0 : ) (pc *pickedTableCompaction) {
2225 0 : overlapSlice := p.vers.Overlaps(rc.level, base.UserKeyBoundsInclusive(rc.start, rc.end))
2226 0 : var fileMatches bool
2227 0 : for f := range overlapSlice.All() {
2228 0 : if f.TableNum == rc.tableNum {
2229 0 : fileMatches = true
2230 0 : break
2231 : }
2232 : }
2233 0 : if !fileMatches {
2234 0 : return nil
2235 0 : }
2236 :
2237 0 : pc = newPickedTableCompaction(p.opts, p.vers, p.latestVersionState.l0Organizer,
2238 0 : rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
2239 0 :
2240 0 : pc.startLevel.files = overlapSlice
2241 0 : if !pc.setupInputs(p.opts, env.diskAvailBytes, env.inProgressCompactions, pc.startLevel, env.problemSpans) {
2242 0 : return nil
2243 0 : }
2244 0 : pc.kind = compactionKindRead
2245 0 :
2246 0 : // Prevent read compactions which are too wide.
2247 0 : outputOverlaps := pc.version.Overlaps(pc.outputLevel.level, pc.bounds)
2248 0 : if outputOverlaps.AggregateSizeSum() > pc.maxReadCompactionBytes {
2249 0 : return nil
2250 0 : }
2251 :
2252 : // Prevent compactions which start with a small seed file X, but overlap
2253 : // with over allowedCompactionWidth * X file sizes in the output layer.
2254 0 : const allowedCompactionWidth = 35
2255 0 : if outputOverlaps.AggregateSizeSum() > overlapSlice.AggregateSizeSum()*allowedCompactionWidth {
2256 0 : return nil
2257 0 : }
2258 :
2259 0 : return pc
2260 : }
2261 :
2262 0 : func (p *compactionPickerByScore) forceBaseLevel1() {
2263 0 : p.baseLevel = 1
2264 0 : }
2265 :
2266 : // outputKeyRangeAlreadyCompacting checks if the input range of the picked
2267 : // compaction is already being written to by an in-progress compaction.
2268 : func outputKeyRangeAlreadyCompacting(
2269 : cmp base.Compare, inProgressCompactions []compactionInfo, pc *pickedTableCompaction,
2270 1 : ) bool {
2271 1 : // Look for active compactions outputting to the same region of the key
2272 1 : // space in the same output level. Two potential compactions may conflict
2273 1 : // without sharing input files if there are no files in the output level
2274 1 : // that overlap with the intersection of the compactions' key spaces.
2275 1 : //
2276 1 : // Consider an active L0->Lbase compaction compacting two L0 files one
2277 1 : // [a-f] and the other [t-z] into Lbase.
2278 1 : //
2279 1 : // L0
2280 1 : // ↦ 000100 ↤ ↦ 000101 ↤
2281 1 : // L1
2282 1 : // ↦ 000004 ↤
2283 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2284 1 : //
2285 1 : // If a new file 000102 [j-p] is flushed while the existing compaction is
2286 1 : // still ongoing, new file would not be in any compacting sublevel
2287 1 : // intervals and would not overlap with any Lbase files that are also
2288 1 : // compacting. However, this compaction cannot be picked because the
2289 1 : // compaction's output key space [j-p] would overlap the existing
2290 1 : // compaction's output key space [a-z].
2291 1 : //
2292 1 : // L0
2293 1 : // ↦ 000100* ↤ ↦ 000102 ↤ ↦ 000101* ↤
2294 1 : // L1
2295 1 : // ↦ 000004* ↤
2296 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2297 1 : //
2298 1 : // * - currently compacting
2299 1 : if pc.outputLevel != nil && pc.outputLevel.level != 0 {
2300 1 : for _, c := range inProgressCompactions {
2301 1 : if pc.outputLevel.level != c.outputLevel {
2302 1 : continue
2303 : }
2304 1 : if !c.bounds.Overlaps(cmp, &pc.bounds) {
2305 1 : continue
2306 : }
2307 : // The picked compaction and the in-progress compaction c are
2308 : // outputting to the same region of the key space of the same
2309 : // level.
2310 1 : return true
2311 : }
2312 : }
2313 1 : return false
2314 : }
2315 :
2316 : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
2317 : func conflictsWithInProgress(
2318 : manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
2319 1 : ) bool {
2320 1 : for _, c := range inProgressCompactions {
2321 1 : if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
2322 1 : areUserKeysOverlapping(manual.start, manual.end, c.bounds.Start, c.bounds.End.Key, cmp) {
2323 1 : return true
2324 1 : }
2325 1 : for _, in := range c.inputs {
2326 1 : if in.files.Empty() {
2327 1 : continue
2328 : }
2329 1 : iter := in.files.Iter()
2330 1 : smallest := iter.First().Smallest().UserKey
2331 1 : largest := iter.Last().Largest().UserKey
2332 1 : if (in.level == manual.level || in.level == outputLevel) &&
2333 1 : areUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
2334 1 : return true
2335 1 : }
2336 : }
2337 : }
2338 1 : return false
2339 : }
2340 :
2341 1 : func areUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
2342 1 : return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
2343 1 : }
|