Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "math"
11 : "sort"
12 : "strings"
13 :
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/humanize"
16 : "github.com/cockroachdb/pebble/internal/manifest"
17 : )
18 :
19 : // The minimum count for an intra-L0 compaction. This matches the RocksDB
20 : // heuristic.
21 : const minIntraL0Count = 4
22 :
23 : type compactionEnv struct {
24 : // diskAvailBytes holds a statistic on the number of bytes available on
25 : // disk, as reported by the filesystem. It's used to be more restrictive in
26 : // expanding compactions if available disk space is limited.
27 : //
28 : // The cached value (d.diskAvailBytes) is updated whenever a file is deleted
29 : // and whenever a compaction or flush completes. Since file removal is the
30 : // primary means of reclaiming space, there is a rough bound on the
31 : // statistic's staleness when available bytes is growing. Compactions and
32 : // flushes are longer, slower operations and provide a much looser bound
33 : // when available bytes is decreasing.
34 : diskAvailBytes uint64
35 : earliestUnflushedSeqNum uint64
36 : earliestSnapshotSeqNum uint64
37 : inProgressCompactions []compactionInfo
38 : readCompactionEnv readCompactionEnv
39 : }
40 :
41 : type compactionPicker interface {
42 : getScores([]compactionInfo) [numLevels]float64
43 : getBaseLevel() int
44 : estimatedCompactionDebt(l0ExtraSize uint64) uint64
45 : pickAuto(env compactionEnv) (pc *pickedCompaction)
46 : pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)
47 : pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction)
48 : pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
49 : forceBaseLevel1()
50 : }
51 :
52 : // readCompactionEnv is used to hold data required to perform read compactions
53 : type readCompactionEnv struct {
54 : rescheduleReadCompaction *bool
55 : readCompactions *readCompactionQueue
56 : flushing bool
57 : }
58 :
59 : // Information about in-progress compactions provided to the compaction picker.
60 : // These are used to constrain the new compactions that will be picked.
61 : type compactionInfo struct {
62 : // versionEditApplied is true if this compaction's version edit has already
63 : // been committed. The compaction may still be in-progress deleting newly
64 : // obsolete files.
65 : versionEditApplied bool
66 : inputs []compactionLevel
67 : outputLevel int
68 : smallest InternalKey
69 : largest InternalKey
70 : }
71 :
72 0 : func (info compactionInfo) String() string {
73 0 : var buf bytes.Buffer
74 0 : var largest int
75 0 : for i, in := range info.inputs {
76 0 : if i > 0 {
77 0 : fmt.Fprintf(&buf, " -> ")
78 0 : }
79 0 : fmt.Fprintf(&buf, "L%d", in.level)
80 0 : in.files.Each(func(m *fileMetadata) {
81 0 : fmt.Fprintf(&buf, " %s", m.FileNum)
82 0 : })
83 0 : if largest < in.level {
84 0 : largest = in.level
85 0 : }
86 : }
87 0 : if largest != info.outputLevel || len(info.inputs) == 1 {
88 0 : fmt.Fprintf(&buf, " -> L%d", info.outputLevel)
89 0 : }
90 0 : return buf.String()
91 : }
92 :
93 : type sortCompactionLevelsDecreasingScore []candidateLevelInfo
94 :
95 1 : func (s sortCompactionLevelsDecreasingScore) Len() int {
96 1 : return len(s)
97 1 : }
98 1 : func (s sortCompactionLevelsDecreasingScore) Less(i, j int) bool {
99 1 : if s[i].score != s[j].score {
100 1 : return s[i].score > s[j].score
101 1 : }
102 1 : return s[i].level < s[j].level
103 : }
104 1 : func (s sortCompactionLevelsDecreasingScore) Swap(i, j int) {
105 1 : s[i], s[j] = s[j], s[i]
106 1 : }
107 :
108 : // sublevelInfo is used to tag a LevelSlice for an L0 sublevel with the
109 : // sublevel.
110 : type sublevelInfo struct {
111 : manifest.LevelSlice
112 : sublevel manifest.Level
113 : }
114 :
115 0 : func (cl sublevelInfo) Clone() sublevelInfo {
116 0 : return sublevelInfo{
117 0 : sublevel: cl.sublevel,
118 0 : LevelSlice: cl.LevelSlice.Reslice(func(start, end *manifest.LevelIterator) {}),
119 : }
120 : }
121 0 : func (cl sublevelInfo) String() string {
122 0 : return fmt.Sprintf(`Sublevel %s; Levels %s`, cl.sublevel, cl.LevelSlice)
123 0 : }
124 :
125 : // generateSublevelInfo will generate the level slices for each of the sublevels
126 : // from the level slice for all of L0.
127 1 : func generateSublevelInfo(cmp base.Compare, levelFiles manifest.LevelSlice) []sublevelInfo {
128 1 : sublevelMap := make(map[uint64][]*fileMetadata)
129 1 : it := levelFiles.Iter()
130 1 : for f := it.First(); f != nil; f = it.Next() {
131 1 : sublevelMap[uint64(f.SubLevel)] = append(sublevelMap[uint64(f.SubLevel)], f)
132 1 : }
133 :
134 1 : var sublevels []int
135 1 : for level := range sublevelMap {
136 1 : sublevels = append(sublevels, int(level))
137 1 : }
138 1 : sort.Ints(sublevels)
139 1 :
140 1 : var levelSlices []sublevelInfo
141 1 : for _, sublevel := range sublevels {
142 1 : metas := sublevelMap[uint64(sublevel)]
143 1 : levelSlices = append(
144 1 : levelSlices,
145 1 : sublevelInfo{
146 1 : manifest.NewLevelSliceKeySorted(cmp, metas),
147 1 : manifest.L0Sublevel(sublevel),
148 1 : },
149 1 : )
150 1 : }
151 1 : return levelSlices
152 : }
153 :
154 : // compactionPickerMetrics holds metrics related to the compaction picking process
155 : type compactionPickerMetrics struct {
156 : scores []float64
157 : singleLevelOverlappingRatio float64
158 : multiLevelOverlappingRatio float64
159 : }
160 :
161 : // pickedCompaction contains information about a compaction that has already
162 : // been chosen, and is being constructed. Compaction construction info lives in
163 : // this struct, and is copied over into the compaction struct when that's
164 : // created.
165 : type pickedCompaction struct {
166 : cmp Compare
167 : // score of the chosen compaction. Taken from candidateLevelInfo.
168 : score float64
169 : // kind indicates the kind of compaction.
170 : kind compactionKind
171 : // startLevel is the level that is being compacted. Inputs from startLevel
172 : // and outputLevel will be merged to produce a set of outputLevel files.
173 : startLevel *compactionLevel
174 : // outputLevel is the level that files are being produced in. outputLevel is
175 : // equal to startLevel+1 except when:
176 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
177 : // - in multilevel compaction, the output level is the lowest level involved in
178 : // the compaction
179 : outputLevel *compactionLevel
180 : // extraLevels contain additional levels in between the input and output
181 : // levels that get compacted in multi level compactions
182 : extraLevels []*compactionLevel
183 : // adjustedOutputLevel is the output level used for the purpose of
184 : // determining the target output file size, overlap bytes, and expanded
185 : // bytes, taking into account the base level.
186 : adjustedOutputLevel int
187 : inputs []compactionLevel
188 : // L0-specific compaction info. Set to a non-nil value for all compactions
189 : // where startLevel == 0 that were generated by L0Sublevels.
190 : lcf *manifest.L0CompactionFiles
191 : // L0SublevelInfo is used for compactions out of L0. It is nil for all
192 : // other compactions.
193 : l0SublevelInfo []sublevelInfo
194 : // maxOutputFileSize is the maximum size of an individual table created
195 : // during compaction.
196 : maxOutputFileSize uint64
197 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
198 : // single output table with the tables in the grandparent level.
199 : maxOverlapBytes uint64
200 : // maxReadCompactionBytes is the maximum bytes a read compaction is allowed to
201 : // overlap in its output level with. If the overlap is greater than
202 : // maxReadCompaction bytes, then we don't proceed with the compaction.
203 : maxReadCompactionBytes uint64
204 : // The boundaries of the input data.
205 : smallest InternalKey
206 : largest InternalKey
207 : version *version
208 : pickerMetrics compactionPickerMetrics
209 : }
210 :
211 1 : func defaultOutputLevel(startLevel, baseLevel int) int {
212 1 : outputLevel := startLevel + 1
213 1 : if startLevel == 0 {
214 1 : outputLevel = baseLevel
215 1 : }
216 1 : if outputLevel >= numLevels-1 {
217 1 : outputLevel = numLevels - 1
218 1 : }
219 1 : return outputLevel
220 : }
221 :
222 : func newPickedCompaction(
223 : opts *Options, cur *version, startLevel, outputLevel, baseLevel int,
224 1 : ) *pickedCompaction {
225 1 : if startLevel > 0 && startLevel < baseLevel {
226 0 : panic(fmt.Sprintf("invalid compaction: start level %d should not be empty (base level %d)",
227 0 : startLevel, baseLevel))
228 : }
229 :
230 1 : adjustedOutputLevel := outputLevel
231 1 : if adjustedOutputLevel > 0 {
232 1 : // Output level is in the range [baseLevel,numLevels]. For the purpose of
233 1 : // determining the target output file size, overlap bytes, and expanded
234 1 : // bytes, we want to adjust the range to [1,numLevels].
235 1 : adjustedOutputLevel = 1 + outputLevel - baseLevel
236 1 : }
237 :
238 1 : pc := &pickedCompaction{
239 1 : cmp: opts.Comparer.Compare,
240 1 : version: cur,
241 1 : inputs: []compactionLevel{{level: startLevel}, {level: outputLevel}},
242 1 : adjustedOutputLevel: adjustedOutputLevel,
243 1 : maxOutputFileSize: uint64(opts.Level(adjustedOutputLevel).TargetFileSize),
244 1 : maxOverlapBytes: maxGrandparentOverlapBytes(opts, adjustedOutputLevel),
245 1 : maxReadCompactionBytes: maxReadCompactionBytes(opts, adjustedOutputLevel),
246 1 : }
247 1 : pc.startLevel = &pc.inputs[0]
248 1 : pc.outputLevel = &pc.inputs[1]
249 1 : return pc
250 : }
251 :
252 : func newPickedCompactionFromL0(
253 : lcf *manifest.L0CompactionFiles, opts *Options, vers *version, baseLevel int, isBase bool,
254 1 : ) *pickedCompaction {
255 1 : outputLevel := baseLevel
256 1 : if !isBase {
257 1 : outputLevel = 0 // Intra L0
258 1 : }
259 :
260 1 : pc := newPickedCompaction(opts, vers, 0, outputLevel, baseLevel)
261 1 : pc.lcf = lcf
262 1 : pc.outputLevel.level = outputLevel
263 1 :
264 1 : // Manually build the compaction as opposed to calling
265 1 : // pickAutoHelper. This is because L0Sublevels has already added
266 1 : // any overlapping L0 SSTables that need to be added, and
267 1 : // because compactions built by L0SSTables do not necessarily
268 1 : // pick contiguous sequences of files in pc.version.Levels[0].
269 1 : files := make([]*manifest.FileMetadata, 0, len(lcf.Files))
270 1 : iter := vers.Levels[0].Iter()
271 1 : for f := iter.First(); f != nil; f = iter.Next() {
272 1 : if lcf.FilesIncluded[f.L0Index] {
273 1 : files = append(files, f)
274 1 : }
275 : }
276 1 : pc.startLevel.files = manifest.NewLevelSliceSeqSorted(files)
277 1 : return pc
278 : }
279 :
280 0 : func (pc *pickedCompaction) String() string {
281 0 : var builder strings.Builder
282 0 : builder.WriteString(fmt.Sprintf(`Score=%f, `, pc.score))
283 0 : builder.WriteString(fmt.Sprintf(`Kind=%s, `, pc.kind))
284 0 : builder.WriteString(fmt.Sprintf(`AdjustedOutputLevel=%d, `, pc.adjustedOutputLevel))
285 0 : builder.WriteString(fmt.Sprintf(`maxOutputFileSize=%d, `, pc.maxOutputFileSize))
286 0 : builder.WriteString(fmt.Sprintf(`maxReadCompactionBytes=%d, `, pc.maxReadCompactionBytes))
287 0 : builder.WriteString(fmt.Sprintf(`smallest=%s, `, pc.smallest))
288 0 : builder.WriteString(fmt.Sprintf(`largest=%s, `, pc.largest))
289 0 : builder.WriteString(fmt.Sprintf(`version=%s, `, pc.version))
290 0 : builder.WriteString(fmt.Sprintf(`inputs=%s, `, pc.inputs))
291 0 : builder.WriteString(fmt.Sprintf(`startlevel=%s, `, pc.startLevel))
292 0 : builder.WriteString(fmt.Sprintf(`outputLevel=%s, `, pc.outputLevel))
293 0 : builder.WriteString(fmt.Sprintf(`extraLevels=%s, `, pc.extraLevels))
294 0 : builder.WriteString(fmt.Sprintf(`l0SublevelInfo=%s, `, pc.l0SublevelInfo))
295 0 : builder.WriteString(fmt.Sprintf(`lcf=%s`, pc.lcf))
296 0 : return builder.String()
297 0 : }
298 :
299 : // Clone creates a deep copy of the pickedCompaction
300 1 : func (pc *pickedCompaction) clone() *pickedCompaction {
301 1 :
302 1 : // Quickly copy over fields that do not require special deep copy care, and
303 1 : // set all fields that will require a deep copy to nil.
304 1 : newPC := &pickedCompaction{
305 1 : cmp: pc.cmp,
306 1 : score: pc.score,
307 1 : kind: pc.kind,
308 1 : adjustedOutputLevel: pc.adjustedOutputLevel,
309 1 : maxOutputFileSize: pc.maxOutputFileSize,
310 1 : maxOverlapBytes: pc.maxOverlapBytes,
311 1 : maxReadCompactionBytes: pc.maxReadCompactionBytes,
312 1 : smallest: pc.smallest.Clone(),
313 1 : largest: pc.largest.Clone(),
314 1 :
315 1 : // TODO(msbutler): properly clone picker metrics
316 1 : pickerMetrics: pc.pickerMetrics,
317 1 :
318 1 : // Both copies see the same manifest, therefore, it's ok for them to se
319 1 : // share the same pc. version.
320 1 : version: pc.version,
321 1 : }
322 1 :
323 1 : newPC.inputs = make([]compactionLevel, len(pc.inputs))
324 1 : newPC.extraLevels = make([]*compactionLevel, 0, len(pc.extraLevels))
325 1 : for i := range pc.inputs {
326 1 : newPC.inputs[i] = pc.inputs[i].Clone()
327 1 : if i == 0 {
328 1 : newPC.startLevel = &newPC.inputs[i]
329 1 : } else if i == len(pc.inputs)-1 {
330 1 : newPC.outputLevel = &newPC.inputs[i]
331 1 : } else {
332 0 : newPC.extraLevels = append(newPC.extraLevels, &newPC.inputs[i])
333 0 : }
334 : }
335 :
336 1 : newPC.l0SublevelInfo = make([]sublevelInfo, len(pc.l0SublevelInfo))
337 1 : for i := range pc.l0SublevelInfo {
338 0 : newPC.l0SublevelInfo[i] = pc.l0SublevelInfo[i].Clone()
339 0 : }
340 1 : if pc.lcf != nil {
341 0 : newPC.lcf = pc.lcf.Clone()
342 0 : }
343 1 : return newPC
344 : }
345 :
346 : // maybeExpandedBounds is a helper function for setupInputs which ensures the
347 : // pickedCompaction's smallest and largest internal keys are updated iff
348 : // the candidate keys expand the key span. This avoids a bug for multi-level
349 : // compactions: during the second call to setupInputs, the picked compaction's
350 : // smallest and largest keys should not decrease the key span.
351 1 : func (pc *pickedCompaction) maybeExpandBounds(smallest InternalKey, largest InternalKey) {
352 1 : emptyKey := InternalKey{}
353 1 : if base.InternalCompare(pc.cmp, smallest, emptyKey) == 0 {
354 1 : if base.InternalCompare(pc.cmp, largest, emptyKey) != 0 {
355 0 : panic("either both candidate keys are empty or neither are empty")
356 : }
357 1 : return
358 : }
359 1 : if base.InternalCompare(pc.cmp, pc.smallest, emptyKey) == 0 {
360 1 : if base.InternalCompare(pc.cmp, pc.largest, emptyKey) != 0 {
361 0 : panic("either both pc keys are empty or neither are empty")
362 : }
363 1 : pc.smallest = smallest
364 1 : pc.largest = largest
365 1 : return
366 : }
367 1 : if base.InternalCompare(pc.cmp, pc.smallest, smallest) >= 0 {
368 1 : pc.smallest = smallest
369 1 : }
370 1 : if base.InternalCompare(pc.cmp, pc.largest, largest) <= 0 {
371 1 : pc.largest = largest
372 1 : }
373 : }
374 :
375 : // setupInputs returns true if a compaction has been set up. It returns false if
376 : // a concurrent compaction is occurring on the start or output level files.
377 : func (pc *pickedCompaction) setupInputs(
378 : opts *Options, diskAvailBytes uint64, startLevel *compactionLevel,
379 1 : ) bool {
380 1 : // maxExpandedBytes is the maximum size of an expanded compaction. If
381 1 : // growing a compaction results in a larger size, the original compaction
382 1 : // is used instead.
383 1 : maxExpandedBytes := expandedCompactionByteSizeLimit(
384 1 : opts, pc.adjustedOutputLevel, diskAvailBytes,
385 1 : )
386 1 :
387 1 : // Expand the initial inputs to a clean cut.
388 1 : var isCompacting bool
389 1 : startLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, startLevel.files, false /* disableIsCompacting */)
390 1 : if isCompacting {
391 1 : return false
392 1 : }
393 1 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.Iter()))
394 1 :
395 1 : // Determine the sstables in the output level which overlap with the input
396 1 : // sstables, and then expand those tables to a clean cut. No need to do
397 1 : // this for intra-L0 compactions; outputLevel.files is left empty for those.
398 1 : if startLevel.level != pc.outputLevel.level {
399 1 : pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.cmp, pc.smallest.UserKey,
400 1 : pc.largest.UserKey, pc.largest.IsExclusiveSentinel())
401 1 : pc.outputLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, pc.outputLevel.files,
402 1 : false /* disableIsCompacting */)
403 1 : if isCompacting {
404 1 : return false
405 1 : }
406 1 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
407 1 : startLevel.files.Iter(), pc.outputLevel.files.Iter()))
408 : }
409 :
410 : // Grow the sstables in startLevel.level as long as it doesn't affect the number
411 : // of sstables included from pc.outputLevel.level.
412 1 : if pc.lcf != nil && startLevel.level == 0 && pc.outputLevel.level != 0 {
413 1 : // Call the L0-specific compaction extension method. Similar logic as
414 1 : // pc.grow. Additional L0 files are optionally added to the compaction at
415 1 : // this step. Note that the bounds passed in are not the bounds of the
416 1 : // compaction, but rather the smallest and largest internal keys that
417 1 : // the compaction cannot include from L0 without pulling in more Lbase
418 1 : // files. Consider this example:
419 1 : //
420 1 : // L0: c-d e+f g-h
421 1 : // Lbase: a-b e+f i-j
422 1 : // a b c d e f g h i j
423 1 : //
424 1 : // The e-f files have already been chosen in the compaction. As pulling
425 1 : // in more LBase files is undesirable, the logic below will pass in
426 1 : // smallest = b and largest = i to ExtendL0ForBaseCompactionTo, which
427 1 : // will expand the compaction to include c-d and g-h from L0. The
428 1 : // bounds passed in are exclusive; the compaction cannot be expanded
429 1 : // to include files that "touch" it.
430 1 : smallestBaseKey := base.InvalidInternalKey
431 1 : largestBaseKey := base.InvalidInternalKey
432 1 : if pc.outputLevel.files.Empty() {
433 1 : baseIter := pc.version.Levels[pc.outputLevel.level].Iter()
434 1 : if sm := baseIter.SeekLT(pc.cmp, pc.smallest.UserKey); sm != nil {
435 1 : smallestBaseKey = sm.Largest
436 1 : }
437 1 : if la := baseIter.SeekGE(pc.cmp, pc.largest.UserKey); la != nil {
438 1 : largestBaseKey = la.Smallest
439 1 : }
440 1 : } else {
441 1 : // NB: We use Reslice to access the underlying level's files, but
442 1 : // we discard the returned slice. The pc.outputLevel.files slice
443 1 : // is not modified.
444 1 : _ = pc.outputLevel.files.Reslice(func(start, end *manifest.LevelIterator) {
445 1 : if sm := start.Prev(); sm != nil {
446 1 : smallestBaseKey = sm.Largest
447 1 : }
448 1 : if la := end.Next(); la != nil {
449 1 : largestBaseKey = la.Smallest
450 1 : }
451 : })
452 : }
453 1 : oldLcf := pc.lcf.Clone()
454 1 : if pc.version.L0Sublevels.ExtendL0ForBaseCompactionTo(smallestBaseKey, largestBaseKey, pc.lcf) {
455 1 : var newStartLevelFiles []*fileMetadata
456 1 : iter := pc.version.Levels[0].Iter()
457 1 : var sizeSum uint64
458 1 : for j, f := 0, iter.First(); f != nil; j, f = j+1, iter.Next() {
459 1 : if pc.lcf.FilesIncluded[f.L0Index] {
460 1 : newStartLevelFiles = append(newStartLevelFiles, f)
461 1 : sizeSum += f.Size
462 1 : }
463 : }
464 1 : if sizeSum+pc.outputLevel.files.SizeSum() < maxExpandedBytes {
465 1 : startLevel.files = manifest.NewLevelSliceSeqSorted(newStartLevelFiles)
466 1 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp,
467 1 : startLevel.files.Iter(), pc.outputLevel.files.Iter())
468 1 : } else {
469 1 : *pc.lcf = *oldLcf
470 1 : }
471 : }
472 1 : } else if pc.grow(pc.smallest, pc.largest, maxExpandedBytes, startLevel) {
473 1 : pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
474 1 : startLevel.files.Iter(), pc.outputLevel.files.Iter()))
475 1 : }
476 :
477 1 : if pc.startLevel.level == 0 {
478 1 : // We don't change the input files for the compaction beyond this point.
479 1 : pc.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
480 1 : }
481 :
482 1 : return true
483 : }
484 :
485 : // grow grows the number of inputs at c.level without changing the number of
486 : // c.level+1 files in the compaction, and returns whether the inputs grew. sm
487 : // and la are the smallest and largest InternalKeys in all of the inputs.
488 : func (pc *pickedCompaction) grow(
489 : sm, la InternalKey, maxExpandedBytes uint64, startLevel *compactionLevel,
490 1 : ) bool {
491 1 : if pc.outputLevel.files.Empty() {
492 1 : return false
493 1 : }
494 1 : grow0 := pc.version.Overlaps(startLevel.level, pc.cmp, sm.UserKey,
495 1 : la.UserKey, la.IsExclusiveSentinel())
496 1 : grow0, isCompacting := expandToAtomicUnit(pc.cmp, grow0, false /* disableIsCompacting */)
497 1 : if isCompacting {
498 1 : return false
499 1 : }
500 1 : if grow0.Len() <= startLevel.files.Len() {
501 1 : return false
502 1 : }
503 1 : if grow0.SizeSum()+pc.outputLevel.files.SizeSum() >= maxExpandedBytes {
504 1 : return false
505 1 : }
506 : // We need to include the outputLevel iter because without it, in a multiLevel scenario,
507 : // sm1 and la1 could shift the output level keyspace when pc.outputLevel.files is set to grow1.
508 1 : sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter(), pc.outputLevel.files.Iter())
509 1 : grow1 := pc.version.Overlaps(pc.outputLevel.level, pc.cmp, sm1.UserKey,
510 1 : la1.UserKey, la1.IsExclusiveSentinel())
511 1 : grow1, isCompacting = expandToAtomicUnit(pc.cmp, grow1, false /* disableIsCompacting */)
512 1 : if isCompacting {
513 1 : return false
514 1 : }
515 1 : if grow1.Len() != pc.outputLevel.files.Len() {
516 1 : return false
517 1 : }
518 1 : startLevel.files = grow0
519 1 : pc.outputLevel.files = grow1
520 1 : return true
521 : }
522 :
523 1 : func (pc *pickedCompaction) compactionSize() uint64 {
524 1 : var bytesToCompact uint64
525 1 : for i := range pc.inputs {
526 1 : bytesToCompact += pc.inputs[i].files.SizeSum()
527 1 : }
528 1 : return bytesToCompact
529 : }
530 :
531 : // setupMultiLevelCandidated returns true if it successfully added another level
532 : // to the compaction.
533 1 : func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailBytes uint64) bool {
534 1 : pc.inputs = append(pc.inputs, compactionLevel{level: pc.outputLevel.level + 1})
535 1 :
536 1 : // Recalibrate startLevel and outputLevel:
537 1 : // - startLevel and outputLevel pointers may be obsolete after appending to pc.inputs.
538 1 : // - push outputLevel to extraLevels and move the new level to outputLevel
539 1 : pc.startLevel = &pc.inputs[0]
540 1 : pc.extraLevels = []*compactionLevel{&pc.inputs[1]}
541 1 : pc.outputLevel = &pc.inputs[2]
542 1 :
543 1 : pc.adjustedOutputLevel++
544 1 : return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
545 1 : }
546 :
547 : // expandToAtomicUnit expands the provided level slice within its level both
548 : // forwards and backwards to its "atomic compaction unit" boundaries, if
549 : // necessary.
550 : //
551 : // While picking compaction inputs, this is required to maintain the invariant
552 : // that the versions of keys at level+1 are older than the versions of keys at
553 : // level. Tables are added to the right of the current slice tables such that
554 : // the rightmost table has a "clean cut". A clean cut is either a change in
555 : // user keys, or when the largest key in the left sstable is a range tombstone
556 : // sentinel key (InternalKeyRangeDeleteSentinel).
557 : //
558 : // In addition to maintaining the seqnum invariant, expandToAtomicUnit is used
559 : // to provide clean boundaries for range tombstone truncation during
560 : // compaction. In order to achieve these clean boundaries, expandToAtomicUnit
561 : // needs to find a "clean cut" on the left edge of the compaction as well.
562 : // This is necessary in order for "atomic compaction units" to always be
563 : // compacted as a unit. Failure to do this leads to a subtle bug with
564 : // truncation of range tombstones to atomic compaction unit boundaries.
565 : // Consider the scenario:
566 : //
567 : // L3:
568 : // 12:[a#2,15-b#1,1]
569 : // 13:[b#0,15-d#72057594037927935,15]
570 : //
571 : // These sstables contain a range tombstone [a-d)#2 which spans the two
572 : // sstables. The two sstables need to always be kept together. Compacting
573 : // sstable 13 independently of sstable 12 would result in:
574 : //
575 : // L3:
576 : // 12:[a#2,15-b#1,1]
577 : // L4:
578 : // 14:[b#0,15-d#72057594037927935,15]
579 : //
580 : // This state is still ok, but when sstable 12 is next compacted, its range
581 : // tombstones will be truncated at "b" (the largest key in its atomic
582 : // compaction unit). In the scenario here, that could result in b#1 becoming
583 : // visible when it should be deleted.
584 : //
585 : // isCompacting is returned true for any atomic units that contain files that
586 : // have in-progress compactions, i.e. FileMetadata.Compacting == true. If
587 : // disableIsCompacting is true, isCompacting always returns false. This helps
588 : // avoid spurious races from being detected when this method is used outside
589 : // of compaction picking code.
590 : //
591 : // TODO(jackson): Compactions and flushes no longer split a user key between two
592 : // sstables. We could perform a migration, re-compacting any sstables with split
593 : // user keys, which would allow us to remove atomic compaction unit expansion
594 : // code.
595 : func expandToAtomicUnit(
596 : cmp Compare, inputs manifest.LevelSlice, disableIsCompacting bool,
597 1 : ) (slice manifest.LevelSlice, isCompacting bool) {
598 1 : // NB: Inputs for L0 can't be expanded and *version.Overlaps guarantees
599 1 : // that we get a 'clean cut.' For L0, Overlaps will return a slice without
600 1 : // access to the rest of the L0 files, so it's OK to try to reslice.
601 1 : if inputs.Empty() {
602 1 : // Nothing to expand.
603 1 : return inputs, false
604 1 : }
605 :
606 : // TODO(jackson): Update to avoid use of LevelIterator.Current(). The
607 : // Reslice interface will require some tweaking, because we currently rely
608 : // on Reslice having already positioned the LevelIterator appropriately.
609 :
610 1 : inputs = inputs.Reslice(func(start, end *manifest.LevelIterator) {
611 1 : iter := start.Clone()
612 1 : iter.Prev()
613 1 : for cur, prev := start.Current(), iter.Current(); prev != nil; cur, prev = start.Prev(), iter.Prev() {
614 1 : if cur.IsCompacting() {
615 1 : isCompacting = true
616 1 : }
617 1 : if cmp(prev.Largest.UserKey, cur.Smallest.UserKey) < 0 {
618 1 : break
619 : }
620 1 : if prev.Largest.IsExclusiveSentinel() {
621 1 : // The table prev has a largest key indicating that the user key
622 1 : // prev.largest.UserKey doesn't actually exist in the table.
623 1 : break
624 : }
625 : // prev.Largest.UserKey == cur.Smallest.UserKey, so we need to
626 : // include prev in the compaction.
627 : }
628 :
629 1 : iter = end.Clone()
630 1 : iter.Next()
631 1 : for cur, next := end.Current(), iter.Current(); next != nil; cur, next = end.Next(), iter.Next() {
632 1 : if cur.IsCompacting() {
633 1 : isCompacting = true
634 1 : }
635 1 : if cmp(cur.Largest.UserKey, next.Smallest.UserKey) < 0 {
636 1 : break
637 : }
638 1 : if cur.Largest.IsExclusiveSentinel() {
639 1 : // The table cur has a largest key indicating that the user key
640 1 : // cur.largest.UserKey doesn't actually exist in the table.
641 1 : break
642 : }
643 : // cur.Largest.UserKey == next.Smallest.UserKey, so we need to
644 : // include next in the compaction.
645 : }
646 : })
647 1 : inputIter := inputs.Iter()
648 1 : isCompacting = !disableIsCompacting &&
649 1 : (isCompacting || inputIter.First().IsCompacting() || inputIter.Last().IsCompacting())
650 1 : return inputs, isCompacting
651 : }
652 :
653 : func newCompactionPicker(
654 : v *version, opts *Options, inProgressCompactions []compactionInfo,
655 1 : ) compactionPicker {
656 1 : p := &compactionPickerByScore{
657 1 : opts: opts,
658 1 : vers: v,
659 1 : }
660 1 : p.initLevelMaxBytes(inProgressCompactions)
661 1 : return p
662 1 : }
663 :
664 : // Information about a candidate compaction level that has been identified by
665 : // the compaction picker.
666 : type candidateLevelInfo struct {
667 : // The score of the level to be compacted, with compensated file sizes and
668 : // adjustments.
669 : score float64
670 : // The original score of the level to be compacted, before adjusting
671 : // according to other levels' sizes.
672 : origScore float64
673 : // The raw score of the level to be compacted, calculated using
674 : // uncompensated file sizes and without any adjustments.
675 : rawScore float64
676 : level int
677 : // The level to compact to.
678 : outputLevel int
679 : // The file in level that will be compacted. Additional files may be
680 : // picked by the compaction, and a pickedCompaction created for the
681 : // compaction.
682 : file manifest.LevelFile
683 : }
684 :
685 1 : func fileCompensation(f *fileMetadata) uint64 {
686 1 : return uint64(f.Stats.PointDeletionsBytesEstimate) + f.Stats.RangeDeletionsBytesEstimate
687 1 : }
688 :
689 : // compensatedSize returns f's file size, inflated according to compaction
690 : // priorities.
691 1 : func compensatedSize(f *fileMetadata) uint64 {
692 1 : // Add in the estimate of disk space that may be reclaimed by compacting the
693 1 : // file's tombstones.
694 1 : return f.Size + fileCompensation(f)
695 1 : }
696 :
697 : // compensatedSizeAnnotator implements manifest.Annotator, annotating B-Tree
698 : // nodes with the sum of the files' compensated sizes. Its annotation type is
699 : // a *uint64. Compensated sizes may change once a table's stats are loaded
700 : // asynchronously, so its values are marked as cacheable only if a file's
701 : // stats have been loaded.
702 : type compensatedSizeAnnotator struct {
703 : }
704 :
705 : var _ manifest.Annotator = compensatedSizeAnnotator{}
706 :
707 1 : func (a compensatedSizeAnnotator) Zero(dst interface{}) interface{} {
708 1 : if dst == nil {
709 1 : return new(uint64)
710 1 : }
711 1 : v := dst.(*uint64)
712 1 : *v = 0
713 1 : return v
714 : }
715 :
716 : func (a compensatedSizeAnnotator) Accumulate(
717 : f *fileMetadata, dst interface{},
718 1 : ) (v interface{}, cacheOK bool) {
719 1 : vptr := dst.(*uint64)
720 1 : *vptr = *vptr + compensatedSize(f)
721 1 : return vptr, f.StatsValid()
722 1 : }
723 :
724 1 : func (a compensatedSizeAnnotator) Merge(src interface{}, dst interface{}) interface{} {
725 1 : srcV := src.(*uint64)
726 1 : dstV := dst.(*uint64)
727 1 : *dstV = *dstV + *srcV
728 1 : return dstV
729 1 : }
730 :
731 : // totalCompensatedSize computes the compensated size over a file metadata
732 : // iterator. Note that this function is linear in the files available to the
733 : // iterator. Use the compensatedSizeAnnotator if querying the total
734 : // compensated size of a level.
735 1 : func totalCompensatedSize(iter manifest.LevelIterator) uint64 {
736 1 : var sz uint64
737 1 : for f := iter.First(); f != nil; f = iter.Next() {
738 1 : sz += compensatedSize(f)
739 1 : }
740 1 : return sz
741 : }
742 :
743 : // compactionPickerByScore holds the state and logic for picking a compaction. A
744 : // compaction picker is associated with a single version. A new compaction
745 : // picker is created and initialized every time a new version is installed.
746 : type compactionPickerByScore struct {
747 : opts *Options
748 : vers *version
749 : // The level to target for L0 compactions. Levels L1 to baseLevel must be
750 : // empty.
751 : baseLevel int
752 : // levelMaxBytes holds the dynamically adjusted max bytes setting for each
753 : // level.
754 : levelMaxBytes [numLevels]int64
755 : }
756 :
757 : var _ compactionPicker = &compactionPickerByScore{}
758 :
759 0 : func (p *compactionPickerByScore) getScores(inProgress []compactionInfo) [numLevels]float64 {
760 0 : var scores [numLevels]float64
761 0 : for _, info := range p.calculateLevelScores(inProgress) {
762 0 : scores[info.level] = info.score
763 0 : }
764 0 : return scores
765 : }
766 :
767 1 : func (p *compactionPickerByScore) getBaseLevel() int {
768 1 : if p == nil {
769 0 : return 1
770 0 : }
771 1 : return p.baseLevel
772 : }
773 :
774 : // estimatedCompactionDebt estimates the number of bytes which need to be
775 : // compacted before the LSM tree becomes stable.
776 1 : func (p *compactionPickerByScore) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
777 1 : if p == nil {
778 0 : return 0
779 0 : }
780 :
781 : // We assume that all the bytes in L0 need to be compacted to Lbase. This is
782 : // unlike the RocksDB logic that figures out whether L0 needs compaction.
783 1 : bytesAddedToNextLevel := l0ExtraSize + p.vers.Levels[0].Size()
784 1 : lbaseSize := p.vers.Levels[p.baseLevel].Size()
785 1 :
786 1 : var compactionDebt uint64
787 1 : if bytesAddedToNextLevel > 0 && lbaseSize > 0 {
788 1 : // We only incur compaction debt if both L0 and Lbase contain data. If L0
789 1 : // is empty, no compaction is necessary. If Lbase is empty, a move-based
790 1 : // compaction from L0 would occur.
791 1 : compactionDebt += bytesAddedToNextLevel + lbaseSize
792 1 : }
793 :
794 : // loop invariant: At the beginning of the loop, bytesAddedToNextLevel is the
795 : // bytes added to `level` in the loop.
796 1 : for level := p.baseLevel; level < numLevels-1; level++ {
797 1 : levelSize := p.vers.Levels[level].Size() + bytesAddedToNextLevel
798 1 : nextLevelSize := p.vers.Levels[level+1].Size()
799 1 : if levelSize > uint64(p.levelMaxBytes[level]) {
800 1 : bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
801 1 : if nextLevelSize > 0 {
802 1 : // We only incur compaction debt if the next level contains data. If the
803 1 : // next level is empty, a move-based compaction would be used.
804 1 : levelRatio := float64(nextLevelSize) / float64(levelSize)
805 1 : // The current level contributes bytesAddedToNextLevel to compactions.
806 1 : // The next level contributes levelRatio * bytesAddedToNextLevel.
807 1 : compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
808 1 : }
809 1 : } else {
810 1 : // We're not moving any bytes to the next level.
811 1 : bytesAddedToNextLevel = 0
812 1 : }
813 : }
814 1 : return compactionDebt
815 : }
816 :
817 1 : func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []compactionInfo) {
818 1 : // The levelMaxBytes calculations here differ from RocksDB in two ways:
819 1 : //
820 1 : // 1. The use of dbSize vs maxLevelSize. RocksDB uses the size of the maximum
821 1 : // level in L1-L6, rather than determining the size of the bottom level
822 1 : // based on the total amount of data in the dB. The RocksDB calculation is
823 1 : // problematic if L0 contains a significant fraction of data, or if the
824 1 : // level sizes are roughly equal and thus there is a significant fraction
825 1 : // of data outside of the largest level.
826 1 : //
827 1 : // 2. Not adjusting the size of Lbase based on L0. RocksDB computes
828 1 : // baseBytesMax as the maximum of the configured LBaseMaxBytes and the
829 1 : // size of L0. This is problematic because baseBytesMax is used to compute
830 1 : // the max size of lower levels. A very large baseBytesMax will result in
831 1 : // an overly large value for the size of lower levels which will caused
832 1 : // those levels not to be compacted even when they should be
833 1 : // compacted. This often results in "inverted" LSM shapes where Ln is
834 1 : // larger than Ln+1.
835 1 :
836 1 : // Determine the first non-empty level and the total DB size.
837 1 : firstNonEmptyLevel := -1
838 1 : var dbSize uint64
839 1 : for level := 1; level < numLevels; level++ {
840 1 : if p.vers.Levels[level].Size() > 0 {
841 1 : if firstNonEmptyLevel == -1 {
842 1 : firstNonEmptyLevel = level
843 1 : }
844 1 : dbSize += p.vers.Levels[level].Size()
845 : }
846 : }
847 1 : for _, c := range inProgressCompactions {
848 1 : if c.outputLevel == 0 || c.outputLevel == -1 {
849 1 : continue
850 : }
851 1 : if c.inputs[0].level == 0 && (firstNonEmptyLevel == -1 || c.outputLevel < firstNonEmptyLevel) {
852 1 : firstNonEmptyLevel = c.outputLevel
853 1 : }
854 : }
855 :
856 : // Initialize the max-bytes setting for each level to "infinity" which will
857 : // disallow compaction for that level. We'll fill in the actual value below
858 : // for levels we want to allow compactions from.
859 1 : for level := 0; level < numLevels; level++ {
860 1 : p.levelMaxBytes[level] = math.MaxInt64
861 1 : }
862 :
863 1 : if dbSize == 0 {
864 1 : // No levels for L1 and up contain any data. Target L0 compactions for the
865 1 : // last level or to the level to which there is an ongoing L0 compaction.
866 1 : p.baseLevel = numLevels - 1
867 1 : if firstNonEmptyLevel >= 0 {
868 1 : p.baseLevel = firstNonEmptyLevel
869 1 : }
870 1 : return
871 : }
872 :
873 1 : dbSize += p.vers.Levels[0].Size()
874 1 : bottomLevelSize := dbSize - dbSize/uint64(p.opts.Experimental.LevelMultiplier)
875 1 :
876 1 : curLevelSize := bottomLevelSize
877 1 : for level := numLevels - 2; level >= firstNonEmptyLevel; level-- {
878 1 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
879 1 : }
880 :
881 : // Compute base level (where L0 data is compacted to).
882 1 : baseBytesMax := uint64(p.opts.LBaseMaxBytes)
883 1 : p.baseLevel = firstNonEmptyLevel
884 1 : for p.baseLevel > 1 && curLevelSize > baseBytesMax {
885 1 : p.baseLevel--
886 1 : curLevelSize = uint64(float64(curLevelSize) / float64(p.opts.Experimental.LevelMultiplier))
887 1 : }
888 :
889 1 : smoothedLevelMultiplier := 1.0
890 1 : if p.baseLevel < numLevels-1 {
891 1 : smoothedLevelMultiplier = math.Pow(
892 1 : float64(bottomLevelSize)/float64(baseBytesMax),
893 1 : 1.0/float64(numLevels-p.baseLevel-1))
894 1 : }
895 :
896 1 : levelSize := float64(baseBytesMax)
897 1 : for level := p.baseLevel; level < numLevels; level++ {
898 1 : if level > p.baseLevel && levelSize > 0 {
899 1 : levelSize *= smoothedLevelMultiplier
900 1 : }
901 : // Round the result since test cases use small target level sizes, which
902 : // can be impacted by floating-point imprecision + integer truncation.
903 1 : roundedLevelSize := math.Round(levelSize)
904 1 : if roundedLevelSize > float64(math.MaxInt64) {
905 0 : p.levelMaxBytes[level] = math.MaxInt64
906 1 : } else {
907 1 : p.levelMaxBytes[level] = int64(roundedLevelSize)
908 1 : }
909 : }
910 : }
911 :
912 : type levelSizeAdjust struct {
913 : incomingActualBytes uint64
914 : outgoingActualBytes uint64
915 : outgoingCompensatedBytes uint64
916 : }
917 :
918 1 : func (a levelSizeAdjust) compensated() uint64 {
919 1 : return a.incomingActualBytes - a.outgoingCompensatedBytes
920 1 : }
921 :
922 1 : func (a levelSizeAdjust) actual() uint64 {
923 1 : return a.incomingActualBytes - a.outgoingActualBytes
924 1 : }
925 :
926 1 : func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
927 1 : // Compute size adjustments for each level based on the in-progress
928 1 : // compactions. We sum the file sizes of all files leaving and entering each
929 1 : // level in in-progress compactions. For outgoing files, we also sum a
930 1 : // separate sum of 'compensated file sizes', which are inflated according
931 1 : // to deletion estimates.
932 1 : //
933 1 : // When we adjust a level's size according to these values during score
934 1 : // calculation, we subtract the compensated size of start level inputs to
935 1 : // account for the fact that score calculation uses compensated sizes.
936 1 : //
937 1 : // Since compensated file sizes may be compensated because they reclaim
938 1 : // space from the output level's files, we only add the real file size to
939 1 : // the output level.
940 1 : //
941 1 : // This is slightly different from RocksDB's behavior, which simply elides
942 1 : // compacting files from the level size calculation.
943 1 : var sizeAdjust [numLevels]levelSizeAdjust
944 1 : for i := range inProgressCompactions {
945 1 : c := &inProgressCompactions[i]
946 1 : // If this compaction's version edit has already been applied, there's
947 1 : // no need to adjust: The LSM we'll examine will already reflect the
948 1 : // new LSM state.
949 1 : if c.versionEditApplied {
950 1 : continue
951 : }
952 :
953 1 : for _, input := range c.inputs {
954 1 : actualSize := input.files.SizeSum()
955 1 : compensatedSize := totalCompensatedSize(input.files.Iter())
956 1 :
957 1 : if input.level != c.outputLevel {
958 1 : sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
959 1 : sizeAdjust[input.level].outgoingActualBytes += actualSize
960 1 : if c.outputLevel != -1 {
961 1 : sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
962 1 : }
963 : }
964 : }
965 : }
966 1 : return sizeAdjust
967 : }
968 :
969 1 : func levelCompensatedSize(lm manifest.LevelMetadata) uint64 {
970 1 : return *lm.Annotation(compensatedSizeAnnotator{}).(*uint64)
971 1 : }
972 :
973 : func (p *compactionPickerByScore) calculateLevelScores(
974 : inProgressCompactions []compactionInfo,
975 1 : ) [numLevels]candidateLevelInfo {
976 1 : var scores [numLevels]candidateLevelInfo
977 1 : for i := range scores {
978 1 : scores[i].level = i
979 1 : scores[i].outputLevel = i + 1
980 1 : }
981 1 : scores[0] = candidateLevelInfo{
982 1 : outputLevel: p.baseLevel,
983 1 : score: calculateL0Score(p.vers, p.opts, inProgressCompactions),
984 1 : }
985 1 : sizeAdjust := calculateSizeAdjust(inProgressCompactions)
986 1 : for level := 1; level < numLevels; level++ {
987 1 : compensatedLevelSize := levelCompensatedSize(p.vers.Levels[level]) + sizeAdjust[level].compensated()
988 1 : scores[level].score = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
989 1 : scores[level].origScore = scores[level].score
990 1 :
991 1 : // In addition to the compensated score, we calculate a separate score
992 1 : // that uses actual file sizes, not compensated sizes. This is used
993 1 : // during score smoothing down below to prevent excessive
994 1 : // prioritization of reclaiming disk space.
995 1 : scores[level].rawScore = float64(p.vers.Levels[level].Size()+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
996 1 : }
997 :
998 : // Adjust each level's score by the score of the next level. If the next
999 : // level has a high score, and is thus a priority for compaction, this
1000 : // reduces the priority for compacting the current level. If the next level
1001 : // has a low score (i.e. it is below its target size), this increases the
1002 : // priority for compacting the current level.
1003 : //
1004 : // The effect of this adjustment is to help prioritize compactions in lower
1005 : // levels. The following shows the new score and original score. In this
1006 : // scenario, L0 has 68 sublevels. L3 (a.k.a. Lbase) is significantly above
1007 : // its target size. The original score prioritizes compactions from those two
1008 : // levels, but doing so ends up causing a future problem: data piles up in
1009 : // the higher levels, starving L5->L6 compactions, and to a lesser degree
1010 : // starving L4->L5 compactions.
1011 : //
1012 : // adjusted original
1013 : // score score size max-size
1014 : // L0 3.2 68.0 2.2 G -
1015 : // L3 3.2 21.1 1.3 G 64 M
1016 : // L4 3.4 6.7 3.1 G 467 M
1017 : // L5 3.4 2.0 6.6 G 3.3 G
1018 : // L6 0.6 0.6 14 G 24 G
1019 1 : var prevLevel int
1020 1 : for level := p.baseLevel; level < numLevels; level++ {
1021 1 : if scores[prevLevel].score >= 1 {
1022 1 : // Avoid absurdly large scores by placing a floor on the score that we'll
1023 1 : // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
1024 1 : const minScore = 0.01
1025 1 : if scores[level].rawScore >= minScore {
1026 1 : scores[prevLevel].score /= scores[level].rawScore
1027 1 : } else {
1028 1 : scores[prevLevel].score /= minScore
1029 1 : }
1030 : }
1031 1 : prevLevel = level
1032 : }
1033 :
1034 1 : sort.Sort(sortCompactionLevelsDecreasingScore(scores[:]))
1035 1 : return scores
1036 : }
1037 :
1038 : // calculateL0Score calculates a float score representing the relative priority
1039 : // of compacting L0. Level L0 is special in that files within L0 may overlap one
1040 : // another, so a different set of heuristics that take into account
1041 : // read amplification apply.
1042 : func calculateL0Score(
1043 : vers *version, opts *Options, inProgressCompactions []compactionInfo,
1044 1 : ) float64 {
1045 1 : // Use the sublevel count to calculate the score. The base vs intra-L0
1046 1 : // compaction determination happens in pickAuto, not here.
1047 1 : score := float64(2*vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
1048 1 : float64(opts.L0CompactionThreshold)
1049 1 :
1050 1 : // Also calculate a score based on the file count but use it only if it
1051 1 : // produces a higher score than the sublevel-based one. This heuristic is
1052 1 : // designed to accommodate cases where L0 is accumulating non-overlapping
1053 1 : // files in L0. Letting too many non-overlapping files accumulate in few
1054 1 : // sublevels is undesirable, because:
1055 1 : // 1) we can produce a massive backlog to compact once files do overlap.
1056 1 : // 2) constructing L0 sublevels has a runtime that grows superlinearly with
1057 1 : // the number of files in L0 and must be done while holding D.mu.
1058 1 : noncompactingFiles := vers.Levels[0].Len()
1059 1 : for _, c := range inProgressCompactions {
1060 1 : for _, cl := range c.inputs {
1061 1 : if cl.level == 0 {
1062 1 : noncompactingFiles -= cl.files.Len()
1063 1 : }
1064 : }
1065 : }
1066 1 : fileScore := float64(noncompactingFiles) / float64(opts.L0CompactionFileThreshold)
1067 1 : if score < fileScore {
1068 1 : score = fileScore
1069 1 : }
1070 1 : return score
1071 : }
1072 :
1073 : // pickCompactionSeedFile picks a file from `level` in the `vers` to build a
1074 : // compaction around. Currently, this function implements a heuristic similar to
1075 : // RocksDB's kMinOverlappingRatio, seeking to minimize write amplification. This
1076 : // function is linear with respect to the number of files in `level` and
1077 : // `outputLevel`.
1078 : func pickCompactionSeedFile(
1079 : vers *version, opts *Options, level, outputLevel int, earliestSnapshotSeqNum uint64,
1080 1 : ) (manifest.LevelFile, bool) {
1081 1 : // Select the file within the level to compact. We want to minimize write
1082 1 : // amplification, but also ensure that deletes are propagated to the
1083 1 : // bottom level in a timely fashion so as to reclaim disk space. A table's
1084 1 : // smallest sequence number provides a measure of its age. The ratio of
1085 1 : // overlapping-bytes / table-size gives an indication of write
1086 1 : // amplification (a smaller ratio is preferrable).
1087 1 : //
1088 1 : // The current heuristic is based off the the RocksDB kMinOverlappingRatio
1089 1 : // heuristic. It chooses the file with the minimum overlapping ratio with
1090 1 : // the target level, which minimizes write amplification.
1091 1 : //
1092 1 : // It uses a "compensated size" for the denominator, which is the file
1093 1 : // size but artificially inflated by an estimate of the space that may be
1094 1 : // reclaimed through compaction. Currently, we only compensate for range
1095 1 : // deletions and only with a rough estimate of the reclaimable bytes. This
1096 1 : // differs from RocksDB which only compensates for point tombstones and
1097 1 : // only if they exceed the number of non-deletion entries in table.
1098 1 : //
1099 1 : // TODO(peter): For concurrent compactions, we may want to try harder to
1100 1 : // pick a seed file whose resulting compaction bounds do not overlap with
1101 1 : // an in-progress compaction.
1102 1 :
1103 1 : cmp := opts.Comparer.Compare
1104 1 : startIter := vers.Levels[level].Iter()
1105 1 : outputIter := vers.Levels[outputLevel].Iter()
1106 1 :
1107 1 : var file manifest.LevelFile
1108 1 : smallestRatio := uint64(math.MaxUint64)
1109 1 :
1110 1 : outputFile := outputIter.First()
1111 1 :
1112 1 : for f := startIter.First(); f != nil; f = startIter.Next() {
1113 1 : var overlappingBytes uint64
1114 1 : compacting := f.IsCompacting()
1115 1 : if compacting {
1116 1 : // Move on if this file is already being compacted. We'll likely
1117 1 : // still need to move past the overlapping output files regardless,
1118 1 : // but in cases where all start-level files are compacting we won't.
1119 1 : continue
1120 : }
1121 :
1122 : // Trim any output-level files smaller than f.
1123 1 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Largest, f.Smallest) < 0 {
1124 1 : outputFile = outputIter.Next()
1125 1 : }
1126 :
1127 1 : for outputFile != nil && sstableKeyCompare(cmp, outputFile.Smallest, f.Largest) <= 0 && !compacting {
1128 1 : overlappingBytes += outputFile.Size
1129 1 : compacting = compacting || outputFile.IsCompacting()
1130 1 :
1131 1 : // For files in the bottommost level of the LSM, the
1132 1 : // Stats.RangeDeletionsBytesEstimate field is set to the estimate
1133 1 : // of bytes /within/ the file itself that may be dropped by
1134 1 : // recompacting the file. These bytes from obsolete keys would not
1135 1 : // need to be rewritten if we compacted `f` into `outputFile`, so
1136 1 : // they don't contribute to write amplification. Subtracting them
1137 1 : // out of the overlapping bytes helps prioritize these compactions
1138 1 : // that are cheaper than their file sizes suggest.
1139 1 : if outputLevel == numLevels-1 && outputFile.LargestSeqNum < earliestSnapshotSeqNum {
1140 1 : overlappingBytes -= outputFile.Stats.RangeDeletionsBytesEstimate
1141 1 : }
1142 :
1143 : // If the file in the next level extends beyond f's largest key,
1144 : // break out and don't advance outputIter because f's successor
1145 : // might also overlap.
1146 : //
1147 : // Note, we stop as soon as we encounter an output-level file with a
1148 : // largest key beyond the input-level file's largest bound. We
1149 : // perform a simple user key comparison here using sstableKeyCompare
1150 : // which handles the potential for exclusive largest key bounds.
1151 : // There's some subtlety when the bounds are equal (eg, equal and
1152 : // inclusive, or equal and exclusive). Current Pebble doesn't split
1153 : // user keys across sstables within a level (and in format versions
1154 : // FormatSplitUserKeysMarkedCompacted and later we guarantee no
1155 : // split user keys exist within the entire LSM). In that case, we're
1156 : // assured that neither the input level nor the output level's next
1157 : // file shares the same user key, so compaction expansion will not
1158 : // include them in any compaction compacting `f`.
1159 : //
1160 : // NB: If we /did/ allow split user keys, or we're running on an
1161 : // old database with an earlier format major version where there are
1162 : // existing split user keys, this logic would be incorrect. Consider
1163 : // L1: [a#120,a#100] [a#80,a#60]
1164 : // L2: [a#55,a#45] [a#35,a#25] [a#15,a#5]
1165 : // While considering the first file in L1, [a#120,a#100], we'd skip
1166 : // past all of the files in L2. When considering the second file in
1167 : // L1, we'd improperly conclude that the second file overlaps
1168 : // nothing in the second level and is cheap to compact, when in
1169 : // reality we'd need to expand the compaction to include all 5
1170 : // files.
1171 1 : if sstableKeyCompare(cmp, outputFile.Largest, f.Largest) > 0 {
1172 1 : break
1173 : }
1174 1 : outputFile = outputIter.Next()
1175 : }
1176 :
1177 : // If the input level file or one of the overlapping files is
1178 : // compacting, we're not going to be able to compact this file
1179 : // anyways, so skip it.
1180 1 : if compacting {
1181 1 : continue
1182 : }
1183 :
1184 1 : compSz := compensatedSize(f)
1185 1 : scaledRatio := overlappingBytes * 1024 / compSz
1186 1 : if scaledRatio < smallestRatio {
1187 1 : smallestRatio = scaledRatio
1188 1 : file = startIter.Take()
1189 1 : }
1190 : }
1191 1 : return file, file.FileMetadata != nil
1192 : }
1193 :
1194 : // pickAuto picks the best compaction, if any.
1195 : //
1196 : // On each call, pickAuto computes per-level size adjustments based on
1197 : // in-progress compactions, and computes a per-level score. The levels are
1198 : // iterated over in decreasing score order trying to find a valid compaction
1199 : // anchored at that level.
1200 : //
1201 : // If a score-based compaction cannot be found, pickAuto falls back to looking
1202 : // for an elision-only compaction to remove obsolete keys.
1203 1 : func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompaction) {
1204 1 : // Compaction concurrency is controlled by L0 read-amp. We allow one
1205 1 : // additional compaction per L0CompactionConcurrency sublevels, as well as
1206 1 : // one additional compaction per CompactionDebtConcurrency bytes of
1207 1 : // compaction debt. Compaction concurrency is tied to L0 sublevels as that
1208 1 : // signal is independent of the database size. We tack on the compaction
1209 1 : // debt as a second signal to prevent compaction concurrency from dropping
1210 1 : // significantly right after a base compaction finishes, and before those
1211 1 : // bytes have been compacted further down the LSM.
1212 1 : if n := len(env.inProgressCompactions); n > 0 {
1213 1 : l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
1214 1 : compactionDebt := p.estimatedCompactionDebt(0)
1215 1 : ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
1216 1 : ccSignal2 := uint64(n) * p.opts.Experimental.CompactionDebtConcurrency
1217 1 : if l0ReadAmp < ccSignal1 && compactionDebt < ccSignal2 {
1218 1 : return nil
1219 1 : }
1220 : }
1221 :
1222 1 : scores := p.calculateLevelScores(env.inProgressCompactions)
1223 1 :
1224 1 : // TODO(bananabrick): Either remove, or change this into an event sent to the
1225 1 : // EventListener.
1226 1 : logCompaction := func(pc *pickedCompaction) {
1227 0 : var buf bytes.Buffer
1228 0 : for i := 0; i < numLevels; i++ {
1229 0 : if i != 0 && i < p.baseLevel {
1230 0 : continue
1231 : }
1232 :
1233 0 : var info *candidateLevelInfo
1234 0 : for j := range scores {
1235 0 : if scores[j].level == i {
1236 0 : info = &scores[j]
1237 0 : break
1238 : }
1239 : }
1240 :
1241 0 : marker := " "
1242 0 : if pc.startLevel.level == info.level {
1243 0 : marker = "*"
1244 0 : }
1245 0 : fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %8s %8s",
1246 0 : marker, info.level, info.score, info.origScore, info.rawScore,
1247 0 : humanize.Bytes.Int64(int64(totalCompensatedSize(
1248 0 : p.vers.Levels[info.level].Iter(),
1249 0 : ))),
1250 0 : humanize.Bytes.Int64(p.levelMaxBytes[info.level]),
1251 0 : )
1252 0 :
1253 0 : count := 0
1254 0 : for i := range env.inProgressCompactions {
1255 0 : c := &env.inProgressCompactions[i]
1256 0 : if c.inputs[0].level != info.level {
1257 0 : continue
1258 : }
1259 0 : count++
1260 0 : if count == 1 {
1261 0 : fmt.Fprintf(&buf, " [")
1262 0 : } else {
1263 0 : fmt.Fprintf(&buf, " ")
1264 0 : }
1265 0 : fmt.Fprintf(&buf, "L%d->L%d", c.inputs[0].level, c.outputLevel)
1266 : }
1267 0 : if count > 0 {
1268 0 : fmt.Fprintf(&buf, "]")
1269 0 : }
1270 0 : fmt.Fprintf(&buf, "\n")
1271 : }
1272 0 : p.opts.Logger.Infof("pickAuto: L%d->L%d\n%s",
1273 0 : pc.startLevel.level, pc.outputLevel.level, buf.String())
1274 : }
1275 :
1276 : // Check for a score-based compaction. "scores" has been sorted in order of
1277 : // decreasing score. For each level with a score >= 1, we attempt to find a
1278 : // compaction anchored at at that level.
1279 1 : for i := range scores {
1280 1 : info := &scores[i]
1281 1 : if info.score < 1 {
1282 1 : break
1283 : }
1284 1 : if info.level == numLevels-1 {
1285 1 : continue
1286 : }
1287 :
1288 1 : if info.level == 0 {
1289 1 : pc = pickL0(env, p.opts, p.vers, p.baseLevel)
1290 1 : // Fail-safe to protect against compacting the same sstable
1291 1 : // concurrently.
1292 1 : if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
1293 1 : p.addScoresToPickedCompactionMetrics(pc, scores)
1294 1 : pc.score = info.score
1295 1 : // TODO(bananabrick): Create an EventListener for logCompaction.
1296 1 : if false {
1297 0 : logCompaction(pc)
1298 0 : }
1299 1 : return pc
1300 : }
1301 1 : continue
1302 : }
1303 :
1304 : // info.level > 0
1305 1 : var ok bool
1306 1 : info.file, ok = pickCompactionSeedFile(p.vers, p.opts, info.level, info.outputLevel, env.earliestSnapshotSeqNum)
1307 1 : if !ok {
1308 1 : continue
1309 : }
1310 :
1311 1 : pc := pickAutoLPositive(env, p.opts, p.vers, *info, p.baseLevel, p.levelMaxBytes)
1312 1 : // Fail-safe to protect against compacting the same sstable concurrently.
1313 1 : if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
1314 1 : p.addScoresToPickedCompactionMetrics(pc, scores)
1315 1 : pc.score = info.score
1316 1 : // TODO(bananabrick): Create an EventListener for logCompaction.
1317 1 : if false {
1318 0 : logCompaction(pc)
1319 0 : }
1320 1 : return pc
1321 : }
1322 : }
1323 :
1324 : // Check for L6 files with tombstones that may be elided. These files may
1325 : // exist if a snapshot prevented the elision of a tombstone or because of
1326 : // a move compaction. These are low-priority compactions because they
1327 : // don't help us keep up with writes, just reclaim disk space.
1328 1 : if pc := p.pickElisionOnlyCompaction(env); pc != nil {
1329 1 : return pc
1330 1 : }
1331 :
1332 1 : if pc := p.pickReadTriggeredCompaction(env); pc != nil {
1333 0 : return pc
1334 0 : }
1335 :
1336 : // NB: This should only be run if a read compaction wasn't
1337 : // scheduled.
1338 : //
1339 : // We won't be scheduling a read compaction right now, and in
1340 : // read heavy workloads, compactions won't be scheduled frequently
1341 : // because flushes aren't frequent. So we need to signal to the
1342 : // iterator to schedule a compaction when it adds compactions to
1343 : // the read compaction queue.
1344 : //
1345 : // We need the nil check here because without it, we have some
1346 : // tests which don't set that variable fail. Since there's a
1347 : // chance that one of those tests wouldn't want extra compactions
1348 : // to be scheduled, I added this check here, instead of
1349 : // setting rescheduleReadCompaction in those tests.
1350 1 : if env.readCompactionEnv.rescheduleReadCompaction != nil {
1351 1 : *env.readCompactionEnv.rescheduleReadCompaction = true
1352 1 : }
1353 :
1354 : // At the lowest possible compaction-picking priority, look for files marked
1355 : // for compaction. Pebble will mark files for compaction if they have atomic
1356 : // compaction units that span multiple files. While current Pebble code does
1357 : // not construct such sstables, RocksDB and earlier versions of Pebble may
1358 : // have created them. These split user keys form sets of files that must be
1359 : // compacted together for correctness (referred to as "atomic compaction
1360 : // units" within the code). Rewrite them in-place.
1361 : //
1362 : // It's also possible that a file may have been marked for compaction by
1363 : // even earlier versions of Pebble code, since FileMetadata's
1364 : // MarkedForCompaction field is persisted in the manifest. That's okay. We
1365 : // previously would've ignored the designation, whereas now we'll re-compact
1366 : // the file in place.
1367 1 : if p.vers.Stats.MarkedForCompaction > 0 {
1368 0 : if pc := p.pickRewriteCompaction(env); pc != nil {
1369 0 : return pc
1370 0 : }
1371 : }
1372 :
1373 1 : return nil
1374 : }
1375 :
1376 : func (p *compactionPickerByScore) addScoresToPickedCompactionMetrics(
1377 : pc *pickedCompaction, candInfo [numLevels]candidateLevelInfo,
1378 1 : ) {
1379 1 :
1380 1 : // candInfo is sorted by score, not by compaction level.
1381 1 : infoByLevel := [numLevels]candidateLevelInfo{}
1382 1 : for i := range candInfo {
1383 1 : level := candInfo[i].level
1384 1 : infoByLevel[level] = candInfo[i]
1385 1 : }
1386 : // Gather the compaction scores for the levels participating in the compaction.
1387 1 : pc.pickerMetrics.scores = make([]float64, len(pc.inputs))
1388 1 : inputIdx := 0
1389 1 : for i := range infoByLevel {
1390 1 : if pc.inputs[inputIdx].level == infoByLevel[i].level {
1391 1 : pc.pickerMetrics.scores[inputIdx] = infoByLevel[i].score
1392 1 : inputIdx++
1393 1 : }
1394 1 : if inputIdx == len(pc.inputs) {
1395 1 : break
1396 : }
1397 : }
1398 : }
1399 :
1400 : // elisionOnlyAnnotator implements the manifest.Annotator interface,
1401 : // annotating B-Tree nodes with the *fileMetadata of a file meeting the
1402 : // obsolete keys criteria for an elision-only compaction within the subtree.
1403 : // If multiple files meet the criteria, it chooses whichever file has the
1404 : // lowest LargestSeqNum. The lowest LargestSeqNum file will be the first
1405 : // eligible for an elision-only compaction once snapshots less than or equal
1406 : // to its LargestSeqNum are closed.
1407 : type elisionOnlyAnnotator struct{}
1408 :
1409 : var _ manifest.Annotator = elisionOnlyAnnotator{}
1410 :
1411 1 : func (a elisionOnlyAnnotator) Zero(interface{}) interface{} {
1412 1 : return nil
1413 1 : }
1414 :
1415 1 : func (a elisionOnlyAnnotator) Accumulate(f *fileMetadata, dst interface{}) (interface{}, bool) {
1416 1 : if f.IsCompacting() {
1417 1 : return dst, true
1418 1 : }
1419 1 : if !f.StatsValid() {
1420 1 : return dst, false
1421 1 : }
1422 : // Bottommost files are large and not worthwhile to compact just
1423 : // to remove a few tombstones. Consider a file ineligible if its
1424 : // own range deletions delete less than 10% of its data and its
1425 : // deletion tombstones make up less than 10% of its entries.
1426 : //
1427 : // TODO(jackson): This does not account for duplicate user keys
1428 : // which may be collapsed. Ideally, we would have 'obsolete keys'
1429 : // statistics that would include tombstones, the keys that are
1430 : // dropped by tombstones and duplicated user keys. See #847.
1431 : //
1432 : // Note that tables that contain exclusively range keys (i.e. no point keys,
1433 : // `NumEntries` and `RangeDeletionsBytesEstimate` are both zero) are excluded
1434 : // from elision-only compactions.
1435 : // TODO(travers): Consider an alternative heuristic for elision of range-keys.
1436 1 : if f.Stats.RangeDeletionsBytesEstimate*10 < f.Size &&
1437 1 : f.Stats.NumDeletions*10 <= f.Stats.NumEntries {
1438 1 : return dst, true
1439 1 : }
1440 1 : if dst == nil {
1441 1 : return f, true
1442 1 : } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
1443 1 : return f, true
1444 1 : }
1445 1 : return dst, true
1446 : }
1447 :
1448 1 : func (a elisionOnlyAnnotator) Merge(v interface{}, accum interface{}) interface{} {
1449 1 : if v == nil {
1450 1 : return accum
1451 1 : }
1452 : // If we haven't accumulated an eligible file yet, or f's LargestSeqNum is
1453 : // less than the accumulated file's, use f.
1454 1 : if accum == nil {
1455 1 : return v
1456 1 : }
1457 1 : f := v.(*fileMetadata)
1458 1 : accumV := accum.(*fileMetadata)
1459 1 : if accumV == nil || accumV.LargestSeqNum > f.LargestSeqNum {
1460 1 : return f
1461 1 : }
1462 1 : return accumV
1463 : }
1464 :
1465 : // markedForCompactionAnnotator implements the manifest.Annotator interface,
1466 : // annotating B-Tree nodes with the *fileMetadata of a file that is marked for
1467 : // compaction within the subtree. If multiple files meet the criteria, it
1468 : // chooses whichever file has the lowest LargestSeqNum.
1469 : type markedForCompactionAnnotator struct{}
1470 :
1471 : var _ manifest.Annotator = markedForCompactionAnnotator{}
1472 :
1473 0 : func (a markedForCompactionAnnotator) Zero(interface{}) interface{} {
1474 0 : return nil
1475 0 : }
1476 :
1477 : func (a markedForCompactionAnnotator) Accumulate(
1478 : f *fileMetadata, dst interface{},
1479 0 : ) (interface{}, bool) {
1480 0 : if !f.MarkedForCompaction {
1481 0 : // Not marked for compaction; return dst.
1482 0 : return dst, true
1483 0 : }
1484 0 : return markedMergeHelper(f, dst)
1485 : }
1486 :
1487 0 : func (a markedForCompactionAnnotator) Merge(v interface{}, accum interface{}) interface{} {
1488 0 : if v == nil {
1489 0 : return accum
1490 0 : }
1491 0 : accum, _ = markedMergeHelper(v.(*fileMetadata), accum)
1492 0 : return accum
1493 : }
1494 :
1495 : // REQUIRES: f is non-nil, and f.MarkedForCompaction=true.
1496 0 : func markedMergeHelper(f *fileMetadata, dst interface{}) (interface{}, bool) {
1497 0 : if dst == nil {
1498 0 : return f, true
1499 0 : } else if dstV := dst.(*fileMetadata); dstV.LargestSeqNum > f.LargestSeqNum {
1500 0 : return f, true
1501 0 : }
1502 0 : return dst, true
1503 : }
1504 :
1505 : // pickElisionOnlyCompaction looks for compactions of sstables in the
1506 : // bottommost level containing obsolete records that may now be dropped.
1507 : func (p *compactionPickerByScore) pickElisionOnlyCompaction(
1508 : env compactionEnv,
1509 1 : ) (pc *pickedCompaction) {
1510 1 : if p.opts.private.disableElisionOnlyCompactions {
1511 1 : return nil
1512 1 : }
1513 1 : v := p.vers.Levels[numLevels-1].Annotation(elisionOnlyAnnotator{})
1514 1 : if v == nil {
1515 1 : return nil
1516 1 : }
1517 1 : candidate := v.(*fileMetadata)
1518 1 : if candidate.IsCompacting() || candidate.LargestSeqNum >= env.earliestSnapshotSeqNum {
1519 1 : return nil
1520 1 : }
1521 1 : lf := p.vers.Levels[numLevels-1].Find(p.opts.Comparer.Compare, candidate)
1522 1 : if lf == nil {
1523 0 : panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
1524 : }
1525 :
1526 : // Construct a picked compaction of the elision candidate's atomic
1527 : // compaction unit.
1528 1 : pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
1529 1 : pc.kind = compactionKindElisionOnly
1530 1 : var isCompacting bool
1531 1 : pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */)
1532 1 : if isCompacting {
1533 0 : return nil
1534 0 : }
1535 1 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1536 1 : // Fail-safe to protect against compacting the same sstable concurrently.
1537 1 : if !inputRangeAlreadyCompacting(env, pc) {
1538 1 : return pc
1539 1 : }
1540 1 : return nil
1541 : }
1542 :
1543 : // pickRewriteCompaction attempts to construct a compaction that
1544 : // rewrites a file marked for compaction. pickRewriteCompaction will
1545 : // pull in adjacent files in the file's atomic compaction unit if
1546 : // necessary. A rewrite compaction outputs files to the same level as
1547 : // the input level.
1548 0 : func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *pickedCompaction) {
1549 0 : for l := numLevels - 1; l >= 0; l-- {
1550 0 : v := p.vers.Levels[l].Annotation(markedForCompactionAnnotator{})
1551 0 : if v == nil {
1552 0 : // Try the next level.
1553 0 : continue
1554 : }
1555 0 : candidate := v.(*fileMetadata)
1556 0 : if candidate.IsCompacting() {
1557 0 : // Try the next level.
1558 0 : continue
1559 : }
1560 0 : lf := p.vers.Levels[l].Find(p.opts.Comparer.Compare, candidate)
1561 0 : if lf == nil {
1562 0 : panic(fmt.Sprintf("file %s not found in level %d as expected", candidate.FileNum, numLevels-1))
1563 : }
1564 :
1565 0 : inputs := lf.Slice()
1566 0 : // L0 files generated by a flush have never been split such that
1567 0 : // adjacent files can contain the same user key. So we do not need to
1568 0 : // rewrite an atomic compaction unit for L0. Note that there is nothing
1569 0 : // preventing two different flushes from producing files that are
1570 0 : // non-overlapping from an InternalKey perspective, but span the same
1571 0 : // user key. However, such files cannot be in the same L0 sublevel,
1572 0 : // since each sublevel requires non-overlapping user keys (unlike other
1573 0 : // levels).
1574 0 : if l > 0 {
1575 0 : // Find this file's atomic compaction unit. This is only relevant
1576 0 : // for levels L1+.
1577 0 : var isCompacting bool
1578 0 : inputs, isCompacting = expandToAtomicUnit(
1579 0 : p.opts.Comparer.Compare,
1580 0 : inputs,
1581 0 : false, /* disableIsCompacting */
1582 0 : )
1583 0 : if isCompacting {
1584 0 : // Try the next level.
1585 0 : continue
1586 : }
1587 : }
1588 :
1589 0 : pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
1590 0 : pc.outputLevel.level = l
1591 0 : pc.kind = compactionKindRewrite
1592 0 : pc.startLevel.files = inputs
1593 0 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1594 0 :
1595 0 : // Fail-safe to protect against compacting the same sstable concurrently.
1596 0 : if !inputRangeAlreadyCompacting(env, pc) {
1597 0 : if pc.startLevel.level == 0 {
1598 0 : pc.l0SublevelInfo = generateSublevelInfo(pc.cmp, pc.startLevel.files)
1599 0 : }
1600 0 : return pc
1601 : }
1602 : }
1603 0 : return nil
1604 : }
1605 :
1606 : // pickAutoLPositive picks an automatic compaction for the candidate
1607 : // file in a positive-numbered level. This function must not be used for
1608 : // L0.
1609 : func pickAutoLPositive(
1610 : env compactionEnv,
1611 : opts *Options,
1612 : vers *version,
1613 : cInfo candidateLevelInfo,
1614 : baseLevel int,
1615 : levelMaxBytes [7]int64,
1616 1 : ) (pc *pickedCompaction) {
1617 1 : if cInfo.level == 0 {
1618 0 : panic("pebble: pickAutoLPositive called for L0")
1619 : }
1620 :
1621 1 : pc = newPickedCompaction(opts, vers, cInfo.level, defaultOutputLevel(cInfo.level, baseLevel), baseLevel)
1622 1 : if pc.outputLevel.level != cInfo.outputLevel {
1623 0 : panic("pebble: compaction picked unexpected output level")
1624 : }
1625 1 : pc.startLevel.files = cInfo.file.Slice()
1626 1 : // Files in level 0 may overlap each other, so pick up all overlapping ones.
1627 1 : if pc.startLevel.level == 0 {
1628 0 : cmp := opts.Comparer.Compare
1629 0 : smallest, largest := manifest.KeyRange(cmp, pc.startLevel.files.Iter())
1630 0 : pc.startLevel.files = vers.Overlaps(0, cmp, smallest.UserKey,
1631 0 : largest.UserKey, largest.IsExclusiveSentinel())
1632 0 : if pc.startLevel.files.Empty() {
1633 0 : panic("pebble: empty compaction")
1634 : }
1635 : }
1636 :
1637 1 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1638 0 : return nil
1639 0 : }
1640 1 : return pc.maybeAddLevel(opts, env.diskAvailBytes)
1641 : }
1642 :
1643 : // maybeAddLevel maybe adds a level to the picked compaction.
1644 1 : func (pc *pickedCompaction) maybeAddLevel(opts *Options, diskAvailBytes uint64) *pickedCompaction {
1645 1 : pc.pickerMetrics.singleLevelOverlappingRatio = pc.overlappingRatio()
1646 1 : if pc.outputLevel.level == numLevels-1 {
1647 1 : // Don't add a level if the current output level is in L6
1648 1 : return pc
1649 1 : }
1650 1 : if !opts.Experimental.MultiLevelCompactionHeuristic.allowL0() && pc.startLevel.level == 0 {
1651 1 : return pc
1652 1 : }
1653 1 : if pc.compactionSize() > expandedCompactionByteSizeLimit(
1654 1 : opts, pc.adjustedOutputLevel, diskAvailBytes) {
1655 1 : // Don't add a level if the current compaction exceeds the compaction size limit
1656 1 : return pc
1657 1 : }
1658 1 : return opts.Experimental.MultiLevelCompactionHeuristic.pick(pc, opts, diskAvailBytes)
1659 : }
1660 :
1661 : // MultiLevelHeuristic evaluates whether to add files from the next level into the compaction.
1662 : type MultiLevelHeuristic interface {
1663 : // Evaluate returns the preferred compaction.
1664 : pick(pc *pickedCompaction, opts *Options, diskAvailBytes uint64) *pickedCompaction
1665 :
1666 : // Returns if the heuristic allows L0 to be involved in ML compaction
1667 : allowL0() bool
1668 : }
1669 :
1670 : // NoMultiLevel will never add an additional level to the compaction.
1671 : type NoMultiLevel struct{}
1672 :
1673 : var _ MultiLevelHeuristic = (*NoMultiLevel)(nil)
1674 :
1675 : func (nml NoMultiLevel) pick(
1676 : pc *pickedCompaction, opts *Options, diskAvailBytes uint64,
1677 0 : ) *pickedCompaction {
1678 0 : return pc
1679 0 : }
1680 :
1681 0 : func (nml NoMultiLevel) allowL0() bool {
1682 0 : return false
1683 0 : }
1684 :
1685 1 : func (pc *pickedCompaction) predictedWriteAmp() float64 {
1686 1 : var bytesToCompact uint64
1687 1 : var higherLevelBytes uint64
1688 1 : for i := range pc.inputs {
1689 1 : levelSize := pc.inputs[i].files.SizeSum()
1690 1 : bytesToCompact += levelSize
1691 1 : if i != len(pc.inputs)-1 {
1692 1 : higherLevelBytes += levelSize
1693 1 : }
1694 : }
1695 1 : return float64(bytesToCompact) / float64(higherLevelBytes)
1696 : }
1697 :
1698 1 : func (pc *pickedCompaction) overlappingRatio() float64 {
1699 1 : var higherLevelBytes uint64
1700 1 : var lowestLevelBytes uint64
1701 1 : for i := range pc.inputs {
1702 1 : levelSize := pc.inputs[i].files.SizeSum()
1703 1 : if i == len(pc.inputs)-1 {
1704 1 : lowestLevelBytes += levelSize
1705 1 : continue
1706 : }
1707 1 : higherLevelBytes += levelSize
1708 : }
1709 1 : return float64(lowestLevelBytes) / float64(higherLevelBytes)
1710 : }
1711 :
1712 : // WriteAmpHeuristic defines a multi level compaction heuristic which will add
1713 : // an additional level to the picked compaction if it reduces predicted write
1714 : // amp of the compaction + the addPropensity constant.
1715 : type WriteAmpHeuristic struct {
1716 : // addPropensity is a constant that affects the propensity to conduct multilevel
1717 : // compactions. If positive, a multilevel compaction may get picked even if
1718 : // the single level compaction has lower write amp, and vice versa.
1719 : AddPropensity float64
1720 :
1721 : // AllowL0 if true, allow l0 to be involved in a ML compaction.
1722 : AllowL0 bool
1723 : }
1724 :
1725 : var _ MultiLevelHeuristic = (*WriteAmpHeuristic)(nil)
1726 :
1727 : // TODO(msbutler): microbenchmark the extent to which multilevel compaction
1728 : // picking slows down the compaction picking process. This should be as fast as
1729 : // possible since Compaction-picking holds d.mu, which prevents WAL rotations,
1730 : // in-progress flushes and compactions from completing, etc. Consider ways to
1731 : // deduplicate work, given that setupInputs has already been called.
1732 : func (wa WriteAmpHeuristic) pick(
1733 : pcOrig *pickedCompaction, opts *Options, diskAvailBytes uint64,
1734 1 : ) *pickedCompaction {
1735 1 : pcMulti := pcOrig.clone()
1736 1 : if !pcMulti.setupMultiLevelCandidate(opts, diskAvailBytes) {
1737 1 : return pcOrig
1738 1 : }
1739 1 : picked := pcOrig
1740 1 : if pcMulti.predictedWriteAmp() <= pcOrig.predictedWriteAmp()+wa.AddPropensity {
1741 1 : picked = pcMulti
1742 1 : }
1743 : // Regardless of what compaction was picked, log the multilevelOverlapping ratio.
1744 1 : picked.pickerMetrics.multiLevelOverlappingRatio = pcMulti.overlappingRatio()
1745 1 : return picked
1746 : }
1747 :
1748 1 : func (wa WriteAmpHeuristic) allowL0() bool {
1749 1 : return wa.AllowL0
1750 1 : }
1751 :
1752 : // Helper method to pick compactions originating from L0. Uses information about
1753 : // sublevels to generate a compaction.
1754 1 : func pickL0(env compactionEnv, opts *Options, vers *version, baseLevel int) (pc *pickedCompaction) {
1755 1 : // It is important to pass information about Lbase files to L0Sublevels
1756 1 : // so it can pick a compaction that does not conflict with an Lbase => Lbase+1
1757 1 : // compaction. Without this, we observed reduced concurrency of L0=>Lbase
1758 1 : // compactions, and increasing read amplification in L0.
1759 1 : //
1760 1 : // TODO(bilal) Remove the minCompactionDepth parameter once fixing it at 1
1761 1 : // has been shown to not cause a performance regression.
1762 1 : lcf, err := vers.L0Sublevels.PickBaseCompaction(1, vers.Levels[baseLevel].Slice())
1763 1 : if err != nil {
1764 0 : opts.Logger.Infof("error when picking base compaction: %s", err)
1765 0 : return
1766 0 : }
1767 1 : if lcf != nil {
1768 1 : pc = newPickedCompactionFromL0(lcf, opts, vers, baseLevel, true)
1769 1 : pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel)
1770 1 : if pc.startLevel.files.Empty() {
1771 0 : opts.Logger.Fatalf("empty compaction chosen")
1772 0 : }
1773 1 : return pc.maybeAddLevel(opts, env.diskAvailBytes)
1774 : }
1775 :
1776 : // Couldn't choose a base compaction. Try choosing an intra-L0
1777 : // compaction. Note that we pass in L0CompactionThreshold here as opposed to
1778 : // 1, since choosing a single sublevel intra-L0 compaction is
1779 : // counterproductive.
1780 1 : lcf, err = vers.L0Sublevels.PickIntraL0Compaction(env.earliestUnflushedSeqNum, minIntraL0Count)
1781 1 : if err != nil {
1782 0 : opts.Logger.Infof("error when picking intra-L0 compaction: %s", err)
1783 0 : return
1784 0 : }
1785 1 : if lcf != nil {
1786 1 : pc = newPickedCompactionFromL0(lcf, opts, vers, 0, false)
1787 1 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1788 0 : return nil
1789 0 : }
1790 1 : if pc.startLevel.files.Empty() {
1791 0 : opts.Logger.Fatalf("empty compaction chosen")
1792 0 : }
1793 1 : {
1794 1 : iter := pc.startLevel.files.Iter()
1795 1 : if iter.First() == nil || iter.Next() == nil {
1796 0 : // A single-file intra-L0 compaction is unproductive.
1797 0 : return nil
1798 0 : }
1799 : }
1800 :
1801 1 : pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
1802 : }
1803 1 : return pc
1804 : }
1805 :
1806 : func pickManualCompaction(
1807 : vers *version, opts *Options, env compactionEnv, baseLevel int, manual *manualCompaction,
1808 1 : ) (pc *pickedCompaction, retryLater bool) {
1809 1 : outputLevel := manual.level + 1
1810 1 : if manual.level == 0 {
1811 1 : outputLevel = baseLevel
1812 1 : } else if manual.level < baseLevel {
1813 1 : // The start level for a compaction must be >= Lbase. A manual
1814 1 : // compaction could have been created adhering to that condition, and
1815 1 : // then an automatic compaction came in and compacted all of the
1816 1 : // sstables in Lbase to Lbase+1 which caused Lbase to change. Simply
1817 1 : // ignore this manual compaction as there is nothing to do (manual.level
1818 1 : // points to an empty level).
1819 1 : return nil, false
1820 1 : }
1821 : // This conflictsWithInProgress call is necessary for the manual compaction to
1822 : // be retried when it conflicts with an ongoing automatic compaction. Without
1823 : // it, the compaction is dropped due to pc.setupInputs returning false since
1824 : // the input/output range is already being compacted, and the manual
1825 : // compaction ends with a non-compacted LSM.
1826 1 : if conflictsWithInProgress(manual, outputLevel, env.inProgressCompactions, opts.Comparer.Compare) {
1827 1 : return nil, true
1828 1 : }
1829 1 : pc = newPickedCompaction(opts, vers, manual.level, defaultOutputLevel(manual.level, baseLevel), baseLevel)
1830 1 : manual.outputLevel = pc.outputLevel.level
1831 1 : pc.startLevel.files = vers.Overlaps(manual.level, opts.Comparer.Compare, manual.start, manual.end, false)
1832 1 : if pc.startLevel.files.Empty() {
1833 1 : // Nothing to do
1834 1 : return nil, false
1835 1 : }
1836 1 : if !pc.setupInputs(opts, env.diskAvailBytes, pc.startLevel) {
1837 1 : // setupInputs returned false indicating there's a conflicting
1838 1 : // concurrent compaction.
1839 1 : return nil, true
1840 1 : }
1841 1 : if pc = pc.maybeAddLevel(opts, env.diskAvailBytes); pc == nil {
1842 0 : return nil, false
1843 0 : }
1844 1 : if pc.outputLevel.level != outputLevel {
1845 1 : if len(pc.extraLevels) > 0 {
1846 1 : // multilevel compactions relax this invariant
1847 1 : } else {
1848 0 : panic("pebble: compaction picked unexpected output level")
1849 : }
1850 : }
1851 : // Fail-safe to protect against compacting the same sstable concurrently.
1852 1 : if inputRangeAlreadyCompacting(env, pc) {
1853 1 : return nil, true
1854 1 : }
1855 1 : return pc, false
1856 : }
1857 :
1858 : func (p *compactionPickerByScore) pickReadTriggeredCompaction(
1859 : env compactionEnv,
1860 1 : ) (pc *pickedCompaction) {
1861 1 : // If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
1862 1 : // soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
1863 1 : // lower priority.
1864 1 : if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
1865 1 : return nil
1866 1 : }
1867 1 : for env.readCompactionEnv.readCompactions.size > 0 {
1868 0 : rc := env.readCompactionEnv.readCompactions.remove()
1869 0 : if pc = pickReadTriggeredCompactionHelper(p, rc, env); pc != nil {
1870 0 : break
1871 : }
1872 : }
1873 1 : return pc
1874 : }
1875 :
1876 : func pickReadTriggeredCompactionHelper(
1877 : p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
1878 0 : ) (pc *pickedCompaction) {
1879 0 : cmp := p.opts.Comparer.Compare
1880 0 : overlapSlice := p.vers.Overlaps(rc.level, cmp, rc.start, rc.end, false /* exclusiveEnd */)
1881 0 : if overlapSlice.Empty() {
1882 0 : // If there is no overlap, then the file with the key range
1883 0 : // must have been compacted away. So, we don't proceed to
1884 0 : // compact the same key range again.
1885 0 : return nil
1886 0 : }
1887 :
1888 0 : iter := overlapSlice.Iter()
1889 0 : var fileMatches bool
1890 0 : for f := iter.First(); f != nil; f = iter.Next() {
1891 0 : if f.FileNum == rc.fileNum {
1892 0 : fileMatches = true
1893 0 : break
1894 : }
1895 : }
1896 0 : if !fileMatches {
1897 0 : return nil
1898 0 : }
1899 :
1900 0 : pc = newPickedCompaction(p.opts, p.vers, rc.level, defaultOutputLevel(rc.level, p.baseLevel), p.baseLevel)
1901 0 :
1902 0 : pc.startLevel.files = overlapSlice
1903 0 : if !pc.setupInputs(p.opts, env.diskAvailBytes, pc.startLevel) {
1904 0 : return nil
1905 0 : }
1906 0 : if inputRangeAlreadyCompacting(env, pc) {
1907 0 : return nil
1908 0 : }
1909 0 : pc.kind = compactionKindRead
1910 0 :
1911 0 : // Prevent read compactions which are too wide.
1912 0 : outputOverlaps := pc.version.Overlaps(
1913 0 : pc.outputLevel.level, pc.cmp, pc.smallest.UserKey,
1914 0 : pc.largest.UserKey, pc.largest.IsExclusiveSentinel())
1915 0 : if outputOverlaps.SizeSum() > pc.maxReadCompactionBytes {
1916 0 : return nil
1917 0 : }
1918 :
1919 : // Prevent compactions which start with a small seed file X, but overlap
1920 : // with over allowedCompactionWidth * X file sizes in the output layer.
1921 0 : const allowedCompactionWidth = 35
1922 0 : if outputOverlaps.SizeSum() > overlapSlice.SizeSum()*allowedCompactionWidth {
1923 0 : return nil
1924 0 : }
1925 :
1926 0 : return pc
1927 : }
1928 :
1929 0 : func (p *compactionPickerByScore) forceBaseLevel1() {
1930 0 : p.baseLevel = 1
1931 0 : }
1932 :
1933 1 : func inputRangeAlreadyCompacting(env compactionEnv, pc *pickedCompaction) bool {
1934 1 : for _, cl := range pc.inputs {
1935 1 : iter := cl.files.Iter()
1936 1 : for f := iter.First(); f != nil; f = iter.Next() {
1937 1 : if f.IsCompacting() {
1938 1 : return true
1939 1 : }
1940 : }
1941 : }
1942 :
1943 : // Look for active compactions outputting to the same region of the key
1944 : // space in the same output level. Two potential compactions may conflict
1945 : // without sharing input files if there are no files in the output level
1946 : // that overlap with the intersection of the compactions' key spaces.
1947 : //
1948 : // Consider an active L0->Lbase compaction compacting two L0 files one
1949 : // [a-f] and the other [t-z] into Lbase.
1950 : //
1951 : // L0
1952 : // ↦ 000100 ↤ ↦ 000101 ↤
1953 : // L1
1954 : // ↦ 000004 ↤
1955 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
1956 : //
1957 : // If a new file 000102 [j-p] is flushed while the existing compaction is
1958 : // still ongoing, new file would not be in any compacting sublevel
1959 : // intervals and would not overlap with any Lbase files that are also
1960 : // compacting. However, this compaction cannot be picked because the
1961 : // compaction's output key space [j-p] would overlap the existing
1962 : // compaction's output key space [a-z].
1963 : //
1964 : // L0
1965 : // ↦ 000100* ↤ ↦ 000102 ↤ ↦ 000101* ↤
1966 : // L1
1967 : // ↦ 000004* ↤
1968 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
1969 : //
1970 : // * - currently compacting
1971 1 : if pc.outputLevel != nil && pc.outputLevel.level != 0 {
1972 1 : for _, c := range env.inProgressCompactions {
1973 1 : if pc.outputLevel.level != c.outputLevel {
1974 1 : continue
1975 : }
1976 1 : if base.InternalCompare(pc.cmp, c.largest, pc.smallest) < 0 ||
1977 1 : base.InternalCompare(pc.cmp, c.smallest, pc.largest) > 0 {
1978 1 : continue
1979 : }
1980 :
1981 : // The picked compaction and the in-progress compaction c are
1982 : // outputting to the same region of the key space of the same
1983 : // level.
1984 1 : return true
1985 : }
1986 : }
1987 1 : return false
1988 : }
1989 :
1990 : // conflictsWithInProgress checks if there are any in-progress compactions with overlapping keyspace.
1991 : func conflictsWithInProgress(
1992 : manual *manualCompaction, outputLevel int, inProgressCompactions []compactionInfo, cmp Compare,
1993 1 : ) bool {
1994 1 : for _, c := range inProgressCompactions {
1995 1 : if (c.outputLevel == manual.level || c.outputLevel == outputLevel) &&
1996 1 : isUserKeysOverlapping(manual.start, manual.end, c.smallest.UserKey, c.largest.UserKey, cmp) {
1997 1 : return true
1998 1 : }
1999 1 : for _, in := range c.inputs {
2000 1 : if in.files.Empty() {
2001 1 : continue
2002 : }
2003 1 : iter := in.files.Iter()
2004 1 : smallest := iter.First().Smallest.UserKey
2005 1 : largest := iter.Last().Largest.UserKey
2006 1 : if (in.level == manual.level || in.level == outputLevel) &&
2007 1 : isUserKeysOverlapping(manual.start, manual.end, smallest, largest, cmp) {
2008 1 : return true
2009 1 : }
2010 : }
2011 : }
2012 1 : return false
2013 : }
2014 :
2015 1 : func isUserKeysOverlapping(x1, x2, y1, y2 []byte, cmp Compare) bool {
2016 1 : return cmp(x1, y2) <= 0 && cmp(y1, x2) <= 0
2017 1 : }
|