Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "math"
12 : "runtime/pprof"
13 : "slices"
14 : "sync/atomic"
15 : "time"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/compact"
20 : "github.com/cockroachdb/pebble/internal/keyspan"
21 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
22 : "github.com/cockroachdb/pebble/internal/manifest"
23 : "github.com/cockroachdb/pebble/internal/sstableinternal"
24 : "github.com/cockroachdb/pebble/objstorage"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
26 : "github.com/cockroachdb/pebble/objstorage/remote"
27 : "github.com/cockroachdb/pebble/sstable"
28 : "github.com/cockroachdb/pebble/vfs"
29 : )
30 :
31 : var errEmptyTable = errors.New("pebble: empty table")
32 :
33 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
34 : // concurrent excise or ingest-split operation.
35 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
36 :
37 : var compactLabels = pprof.Labels("pebble", "compact")
38 : var flushLabels = pprof.Labels("pebble", "flush")
39 : var gcLabels = pprof.Labels("pebble", "gc")
40 :
41 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
42 : // compacted files. We avoid expanding the lower level file set of a compaction
43 : // if it would make the total compaction cover more than this many bytes.
44 2 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
45 2 : v := uint64(25 * opts.Level(level).TargetFileSize)
46 2 :
47 2 : // Never expand a compaction beyond half the available capacity, divided
48 2 : // by the maximum number of concurrent compactions. Each of the concurrent
49 2 : // compactions may expand up to this limit, so this attempts to limit
50 2 : // compactions to half of available disk space. Note that this will not
51 2 : // prevent compaction picking from pursuing compactions that are larger
52 2 : // than this threshold before expansion.
53 2 : diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
54 2 : if v > diskMax {
55 1 : v = diskMax
56 1 : }
57 2 : return v
58 : }
59 :
60 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
61 : // before we stop building a single file in a level-1 to level compaction.
62 2 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
63 2 : return uint64(10 * opts.Level(level).TargetFileSize)
64 2 : }
65 :
66 : // maxReadCompactionBytes is used to prevent read compactions which
67 : // are too wide.
68 2 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
69 2 : return uint64(10 * opts.Level(level).TargetFileSize)
70 2 : }
71 :
72 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
73 : // calls to Close. It is used during compaction to ensure that rangeDelIters
74 : // are not closed prematurely.
75 : type noCloseIter struct {
76 : keyspan.FragmentIterator
77 : }
78 :
79 2 : func (i *noCloseIter) Close() {}
80 :
81 : type compactionLevel struct {
82 : level int
83 : files manifest.LevelSlice
84 : // l0SublevelInfo contains information about L0 sublevels being compacted.
85 : // It's only set for the start level of a compaction starting out of L0 and
86 : // is nil for all other compactions.
87 : l0SublevelInfo []sublevelInfo
88 : }
89 :
90 2 : func (cl compactionLevel) Clone() compactionLevel {
91 2 : newCL := compactionLevel{
92 2 : level: cl.level,
93 2 : files: cl.files,
94 2 : }
95 2 : return newCL
96 2 : }
97 1 : func (cl compactionLevel) String() string {
98 1 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
99 1 : }
100 :
101 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
102 : // updates a metric in `versions` on bytes written by in-progress compactions so
103 : // far. It also increments a per-compaction `written` int.
104 : type compactionWritable struct {
105 : objstorage.Writable
106 :
107 : versions *versionSet
108 : written *int64
109 : }
110 :
111 : // Write is part of the objstorage.Writable interface.
112 2 : func (c *compactionWritable) Write(p []byte) error {
113 2 : if err := c.Writable.Write(p); err != nil {
114 0 : return err
115 0 : }
116 :
117 2 : *c.written += int64(len(p))
118 2 : c.versions.incrementCompactionBytes(int64(len(p)))
119 2 : return nil
120 : }
121 :
122 : type compactionKind int
123 :
124 : const (
125 : compactionKindDefault compactionKind = iota
126 : compactionKindFlush
127 : // compactionKindMove denotes a move compaction where the input file is
128 : // retained and linked in a new level without being obsoleted.
129 : compactionKindMove
130 : // compactionKindCopy denotes a copy compaction where the input file is
131 : // copied byte-by-byte into a new file with a new FileNum in the output level.
132 : compactionKindCopy
133 : // compactionKindDeleteOnly denotes a compaction that only deletes input
134 : // files. It can occur when wide range tombstones completely contain sstables.
135 : compactionKindDeleteOnly
136 : compactionKindElisionOnly
137 : compactionKindRead
138 : compactionKindTombstoneDensity
139 : compactionKindRewrite
140 : compactionKindIngestedFlushable
141 : )
142 :
143 2 : func (k compactionKind) String() string {
144 2 : switch k {
145 2 : case compactionKindDefault:
146 2 : return "default"
147 0 : case compactionKindFlush:
148 0 : return "flush"
149 2 : case compactionKindMove:
150 2 : return "move"
151 2 : case compactionKindDeleteOnly:
152 2 : return "delete-only"
153 2 : case compactionKindElisionOnly:
154 2 : return "elision-only"
155 1 : case compactionKindRead:
156 1 : return "read"
157 1 : case compactionKindTombstoneDensity:
158 1 : return "tombstone-density"
159 2 : case compactionKindRewrite:
160 2 : return "rewrite"
161 0 : case compactionKindIngestedFlushable:
162 0 : return "ingested-flushable"
163 2 : case compactionKindCopy:
164 2 : return "copy"
165 : }
166 0 : return "?"
167 : }
168 :
169 : // compaction is a table compaction from one level to the next, starting from a
170 : // given version.
171 : type compaction struct {
172 : // cancel is a bool that can be used by other goroutines to signal a compaction
173 : // to cancel, such as if a conflicting excise operation raced it to manifest
174 : // application. Only holders of the manifest lock will write to this atomic.
175 : cancel atomic.Bool
176 :
177 : kind compactionKind
178 : // isDownload is true if this compaction was started as part of a Download
179 : // operation. In this case kind is compactionKindCopy or
180 : // compactionKindRewrite.
181 : isDownload bool
182 :
183 : cmp Compare
184 : equal Equal
185 : comparer *base.Comparer
186 : formatKey base.FormatKey
187 : logger Logger
188 : version *version
189 : stats base.InternalIteratorStats
190 : beganAt time.Time
191 : // versionEditApplied is set to true when a compaction has completed and the
192 : // resulting version has been installed (if successful), but the compaction
193 : // goroutine is still cleaning up (eg, deleting obsolete files).
194 : versionEditApplied bool
195 : bufferPool sstable.BufferPool
196 :
197 : // startLevel is the level that is being compacted. Inputs from startLevel
198 : // and outputLevel will be merged to produce a set of outputLevel files.
199 : startLevel *compactionLevel
200 :
201 : // outputLevel is the level that files are being produced in. outputLevel is
202 : // equal to startLevel+1 except when:
203 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
204 : // - in multilevel compaction, the output level is the lowest level involved in
205 : // the compaction
206 : // A compaction's outputLevel is nil for delete-only compactions.
207 : outputLevel *compactionLevel
208 :
209 : // extraLevels point to additional levels in between the input and output
210 : // levels that get compacted in multilevel compactions
211 : extraLevels []*compactionLevel
212 :
213 : inputs []compactionLevel
214 :
215 : // maxOutputFileSize is the maximum size of an individual table created
216 : // during compaction.
217 : maxOutputFileSize uint64
218 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
219 : // single output table with the tables in the grandparent level.
220 : maxOverlapBytes uint64
221 :
222 : // flushing contains the flushables (aka memtables) that are being flushed.
223 : flushing flushableList
224 : // bytesWritten contains the number of bytes that have been written to outputs.
225 : bytesWritten int64
226 :
227 : // The boundaries of the input data.
228 : smallest InternalKey
229 : largest InternalKey
230 :
231 : // A list of fragment iterators to close when the compaction finishes. Used by
232 : // input iteration to keep rangeDelIters open for the lifetime of the
233 : // compaction, and only close them when the compaction finishes.
234 : closers []*noCloseIter
235 :
236 : // grandparents are the tables in level+2 that overlap with the files being
237 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
238 : // in the grandparent when this compaction finishes will be the same.
239 : grandparents manifest.LevelSlice
240 :
241 : // Boundaries at which flushes to L0 should be split. Determined by
242 : // L0Sublevels. If nil, flushes aren't split.
243 : l0Limits [][]byte
244 :
245 : delElision compact.TombstoneElision
246 : rangeKeyElision compact.TombstoneElision
247 :
248 : // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
249 : // snapshots requiring them to be kept. This determination is made by
250 : // looking for an sstable which overlaps the bounds of the compaction at a
251 : // lower level in the LSM during runCompaction.
252 : allowedZeroSeqNum bool
253 :
254 : metrics map[int]*LevelMetrics
255 :
256 : pickerMetrics compactionPickerMetrics
257 : }
258 :
259 : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
260 : // input sstables.
261 2 : func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
262 2 : var seqNum base.SeqNum
263 2 : for _, cl := range c.inputs {
264 2 : cl.files.Each(func(m *manifest.FileMetadata) {
265 2 : seqNum = max(seqNum, m.LargestSeqNumAbsolute)
266 2 : })
267 : }
268 2 : return seqNum
269 : }
270 :
271 2 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
272 2 : info := CompactionInfo{
273 2 : JobID: int(jobID),
274 2 : Reason: c.kind.String(),
275 2 : Input: make([]LevelInfo, 0, len(c.inputs)),
276 2 : Annotations: []string{},
277 2 : }
278 2 : if c.isDownload {
279 2 : info.Reason = "download," + info.Reason
280 2 : }
281 2 : for _, cl := range c.inputs {
282 2 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
283 2 : iter := cl.files.Iter()
284 2 : for m := iter.First(); m != nil; m = iter.Next() {
285 2 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
286 2 : }
287 2 : info.Input = append(info.Input, inputInfo)
288 : }
289 2 : if c.outputLevel != nil {
290 2 : info.Output.Level = c.outputLevel.level
291 2 :
292 2 : // If there are no inputs from the output level (eg, a move
293 2 : // compaction), add an empty LevelInfo to info.Input.
294 2 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
295 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
296 0 : }
297 2 : } else {
298 2 : // For a delete-only compaction, set the output level to L6. The
299 2 : // output level is not meaningful here, but complicating the
300 2 : // info.Output interface with a pointer doesn't seem worth the
301 2 : // semantic distinction.
302 2 : info.Output.Level = numLevels - 1
303 2 : }
304 :
305 2 : for i, score := range c.pickerMetrics.scores {
306 2 : info.Input[i].Score = score
307 2 : }
308 2 : info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
309 2 : info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
310 2 : if len(info.Input) > 2 {
311 2 : info.Annotations = append(info.Annotations, "multilevel")
312 2 : }
313 2 : return info
314 : }
315 :
316 2 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
317 2 : return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
318 2 : }
319 :
320 : func newCompaction(
321 : pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
322 2 : ) *compaction {
323 2 : c := &compaction{
324 2 : kind: compactionKindDefault,
325 2 : cmp: pc.cmp,
326 2 : equal: opts.Comparer.Equal,
327 2 : comparer: opts.Comparer,
328 2 : formatKey: opts.Comparer.FormatKey,
329 2 : inputs: pc.inputs,
330 2 : smallest: pc.smallest,
331 2 : largest: pc.largest,
332 2 : logger: opts.Logger,
333 2 : version: pc.version,
334 2 : beganAt: beganAt,
335 2 : maxOutputFileSize: pc.maxOutputFileSize,
336 2 : maxOverlapBytes: pc.maxOverlapBytes,
337 2 : pickerMetrics: pc.pickerMetrics,
338 2 : }
339 2 : c.startLevel = &c.inputs[0]
340 2 : if pc.startLevel.l0SublevelInfo != nil {
341 2 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
342 2 : }
343 2 : c.outputLevel = &c.inputs[1]
344 2 :
345 2 : if len(pc.extraLevels) > 0 {
346 2 : c.extraLevels = pc.extraLevels
347 2 : c.outputLevel = &c.inputs[len(c.inputs)-1]
348 2 : }
349 : // Compute the set of outputLevel+1 files that overlap this compaction (these
350 : // are the grandparent sstables).
351 2 : if c.outputLevel.level+1 < numLevels {
352 2 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
353 2 : }
354 2 : c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
355 2 : c.cmp, c.version, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
356 2 : )
357 2 : c.kind = pc.kind
358 2 :
359 2 : if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
360 2 : c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
361 2 : // This compaction can be converted into a move or copy from one level
362 2 : // to the next. We avoid such a move if there is lots of overlapping
363 2 : // grandparent data. Otherwise, the move could create a parent file
364 2 : // that will require a very expensive merge later on.
365 2 : iter := c.startLevel.files.Iter()
366 2 : meta := iter.First()
367 2 : isRemote := false
368 2 : // We should always be passed a provider, except in some unit tests.
369 2 : if provider != nil {
370 2 : isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
371 2 : }
372 : // Avoid a trivial move or copy if all of these are true, as rewriting a
373 : // new file is better:
374 : //
375 : // 1) The source file is a virtual sstable
376 : // 2) The existing file `meta` is on non-remote storage
377 : // 3) The output level prefers shared storage
378 2 : mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
379 2 : if mustCopy {
380 2 : // If the source is virtual, it's best to just rewrite the file as all
381 2 : // conditions in the above comment are met.
382 2 : if !meta.Virtual {
383 2 : c.kind = compactionKindCopy
384 2 : }
385 2 : } else {
386 2 : c.kind = compactionKindMove
387 2 : }
388 : }
389 2 : return c
390 : }
391 :
392 : func newDeleteOnlyCompaction(
393 : opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
394 2 : ) *compaction {
395 2 : c := &compaction{
396 2 : kind: compactionKindDeleteOnly,
397 2 : cmp: opts.Comparer.Compare,
398 2 : equal: opts.Comparer.Equal,
399 2 : comparer: opts.Comparer,
400 2 : formatKey: opts.Comparer.FormatKey,
401 2 : logger: opts.Logger,
402 2 : version: cur,
403 2 : beganAt: beganAt,
404 2 : inputs: inputs,
405 2 : }
406 2 :
407 2 : // Set c.smallest, c.largest.
408 2 : files := make([]manifest.LevelIterator, 0, len(inputs))
409 2 : for _, in := range inputs {
410 2 : files = append(files, in.files.Iter())
411 2 : }
412 2 : c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
413 2 : return c
414 : }
415 :
416 2 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
417 2 : // Heuristic to place a lower bound on compaction output file size
418 2 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
419 2 : // production with 310K files of which 290K files were < 10KB in size.
420 2 : // Our hypothesis is that it was caused by L1 having 2600 files and
421 2 : // ~10GB, such that each flush got split into many tiny files due to
422 2 : // overlapping with most of the files in Lbase.
423 2 : //
424 2 : // The computation below is general in that it accounts
425 2 : // for flushing different volumes of data (e.g. we may be flushing
426 2 : // many memtables). For illustration, we consider the typical
427 2 : // example of flushing a 64MB memtable. So 12.8MB output,
428 2 : // based on the compression guess below. If the compressed bytes
429 2 : // guess is an over-estimate we will end up with smaller files,
430 2 : // and if an under-estimate we will end up with larger files.
431 2 : // With a 2MB target file size, 7 files. We are willing to accept
432 2 : // 4x the number of files, if it results in better write amplification
433 2 : // when later compacting to Lbase, i.e., ~450KB files (target file
434 2 : // size / 4).
435 2 : //
436 2 : // Note that this is a pessimistic heuristic in that
437 2 : // fileCountUpperBoundDueToGrandparents could be far from the actual
438 2 : // number of files produced due to the grandparent limits. For
439 2 : // example, in the extreme, consider a flush that overlaps with 1000
440 2 : // files in Lbase f0...f999, and the initially calculated value of
441 2 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
442 2 : // means an upper bound file count of 100 files. Say the input bytes
443 2 : // in the flush are such that acceptableFileCount=10. We will fatten
444 2 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
445 2 : // drops to 10. However, it is possible that in practice, even without
446 2 : // this change, we would have produced no more than 10 files, and that
447 2 : // this change makes the files unnecessarily wide. Say the input bytes
448 2 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
449 2 : // 10% in f80...f89 and 10% in f990...f999. The original value of
450 2 : // maxOverlapBytes would have actually produced only 10 sstables. But
451 2 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
452 2 : // spans f0...f89, i.e., a much wider sstable than necessary.
453 2 : //
454 2 : // We could produce a tighter estimate of
455 2 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
456 2 : // distribution of the flush. The 4x multiplier mentioned earlier is
457 2 : // a way to try to compensate for this pessimism.
458 2 : //
459 2 : // TODO(sumeer): we don't have compression info for the data being
460 2 : // flushed, but it is likely that existing files that overlap with
461 2 : // this flush in Lbase are representative wrt compression ratio. We
462 2 : // could store the uncompressed size in FileMetadata and estimate
463 2 : // the compression ratio.
464 2 : const approxCompressionRatio = 0.2
465 2 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
466 2 : approxNumFilesBasedOnTargetSize :=
467 2 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
468 2 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
469 2 : // The byte calculation is linear in numGrandparentFiles, but we will
470 2 : // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
471 2 : // also willing to pay it now. We could approximate this cheaply by using the
472 2 : // mean file size of Lbase.
473 2 : grandparentFileBytes := c.grandparents.SizeSum()
474 2 : fileCountUpperBoundDueToGrandparents :=
475 2 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
476 2 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
477 2 : c.maxOverlapBytes = uint64(
478 2 : float64(c.maxOverlapBytes) *
479 2 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
480 2 : }
481 : }
482 :
483 : func newFlush(
484 : opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
485 2 : ) (*compaction, error) {
486 2 : c := &compaction{
487 2 : kind: compactionKindFlush,
488 2 : cmp: opts.Comparer.Compare,
489 2 : equal: opts.Comparer.Equal,
490 2 : comparer: opts.Comparer,
491 2 : formatKey: opts.Comparer.FormatKey,
492 2 : logger: opts.Logger,
493 2 : version: cur,
494 2 : beganAt: beganAt,
495 2 : inputs: []compactionLevel{{level: -1}, {level: 0}},
496 2 : maxOutputFileSize: math.MaxUint64,
497 2 : maxOverlapBytes: math.MaxUint64,
498 2 : flushing: flushing,
499 2 : }
500 2 : c.startLevel = &c.inputs[0]
501 2 : c.outputLevel = &c.inputs[1]
502 2 :
503 2 : if len(flushing) > 0 {
504 2 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
505 2 : if len(flushing) != 1 {
506 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
507 : }
508 2 : c.kind = compactionKindIngestedFlushable
509 2 : return c, nil
510 : }
511 : }
512 :
513 : // Make sure there's no ingestedFlushable after the first flushable in the
514 : // list.
515 2 : for _, f := range flushing {
516 2 : if _, ok := f.flushable.(*ingestedFlushable); ok {
517 0 : panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
518 : }
519 : }
520 :
521 2 : if cur.L0Sublevels != nil {
522 2 : c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
523 2 : }
524 :
525 2 : smallestSet, largestSet := false, false
526 2 : updatePointBounds := func(iter internalIterator) {
527 2 : if kv := iter.First(); kv != nil {
528 2 : if !smallestSet ||
529 2 : base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
530 2 : smallestSet = true
531 2 : c.smallest = kv.K.Clone()
532 2 : }
533 : }
534 2 : if kv := iter.Last(); kv != nil {
535 2 : if !largestSet ||
536 2 : base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
537 2 : largestSet = true
538 2 : c.largest = kv.K.Clone()
539 2 : }
540 : }
541 : }
542 :
543 2 : updateRangeBounds := func(iter keyspan.FragmentIterator) error {
544 2 : // File bounds require s != nil && !s.Empty(). We only need to check for
545 2 : // s != nil here, as the memtable's FragmentIterator would never surface
546 2 : // empty spans.
547 2 : if s, err := iter.First(); err != nil {
548 0 : return err
549 2 : } else if s != nil {
550 2 : if key := s.SmallestKey(); !smallestSet ||
551 2 : base.InternalCompare(c.cmp, c.smallest, key) > 0 {
552 2 : smallestSet = true
553 2 : c.smallest = key.Clone()
554 2 : }
555 : }
556 2 : if s, err := iter.Last(); err != nil {
557 0 : return err
558 2 : } else if s != nil {
559 2 : if key := s.LargestKey(); !largestSet ||
560 2 : base.InternalCompare(c.cmp, c.largest, key) < 0 {
561 2 : largestSet = true
562 2 : c.largest = key.Clone()
563 2 : }
564 : }
565 2 : return nil
566 : }
567 :
568 2 : var flushingBytes uint64
569 2 : for i := range flushing {
570 2 : f := flushing[i]
571 2 : updatePointBounds(f.newIter(nil))
572 2 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
573 2 : if err := updateRangeBounds(rangeDelIter); err != nil {
574 0 : return nil, err
575 0 : }
576 : }
577 2 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
578 2 : if err := updateRangeBounds(rangeKeyIter); err != nil {
579 0 : return nil, err
580 0 : }
581 : }
582 2 : flushingBytes += f.inuseBytes()
583 : }
584 :
585 2 : if opts.FlushSplitBytes > 0 {
586 2 : c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
587 2 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
588 2 : c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
589 2 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
590 2 : }
591 :
592 : // We don't elide tombstones for flushes.
593 2 : c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
594 2 : return c, nil
595 : }
596 :
597 2 : func (c *compaction) hasExtraLevelData() bool {
598 2 : if len(c.extraLevels) == 0 {
599 2 : // not a multi level compaction
600 2 : return false
601 2 : } else if c.extraLevels[0].files.Empty() {
602 2 : // a multi level compaction without data in the intermediate input level;
603 2 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
604 2 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
605 2 : return false
606 2 : }
607 2 : return true
608 : }
609 :
610 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
611 : // this compaction have revisions of the same user key present in both sstables,
612 : // when it shouldn't (eg. when splitting flushes).
613 1 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
614 1 : if n := len(ve.NewFiles); n > 1 {
615 1 : meta := ve.NewFiles[n-1].Meta
616 1 : prevMeta := ve.NewFiles[n-2].Meta
617 1 : if !prevMeta.Largest.IsExclusiveSentinel() &&
618 1 : c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
619 1 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
620 1 : prevMeta.Largest.Pretty(c.formatKey),
621 1 : prevMeta.FileNum,
622 1 : meta.FileNum)
623 1 : }
624 : }
625 1 : return nil
626 : }
627 :
628 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
629 : // snapshots requiring them to be kept. It performs this determination by
630 : // looking at the TombstoneElision values which are set up based on sstables
631 : // which overlap the bounds of the compaction at a lower level in the LSM.
632 2 : func (c *compaction) allowZeroSeqNum() bool {
633 2 : // TODO(peter): we disable zeroing of seqnums during flushing to match
634 2 : // RocksDB behavior and to avoid generating overlapping sstables during
635 2 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
636 2 : // WAL is replayed building up a single version edit that is
637 2 : // applied. Because we don't apply the version edit after each flush, this
638 2 : // code doesn't know that L0 contains files and zeroing of seqnums should
639 2 : // be disabled. That is fixable, but it seems safer to just match the
640 2 : // RocksDB behavior for now.
641 2 : return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
642 2 : }
643 :
644 : // newInputIters returns an iterator over all the input tables in a compaction.
645 : func (c *compaction) newInputIters(
646 : newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter,
647 : ) (
648 : pointIter internalIterator,
649 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
650 : retErr error,
651 2 : ) {
652 2 : // Validate the ordering of compaction input files for defense in depth.
653 2 : if len(c.flushing) == 0 {
654 2 : if c.startLevel.level >= 0 {
655 2 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
656 2 : manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
657 2 : if err != nil {
658 1 : return nil, nil, nil, err
659 1 : }
660 : }
661 2 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
662 2 : manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
663 2 : if err != nil {
664 1 : return nil, nil, nil, err
665 1 : }
666 2 : if c.startLevel.level == 0 {
667 2 : if c.startLevel.l0SublevelInfo == nil {
668 0 : panic("l0SublevelInfo not created for compaction out of L0")
669 : }
670 2 : for _, info := range c.startLevel.l0SublevelInfo {
671 2 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
672 2 : info.sublevel, info.Iter())
673 2 : if err != nil {
674 1 : return nil, nil, nil, err
675 1 : }
676 : }
677 : }
678 2 : if len(c.extraLevels) > 0 {
679 2 : if len(c.extraLevels) > 1 {
680 0 : panic("n>2 multi level compaction not implemented yet")
681 : }
682 2 : interLevel := c.extraLevels[0]
683 2 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
684 2 : manifest.Level(interLevel.level), interLevel.files.Iter())
685 2 : if err != nil {
686 0 : return nil, nil, nil, err
687 0 : }
688 : }
689 : }
690 :
691 : // There are three classes of keys that a compaction needs to process: point
692 : // keys, range deletion tombstones and range keys. Collect all iterators for
693 : // all these classes of keys from all the levels. We'll aggregate them
694 : // together farther below.
695 : //
696 : // numInputLevels is an approximation of the number of iterator levels. Due
697 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
698 : // initial capacity.
699 2 : numInputLevels := max(len(c.flushing), len(c.inputs))
700 2 : iters := make([]internalIterator, 0, numInputLevels)
701 2 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
702 2 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
703 2 :
704 2 : // If construction of the iterator inputs fails, ensure that we close all
705 2 : // the consitutent iterators.
706 2 : defer func() {
707 2 : if retErr != nil {
708 0 : for _, iter := range iters {
709 0 : if iter != nil {
710 0 : iter.Close()
711 0 : }
712 : }
713 0 : for _, rangeDelIter := range rangeDelIters {
714 0 : rangeDelIter.Close()
715 0 : }
716 : }
717 : }()
718 2 : iterOpts := IterOptions{
719 2 : CategoryAndQoS: sstable.CategoryAndQoS{
720 2 : Category: "pebble-compaction",
721 2 : QoSLevel: sstable.NonLatencySensitiveQoSLevel,
722 2 : },
723 2 : logger: c.logger,
724 2 : }
725 2 :
726 2 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
727 2 : // constituent iterators. This depends on whether this is a flush or a
728 2 : // compaction.
729 2 : if len(c.flushing) != 0 {
730 2 : // If flushing, we need to build the input iterators over the memtables
731 2 : // stored in c.flushing.
732 2 : for i := range c.flushing {
733 2 : f := c.flushing[i]
734 2 : iters = append(iters, f.newFlushIter(nil))
735 2 : rangeDelIter := f.newRangeDelIter(nil)
736 2 : if rangeDelIter != nil {
737 2 : rangeDelIters = append(rangeDelIters, rangeDelIter)
738 2 : }
739 2 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
740 2 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
741 2 : }
742 : }
743 2 : } else {
744 2 : addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
745 2 : // Add a *levelIter for point iterators. Because we don't call
746 2 : // initRangeDel, the levelIter will close and forget the range
747 2 : // deletion iterator when it steps on to a new file. Surfacing range
748 2 : // deletions to compactions are handled below.
749 2 : iters = append(iters, newLevelIter(context.Background(),
750 2 : iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
751 2 : compaction: true,
752 2 : bufferPool: &c.bufferPool,
753 2 : }))
754 2 : // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
755 2 : // deletions into memory upfront. (See #2015, which reverted this.) There
756 2 : // will be no user keys that are split between sstables within a level in
757 2 : // Cockroach 23.1, which unblocks this optimization.
758 2 :
759 2 : // Add the range deletion iterator for each file as an independent level
760 2 : // in mergingIter, as opposed to making a levelIter out of those. This
761 2 : // is safer as levelIter expects all keys coming from underlying
762 2 : // iterators to be in order. Due to compaction / tombstone writing
763 2 : // logic in finishOutput(), it is possible for range tombstones to not
764 2 : // be strictly ordered across all files in one level.
765 2 : //
766 2 : // Consider this example from the metamorphic tests (also repeated in
767 2 : // finishOutput()), consisting of three L3 files with their bounds
768 2 : // specified in square brackets next to the file name:
769 2 : //
770 2 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
771 2 : // tmgc#391,MERGE [786e627a]
772 2 : // tmgc-udkatvs#331,RANGEDEL
773 2 : //
774 2 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
775 2 : // tmgc#384,MERGE [666c7070]
776 2 : // tmgc-tvsalezade#383,RANGEDEL
777 2 : // tmgc-tvsalezade#331,RANGEDEL
778 2 : //
779 2 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
780 2 : // tmgc-tvsalezade#383,RANGEDEL
781 2 : // tmgc#375,SET [72646c78766965616c72776865676e79]
782 2 : // tmgc-tvsalezade#356,RANGEDEL
783 2 : //
784 2 : // Here, the range tombstone in 000240.sst falls "after" one in
785 2 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
786 2 : // levelIter's purposes. While each file is still consistent before its
787 2 : // bounds, it's safer to have all rangedel iterators be visible to
788 2 : // mergingIter.
789 2 : iter := level.files.Iter()
790 2 : for f := iter.First(); f != nil; f = iter.Next() {
791 2 : rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, l)
792 2 : if err != nil {
793 0 : // The error will already be annotated with the BackingFileNum, so
794 0 : // we annotate it with the FileNum.
795 0 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
796 0 : }
797 2 : if rangeDelIter == nil {
798 2 : continue
799 : }
800 2 : rangeDelIters = append(rangeDelIters, rangeDelIter)
801 2 : c.closers = append(c.closers, rangeDelIter)
802 : }
803 :
804 : // Check if this level has any range keys.
805 2 : hasRangeKeys := false
806 2 : for f := iter.First(); f != nil; f = iter.Next() {
807 2 : if f.HasRangeKeys {
808 2 : hasRangeKeys = true
809 2 : break
810 : }
811 : }
812 2 : if hasRangeKeys {
813 2 : newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
814 2 : rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
815 2 : if err != nil {
816 0 : return nil, err
817 2 : } else if rangeKeyIter == nil {
818 0 : return emptyKeyspanIter, nil
819 0 : }
820 : // Ensure that the range key iter is not closed until the compaction is
821 : // finished. This is necessary because range key processing
822 : // requires the range keys to be held in memory for up to the
823 : // lifetime of the compaction.
824 2 : noCloseIter := &noCloseIter{rangeKeyIter}
825 2 : c.closers = append(c.closers, noCloseIter)
826 2 :
827 2 : // We do not need to truncate range keys to sstable boundaries, or
828 2 : // only read within the file's atomic compaction units, unlike with
829 2 : // range tombstones. This is because range keys were added after we
830 2 : // stopped splitting user keys across sstables, so all the range keys
831 2 : // in this sstable must wholly lie within the file's bounds.
832 2 : return noCloseIter, err
833 : }
834 2 : li := keyspanimpl.NewLevelIter(
835 2 : context.Background(), keyspan.SpanIterOptions{}, c.cmp,
836 2 : newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
837 2 : )
838 2 : rangeKeyIters = append(rangeKeyIters, li)
839 : }
840 2 : return nil
841 : }
842 :
843 2 : for i := range c.inputs {
844 2 : // If the level is annotated with l0SublevelInfo, expand it into one
845 2 : // level per sublevel.
846 2 : // TODO(jackson): Perform this expansion even earlier when we pick the
847 2 : // compaction?
848 2 : if len(c.inputs[i].l0SublevelInfo) > 0 {
849 2 : for _, info := range c.startLevel.l0SublevelInfo {
850 2 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
851 2 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
852 0 : return nil, nil, nil, err
853 0 : }
854 : }
855 2 : continue
856 : }
857 2 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
858 0 : return nil, nil, nil, err
859 0 : }
860 : }
861 : }
862 :
863 : // If there's only one constituent point iterator, we can avoid the overhead
864 : // of a *mergingIter. This is possible, for example, when performing a flush
865 : // of a single memtable. Otherwise, combine all the iterators into a merging
866 : // iter.
867 2 : pointIter = iters[0]
868 2 : if len(iters) > 1 {
869 2 : pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
870 2 : }
871 :
872 : // In normal operation, levelIter iterates over the point operations in a
873 : // level, and initializes a rangeDelIter pointer for the range deletions in
874 : // each table. During compaction, we want to iterate over the merged view of
875 : // point operations and range deletions. In order to do this we create one
876 : // levelIter per level to iterate over the point operations, and collect up
877 : // all the range deletion files.
878 : //
879 : // The range deletion levels are combined with a keyspanimpl.MergingIter. The
880 : // resulting merged rangedel iterator is then included using an
881 : // InterleavingIter.
882 : // TODO(jackson): Consider using a defragmenting iterator to stitch together
883 : // logical range deletions that were fragmented due to previous file
884 : // boundaries.
885 2 : if len(rangeDelIters) > 0 {
886 2 : mi := &keyspanimpl.MergingIter{}
887 2 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
888 2 : rangeDelIter = mi
889 2 : }
890 :
891 : // If there are range key iterators, we need to combine them using
892 : // keyspanimpl.MergingIter, and then interleave them among the points.
893 2 : if len(rangeKeyIters) > 0 {
894 2 : mi := &keyspanimpl.MergingIter{}
895 2 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
896 2 : // TODO(radu): why do we have a defragmenter here but not above?
897 2 : di := &keyspan.DefragmentingIter{}
898 2 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
899 2 : rangeKeyIter = di
900 2 : }
901 2 : return pointIter, rangeDelIter, rangeKeyIter, nil
902 : }
903 :
904 : func (c *compaction) newRangeDelIter(
905 : newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Layer,
906 2 : ) (*noCloseIter, error) {
907 2 : opts.layer = l
908 2 : iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
909 2 : internalIterOpts{
910 2 : compaction: true,
911 2 : bufferPool: &c.bufferPool,
912 2 : }, iterRangeDeletions)
913 2 : if err != nil {
914 0 : return nil, err
915 2 : } else if iterSet.rangeDeletion == nil {
916 2 : // The file doesn't contain any range deletions.
917 2 : return nil, nil
918 2 : }
919 : // Ensure that rangeDelIter is not closed until the compaction is
920 : // finished. This is necessary because range tombstone processing
921 : // requires the range tombstones to be held in memory for up to the
922 : // lifetime of the compaction.
923 2 : return &noCloseIter{iterSet.rangeDeletion}, nil
924 : }
925 :
926 1 : func (c *compaction) String() string {
927 1 : if len(c.flushing) != 0 {
928 0 : return "flush\n"
929 0 : }
930 :
931 1 : var buf bytes.Buffer
932 1 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
933 1 : i := level - c.startLevel.level
934 1 : fmt.Fprintf(&buf, "%d:", level)
935 1 : iter := c.inputs[i].files.Iter()
936 1 : for f := iter.First(); f != nil; f = iter.Next() {
937 1 : fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
938 1 : }
939 1 : fmt.Fprintf(&buf, "\n")
940 : }
941 1 : return buf.String()
942 : }
943 :
944 : type manualCompaction struct {
945 : // Count of the retries either due to too many concurrent compactions, or a
946 : // concurrent compaction to overlapping levels.
947 : retries int
948 : level int
949 : outputLevel int
950 : done chan error
951 : start []byte
952 : end []byte
953 : split bool
954 : }
955 :
956 : type readCompaction struct {
957 : level int
958 : // [start, end] key ranges are used for de-duping.
959 : start []byte
960 : end []byte
961 :
962 : // The file associated with the compaction.
963 : // If the file no longer belongs in the same
964 : // level, then we skip the compaction.
965 : fileNum base.FileNum
966 : }
967 :
968 2 : func (d *DB) addInProgressCompaction(c *compaction) {
969 2 : d.mu.compact.inProgress[c] = struct{}{}
970 2 : var isBase, isIntraL0 bool
971 2 : for _, cl := range c.inputs {
972 2 : iter := cl.files.Iter()
973 2 : for f := iter.First(); f != nil; f = iter.Next() {
974 2 : if f.IsCompacting() {
975 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
976 0 : }
977 2 : f.SetCompactionState(manifest.CompactionStateCompacting)
978 2 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
979 2 : if c.outputLevel.level == 0 {
980 2 : f.IsIntraL0Compacting = true
981 2 : isIntraL0 = true
982 2 : } else {
983 2 : isBase = true
984 2 : }
985 : }
986 : }
987 : }
988 :
989 2 : if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
990 2 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
991 2 : if isIntraL0 {
992 2 : l0Inputs = append(l0Inputs, c.outputLevel.files)
993 2 : }
994 2 : if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
995 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
996 0 : }
997 : }
998 : }
999 :
1000 : // Removes compaction markers from files in a compaction. The rollback parameter
1001 : // indicates whether the compaction state should be rolled back to its original
1002 : // state in the case of an unsuccessful compaction.
1003 : //
1004 : // DB.mu must be held when calling this method, however this method can drop and
1005 : // re-acquire that mutex. All writes to the manifest for this compaction should
1006 : // have completed by this point.
1007 2 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
1008 2 : c.versionEditApplied = true
1009 2 : for _, cl := range c.inputs {
1010 2 : iter := cl.files.Iter()
1011 2 : for f := iter.First(); f != nil; f = iter.Next() {
1012 2 : if !f.IsCompacting() {
1013 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1014 0 : }
1015 2 : if !rollback {
1016 2 : // On success all compactions other than move-compactions transition the
1017 2 : // file into the Compacted state. Move-compacted files become eligible
1018 2 : // for compaction again and transition back to NotCompacting.
1019 2 : if c.kind != compactionKindMove {
1020 2 : f.SetCompactionState(manifest.CompactionStateCompacted)
1021 2 : } else {
1022 2 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1023 2 : }
1024 2 : } else {
1025 2 : // Else, on rollback, all input files unconditionally transition back to
1026 2 : // NotCompacting.
1027 2 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1028 2 : }
1029 2 : f.IsIntraL0Compacting = false
1030 : }
1031 : }
1032 2 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1033 2 : func() {
1034 2 : // InitCompactingFileInfo requires that no other manifest writes be
1035 2 : // happening in parallel with it, i.e. we're not in the midst of installing
1036 2 : // another version. Otherwise, it's possible that we've created another
1037 2 : // L0Sublevels instance, but not added it to the versions list, causing
1038 2 : // all the indices in FileMetadata to be inaccurate. To ensure this,
1039 2 : // grab the manifest lock.
1040 2 : d.mu.versions.logLock()
1041 2 : defer d.mu.versions.logUnlock()
1042 2 : d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
1043 2 : }()
1044 : }
1045 :
1046 2 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1047 2 : if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
1048 2 : d.diskAvailBytes.Store(space.AvailBytes)
1049 2 : return space.AvailBytes
1050 2 : } else if !errors.Is(err, vfs.ErrUnsupported) {
1051 1 : d.opts.EventListener.BackgroundError(err)
1052 1 : }
1053 2 : return d.diskAvailBytes.Load()
1054 : }
1055 :
1056 : // maybeScheduleFlush schedules a flush if necessary.
1057 : //
1058 : // d.mu must be held when calling this.
1059 2 : func (d *DB) maybeScheduleFlush() {
1060 2 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1061 2 : return
1062 2 : }
1063 2 : if len(d.mu.mem.queue) <= 1 {
1064 2 : return
1065 2 : }
1066 :
1067 2 : if !d.passedFlushThreshold() {
1068 2 : return
1069 2 : }
1070 :
1071 2 : d.mu.compact.flushing = true
1072 2 : go d.flush()
1073 : }
1074 :
1075 2 : func (d *DB) passedFlushThreshold() bool {
1076 2 : var n int
1077 2 : var size uint64
1078 2 : for ; n < len(d.mu.mem.queue)-1; n++ {
1079 2 : if !d.mu.mem.queue[n].readyForFlush() {
1080 2 : break
1081 : }
1082 2 : if d.mu.mem.queue[n].flushForced {
1083 2 : // A flush was forced. Pretend the memtable size is the configured
1084 2 : // size. See minFlushSize below.
1085 2 : size += d.opts.MemTableSize
1086 2 : } else {
1087 2 : size += d.mu.mem.queue[n].totalBytes()
1088 2 : }
1089 : }
1090 2 : if n == 0 {
1091 2 : // None of the immutable memtables are ready for flushing.
1092 2 : return false
1093 2 : }
1094 :
1095 : // Only flush once the sum of the queued memtable sizes exceeds half the
1096 : // configured memtable size. This prevents flushing of memtables at startup
1097 : // while we're undergoing the ramp period on the memtable size. See
1098 : // DB.newMemTable().
1099 2 : minFlushSize := d.opts.MemTableSize / 2
1100 2 : return size >= minFlushSize
1101 : }
1102 :
1103 2 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1104 2 : var mem *flushableEntry
1105 2 : for _, m := range d.mu.mem.queue {
1106 2 : if m.flushable == tbl {
1107 2 : mem = m
1108 2 : break
1109 : }
1110 : }
1111 2 : if mem == nil || mem.flushForced {
1112 2 : return
1113 2 : }
1114 2 : deadline := d.timeNow().Add(dur)
1115 2 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1116 2 : // Already scheduled to flush sooner than within `dur`.
1117 2 : return
1118 2 : }
1119 2 : mem.delayedFlushForcedAt = deadline
1120 2 : go func() {
1121 2 : timer := time.NewTimer(dur)
1122 2 : defer timer.Stop()
1123 2 :
1124 2 : select {
1125 2 : case <-d.closedCh:
1126 2 : return
1127 2 : case <-mem.flushed:
1128 2 : return
1129 2 : case <-timer.C:
1130 2 : d.commit.mu.Lock()
1131 2 : defer d.commit.mu.Unlock()
1132 2 : d.mu.Lock()
1133 2 : defer d.mu.Unlock()
1134 2 :
1135 2 : // NB: The timer may fire concurrently with a call to Close. If a
1136 2 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1137 2 : // and it's too late to flush anything. Otherwise, the Close call
1138 2 : // will block on locking d.mu until we've finished scheduling the
1139 2 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1140 2 : // for the current flush to complete.
1141 2 : if d.closed.Load() != nil {
1142 2 : return
1143 2 : }
1144 :
1145 2 : if d.mu.mem.mutable == tbl {
1146 2 : d.makeRoomForWrite(nil)
1147 2 : } else {
1148 2 : mem.flushForced = true
1149 2 : }
1150 2 : d.maybeScheduleFlush()
1151 : }
1152 : }()
1153 : }
1154 :
1155 2 : func (d *DB) flush() {
1156 2 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1157 2 : flushingWorkStart := time.Now()
1158 2 : d.mu.Lock()
1159 2 : defer d.mu.Unlock()
1160 2 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1161 2 : var bytesFlushed uint64
1162 2 : var err error
1163 2 : if bytesFlushed, err = d.flush1(); err != nil {
1164 2 : // TODO(peter): count consecutive flush errors and backoff.
1165 2 : d.opts.EventListener.BackgroundError(err)
1166 2 : }
1167 2 : d.mu.compact.flushing = false
1168 2 : d.mu.compact.noOngoingFlushStartTime = time.Now()
1169 2 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1170 2 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1171 2 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1172 2 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1173 2 : // More flush work may have arrived while we were flushing, so schedule
1174 2 : // another flush if needed.
1175 2 : d.maybeScheduleFlush()
1176 2 : // The flush may have produced too many files in a level, so schedule a
1177 2 : // compaction if needed.
1178 2 : d.maybeScheduleCompaction()
1179 2 : d.mu.compact.cond.Broadcast()
1180 : })
1181 : }
1182 :
1183 : // runIngestFlush is used to generate a flush version edit for sstables which
1184 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1185 : // while runIngestFlush is called.
1186 2 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1187 2 : if len(c.flushing) != 1 {
1188 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1189 : }
1190 :
1191 : // Construct the VersionEdit, levelMetrics etc.
1192 2 : c.metrics = make(map[int]*LevelMetrics, numLevels)
1193 2 : // Finding the target level for ingestion must use the latest version
1194 2 : // after the logLock has been acquired.
1195 2 : c.version = d.mu.versions.currentVersion()
1196 2 :
1197 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1198 2 : ve := &versionEdit{}
1199 2 : var ingestSplitFiles []ingestSplitFile
1200 2 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1201 2 :
1202 2 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
1203 2 : levelMetrics := c.metrics[level]
1204 2 : if levelMetrics == nil {
1205 2 : levelMetrics = &LevelMetrics{}
1206 2 : c.metrics[level] = levelMetrics
1207 2 : }
1208 2 : levelMetrics.NumFiles--
1209 2 : levelMetrics.Size -= int64(m.Size)
1210 2 : for i := range added {
1211 2 : levelMetrics.NumFiles++
1212 2 : levelMetrics.Size += int64(added[i].Meta.Size)
1213 2 : }
1214 : }
1215 :
1216 2 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1217 2 : d.FormatMajorVersion() >= FormatVirtualSSTables
1218 2 :
1219 2 : if suggestSplit || ingestFlushable.exciseSpan.Valid() {
1220 2 : // We could add deleted files to ve.
1221 2 : ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
1222 2 : }
1223 :
1224 2 : ctx := context.Background()
1225 2 : overlapChecker := &overlapChecker{
1226 2 : comparer: d.opts.Comparer,
1227 2 : newIters: d.newIters,
1228 2 : opts: IterOptions{
1229 2 : logger: d.opts.Logger,
1230 2 : CategoryAndQoS: sstable.CategoryAndQoS{
1231 2 : Category: "pebble-ingest",
1232 2 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1233 2 : },
1234 2 : },
1235 2 : v: c.version,
1236 2 : }
1237 2 : replacedFiles := make(map[base.FileNum][]newFileEntry)
1238 2 : for _, file := range ingestFlushable.files {
1239 2 : var fileToSplit *fileMetadata
1240 2 : var level int
1241 2 :
1242 2 : // This file fits perfectly within the excise span, so we can slot it at L6.
1243 2 : if ingestFlushable.exciseSpan.Valid() &&
1244 2 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
1245 2 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
1246 2 : level = 6
1247 2 : } else {
1248 2 : // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
1249 2 : lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
1250 2 : if err != nil {
1251 0 : return nil, err
1252 0 : }
1253 2 : level, fileToSplit, err = ingestTargetLevel(
1254 2 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit,
1255 2 : )
1256 2 : if err != nil {
1257 0 : return nil, err
1258 0 : }
1259 : }
1260 :
1261 : // Add the current flushableIngest file to the version.
1262 2 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
1263 2 : if fileToSplit != nil {
1264 1 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1265 1 : ingestFile: file.FileMetadata,
1266 1 : splitFile: fileToSplit,
1267 1 : level: level,
1268 1 : })
1269 1 : }
1270 2 : levelMetrics := c.metrics[level]
1271 2 : if levelMetrics == nil {
1272 2 : levelMetrics = &LevelMetrics{}
1273 2 : c.metrics[level] = levelMetrics
1274 2 : }
1275 2 : levelMetrics.BytesIngested += file.Size
1276 2 : levelMetrics.TablesIngested++
1277 : }
1278 2 : if ingestFlushable.exciseSpan.Valid() {
1279 2 : // Iterate through all levels and find files that intersect with exciseSpan.
1280 2 : for l := range c.version.Levels {
1281 2 : overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
1282 2 : iter := overlaps.Iter()
1283 2 :
1284 2 : for m := iter.First(); m != nil; m = iter.Next() {
1285 2 : newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
1286 2 : if err != nil {
1287 0 : return nil, err
1288 0 : }
1289 :
1290 2 : if _, ok := ve.DeletedFiles[deletedFileEntry{
1291 2 : Level: l,
1292 2 : FileNum: m.FileNum,
1293 2 : }]; !ok {
1294 1 : // We did not excise this file.
1295 1 : continue
1296 : }
1297 2 : replacedFiles[m.FileNum] = newFiles
1298 2 : updateLevelMetricsOnExcise(m, l, newFiles)
1299 : }
1300 : }
1301 : }
1302 :
1303 2 : if len(ingestSplitFiles) > 0 {
1304 1 : if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
1305 0 : return nil, err
1306 0 : }
1307 : }
1308 :
1309 2 : return ve, nil
1310 : }
1311 :
1312 : // flush runs a compaction that copies the immutable memtables from memory to
1313 : // disk.
1314 : //
1315 : // d.mu must be held when calling this, but the mutex may be dropped and
1316 : // re-acquired during the course of this method.
1317 2 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1318 2 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1319 2 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1320 2 : // level in the lsm. Let's say the flushable queue contains a prefix of
1321 2 : // regular immutable memtables, then an ingestedFlushable, and then the
1322 2 : // mutable memtable. When the flush of the ingestedFlushable is performed,
1323 2 : // it needs an updated view of the lsm. That is, the prefix of immutable
1324 2 : // memtables must have already been flushed. Similarly, if there are two
1325 2 : // contiguous ingestedFlushables in the queue, then the first flushable must
1326 2 : // be flushed, so that the second flushable can see an updated view of the
1327 2 : // lsm.
1328 2 : //
1329 2 : // Given the above, we restrict flushes to either some prefix of regular
1330 2 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
1331 2 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
1332 2 : // the remaining flush work should be scheduled right away.
1333 2 : //
1334 2 : // NB: Large batches placed in the flushable queue share the WAL with the
1335 2 : // previous memtable in the queue. We must ensure the property that both the
1336 2 : // large batch and the memtable with which it shares a WAL are flushed
1337 2 : // together. The property ensures that the minimum unflushed log number
1338 2 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
1339 2 : // returns true, and since the large batch will always be placed right after
1340 2 : // the memtable with which it shares a WAL, the property is naturally
1341 2 : // ensured. The large batch will always be placed after the memtable with
1342 2 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
1343 2 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
1344 2 : // measure, if we try to flush the memtable without also flushing the
1345 2 : // flushable batch in the same flush, since the memtable and flushableBatch
1346 2 : // have the same logNum, the logNum invariant check below will trigger.
1347 2 : var n, inputs int
1348 2 : var inputBytes uint64
1349 2 : var ingest bool
1350 2 : for ; n < len(d.mu.mem.queue)-1; n++ {
1351 2 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
1352 2 : if n == 0 {
1353 2 : // The first flushable is of type ingestedFlushable. Since these
1354 2 : // must be flushed individually, we perform a flush for just
1355 2 : // this.
1356 2 : if !f.readyForFlush() {
1357 0 : // This check is almost unnecessary, but we guard against it
1358 0 : // just in case this invariant changes in the future.
1359 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
1360 : }
1361 : // By setting n = 1, we ensure that the first flushable(n == 0)
1362 : // is scheduled for a flush. The number of tables added is equal to the
1363 : // number of files in the ingest operation.
1364 2 : n = 1
1365 2 : inputs = len(f.files)
1366 2 : ingest = true
1367 2 : break
1368 2 : } else {
1369 2 : // There was some prefix of flushables which weren't of type
1370 2 : // ingestedFlushable. So, perform a flush for those.
1371 2 : break
1372 : }
1373 : }
1374 2 : if !d.mu.mem.queue[n].readyForFlush() {
1375 1 : break
1376 : }
1377 2 : inputBytes += d.mu.mem.queue[n].inuseBytes()
1378 : }
1379 2 : if n == 0 {
1380 0 : // None of the immutable memtables are ready for flushing.
1381 0 : return 0, nil
1382 0 : }
1383 2 : if !ingest {
1384 2 : // Flushes of memtables add the prefix of n memtables from the flushable
1385 2 : // queue.
1386 2 : inputs = n
1387 2 : }
1388 :
1389 : // Require that every memtable being flushed has a log number less than the
1390 : // new minimum unflushed log number.
1391 2 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
1392 2 : if !d.opts.DisableWAL {
1393 2 : for i := 0; i < n; i++ {
1394 2 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
1395 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
1396 0 : n,
1397 0 : i, d.mu.mem.queue[i].flushable, logNum,
1398 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
1399 : }
1400 : }
1401 : }
1402 :
1403 2 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
1404 2 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
1405 2 : if err != nil {
1406 0 : return 0, err
1407 0 : }
1408 2 : d.addInProgressCompaction(c)
1409 2 :
1410 2 : jobID := d.newJobIDLocked()
1411 2 : d.opts.EventListener.FlushBegin(FlushInfo{
1412 2 : JobID: int(jobID),
1413 2 : Input: inputs,
1414 2 : InputBytes: inputBytes,
1415 2 : Ingest: ingest,
1416 2 : })
1417 2 : startTime := d.timeNow()
1418 2 :
1419 2 : var ve *manifest.VersionEdit
1420 2 : var stats compact.Stats
1421 2 : // To determine the target level of the files in the ingestedFlushable, we
1422 2 : // need to acquire the logLock, and not release it for that duration. Since,
1423 2 : // we need to acquire the logLock below to perform the logAndApply step
1424 2 : // anyway, we create the VersionEdit for ingestedFlushable outside of
1425 2 : // runCompaction. For all other flush cases, we construct the VersionEdit
1426 2 : // inside runCompaction.
1427 2 : if c.kind != compactionKindIngestedFlushable {
1428 2 : ve, stats, err = d.runCompaction(jobID, c)
1429 2 : }
1430 :
1431 : // Acquire logLock. This will be released either on an error, by way of
1432 : // logUnlock, or through a call to logAndApply if there is no error.
1433 2 : d.mu.versions.logLock()
1434 2 :
1435 2 : if c.kind == compactionKindIngestedFlushable {
1436 2 : ve, err = d.runIngestFlush(c)
1437 2 : }
1438 :
1439 2 : info := FlushInfo{
1440 2 : JobID: int(jobID),
1441 2 : Input: inputs,
1442 2 : InputBytes: inputBytes,
1443 2 : Duration: d.timeNow().Sub(startTime),
1444 2 : Done: true,
1445 2 : Ingest: ingest,
1446 2 : Err: err,
1447 2 : }
1448 2 : if err == nil {
1449 2 : validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
1450 2 : for i := range ve.NewFiles {
1451 2 : e := &ve.NewFiles[i]
1452 2 : info.Output = append(info.Output, e.Meta.TableInfo())
1453 2 : // Ingested tables are not necessarily flushed to L0. Record the level of
1454 2 : // each ingested file explicitly.
1455 2 : if ingest {
1456 2 : info.IngestLevels = append(info.IngestLevels, e.Level)
1457 2 : }
1458 : }
1459 2 : if len(ve.NewFiles) == 0 {
1460 2 : info.Err = errEmptyTable
1461 2 : }
1462 :
1463 : // The flush succeeded or it produced an empty sstable. In either case we
1464 : // want to bump the minimum unflushed log number to the log number of the
1465 : // oldest unflushed memtable.
1466 2 : ve.MinUnflushedLogNum = minUnflushedLogNum
1467 2 : if c.kind != compactionKindIngestedFlushable {
1468 2 : metrics := c.metrics[0]
1469 2 : if d.opts.DisableWAL {
1470 2 : // If the WAL is disabled, every flushable has a zero [logSize],
1471 2 : // resulting in zero bytes in. Instead, use the number of bytes we
1472 2 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
1473 2 : // calculation even when the WAL is disabled.
1474 2 : metrics.BytesIn = metrics.BytesFlushed
1475 2 : } else {
1476 2 : for i := 0; i < n; i++ {
1477 2 : metrics.BytesIn += d.mu.mem.queue[i].logSize
1478 2 : }
1479 : }
1480 2 : } else {
1481 2 : // c.kind == compactionKindIngestedFlushable && we could have deleted files due
1482 2 : // to ingest-time splits or excises.
1483 2 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1484 2 : for c2 := range d.mu.compact.inProgress {
1485 2 : // Check if this compaction overlaps with the excise span. Note that just
1486 2 : // checking if the inputs individually overlap with the excise span
1487 2 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
1488 2 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
1489 2 : // doing a [c,d) excise at the same time as this compaction, we will have
1490 2 : // to error out the whole compaction as we can't guarantee it hasn't/won't
1491 2 : // write a file overlapping with the excise span.
1492 2 : if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
1493 2 : c2.cancel.Store(true)
1494 2 : continue
1495 : }
1496 : }
1497 :
1498 2 : if len(ve.DeletedFiles) > 0 {
1499 2 : // Iterate through all other compactions, and check if their inputs have
1500 2 : // been replaced due to an ingest-time split or excise. In that case,
1501 2 : // cancel the compaction.
1502 2 : for c2 := range d.mu.compact.inProgress {
1503 2 : for i := range c2.inputs {
1504 2 : iter := c2.inputs[i].files.Iter()
1505 2 : for f := iter.First(); f != nil; f = iter.Next() {
1506 2 : if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
1507 2 : c2.cancel.Store(true)
1508 2 : break
1509 : }
1510 : }
1511 : }
1512 : }
1513 : }
1514 : }
1515 2 : err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
1516 2 : func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
1517 2 : if err != nil {
1518 1 : info.Err = err
1519 1 : }
1520 2 : } else {
1521 2 : // We won't be performing the logAndApply step because of the error,
1522 2 : // so logUnlock.
1523 2 : d.mu.versions.logUnlock()
1524 2 : }
1525 :
1526 : // If err != nil, then the flush will be retried, and we will recalculate
1527 : // these metrics.
1528 2 : if err == nil {
1529 2 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
1530 2 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
1531 2 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
1532 2 : }
1533 :
1534 2 : d.clearCompactingState(c, err != nil)
1535 2 : delete(d.mu.compact.inProgress, c)
1536 2 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
1537 2 :
1538 2 : var flushed flushableList
1539 2 : if err == nil {
1540 2 : flushed = d.mu.mem.queue[:n]
1541 2 : d.mu.mem.queue = d.mu.mem.queue[n:]
1542 2 : d.updateReadStateLocked(d.opts.DebugCheck)
1543 2 : d.updateTableStatsLocked(ve.NewFiles)
1544 2 : if ingest {
1545 2 : d.mu.versions.metrics.Flush.AsIngestCount++
1546 2 : for _, l := range c.metrics {
1547 2 : d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
1548 2 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1549 2 : }
1550 : }
1551 2 : d.maybeTransitionSnapshotsToFileOnlyLocked()
1552 :
1553 : }
1554 : // Signal FlushEnd after installing the new readState. This helps for unit
1555 : // tests that use the callback to trigger a read using an iterator with
1556 : // IterOptions.OnlyReadGuaranteedDurable.
1557 2 : info.TotalDuration = d.timeNow().Sub(startTime)
1558 2 : d.opts.EventListener.FlushEnd(info)
1559 2 :
1560 2 : // The order of these operations matters here for ease of testing.
1561 2 : // Removing the reader reference first allows tests to be guaranteed that
1562 2 : // the memtable reservation has been released by the time a synchronous
1563 2 : // flush returns. readerUnrefLocked may also produce obsolete files so the
1564 2 : // call to deleteObsoleteFiles must happen after it.
1565 2 : for i := range flushed {
1566 2 : flushed[i].readerUnrefLocked(true)
1567 2 : }
1568 :
1569 2 : d.deleteObsoleteFiles(jobID)
1570 2 :
1571 2 : // Mark all the memtables we flushed as flushed.
1572 2 : for i := range flushed {
1573 2 : close(flushed[i].flushed)
1574 2 : }
1575 :
1576 2 : return inputBytes, err
1577 : }
1578 :
1579 : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
1580 : // file-only" snapshots to be file-only if all their visible state has been
1581 : // flushed to sstables.
1582 : //
1583 : // REQUIRES: d.mu.
1584 2 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
1585 2 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
1586 2 : currentVersion := d.mu.versions.currentVersion()
1587 2 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
1588 2 : if s.efos == nil {
1589 2 : s = s.next
1590 2 : continue
1591 : }
1592 2 : overlapsFlushable := false
1593 2 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
1594 2 : // There are some unflushed keys that are still visible to the EFOS.
1595 2 : // Check if any memtables older than the EFOS contain keys within a
1596 2 : // protected range of the EFOS. If no, we can transition.
1597 2 : protectedRanges := make([]bounded, len(s.efos.protectedRanges))
1598 2 : for i := range s.efos.protectedRanges {
1599 2 : protectedRanges[i] = s.efos.protectedRanges[i]
1600 2 : }
1601 2 : for i := range d.mu.mem.queue {
1602 2 : if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
1603 1 : // All keys in this memtable are newer than the EFOS. Skip this
1604 1 : // memtable.
1605 1 : continue
1606 : }
1607 : // NB: computePossibleOverlaps could have false positives, such as if
1608 : // the flushable is a flushable ingest and not a memtable. In that
1609 : // case we don't open the sstables to check; we just pessimistically
1610 : // assume an overlap.
1611 2 : d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
1612 2 : overlapsFlushable = true
1613 2 : return stopIteration
1614 2 : }, protectedRanges...)
1615 2 : if overlapsFlushable {
1616 2 : break
1617 : }
1618 : }
1619 : }
1620 2 : if overlapsFlushable {
1621 2 : s = s.next
1622 2 : continue
1623 : }
1624 2 : currentVersion.Ref()
1625 2 :
1626 2 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
1627 2 : // case s.next would be nil. Save it before calling it.
1628 2 : next := s.next
1629 2 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
1630 2 : s = next
1631 : }
1632 : }
1633 :
1634 : // maybeScheduleCompactionAsync should be used when
1635 : // we want to possibly schedule a compaction, but don't
1636 : // want to eat the cost of running maybeScheduleCompaction.
1637 : // This method should be launched in a separate goroutine.
1638 : // d.mu must not be held when this is called.
1639 0 : func (d *DB) maybeScheduleCompactionAsync() {
1640 0 : defer d.compactionSchedulers.Done()
1641 0 :
1642 0 : d.mu.Lock()
1643 0 : d.maybeScheduleCompaction()
1644 0 : d.mu.Unlock()
1645 0 : }
1646 :
1647 : // maybeScheduleCompaction schedules a compaction if necessary.
1648 : //
1649 : // d.mu must be held when calling this.
1650 2 : func (d *DB) maybeScheduleCompaction() {
1651 2 : d.maybeScheduleCompactionPicker(pickAuto)
1652 2 : }
1653 :
1654 2 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
1655 2 : return picker.pickAuto(env)
1656 2 : }
1657 :
1658 2 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
1659 2 : return picker.pickElisionOnlyCompaction(env)
1660 2 : }
1661 :
1662 : // tryScheduleDownloadCompaction tries to start a download compaction.
1663 : //
1664 : // Returns true if we started a download compaction (or completed it
1665 : // immediately because it is a no-op or we hit an error).
1666 : //
1667 : // Requires d.mu to be held. Updates d.mu.compact.downloads.
1668 2 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
1669 2 : vers := d.mu.versions.currentVersion()
1670 2 : for i := 0; i < len(d.mu.compact.downloads); {
1671 2 : download := d.mu.compact.downloads[i]
1672 2 : switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
1673 2 : case launchedCompaction:
1674 2 : return true
1675 1 : case didNotLaunchCompaction:
1676 1 : // See if we can launch a compaction for another download task.
1677 1 : i++
1678 2 : case downloadTaskCompleted:
1679 2 : // Task is completed and must be removed.
1680 2 : d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
1681 : }
1682 : }
1683 2 : return false
1684 : }
1685 :
1686 : // maybeScheduleCompactionPicker schedules a compaction if necessary,
1687 : // calling `pickFunc` to pick automatic compactions.
1688 : //
1689 : // Requires d.mu to be held.
1690 : func (d *DB) maybeScheduleCompactionPicker(
1691 : pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
1692 2 : ) {
1693 2 : if d.closed.Load() != nil || d.opts.ReadOnly {
1694 2 : return
1695 2 : }
1696 2 : maxCompactions := d.opts.MaxConcurrentCompactions()
1697 2 : maxDownloads := d.opts.MaxConcurrentDownloads()
1698 2 :
1699 2 : if d.mu.compact.compactingCount >= maxCompactions &&
1700 2 : (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
1701 2 : if len(d.mu.compact.manual) > 0 {
1702 2 : // Inability to run head blocks later manual compactions.
1703 2 : d.mu.compact.manual[0].retries++
1704 2 : }
1705 2 : return
1706 : }
1707 :
1708 : // Compaction picking needs a coherent view of a Version. In particular, we
1709 : // need to exclude concurrent ingestions from making a decision on which level
1710 : // to ingest into that conflicts with our compaction
1711 : // decision. versionSet.logLock provides the necessary mutual exclusion.
1712 2 : d.mu.versions.logLock()
1713 2 : defer d.mu.versions.logUnlock()
1714 2 :
1715 2 : // Check for the closed flag again, in case the DB was closed while we were
1716 2 : // waiting for logLock().
1717 2 : if d.closed.Load() != nil {
1718 2 : return
1719 2 : }
1720 :
1721 2 : env := compactionEnv{
1722 2 : diskAvailBytes: d.diskAvailBytes.Load(),
1723 2 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
1724 2 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
1725 2 : }
1726 2 :
1727 2 : if d.mu.compact.compactingCount < maxCompactions {
1728 2 : // Check for delete-only compactions first, because they're expected to be
1729 2 : // cheap and reduce future compaction work.
1730 2 : if !d.opts.private.disableDeleteOnlyCompactions &&
1731 2 : !d.opts.DisableAutomaticCompactions &&
1732 2 : len(d.mu.compact.deletionHints) > 0 {
1733 2 : d.tryScheduleDeleteOnlyCompaction()
1734 2 : }
1735 :
1736 2 : for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
1737 2 : if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
1738 2 : // Inability to run head blocks later manual compactions.
1739 2 : manual.retries++
1740 2 : break
1741 : }
1742 2 : d.mu.compact.manual = d.mu.compact.manual[1:]
1743 : }
1744 :
1745 2 : for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
1746 2 : d.tryScheduleAutoCompaction(env, pickFunc) {
1747 2 : }
1748 : }
1749 :
1750 2 : for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
1751 2 : d.tryScheduleDownloadCompaction(env, maxDownloads) {
1752 2 : }
1753 : }
1754 :
1755 : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
1756 : // for all files that can be deleted as suggested by deletionHints.
1757 : //
1758 : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
1759 2 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
1760 2 : v := d.mu.versions.currentVersion()
1761 2 : snapshots := d.mu.snapshots.toSlice()
1762 2 : inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
1763 2 : d.mu.compact.deletionHints = unresolvedHints
1764 2 :
1765 2 : if len(inputs) > 0 {
1766 2 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
1767 2 : d.mu.compact.compactingCount++
1768 2 : d.addInProgressCompaction(c)
1769 2 : go d.compact(c, nil)
1770 2 : }
1771 : }
1772 :
1773 : // tryScheduleManualCompaction tries to kick off the given manual compaction.
1774 : //
1775 : // Returns false if we are not able to run this compaction at this time.
1776 : //
1777 : // Requires d.mu to be held.
1778 2 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
1779 2 : v := d.mu.versions.currentVersion()
1780 2 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1781 2 : pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
1782 2 : if pc == nil {
1783 2 : if !retryLater {
1784 2 : // Manual compaction is a no-op. Signal completion and exit.
1785 2 : manual.done <- nil
1786 2 : return true
1787 2 : }
1788 : // We are not able to run this manual compaction at this time.
1789 2 : return false
1790 : }
1791 :
1792 2 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
1793 2 : d.mu.compact.compactingCount++
1794 2 : d.addInProgressCompaction(c)
1795 2 : go d.compact(c, manual.done)
1796 2 : return true
1797 : }
1798 :
1799 : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
1800 : //
1801 : // Returns false if no automatic compactions are necessary or able to run at
1802 : // this time.
1803 : //
1804 : // Requires d.mu to be held.
1805 : func (d *DB) tryScheduleAutoCompaction(
1806 : env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
1807 2 : ) bool {
1808 2 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1809 2 : env.readCompactionEnv = readCompactionEnv{
1810 2 : readCompactions: &d.mu.compact.readCompactions,
1811 2 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
1812 2 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
1813 2 : }
1814 2 : pc := pickFunc(d.mu.versions.picker, env)
1815 2 : if pc == nil {
1816 2 : return false
1817 2 : }
1818 2 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
1819 2 : d.mu.compact.compactingCount++
1820 2 : d.addInProgressCompaction(c)
1821 2 : go d.compact(c, nil)
1822 2 : return true
1823 : }
1824 :
1825 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
1826 : // generated from a span containing a range del (point key only), a range key
1827 : // delete (range key only), or both a point and range key.
1828 : type deleteCompactionHintType uint8
1829 :
1830 : const (
1831 : // NOTE: While these are primarily used as enumeration types, they are also
1832 : // used for some bitwise operations. Care should be taken when updating.
1833 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
1834 : deleteCompactionHintTypePointKeyOnly
1835 : deleteCompactionHintTypeRangeKeyOnly
1836 : deleteCompactionHintTypePointAndRangeKey
1837 : )
1838 :
1839 : // String implements fmt.Stringer.
1840 1 : func (h deleteCompactionHintType) String() string {
1841 1 : switch h {
1842 0 : case deleteCompactionHintTypeUnknown:
1843 0 : return "unknown"
1844 1 : case deleteCompactionHintTypePointKeyOnly:
1845 1 : return "point-key-only"
1846 1 : case deleteCompactionHintTypeRangeKeyOnly:
1847 1 : return "range-key-only"
1848 1 : case deleteCompactionHintTypePointAndRangeKey:
1849 1 : return "point-and-range-key"
1850 0 : default:
1851 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
1852 : }
1853 : }
1854 :
1855 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
1856 : // keyspan.Keys.
1857 2 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
1858 2 : var hintType deleteCompactionHintType
1859 2 : for _, k := range keys {
1860 2 : switch k.Kind() {
1861 2 : case base.InternalKeyKindRangeDelete:
1862 2 : hintType |= deleteCompactionHintTypePointKeyOnly
1863 2 : case base.InternalKeyKindRangeKeyDelete:
1864 2 : hintType |= deleteCompactionHintTypeRangeKeyOnly
1865 0 : default:
1866 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
1867 : }
1868 : }
1869 2 : return hintType
1870 : }
1871 :
1872 : // A deleteCompactionHint records a user key and sequence number span that has been
1873 : // deleted by a range tombstone. A hint is recorded if at least one sstable
1874 : // falls completely within both the user key and sequence number spans.
1875 : // Once the tombstones and the observed completely-contained sstables fall
1876 : // into the same snapshot stripe, a delete-only compaction may delete any
1877 : // sstables within the range.
1878 : type deleteCompactionHint struct {
1879 : // The type of key span that generated this hint (point key, range key, or
1880 : // both).
1881 : hintType deleteCompactionHintType
1882 : // start and end are user keys specifying a key range [start, end) of
1883 : // deleted keys.
1884 : start []byte
1885 : end []byte
1886 : // The level of the file containing the range tombstone(s) when the hint
1887 : // was created. Only lower levels need to be searched for files that may
1888 : // be deleted.
1889 : tombstoneLevel int
1890 : // The file containing the range tombstone(s) that created the hint.
1891 : tombstoneFile *fileMetadata
1892 : // The smallest and largest sequence numbers of the abutting tombstones
1893 : // merged to form this hint. All of a tables' keys must be less than the
1894 : // tombstone smallest sequence number to be deleted. All of a tables'
1895 : // sequence numbers must fall into the same snapshot stripe as the
1896 : // tombstone largest sequence number to be deleted.
1897 : tombstoneLargestSeqNum base.SeqNum
1898 : tombstoneSmallestSeqNum base.SeqNum
1899 : // The smallest sequence number of a sstable that was found to be covered
1900 : // by this hint. The hint cannot be resolved until this sequence number is
1901 : // in the same snapshot stripe as the largest tombstone sequence number.
1902 : // This is set when a hint is created, so the LSM may look different and
1903 : // notably no longer contain the sstable that contained the key at this
1904 : // sequence number.
1905 : fileSmallestSeqNum base.SeqNum
1906 : }
1907 :
1908 1 : func (h deleteCompactionHint) String() string {
1909 1 : return fmt.Sprintf(
1910 1 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
1911 1 : h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
1912 1 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
1913 1 : h.hintType,
1914 1 : )
1915 1 : }
1916 :
1917 : func (h *deleteCompactionHint) canDelete(
1918 : cmp Compare, m *fileMetadata, snapshots compact.Snapshots,
1919 2 : ) bool {
1920 2 : // The file can only be deleted if all of its keys are older than the
1921 2 : // earliest tombstone aggregated into the hint. Note that we use
1922 2 : // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
1923 2 : // zeroes sequence numbers. A compaction may zero the sequence number of a
1924 2 : // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
1925 2 : // zero. If we looked at m.LargestSeqNum, the resulting output file would
1926 2 : // appear to not contain any keys more recent than the oldest tombstone. To
1927 2 : // avoid this error, the largest pre-zeroing sequence number is maintained
1928 2 : // in LargestSeqNumAbsolute and used here to make the determination whether
1929 2 : // the file's keys are older than all of the hint's tombstones.
1930 2 : if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
1931 2 : return false
1932 2 : }
1933 :
1934 : // The file's oldest key must be in the same snapshot stripe as the
1935 : // newest tombstone. NB: We already checked the hint's sequence numbers,
1936 : // but this file's oldest sequence number might be lower than the hint's
1937 : // smallest sequence number despite the file falling within the key range
1938 : // if this file was constructed after the hint by a compaction.
1939 2 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
1940 0 : return false
1941 0 : }
1942 :
1943 2 : switch h.hintType {
1944 2 : case deleteCompactionHintTypePointKeyOnly:
1945 2 : // A hint generated by a range del span cannot delete tables that contain
1946 2 : // range keys.
1947 2 : if m.HasRangeKeys {
1948 1 : return false
1949 1 : }
1950 2 : case deleteCompactionHintTypeRangeKeyOnly:
1951 2 : // A hint generated by a range key del span cannot delete tables that
1952 2 : // contain point keys.
1953 2 : if m.HasPointKeys {
1954 2 : return false
1955 2 : }
1956 2 : case deleteCompactionHintTypePointAndRangeKey:
1957 : // A hint from a span that contains both range dels *and* range keys can
1958 : // only be deleted if both bounds fall within the hint. The next check takes
1959 : // care of this.
1960 0 : default:
1961 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
1962 : }
1963 :
1964 : // The file's keys must be completely contained within the hint range.
1965 2 : return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
1966 : }
1967 :
1968 : func checkDeleteCompactionHints(
1969 : cmp Compare, v *version, hints []deleteCompactionHint, snapshots compact.Snapshots,
1970 2 : ) ([]compactionLevel, []deleteCompactionHint) {
1971 2 : var files map[*fileMetadata]bool
1972 2 : var byLevel [numLevels][]*fileMetadata
1973 2 :
1974 2 : unresolvedHints := hints[:0]
1975 2 : for _, h := range hints {
1976 2 : // Check each compaction hint to see if it's resolvable. Resolvable
1977 2 : // hints are removed and trigger a delete-only compaction if any files
1978 2 : // in the current LSM still meet their criteria. Unresolvable hints
1979 2 : // are saved and don't trigger a delete-only compaction.
1980 2 : //
1981 2 : // When a compaction hint is created, the sequence numbers of the
1982 2 : // range tombstones and the covered file with the oldest key are
1983 2 : // recorded. The largest tombstone sequence number and the smallest
1984 2 : // file sequence number must be in the same snapshot stripe for the
1985 2 : // hint to be resolved. The below graphic models a compaction hint
1986 2 : // covering the keyspace [b, r). The hint completely contains two
1987 2 : // files, 000002 and 000003. The file 000003 contains the lowest
1988 2 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
1989 2 : // the highest tombstone sequence number incorporated into the hint.
1990 2 : // The hint may be resolved only once the snapshots at #100, #180 and
1991 2 : // #210 are all closed. File 000001 is not included within the hint
1992 2 : // because it extends beyond the range tombstones in user key space.
1993 2 : //
1994 2 : // 250
1995 2 : //
1996 2 : // |-b...230:h-|
1997 2 : // _____________________________________________________ snapshot #210
1998 2 : // 200 |--h.RANGEDEL.200:r--|
1999 2 : //
2000 2 : // _____________________________________________________ snapshot #180
2001 2 : //
2002 2 : // 150 +--------+
2003 2 : // +---------+ | 000003 |
2004 2 : // | 000002 | | |
2005 2 : // +_________+ | |
2006 2 : // 100_____________________|________|___________________ snapshot #100
2007 2 : // +--------+
2008 2 : // _____________________________________________________ snapshot #70
2009 2 : // +---------------+
2010 2 : // 50 | 000001 |
2011 2 : // | |
2012 2 : // +---------------+
2013 2 : // ______________________________________________________________
2014 2 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2015 2 :
2016 2 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) {
2017 2 : // Cannot resolve yet.
2018 2 : unresolvedHints = append(unresolvedHints, h)
2019 2 : continue
2020 : }
2021 :
2022 : // The hint h will be resolved and dropped, regardless of whether
2023 : // there are any tables that can be deleted.
2024 2 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2025 2 : overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
2026 2 : iter := overlaps.Iter()
2027 2 : for m := iter.First(); m != nil; m = iter.Next() {
2028 2 : if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
2029 2 : continue
2030 : }
2031 2 : if files == nil {
2032 2 : // Construct files lazily, assuming most calls will not
2033 2 : // produce delete-only compactions.
2034 2 : files = make(map[*fileMetadata]bool)
2035 2 : }
2036 2 : files[m] = true
2037 2 : byLevel[l] = append(byLevel[l], m)
2038 : }
2039 : }
2040 : }
2041 :
2042 2 : var compactLevels []compactionLevel
2043 2 : for l, files := range byLevel {
2044 2 : if len(files) == 0 {
2045 2 : continue
2046 : }
2047 2 : compactLevels = append(compactLevels, compactionLevel{
2048 2 : level: l,
2049 2 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2050 2 : })
2051 : }
2052 2 : return compactLevels, unresolvedHints
2053 : }
2054 :
2055 : // compact runs one compaction and maybe schedules another call to compact.
2056 2 : func (d *DB) compact(c *compaction, errChannel chan error) {
2057 2 : pprof.Do(context.Background(), compactLabels, func(context.Context) {
2058 2 : d.mu.Lock()
2059 2 : defer d.mu.Unlock()
2060 2 : if err := d.compact1(c, errChannel); err != nil {
2061 2 : // TODO(peter): count consecutive compaction errors and backoff.
2062 2 : d.opts.EventListener.BackgroundError(err)
2063 2 : }
2064 2 : if c.isDownload {
2065 2 : d.mu.compact.downloadingCount--
2066 2 : } else {
2067 2 : d.mu.compact.compactingCount--
2068 2 : }
2069 2 : delete(d.mu.compact.inProgress, c)
2070 2 : // Add this compaction's duration to the cumulative duration. NB: This
2071 2 : // must be atomic with the above removal of c from
2072 2 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2073 2 : // miss or double count a completing compaction's duration.
2074 2 : d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2075 2 :
2076 2 : // The previous compaction may have produced too many files in a
2077 2 : // level, so reschedule another compaction if needed.
2078 2 : d.maybeScheduleCompaction()
2079 2 : d.mu.compact.cond.Broadcast()
2080 : })
2081 : }
2082 :
2083 : // compact1 runs one compaction.
2084 : //
2085 : // d.mu must be held when calling this, but the mutex may be dropped and
2086 : // re-acquired during the course of this method.
2087 2 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2088 2 : if errChannel != nil {
2089 2 : defer func() {
2090 2 : errChannel <- err
2091 2 : }()
2092 : }
2093 :
2094 2 : jobID := d.newJobIDLocked()
2095 2 : info := c.makeInfo(jobID)
2096 2 : d.opts.EventListener.CompactionBegin(info)
2097 2 : startTime := d.timeNow()
2098 2 :
2099 2 : ve, stats, err := d.runCompaction(jobID, c)
2100 2 :
2101 2 : info.Duration = d.timeNow().Sub(startTime)
2102 2 : if err == nil {
2103 2 : validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey, d.opts.Logger)
2104 2 : err = func() error {
2105 2 : var err error
2106 2 : d.mu.versions.logLock()
2107 2 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2108 2 : // that necessitates it restarting from scratch. Note that since we hold
2109 2 : // the manifest lock, we don't expect this bool to change its value
2110 2 : // as only the holder of the manifest lock will ever write to it.
2111 2 : if c.cancel.Load() {
2112 2 : err = firstError(err, ErrCancelledCompaction)
2113 2 : }
2114 2 : if err != nil {
2115 2 : // logAndApply calls logUnlock. If we didn't call it, we need to call
2116 2 : // logUnlock ourselves.
2117 2 : d.mu.versions.logUnlock()
2118 2 : return err
2119 2 : }
2120 2 : return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
2121 2 : return d.getInProgressCompactionInfoLocked(c)
2122 2 : })
2123 : }()
2124 : }
2125 :
2126 2 : info.Done = true
2127 2 : info.Err = err
2128 2 : if err == nil {
2129 2 : for i := range ve.NewFiles {
2130 2 : e := &ve.NewFiles[i]
2131 2 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2132 2 : }
2133 2 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
2134 2 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
2135 2 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
2136 : }
2137 :
2138 : // NB: clearing compacting state must occur before updating the read state;
2139 : // L0Sublevels initialization depends on it.
2140 2 : d.clearCompactingState(c, err != nil)
2141 2 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
2142 2 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
2143 2 :
2144 2 : info.TotalDuration = d.timeNow().Sub(c.beganAt)
2145 2 : d.opts.EventListener.CompactionEnd(info)
2146 2 :
2147 2 : // Update the read state before deleting obsolete files because the
2148 2 : // read-state update will cause the previous version to be unref'd and if
2149 2 : // there are no references obsolete tables will be added to the obsolete
2150 2 : // table list.
2151 2 : if err == nil {
2152 2 : d.updateReadStateLocked(d.opts.DebugCheck)
2153 2 : d.updateTableStatsLocked(ve.NewFiles)
2154 2 : }
2155 2 : d.deleteObsoleteFiles(jobID)
2156 2 :
2157 2 : return err
2158 : }
2159 :
2160 : // runCopyCompaction runs a copy compaction where a new FileNum is created that
2161 : // is a byte-for-byte copy of the input file or span thereof in some cases. This
2162 : // is used in lieu of a move compaction when a file is being moved across the
2163 : // local/remote storage boundary. It could also be used in lieu of a rewrite
2164 : // compaction as part of a Download() call, which allows copying only a span of
2165 : // the external file, provided the file does not contain range keys or value
2166 : // blocks (see sstable.CopySpan).
2167 : //
2168 : // d.mu must be held when calling this method. The mutex will be released when
2169 : // doing IO.
2170 : func (d *DB) runCopyCompaction(
2171 : jobID JobID, c *compaction,
2172 2 : ) (ve *versionEdit, stats compact.Stats, _ error) {
2173 2 : iter := c.startLevel.files.Iter()
2174 2 : inputMeta := iter.First()
2175 2 : if iter.Next() != nil {
2176 0 : return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2177 0 : }
2178 2 : if c.cancel.Load() {
2179 1 : return nil, compact.Stats{}, ErrCancelledCompaction
2180 1 : }
2181 2 : ve = &versionEdit{
2182 2 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
2183 2 : {Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
2184 2 : },
2185 2 : }
2186 2 :
2187 2 : objMeta, err := d.objProvider.Lookup(fileTypeTable, inputMeta.FileBacking.DiskFileNum)
2188 2 : if err != nil {
2189 0 : return nil, compact.Stats{}, err
2190 0 : }
2191 2 : if !objMeta.IsExternal() {
2192 2 : if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
2193 0 : panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
2194 : }
2195 : // Note that based on logic in the compaction picker, we're guaranteed
2196 : // inputMeta.Virtual is false.
2197 2 : if inputMeta.Virtual {
2198 0 : panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
2199 : }
2200 : }
2201 :
2202 : // We are in the relatively more complex case where we need to copy this
2203 : // file to remote storage. Drop the db mutex while we do the copy
2204 : //
2205 : // To ease up cleanup of the local file and tracking of refs, we create
2206 : // a new FileNum. This has the potential of making the block cache less
2207 : // effective, however.
2208 2 : newMeta := &fileMetadata{
2209 2 : Size: inputMeta.Size,
2210 2 : CreationTime: inputMeta.CreationTime,
2211 2 : SmallestSeqNum: inputMeta.SmallestSeqNum,
2212 2 : LargestSeqNum: inputMeta.LargestSeqNum,
2213 2 : LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
2214 2 : Stats: inputMeta.Stats,
2215 2 : Virtual: inputMeta.Virtual,
2216 2 : SyntheticPrefix: inputMeta.SyntheticPrefix,
2217 2 : SyntheticSuffix: inputMeta.SyntheticSuffix,
2218 2 : }
2219 2 : if inputMeta.HasPointKeys {
2220 2 : newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
2221 2 : }
2222 2 : if inputMeta.HasRangeKeys {
2223 2 : newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
2224 2 : }
2225 2 : newMeta.FileNum = d.mu.versions.getNextFileNum()
2226 2 : if objMeta.IsExternal() {
2227 2 : // external -> local/shared copy. File must be virtual.
2228 2 : // We will update this size later after we produce the new backing file.
2229 2 : newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
2230 2 : } else {
2231 2 : // local -> shared copy. New file is guaranteed to not be virtual.
2232 2 : newMeta.InitPhysicalBacking()
2233 2 : }
2234 :
2235 : // Before dropping the db mutex, grab a ref to the current version. This
2236 : // prevents any concurrent excises from deleting files that this compaction
2237 : // needs to read/maintain a reference to.
2238 2 : vers := d.mu.versions.currentVersion()
2239 2 : vers.Ref()
2240 2 : defer vers.UnrefLocked()
2241 2 :
2242 2 : // NB: The order here is reversed, lock after unlock. This is similar to
2243 2 : // runCompaction.
2244 2 : d.mu.Unlock()
2245 2 : defer d.mu.Lock()
2246 2 :
2247 2 : deleteOnExit := false
2248 2 : defer func() {
2249 2 : if deleteOnExit {
2250 1 : _ = d.objProvider.Remove(fileTypeTable, newMeta.FileBacking.DiskFileNum)
2251 1 : }
2252 : }()
2253 :
2254 : // If the src obj is external, we're doing an external to local/shared copy.
2255 2 : if objMeta.IsExternal() {
2256 2 : ctx := context.TODO()
2257 2 : src, err := d.objProvider.OpenForReading(
2258 2 : ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
2259 2 : )
2260 2 : if err != nil {
2261 0 : return nil, compact.Stats{}, err
2262 0 : }
2263 2 : defer func() {
2264 2 : if src != nil {
2265 0 : src.Close()
2266 0 : }
2267 : }()
2268 :
2269 2 : w, _, err := d.objProvider.Create(
2270 2 : ctx, fileTypeTable, newMeta.FileBacking.DiskFileNum,
2271 2 : objstorage.CreateOptions{
2272 2 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
2273 2 : },
2274 2 : )
2275 2 : if err != nil {
2276 0 : return nil, compact.Stats{}, err
2277 0 : }
2278 2 : deleteOnExit = true
2279 2 :
2280 2 : start, end := newMeta.Smallest, newMeta.Largest
2281 2 : if newMeta.SyntheticPrefix.IsSet() {
2282 2 : start.UserKey = newMeta.SyntheticPrefix.Invert(start.UserKey)
2283 2 : end.UserKey = newMeta.SyntheticPrefix.Invert(end.UserKey)
2284 2 : }
2285 2 : if newMeta.SyntheticSuffix.IsSet() {
2286 1 : // Extend the bounds as necessary so that the keys don't include suffixes.
2287 1 : start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
2288 1 : if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
2289 0 : end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
2290 0 : }
2291 : }
2292 :
2293 : // NB: external files are always virtual.
2294 2 : var wrote uint64
2295 2 : err = d.tableCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
2296 2 : var err error
2297 2 : wrote, err = sstable.CopySpan(ctx,
2298 2 : src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
2299 2 : w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
2300 2 : start, end,
2301 2 : )
2302 2 : return err
2303 2 : })
2304 :
2305 2 : src = nil // We passed src to CopySpan; it's responsible for closing it.
2306 2 : if err != nil {
2307 1 : if errors.Is(err, sstable.ErrEmptySpan) {
2308 1 : // The virtual table was empty. Just remove the backing file.
2309 1 : // Note that deleteOnExit is true so we will delete the created object.
2310 1 : c.metrics = map[int]*LevelMetrics{
2311 1 : c.outputLevel.level: {
2312 1 : BytesIn: inputMeta.Size,
2313 1 : },
2314 1 : }
2315 1 : return ve, compact.Stats{}, nil
2316 1 : }
2317 0 : return nil, compact.Stats{}, err
2318 : }
2319 2 : newMeta.FileBacking.Size = wrote
2320 2 : newMeta.Size = wrote
2321 2 : } else {
2322 2 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2323 2 : d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
2324 2 : objstorage.CreateOptions{PreferSharedStorage: true})
2325 2 : if err != nil {
2326 0 : return nil, compact.Stats{}, err
2327 0 : }
2328 2 : deleteOnExit = true
2329 : }
2330 2 : ve.NewFiles = []newFileEntry{{
2331 2 : Level: c.outputLevel.level,
2332 2 : Meta: newMeta,
2333 2 : }}
2334 2 : if newMeta.Virtual {
2335 2 : ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
2336 2 : }
2337 2 : c.metrics = map[int]*LevelMetrics{
2338 2 : c.outputLevel.level: {
2339 2 : BytesIn: inputMeta.Size,
2340 2 : BytesCompacted: newMeta.Size,
2341 2 : TablesCompacted: 1,
2342 2 : },
2343 2 : }
2344 2 :
2345 2 : if err := d.objProvider.Sync(); err != nil {
2346 0 : return nil, compact.Stats{}, err
2347 0 : }
2348 2 : deleteOnExit = false
2349 2 : return ve, compact.Stats{}, nil
2350 : }
2351 :
2352 : func (d *DB) runDeleteOnlyCompaction(
2353 : jobID JobID, c *compaction,
2354 2 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
2355 2 : c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
2356 2 : ve = &versionEdit{
2357 2 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2358 2 : }
2359 2 : for _, cl := range c.inputs {
2360 2 : levelMetrics := &LevelMetrics{}
2361 2 : iter := cl.files.Iter()
2362 2 : for f := iter.First(); f != nil; f = iter.Next() {
2363 2 : ve.DeletedFiles[deletedFileEntry{
2364 2 : Level: cl.level,
2365 2 : FileNum: f.FileNum,
2366 2 : }] = f
2367 2 : }
2368 2 : c.metrics[cl.level] = levelMetrics
2369 : }
2370 2 : return ve, stats, nil
2371 : }
2372 :
2373 : func (d *DB) runMoveCompaction(
2374 : jobID JobID, c *compaction,
2375 2 : ) (ve *versionEdit, stats compact.Stats, _ error) {
2376 2 : iter := c.startLevel.files.Iter()
2377 2 : meta := iter.First()
2378 2 : if iter.Next() != nil {
2379 0 : return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
2380 0 : }
2381 2 : if c.cancel.Load() {
2382 1 : return ve, stats, ErrCancelledCompaction
2383 1 : }
2384 2 : c.metrics = map[int]*LevelMetrics{
2385 2 : c.outputLevel.level: {
2386 2 : BytesMoved: meta.Size,
2387 2 : TablesMoved: 1,
2388 2 : },
2389 2 : }
2390 2 : ve = &versionEdit{
2391 2 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
2392 2 : {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
2393 2 : },
2394 2 : NewFiles: []newFileEntry{
2395 2 : {Level: c.outputLevel.level, Meta: meta},
2396 2 : },
2397 2 : }
2398 2 :
2399 2 : return ve, stats, nil
2400 : }
2401 :
2402 : // runCompaction runs a compaction that produces new on-disk tables from
2403 : // memtables or old on-disk tables.
2404 : //
2405 : // runCompaction cannot be used for compactionKindIngestedFlushable.
2406 : //
2407 : // d.mu must be held when calling this, but the mutex may be dropped and
2408 : // re-acquired during the course of this method.
2409 : func (d *DB) runCompaction(
2410 : jobID JobID, c *compaction,
2411 2 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
2412 2 : switch c.kind {
2413 2 : case compactionKindDeleteOnly:
2414 2 : return d.runDeleteOnlyCompaction(jobID, c)
2415 2 : case compactionKindMove:
2416 2 : return d.runMoveCompaction(jobID, c)
2417 2 : case compactionKindCopy:
2418 2 : return d.runCopyCompaction(jobID, c)
2419 0 : case compactionKindIngestedFlushable:
2420 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
2421 : }
2422 :
2423 2 : snapshots := d.mu.snapshots.toSlice()
2424 2 :
2425 2 : if c.flushing == nil {
2426 2 : // Before dropping the db mutex, grab a ref to the current version. This
2427 2 : // prevents any concurrent excises from deleting files that this compaction
2428 2 : // needs to read/maintain a reference to.
2429 2 : //
2430 2 : // Note that unlike user iterators, compactionIter does not maintain a ref
2431 2 : // of the version or read state.
2432 2 : vers := d.mu.versions.currentVersion()
2433 2 : vers.Ref()
2434 2 : defer vers.UnrefLocked()
2435 2 : }
2436 :
2437 2 : if c.cancel.Load() {
2438 2 : return ve, stats, ErrCancelledCompaction
2439 2 : }
2440 :
2441 : // The table is typically written at the maximum allowable format implied by
2442 : // the current format major version of the DB.
2443 2 : tableFormat := d.FormatMajorVersion().MaxTableFormat()
2444 2 : // In format major versions with maximum table formats of Pebblev3, value
2445 2 : // blocks were conditional on an experimental setting. In format major
2446 2 : // versions with maximum table formats of Pebblev4 and higher, value blocks
2447 2 : // are always enabled.
2448 2 : if tableFormat == sstable.TableFormatPebblev3 &&
2449 2 : (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
2450 2 : tableFormat = sstable.TableFormatPebblev2
2451 2 : }
2452 :
2453 : // Release the d.mu lock while doing I/O.
2454 : // Note the unusual order: Unlock and then Lock.
2455 2 : d.mu.Unlock()
2456 2 : defer d.mu.Lock()
2457 2 :
2458 2 : result := d.compactAndWrite(jobID, c, snapshots, tableFormat)
2459 2 : if result.Err == nil {
2460 2 : ve, result.Err = c.makeVersionEdit(result)
2461 2 : }
2462 2 : if result.Err != nil {
2463 2 : // Delete any created tables.
2464 2 : for i := range result.Tables {
2465 2 : _ = d.objProvider.Remove(fileTypeTable, result.Tables[i].ObjMeta.DiskFileNum)
2466 2 : }
2467 : }
2468 : // Refresh the disk available statistic whenever a compaction/flush
2469 : // completes, before re-acquiring the mutex.
2470 2 : d.calculateDiskAvailableBytes()
2471 2 : return ve, result.Stats, result.Err
2472 : }
2473 :
2474 : // compactAndWrite runs the data part of a compaction, where we set up a
2475 : // compaction iterator and use it to write output tables.
2476 : func (d *DB) compactAndWrite(
2477 : jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat,
2478 2 : ) (result compact.Result) {
2479 2 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
2480 2 : // block cache with blocks that will not be read again. We initialize the
2481 2 : // buffer pool with a size 12. This initial size does not need to be
2482 2 : // accurate, because the pool will grow to accommodate the maximum number of
2483 2 : // blocks allocated at a given time over the course of the compaction. But
2484 2 : // choosing a size larger than that working set avoids any additional
2485 2 : // allocations to grow the size of the pool over the course of iteration.
2486 2 : //
2487 2 : // Justification for initial size 12: In a two-level compaction, at any
2488 2 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
2489 2 : // Additionally, when decoding a compressed block, we'll temporarily
2490 2 : // allocate 1 additional block to hold the compressed buffer. In the worst
2491 2 : // case that all input sstables have two-level index blocks (+2), value
2492 2 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
2493 2 : // additionally require 2n+4 blocks where n is the number of input sstables.
2494 2 : // Range deletion and range key blocks are relatively rare, and the cost of
2495 2 : // an additional allocation or two over the course of the compaction is
2496 2 : // considered to be okay. A larger initial size would cause the pool to hold
2497 2 : // on to more memory, even when it's not in-use because the pool will
2498 2 : // recycle buffers up to the current capacity of the pool. The memory use of
2499 2 : // a 12-buffer pool is expected to be within reason, even if all the buffers
2500 2 : // grow to the typical size of an index block (256 KiB) which would
2501 2 : // translate to 3 MiB per compaction.
2502 2 : c.bufferPool.Init(12)
2503 2 : defer c.bufferPool.Release()
2504 2 :
2505 2 : pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter)
2506 2 : defer func() {
2507 2 : for _, closer := range c.closers {
2508 2 : closer.FragmentIterator.Close()
2509 2 : }
2510 : }()
2511 2 : if err != nil {
2512 0 : return compact.Result{Err: err}
2513 0 : }
2514 2 : c.allowedZeroSeqNum = c.allowZeroSeqNum()
2515 2 : cfg := compact.IterConfig{
2516 2 : Comparer: c.comparer,
2517 2 : Merge: d.merge,
2518 2 : TombstoneElision: c.delElision,
2519 2 : RangeKeyElision: c.rangeKeyElision,
2520 2 : Snapshots: snapshots,
2521 2 : AllowZeroSeqNum: c.allowedZeroSeqNum,
2522 2 : IneffectualSingleDeleteCallback: d.opts.Experimental.IneffectualSingleDeleteCallback,
2523 2 : SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback,
2524 2 : }
2525 2 : iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
2526 2 :
2527 2 : runnerCfg := compact.RunnerConfig{
2528 2 : CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest),
2529 2 : L0SplitKeys: c.l0Limits,
2530 2 : Grandparents: c.grandparents,
2531 2 : MaxGrandparentOverlapBytes: c.maxOverlapBytes,
2532 2 : TargetOutputFileSize: c.maxOutputFileSize,
2533 2 : }
2534 2 : runner := compact.NewRunner(runnerCfg, iter)
2535 2 : for runner.MoreDataToWrite() {
2536 2 : if c.cancel.Load() {
2537 2 : return runner.Finish().WithError(ErrCancelledCompaction)
2538 2 : }
2539 : // Create a new table.
2540 2 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
2541 2 : objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
2542 2 : if err != nil {
2543 1 : return runner.Finish().WithError(err)
2544 1 : }
2545 2 : runner.WriteTable(objMeta, tw)
2546 2 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
2547 : }
2548 2 : result = runner.Finish()
2549 2 : if result.Err == nil {
2550 2 : result.Err = d.objProvider.Sync()
2551 2 : }
2552 2 : return result
2553 : }
2554 :
2555 : // makeVersionEdit creates the version edit for a compaction, based on the
2556 : // tables in compact.Result.
2557 2 : func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
2558 2 : ve := &versionEdit{
2559 2 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2560 2 : }
2561 2 : for _, cl := range c.inputs {
2562 2 : iter := cl.files.Iter()
2563 2 : for f := iter.First(); f != nil; f = iter.Next() {
2564 2 : ve.DeletedFiles[deletedFileEntry{
2565 2 : Level: cl.level,
2566 2 : FileNum: f.FileNum,
2567 2 : }] = f
2568 2 : }
2569 : }
2570 :
2571 2 : startLevelBytes := c.startLevel.files.SizeSum()
2572 2 : outputMetrics := &LevelMetrics{
2573 2 : BytesIn: startLevelBytes,
2574 2 : BytesRead: c.outputLevel.files.SizeSum(),
2575 2 : }
2576 2 : if len(c.extraLevels) > 0 {
2577 2 : outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
2578 2 : }
2579 2 : outputMetrics.BytesRead += outputMetrics.BytesIn
2580 2 :
2581 2 : c.metrics = map[int]*LevelMetrics{
2582 2 : c.outputLevel.level: outputMetrics,
2583 2 : }
2584 2 : if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
2585 2 : c.metrics[c.startLevel.level] = &LevelMetrics{}
2586 2 : }
2587 2 : if len(c.extraLevels) > 0 {
2588 2 : c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
2589 2 : outputMetrics.MultiLevel.BytesInTop = startLevelBytes
2590 2 : outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
2591 2 : outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
2592 2 : }
2593 :
2594 2 : inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
2595 2 : ve.NewFiles = make([]newFileEntry, len(result.Tables))
2596 2 : for i := range result.Tables {
2597 2 : t := &result.Tables[i]
2598 2 :
2599 2 : fileMeta := &fileMetadata{
2600 2 : FileNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
2601 2 : CreationTime: t.CreationTime.Unix(),
2602 2 : Size: t.WriterMeta.Size,
2603 2 : SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
2604 2 : LargestSeqNum: t.WriterMeta.LargestSeqNum,
2605 2 : }
2606 2 : if c.flushing == nil {
2607 2 : // Set the file's LargestSeqNumAbsolute to be the maximum value of any
2608 2 : // of the compaction's input sstables.
2609 2 : // TODO(jackson): This could be narrowed to be the maximum of input
2610 2 : // sstables that overlap the output sstable's key range.
2611 2 : fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
2612 2 : } else {
2613 2 : fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
2614 2 : }
2615 2 : fileMeta.InitPhysicalBacking()
2616 2 :
2617 2 : // If the file didn't contain any range deletions, we can fill its
2618 2 : // table stats now, avoiding unnecessarily loading the table later.
2619 2 : maybeSetStatsFromProperties(
2620 2 : fileMeta.PhysicalMeta(), &t.WriterMeta.Properties,
2621 2 : )
2622 2 :
2623 2 : if t.WriterMeta.HasPointKeys {
2624 2 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint)
2625 2 : }
2626 2 : if t.WriterMeta.HasRangeDelKeys {
2627 2 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel)
2628 2 : }
2629 2 : if t.WriterMeta.HasRangeKeys {
2630 2 : fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
2631 2 : }
2632 :
2633 2 : ve.NewFiles[i] = newFileEntry{
2634 2 : Level: c.outputLevel.level,
2635 2 : Meta: fileMeta,
2636 2 : }
2637 2 :
2638 2 : // Update metrics.
2639 2 : if c.flushing == nil {
2640 2 : outputMetrics.TablesCompacted++
2641 2 : outputMetrics.BytesCompacted += fileMeta.Size
2642 2 : } else {
2643 2 : outputMetrics.TablesFlushed++
2644 2 : outputMetrics.BytesFlushed += fileMeta.Size
2645 2 : }
2646 2 : outputMetrics.Size += int64(fileMeta.Size)
2647 2 : outputMetrics.NumFiles++
2648 2 : outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
2649 2 : outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
2650 : }
2651 :
2652 : // Sanity check that the tables are ordered and don't overlap.
2653 2 : for i := 1; i < len(ve.NewFiles); i++ {
2654 2 : if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) {
2655 0 : return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
2656 0 : ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true),
2657 0 : ve.NewFiles[i].Meta.DebugString(c.formatKey, true),
2658 0 : )
2659 0 : }
2660 : }
2661 :
2662 2 : return ve, nil
2663 : }
2664 :
2665 : // newCompactionOutput creates an object for a new table produced by a
2666 : // compaction or flush.
2667 : func (d *DB) newCompactionOutput(
2668 : jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
2669 2 : ) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
2670 2 : diskFileNum := d.mu.versions.getNextDiskFileNum()
2671 2 :
2672 2 : var writeCategory vfs.DiskWriteCategory
2673 2 : if d.opts.EnableSQLRowSpillMetrics {
2674 0 : // In the scenario that the Pebble engine is used for SQL row spills the
2675 0 : // data written to the memtable will correspond to spills to disk and
2676 0 : // should be categorized as such.
2677 0 : writeCategory = "sql-row-spill"
2678 2 : } else if c.kind == compactionKindFlush {
2679 2 : writeCategory = "pebble-memtable-flush"
2680 2 : } else {
2681 2 : writeCategory = "pebble-compaction"
2682 2 : }
2683 :
2684 2 : var reason string
2685 2 : if c.kind == compactionKindFlush {
2686 2 : reason = "flushing"
2687 2 : } else {
2688 2 : reason = "compacting"
2689 2 : }
2690 :
2691 2 : ctx := context.TODO()
2692 2 : if objiotracing.Enabled {
2693 0 : ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
2694 0 : if c.kind == compactionKindFlush {
2695 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
2696 0 : } else {
2697 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
2698 0 : }
2699 : }
2700 :
2701 : // Prefer shared storage if present.
2702 2 : createOpts := objstorage.CreateOptions{
2703 2 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
2704 2 : WriteCategory: writeCategory,
2705 2 : }
2706 2 : writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
2707 2 : if err != nil {
2708 1 : return objstorage.ObjectMetadata{}, nil, nil, err
2709 1 : }
2710 :
2711 2 : if c.kind != compactionKindFlush {
2712 2 : writable = &compactionWritable{
2713 2 : Writable: writable,
2714 2 : versions: d.mu.versions,
2715 2 : written: &c.bytesWritten,
2716 2 : }
2717 2 : }
2718 2 : d.opts.EventListener.TableCreated(TableCreateInfo{
2719 2 : JobID: int(jobID),
2720 2 : Reason: reason,
2721 2 : Path: d.objProvider.Path(objMeta),
2722 2 : FileNum: diskFileNum,
2723 2 : })
2724 2 :
2725 2 : writerOpts.SetInternal(sstableinternal.WriterOptions{
2726 2 : CacheOpts: sstableinternal.CacheOptions{
2727 2 : Cache: d.opts.Cache,
2728 2 : CacheID: d.cacheID,
2729 2 : FileNum: diskFileNum,
2730 2 : },
2731 2 : })
2732 2 :
2733 2 : const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
2734 2 : cpuWorkHandle := d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
2735 2 : MaxFileWriteAdditionalCPUTime,
2736 2 : )
2737 2 : writerOpts.Parallelism =
2738 2 : d.opts.Experimental.MaxWriterConcurrency > 0 &&
2739 2 : (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
2740 2 :
2741 2 : // TODO(jackson): Make the compaction body generic over the RawWriter type,
2742 2 : // so that we don't need to pay the cost of dynamic dispatch. For now, we
2743 2 : // type assert into the only concrete RawWriter type we see (until colblk is
2744 2 : // integrated).
2745 2 : tw := sstable.NewRawWriter(writable, writerOpts).(*sstable.RawRowWriter)
2746 2 : return objMeta, tw, cpuWorkHandle, nil
2747 : }
2748 :
2749 : // validateVersionEdit validates that start and end keys across new and deleted
2750 : // files in a versionEdit pass the given validation function.
2751 : func validateVersionEdit(
2752 : ve *versionEdit, validateFn func([]byte) error, format base.FormatKey, logger Logger,
2753 2 : ) {
2754 2 : validateKey := func(f *manifest.FileMetadata, key []byte) {
2755 2 : if err := validateFn(key); err != nil {
2756 1 : logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
2757 1 : }
2758 : }
2759 :
2760 : // Validate both new and deleted files.
2761 2 : for _, f := range ve.NewFiles {
2762 2 : validateKey(f.Meta, f.Meta.Smallest.UserKey)
2763 2 : validateKey(f.Meta, f.Meta.Largest.UserKey)
2764 2 : }
2765 2 : for _, m := range ve.DeletedFiles {
2766 2 : validateKey(m, m.Smallest.UserKey)
2767 2 : validateKey(m, m.Largest.UserKey)
2768 2 : }
2769 : }
|