Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "math"
12 : "runtime/pprof"
13 : "slices"
14 : "sort"
15 : "sync/atomic"
16 : "time"
17 : "unsafe"
18 :
19 : "github.com/cockroachdb/crlib/crtime"
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/compact"
23 : "github.com/cockroachdb/pebble/internal/keyspan"
24 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
25 : "github.com/cockroachdb/pebble/internal/manifest"
26 : "github.com/cockroachdb/pebble/internal/sstableinternal"
27 : "github.com/cockroachdb/pebble/objstorage"
28 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
29 : "github.com/cockroachdb/pebble/objstorage/remote"
30 : "github.com/cockroachdb/pebble/sstable"
31 : "github.com/cockroachdb/pebble/sstable/block"
32 : "github.com/cockroachdb/pebble/vfs"
33 : )
34 :
35 : var errEmptyTable = errors.New("pebble: empty table")
36 :
37 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
38 : // concurrent excise or ingest-split operation.
39 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
40 :
41 : var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
42 : var gcLabels = pprof.Labels("pebble", "gc")
43 :
44 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
45 : // compacted files. We avoid expanding the lower level file set of a compaction
46 : // if it would make the total compaction cover more than this many bytes.
47 1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
48 1 : v := uint64(25 * opts.Level(level).TargetFileSize)
49 1 :
50 1 : // Never expand a compaction beyond half the available capacity, divided
51 1 : // by the maximum number of concurrent compactions. Each of the concurrent
52 1 : // compactions may expand up to this limit, so this attempts to limit
53 1 : // compactions to half of available disk space. Note that this will not
54 1 : // prevent compaction picking from pursuing compactions that are larger
55 1 : // than this threshold before expansion.
56 1 : diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
57 1 : if v > diskMax {
58 0 : v = diskMax
59 0 : }
60 1 : return v
61 : }
62 :
63 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
64 : // before we stop building a single file in a level-1 to level compaction.
65 1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
66 1 : return uint64(10 * opts.Level(level).TargetFileSize)
67 1 : }
68 :
69 : // maxReadCompactionBytes is used to prevent read compactions which
70 : // are too wide.
71 1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
72 1 : return uint64(10 * opts.Level(level).TargetFileSize)
73 1 : }
74 :
75 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
76 : // calls to Close. It is used during compaction to ensure that rangeDelIters
77 : // are not closed prematurely.
78 : type noCloseIter struct {
79 : keyspan.FragmentIterator
80 : }
81 :
82 1 : func (i *noCloseIter) Close() {}
83 :
84 : type compactionLevel struct {
85 : level int
86 : files manifest.LevelSlice
87 : // l0SublevelInfo contains information about L0 sublevels being compacted.
88 : // It's only set for the start level of a compaction starting out of L0 and
89 : // is nil for all other compactions.
90 : l0SublevelInfo []sublevelInfo
91 : }
92 :
93 1 : func (cl compactionLevel) Clone() compactionLevel {
94 1 : newCL := compactionLevel{
95 1 : level: cl.level,
96 1 : files: cl.files,
97 1 : }
98 1 : return newCL
99 1 : }
100 0 : func (cl compactionLevel) String() string {
101 0 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
102 0 : }
103 :
104 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
105 : // updates a metric in `versions` on bytes written by in-progress compactions so
106 : // far. It also increments a per-compaction `written` int.
107 : type compactionWritable struct {
108 : objstorage.Writable
109 :
110 : versions *versionSet
111 : written *int64
112 : }
113 :
114 : // Write is part of the objstorage.Writable interface.
115 1 : func (c *compactionWritable) Write(p []byte) error {
116 1 : if err := c.Writable.Write(p); err != nil {
117 0 : return err
118 0 : }
119 :
120 1 : *c.written += int64(len(p))
121 1 : c.versions.incrementCompactionBytes(int64(len(p)))
122 1 : return nil
123 : }
124 :
125 : type compactionKind int
126 :
127 : const (
128 : compactionKindDefault compactionKind = iota
129 : compactionKindFlush
130 : // compactionKindMove denotes a move compaction where the input file is
131 : // retained and linked in a new level without being obsoleted.
132 : compactionKindMove
133 : // compactionKindCopy denotes a copy compaction where the input file is
134 : // copied byte-by-byte into a new file with a new FileNum in the output level.
135 : compactionKindCopy
136 : // compactionKindDeleteOnly denotes a compaction that only deletes input
137 : // files. It can occur when wide range tombstones completely contain sstables.
138 : compactionKindDeleteOnly
139 : compactionKindElisionOnly
140 : compactionKindRead
141 : compactionKindTombstoneDensity
142 : compactionKindRewrite
143 : compactionKindIngestedFlushable
144 : )
145 :
146 1 : func (k compactionKind) String() string {
147 1 : switch k {
148 1 : case compactionKindDefault:
149 1 : return "default"
150 0 : case compactionKindFlush:
151 0 : return "flush"
152 1 : case compactionKindMove:
153 1 : return "move"
154 1 : case compactionKindDeleteOnly:
155 1 : return "delete-only"
156 1 : case compactionKindElisionOnly:
157 1 : return "elision-only"
158 0 : case compactionKindRead:
159 0 : return "read"
160 1 : case compactionKindTombstoneDensity:
161 1 : return "tombstone-density"
162 1 : case compactionKindRewrite:
163 1 : return "rewrite"
164 0 : case compactionKindIngestedFlushable:
165 0 : return "ingested-flushable"
166 1 : case compactionKindCopy:
167 1 : return "copy"
168 : }
169 0 : return "?"
170 : }
171 :
172 : // compaction is a table compaction from one level to the next, starting from a
173 : // given version.
174 : type compaction struct {
175 : // cancel is a bool that can be used by other goroutines to signal a compaction
176 : // to cancel, such as if a conflicting excise operation raced it to manifest
177 : // application. Only holders of the manifest lock will write to this atomic.
178 : cancel atomic.Bool
179 :
180 : kind compactionKind
181 : // isDownload is true if this compaction was started as part of a Download
182 : // operation. In this case kind is compactionKindCopy or
183 : // compactionKindRewrite.
184 : isDownload bool
185 :
186 : cmp Compare
187 : equal Equal
188 : comparer *base.Comparer
189 : formatKey base.FormatKey
190 : logger Logger
191 : version *version
192 : stats base.InternalIteratorStats
193 : beganAt time.Time
194 : // versionEditApplied is set to true when a compaction has completed and the
195 : // resulting version has been installed (if successful), but the compaction
196 : // goroutine is still cleaning up (eg, deleting obsolete files).
197 : versionEditApplied bool
198 : bufferPool sstable.BufferPool
199 :
200 : // startLevel is the level that is being compacted. Inputs from startLevel
201 : // and outputLevel will be merged to produce a set of outputLevel files.
202 : startLevel *compactionLevel
203 :
204 : // outputLevel is the level that files are being produced in. outputLevel is
205 : // equal to startLevel+1 except when:
206 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
207 : // - in multilevel compaction, the output level is the lowest level involved in
208 : // the compaction
209 : // A compaction's outputLevel is nil for delete-only compactions.
210 : outputLevel *compactionLevel
211 :
212 : // extraLevels point to additional levels in between the input and output
213 : // levels that get compacted in multilevel compactions
214 : extraLevels []*compactionLevel
215 :
216 : inputs []compactionLevel
217 :
218 : // maxOutputFileSize is the maximum size of an individual table created
219 : // during compaction.
220 : maxOutputFileSize uint64
221 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
222 : // single output table with the tables in the grandparent level.
223 : maxOverlapBytes uint64
224 :
225 : // flushing contains the flushables (aka memtables) that are being flushed.
226 : flushing flushableList
227 : // bytesWritten contains the number of bytes that have been written to outputs.
228 : bytesWritten int64
229 :
230 : // The boundaries of the input data.
231 : smallest InternalKey
232 : largest InternalKey
233 :
234 : // A list of fragment iterators to close when the compaction finishes. Used by
235 : // input iteration to keep rangeDelIters open for the lifetime of the
236 : // compaction, and only close them when the compaction finishes.
237 : closers []*noCloseIter
238 :
239 : // grandparents are the tables in level+2 that overlap with the files being
240 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
241 : // in the grandparent when this compaction finishes will be the same.
242 : grandparents manifest.LevelSlice
243 :
244 : // Boundaries at which flushes to L0 should be split. Determined by
245 : // L0Sublevels. If nil, flushes aren't split.
246 : l0Limits [][]byte
247 :
248 : delElision compact.TombstoneElision
249 : rangeKeyElision compact.TombstoneElision
250 :
251 : // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
252 : // snapshots requiring them to be kept. This determination is made by
253 : // looking for an sstable which overlaps the bounds of the compaction at a
254 : // lower level in the LSM during runCompaction.
255 : allowedZeroSeqNum bool
256 :
257 : // deletionHints are set if this is a compactionKindDeleteOnly. Used to figure
258 : // out whether an input must be deleted in its entirety, or excised into
259 : // virtual sstables.
260 : deletionHints []deleteCompactionHint
261 :
262 : // exciseEnabled is set to true if this is a compactionKindDeleteOnly and
263 : // this compaction is allowed to excise files.
264 : exciseEnabled bool
265 :
266 : metrics map[int]*LevelMetrics
267 :
268 : pickerMetrics compactionPickerMetrics
269 :
270 : slot base.CompactionSlot
271 : }
272 :
273 : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
274 : // input sstables.
275 1 : func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
276 1 : var seqNum base.SeqNum
277 1 : for _, cl := range c.inputs {
278 1 : cl.files.Each(func(m *manifest.FileMetadata) {
279 1 : seqNum = max(seqNum, m.LargestSeqNumAbsolute)
280 1 : })
281 : }
282 1 : return seqNum
283 : }
284 :
285 1 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
286 1 : info := CompactionInfo{
287 1 : JobID: int(jobID),
288 1 : Reason: c.kind.String(),
289 1 : Input: make([]LevelInfo, 0, len(c.inputs)),
290 1 : Annotations: []string{},
291 1 : }
292 1 : if c.isDownload {
293 1 : info.Reason = "download," + info.Reason
294 1 : }
295 1 : for _, cl := range c.inputs {
296 1 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
297 1 : iter := cl.files.Iter()
298 1 : for m := iter.First(); m != nil; m = iter.Next() {
299 1 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
300 1 : }
301 1 : info.Input = append(info.Input, inputInfo)
302 : }
303 1 : if c.outputLevel != nil {
304 1 : info.Output.Level = c.outputLevel.level
305 1 :
306 1 : // If there are no inputs from the output level (eg, a move
307 1 : // compaction), add an empty LevelInfo to info.Input.
308 1 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
309 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
310 0 : }
311 1 : } else {
312 1 : // For a delete-only compaction, set the output level to L6. The
313 1 : // output level is not meaningful here, but complicating the
314 1 : // info.Output interface with a pointer doesn't seem worth the
315 1 : // semantic distinction.
316 1 : info.Output.Level = numLevels - 1
317 1 : }
318 :
319 1 : for i, score := range c.pickerMetrics.scores {
320 1 : info.Input[i].Score = score
321 1 : }
322 1 : info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
323 1 : info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
324 1 : if len(info.Input) > 2 {
325 1 : info.Annotations = append(info.Annotations, "multilevel")
326 1 : }
327 1 : return info
328 : }
329 :
330 1 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
331 1 : return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
332 1 : }
333 :
334 : func newCompaction(
335 : pc *pickedCompaction,
336 : opts *Options,
337 : beganAt time.Time,
338 : provider objstorage.Provider,
339 : slot base.CompactionSlot,
340 1 : ) *compaction {
341 1 : c := &compaction{
342 1 : kind: compactionKindDefault,
343 1 : cmp: pc.cmp,
344 1 : equal: opts.Comparer.Equal,
345 1 : comparer: opts.Comparer,
346 1 : formatKey: opts.Comparer.FormatKey,
347 1 : inputs: pc.inputs,
348 1 : smallest: pc.smallest,
349 1 : largest: pc.largest,
350 1 : logger: opts.Logger,
351 1 : version: pc.version,
352 1 : beganAt: beganAt,
353 1 : maxOutputFileSize: pc.maxOutputFileSize,
354 1 : maxOverlapBytes: pc.maxOverlapBytes,
355 1 : pickerMetrics: pc.pickerMetrics,
356 1 : slot: slot,
357 1 : }
358 1 : c.startLevel = &c.inputs[0]
359 1 : if pc.startLevel.l0SublevelInfo != nil {
360 1 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
361 1 : }
362 1 : c.outputLevel = &c.inputs[1]
363 1 : if c.slot == nil {
364 1 : c.slot = opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO())
365 1 : c.slot.CompactionSelected(c.startLevel.level, c.outputLevel.level, c.startLevel.files.SizeSum())
366 1 : }
367 :
368 1 : if len(pc.extraLevels) > 0 {
369 1 : c.extraLevels = pc.extraLevels
370 1 : c.outputLevel = &c.inputs[len(c.inputs)-1]
371 1 : }
372 : // Compute the set of outputLevel+1 files that overlap this compaction (these
373 : // are the grandparent sstables).
374 1 : if c.outputLevel.level+1 < numLevels {
375 1 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
376 1 : }
377 1 : c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
378 1 : c.cmp, c.version, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
379 1 : )
380 1 : c.kind = pc.kind
381 1 :
382 1 : if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
383 1 : c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
384 1 : // This compaction can be converted into a move or copy from one level
385 1 : // to the next. We avoid such a move if there is lots of overlapping
386 1 : // grandparent data. Otherwise, the move could create a parent file
387 1 : // that will require a very expensive merge later on.
388 1 : iter := c.startLevel.files.Iter()
389 1 : meta := iter.First()
390 1 : isRemote := false
391 1 : // We should always be passed a provider, except in some unit tests.
392 1 : if provider != nil {
393 1 : isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
394 1 : }
395 : // Avoid a trivial move or copy if all of these are true, as rewriting a
396 : // new file is better:
397 : //
398 : // 1) The source file is a virtual sstable
399 : // 2) The existing file `meta` is on non-remote storage
400 : // 3) The output level prefers shared storage
401 1 : mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
402 1 : if mustCopy {
403 1 : // If the source is virtual, it's best to just rewrite the file as all
404 1 : // conditions in the above comment are met.
405 1 : if !meta.Virtual {
406 1 : c.kind = compactionKindCopy
407 1 : }
408 1 : } else {
409 1 : c.kind = compactionKindMove
410 1 : }
411 : }
412 1 : return c
413 : }
414 :
415 : func newDeleteOnlyCompaction(
416 : opts *Options,
417 : cur *version,
418 : inputs []compactionLevel,
419 : beganAt time.Time,
420 : hints []deleteCompactionHint,
421 : exciseEnabled bool,
422 1 : ) *compaction {
423 1 : c := &compaction{
424 1 : kind: compactionKindDeleteOnly,
425 1 : cmp: opts.Comparer.Compare,
426 1 : equal: opts.Comparer.Equal,
427 1 : comparer: opts.Comparer,
428 1 : formatKey: opts.Comparer.FormatKey,
429 1 : logger: opts.Logger,
430 1 : version: cur,
431 1 : beganAt: beganAt,
432 1 : inputs: inputs,
433 1 : deletionHints: hints,
434 1 : exciseEnabled: exciseEnabled,
435 1 : }
436 1 :
437 1 : // Set c.smallest, c.largest.
438 1 : files := make([]manifest.LevelIterator, 0, len(inputs))
439 1 : for _, in := range inputs {
440 1 : files = append(files, in.files.Iter())
441 1 : }
442 1 : c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
443 1 : return c
444 : }
445 :
446 1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
447 1 : // Heuristic to place a lower bound on compaction output file size
448 1 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
449 1 : // production with 310K files of which 290K files were < 10KB in size.
450 1 : // Our hypothesis is that it was caused by L1 having 2600 files and
451 1 : // ~10GB, such that each flush got split into many tiny files due to
452 1 : // overlapping with most of the files in Lbase.
453 1 : //
454 1 : // The computation below is general in that it accounts
455 1 : // for flushing different volumes of data (e.g. we may be flushing
456 1 : // many memtables). For illustration, we consider the typical
457 1 : // example of flushing a 64MB memtable. So 12.8MB output,
458 1 : // based on the compression guess below. If the compressed bytes
459 1 : // guess is an over-estimate we will end up with smaller files,
460 1 : // and if an under-estimate we will end up with larger files.
461 1 : // With a 2MB target file size, 7 files. We are willing to accept
462 1 : // 4x the number of files, if it results in better write amplification
463 1 : // when later compacting to Lbase, i.e., ~450KB files (target file
464 1 : // size / 4).
465 1 : //
466 1 : // Note that this is a pessimistic heuristic in that
467 1 : // fileCountUpperBoundDueToGrandparents could be far from the actual
468 1 : // number of files produced due to the grandparent limits. For
469 1 : // example, in the extreme, consider a flush that overlaps with 1000
470 1 : // files in Lbase f0...f999, and the initially calculated value of
471 1 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
472 1 : // means an upper bound file count of 100 files. Say the input bytes
473 1 : // in the flush are such that acceptableFileCount=10. We will fatten
474 1 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
475 1 : // drops to 10. However, it is possible that in practice, even without
476 1 : // this change, we would have produced no more than 10 files, and that
477 1 : // this change makes the files unnecessarily wide. Say the input bytes
478 1 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
479 1 : // 10% in f80...f89 and 10% in f990...f999. The original value of
480 1 : // maxOverlapBytes would have actually produced only 10 sstables. But
481 1 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
482 1 : // spans f0...f89, i.e., a much wider sstable than necessary.
483 1 : //
484 1 : // We could produce a tighter estimate of
485 1 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
486 1 : // distribution of the flush. The 4x multiplier mentioned earlier is
487 1 : // a way to try to compensate for this pessimism.
488 1 : //
489 1 : // TODO(sumeer): we don't have compression info for the data being
490 1 : // flushed, but it is likely that existing files that overlap with
491 1 : // this flush in Lbase are representative wrt compression ratio. We
492 1 : // could store the uncompressed size in FileMetadata and estimate
493 1 : // the compression ratio.
494 1 : const approxCompressionRatio = 0.2
495 1 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
496 1 : approxNumFilesBasedOnTargetSize :=
497 1 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
498 1 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
499 1 : // The byte calculation is linear in numGrandparentFiles, but we will
500 1 : // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
501 1 : // also willing to pay it now. We could approximate this cheaply by using the
502 1 : // mean file size of Lbase.
503 1 : grandparentFileBytes := c.grandparents.SizeSum()
504 1 : fileCountUpperBoundDueToGrandparents :=
505 1 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
506 1 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
507 1 : c.maxOverlapBytes = uint64(
508 1 : float64(c.maxOverlapBytes) *
509 1 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
510 1 : }
511 : }
512 :
513 : func newFlush(
514 : opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
515 1 : ) (*compaction, error) {
516 1 : c := &compaction{
517 1 : kind: compactionKindFlush,
518 1 : cmp: opts.Comparer.Compare,
519 1 : equal: opts.Comparer.Equal,
520 1 : comparer: opts.Comparer,
521 1 : formatKey: opts.Comparer.FormatKey,
522 1 : logger: opts.Logger,
523 1 : version: cur,
524 1 : beganAt: beganAt,
525 1 : inputs: []compactionLevel{{level: -1}, {level: 0}},
526 1 : maxOutputFileSize: math.MaxUint64,
527 1 : maxOverlapBytes: math.MaxUint64,
528 1 : flushing: flushing,
529 1 : }
530 1 : c.startLevel = &c.inputs[0]
531 1 : c.outputLevel = &c.inputs[1]
532 1 :
533 1 : // Flush slots are always taken without permission.
534 1 : //
535 1 : // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
536 1 : // and passed-in as an option during Open.
537 1 : slot := opts.Experimental.CompactionLimiter.TookWithoutPermission(context.TODO())
538 1 : var flushingSize uint64
539 1 : for i := range flushing {
540 1 : flushingSize += flushing[i].totalBytes()
541 1 : }
542 1 : slot.CompactionSelected(-1, 0, flushingSize)
543 1 : c.slot = slot
544 1 :
545 1 : if len(flushing) > 0 {
546 1 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
547 1 : if len(flushing) != 1 {
548 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
549 : }
550 1 : c.kind = compactionKindIngestedFlushable
551 1 : return c, nil
552 : }
553 : }
554 :
555 : // Make sure there's no ingestedFlushable after the first flushable in the
556 : // list.
557 1 : for _, f := range flushing {
558 1 : if _, ok := f.flushable.(*ingestedFlushable); ok {
559 0 : panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
560 : }
561 : }
562 :
563 1 : if cur.L0Sublevels != nil {
564 1 : c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
565 1 : }
566 :
567 1 : smallestSet, largestSet := false, false
568 1 : updatePointBounds := func(iter internalIterator) {
569 1 : if kv := iter.First(); kv != nil {
570 1 : if !smallestSet ||
571 1 : base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
572 1 : smallestSet = true
573 1 : c.smallest = kv.K.Clone()
574 1 : }
575 : }
576 1 : if kv := iter.Last(); kv != nil {
577 1 : if !largestSet ||
578 1 : base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
579 1 : largestSet = true
580 1 : c.largest = kv.K.Clone()
581 1 : }
582 : }
583 : }
584 :
585 1 : updateRangeBounds := func(iter keyspan.FragmentIterator) error {
586 1 : // File bounds require s != nil && !s.Empty(). We only need to check for
587 1 : // s != nil here, as the memtable's FragmentIterator would never surface
588 1 : // empty spans.
589 1 : if s, err := iter.First(); err != nil {
590 0 : return err
591 1 : } else if s != nil {
592 1 : if key := s.SmallestKey(); !smallestSet ||
593 1 : base.InternalCompare(c.cmp, c.smallest, key) > 0 {
594 1 : smallestSet = true
595 1 : c.smallest = key.Clone()
596 1 : }
597 : }
598 1 : if s, err := iter.Last(); err != nil {
599 0 : return err
600 1 : } else if s != nil {
601 1 : if key := s.LargestKey(); !largestSet ||
602 1 : base.InternalCompare(c.cmp, c.largest, key) < 0 {
603 1 : largestSet = true
604 1 : c.largest = key.Clone()
605 1 : }
606 : }
607 1 : return nil
608 : }
609 :
610 1 : var flushingBytes uint64
611 1 : for i := range flushing {
612 1 : f := flushing[i]
613 1 : updatePointBounds(f.newIter(nil))
614 1 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
615 1 : if err := updateRangeBounds(rangeDelIter); err != nil {
616 0 : c.slot.Release(0)
617 0 : c.slot = nil
618 0 : return nil, err
619 0 : }
620 : }
621 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
622 1 : if err := updateRangeBounds(rangeKeyIter); err != nil {
623 0 : c.slot.Release(0)
624 0 : c.slot = nil
625 0 : return nil, err
626 0 : }
627 : }
628 1 : flushingBytes += f.inuseBytes()
629 : }
630 :
631 1 : if opts.FlushSplitBytes > 0 {
632 1 : c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
633 1 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
634 1 : c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
635 1 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
636 1 : }
637 :
638 : // We don't elide tombstones for flushes.
639 1 : c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
640 1 : return c, nil
641 : }
642 :
643 1 : func (c *compaction) hasExtraLevelData() bool {
644 1 : if len(c.extraLevels) == 0 {
645 1 : // not a multi level compaction
646 1 : return false
647 1 : } else if c.extraLevels[0].files.Empty() {
648 1 : // a multi level compaction without data in the intermediate input level;
649 1 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
650 1 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
651 1 : return false
652 1 : }
653 1 : return true
654 : }
655 :
656 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
657 : // this compaction have revisions of the same user key present in both sstables,
658 : // when it shouldn't (eg. when splitting flushes).
659 0 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
660 0 : if n := len(ve.NewFiles); n > 1 {
661 0 : meta := ve.NewFiles[n-1].Meta
662 0 : prevMeta := ve.NewFiles[n-2].Meta
663 0 : if !prevMeta.Largest.IsExclusiveSentinel() &&
664 0 : c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
665 0 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
666 0 : prevMeta.Largest.Pretty(c.formatKey),
667 0 : prevMeta.FileNum,
668 0 : meta.FileNum)
669 0 : }
670 : }
671 0 : return nil
672 : }
673 :
674 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
675 : // snapshots requiring them to be kept. It performs this determination by
676 : // looking at the TombstoneElision values which are set up based on sstables
677 : // which overlap the bounds of the compaction at a lower level in the LSM.
678 1 : func (c *compaction) allowZeroSeqNum() bool {
679 1 : // TODO(peter): we disable zeroing of seqnums during flushing to match
680 1 : // RocksDB behavior and to avoid generating overlapping sstables during
681 1 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
682 1 : // WAL is replayed building up a single version edit that is
683 1 : // applied. Because we don't apply the version edit after each flush, this
684 1 : // code doesn't know that L0 contains files and zeroing of seqnums should
685 1 : // be disabled. That is fixable, but it seems safer to just match the
686 1 : // RocksDB behavior for now.
687 1 : return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
688 1 : }
689 :
690 : // newInputIters returns an iterator over all the input tables in a compaction.
691 : func (c *compaction) newInputIters(
692 : newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, iiopts internalIterOpts,
693 : ) (
694 : pointIter internalIterator,
695 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
696 : retErr error,
697 1 : ) {
698 1 : // Validate the ordering of compaction input files for defense in depth.
699 1 : if len(c.flushing) == 0 {
700 1 : if c.startLevel.level >= 0 {
701 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
702 1 : manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
703 1 : if err != nil {
704 0 : return nil, nil, nil, err
705 0 : }
706 : }
707 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
708 1 : manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
709 1 : if err != nil {
710 0 : return nil, nil, nil, err
711 0 : }
712 1 : if c.startLevel.level == 0 {
713 1 : if c.startLevel.l0SublevelInfo == nil {
714 0 : panic("l0SublevelInfo not created for compaction out of L0")
715 : }
716 1 : for _, info := range c.startLevel.l0SublevelInfo {
717 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
718 1 : info.sublevel, info.Iter())
719 1 : if err != nil {
720 0 : return nil, nil, nil, err
721 0 : }
722 : }
723 : }
724 1 : if len(c.extraLevels) > 0 {
725 1 : if len(c.extraLevels) > 1 {
726 0 : panic("n>2 multi level compaction not implemented yet")
727 : }
728 1 : interLevel := c.extraLevels[0]
729 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
730 1 : manifest.Level(interLevel.level), interLevel.files.Iter())
731 1 : if err != nil {
732 0 : return nil, nil, nil, err
733 0 : }
734 : }
735 : }
736 :
737 : // There are three classes of keys that a compaction needs to process: point
738 : // keys, range deletion tombstones and range keys. Collect all iterators for
739 : // all these classes of keys from all the levels. We'll aggregate them
740 : // together farther below.
741 : //
742 : // numInputLevels is an approximation of the number of iterator levels. Due
743 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
744 : // initial capacity.
745 1 : numInputLevels := max(len(c.flushing), len(c.inputs))
746 1 : iters := make([]internalIterator, 0, numInputLevels)
747 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
748 1 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
749 1 :
750 1 : // If construction of the iterator inputs fails, ensure that we close all
751 1 : // the consitutent iterators.
752 1 : defer func() {
753 1 : if retErr != nil {
754 0 : for _, iter := range iters {
755 0 : if iter != nil {
756 0 : iter.Close()
757 0 : }
758 : }
759 0 : for _, rangeDelIter := range rangeDelIters {
760 0 : rangeDelIter.Close()
761 0 : }
762 : }
763 : }()
764 1 : iterOpts := IterOptions{
765 1 : Category: categoryCompaction,
766 1 : logger: c.logger,
767 1 : }
768 1 :
769 1 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
770 1 : // constituent iterators. This depends on whether this is a flush or a
771 1 : // compaction.
772 1 : if len(c.flushing) != 0 {
773 1 : // If flushing, we need to build the input iterators over the memtables
774 1 : // stored in c.flushing.
775 1 : for i := range c.flushing {
776 1 : f := c.flushing[i]
777 1 : iters = append(iters, f.newFlushIter(nil))
778 1 : rangeDelIter := f.newRangeDelIter(nil)
779 1 : if rangeDelIter != nil {
780 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
781 1 : }
782 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
783 1 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
784 1 : }
785 : }
786 1 : } else {
787 1 : addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
788 1 : // Add a *levelIter for point iterators. Because we don't call
789 1 : // initRangeDel, the levelIter will close and forget the range
790 1 : // deletion iterator when it steps on to a new file. Surfacing range
791 1 : // deletions to compactions are handled below.
792 1 : iters = append(iters, newLevelIter(context.Background(),
793 1 : iterOpts, c.comparer, newIters, level.files.Iter(), l, iiopts))
794 1 : // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
795 1 : // deletions into memory upfront. (See #2015, which reverted this.) There
796 1 : // will be no user keys that are split between sstables within a level in
797 1 : // Cockroach 23.1, which unblocks this optimization.
798 1 :
799 1 : // Add the range deletion iterator for each file as an independent level
800 1 : // in mergingIter, as opposed to making a levelIter out of those. This
801 1 : // is safer as levelIter expects all keys coming from underlying
802 1 : // iterators to be in order. Due to compaction / tombstone writing
803 1 : // logic in finishOutput(), it is possible for range tombstones to not
804 1 : // be strictly ordered across all files in one level.
805 1 : //
806 1 : // Consider this example from the metamorphic tests (also repeated in
807 1 : // finishOutput()), consisting of three L3 files with their bounds
808 1 : // specified in square brackets next to the file name:
809 1 : //
810 1 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
811 1 : // tmgc#391,MERGE [786e627a]
812 1 : // tmgc-udkatvs#331,RANGEDEL
813 1 : //
814 1 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
815 1 : // tmgc#384,MERGE [666c7070]
816 1 : // tmgc-tvsalezade#383,RANGEDEL
817 1 : // tmgc-tvsalezade#331,RANGEDEL
818 1 : //
819 1 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
820 1 : // tmgc-tvsalezade#383,RANGEDEL
821 1 : // tmgc#375,SET [72646c78766965616c72776865676e79]
822 1 : // tmgc-tvsalezade#356,RANGEDEL
823 1 : //
824 1 : // Here, the range tombstone in 000240.sst falls "after" one in
825 1 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
826 1 : // levelIter's purposes. While each file is still consistent before its
827 1 : // bounds, it's safer to have all rangedel iterators be visible to
828 1 : // mergingIter.
829 1 : iter := level.files.Iter()
830 1 : for f := iter.First(); f != nil; f = iter.Next() {
831 1 : rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, iiopts, l)
832 1 : if err != nil {
833 0 : // The error will already be annotated with the BackingFileNum, so
834 0 : // we annotate it with the FileNum.
835 0 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
836 0 : }
837 1 : if rangeDelIter == nil {
838 1 : continue
839 : }
840 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
841 1 : c.closers = append(c.closers, rangeDelIter)
842 : }
843 :
844 : // Check if this level has any range keys.
845 1 : hasRangeKeys := false
846 1 : for f := iter.First(); f != nil; f = iter.Next() {
847 1 : if f.HasRangeKeys {
848 1 : hasRangeKeys = true
849 1 : break
850 : }
851 : }
852 1 : if hasRangeKeys {
853 1 : newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
854 1 : rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
855 1 : if err != nil {
856 0 : return nil, err
857 1 : } else if rangeKeyIter == nil {
858 0 : return emptyKeyspanIter, nil
859 0 : }
860 : // Ensure that the range key iter is not closed until the compaction is
861 : // finished. This is necessary because range key processing
862 : // requires the range keys to be held in memory for up to the
863 : // lifetime of the compaction.
864 1 : noCloseIter := &noCloseIter{rangeKeyIter}
865 1 : c.closers = append(c.closers, noCloseIter)
866 1 :
867 1 : // We do not need to truncate range keys to sstable boundaries, or
868 1 : // only read within the file's atomic compaction units, unlike with
869 1 : // range tombstones. This is because range keys were added after we
870 1 : // stopped splitting user keys across sstables, so all the range keys
871 1 : // in this sstable must wholly lie within the file's bounds.
872 1 : return noCloseIter, err
873 : }
874 1 : li := keyspanimpl.NewLevelIter(
875 1 : context.Background(), keyspan.SpanIterOptions{}, c.cmp,
876 1 : newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
877 1 : )
878 1 : rangeKeyIters = append(rangeKeyIters, li)
879 : }
880 1 : return nil
881 : }
882 :
883 1 : for i := range c.inputs {
884 1 : // If the level is annotated with l0SublevelInfo, expand it into one
885 1 : // level per sublevel.
886 1 : // TODO(jackson): Perform this expansion even earlier when we pick the
887 1 : // compaction?
888 1 : if len(c.inputs[i].l0SublevelInfo) > 0 {
889 1 : for _, info := range c.startLevel.l0SublevelInfo {
890 1 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
891 1 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
892 0 : return nil, nil, nil, err
893 0 : }
894 : }
895 1 : continue
896 : }
897 1 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
898 0 : return nil, nil, nil, err
899 0 : }
900 : }
901 : }
902 :
903 : // If there's only one constituent point iterator, we can avoid the overhead
904 : // of a *mergingIter. This is possible, for example, when performing a flush
905 : // of a single memtable. Otherwise, combine all the iterators into a merging
906 : // iter.
907 1 : pointIter = iters[0]
908 1 : if len(iters) > 1 {
909 1 : pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
910 1 : }
911 :
912 : // In normal operation, levelIter iterates over the point operations in a
913 : // level, and initializes a rangeDelIter pointer for the range deletions in
914 : // each table. During compaction, we want to iterate over the merged view of
915 : // point operations and range deletions. In order to do this we create one
916 : // levelIter per level to iterate over the point operations, and collect up
917 : // all the range deletion files.
918 : //
919 : // The range deletion levels are combined with a keyspanimpl.MergingIter. The
920 : // resulting merged rangedel iterator is then included using an
921 : // InterleavingIter.
922 : // TODO(jackson): Consider using a defragmenting iterator to stitch together
923 : // logical range deletions that were fragmented due to previous file
924 : // boundaries.
925 1 : if len(rangeDelIters) > 0 {
926 1 : mi := &keyspanimpl.MergingIter{}
927 1 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
928 1 : rangeDelIter = mi
929 1 : }
930 :
931 : // If there are range key iterators, we need to combine them using
932 : // keyspanimpl.MergingIter, and then interleave them among the points.
933 1 : if len(rangeKeyIters) > 0 {
934 1 : mi := &keyspanimpl.MergingIter{}
935 1 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
936 1 : // TODO(radu): why do we have a defragmenter here but not above?
937 1 : di := &keyspan.DefragmentingIter{}
938 1 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
939 1 : rangeKeyIter = di
940 1 : }
941 1 : return pointIter, rangeDelIter, rangeKeyIter, nil
942 : }
943 :
944 : func (c *compaction) newRangeDelIter(
945 : newIters tableNewIters,
946 : f manifest.LevelFile,
947 : opts IterOptions,
948 : iiopts internalIterOpts,
949 : l manifest.Layer,
950 1 : ) (*noCloseIter, error) {
951 1 : opts.layer = l
952 1 : iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
953 1 : iiopts, iterRangeDeletions)
954 1 : if err != nil {
955 0 : return nil, err
956 1 : } else if iterSet.rangeDeletion == nil {
957 1 : // The file doesn't contain any range deletions.
958 1 : return nil, nil
959 1 : }
960 : // Ensure that rangeDelIter is not closed until the compaction is
961 : // finished. This is necessary because range tombstone processing
962 : // requires the range tombstones to be held in memory for up to the
963 : // lifetime of the compaction.
964 1 : return &noCloseIter{iterSet.rangeDeletion}, nil
965 : }
966 :
967 0 : func (c *compaction) String() string {
968 0 : if len(c.flushing) != 0 {
969 0 : return "flush\n"
970 0 : }
971 :
972 0 : var buf bytes.Buffer
973 0 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
974 0 : i := level - c.startLevel.level
975 0 : fmt.Fprintf(&buf, "%d:", level)
976 0 : iter := c.inputs[i].files.Iter()
977 0 : for f := iter.First(); f != nil; f = iter.Next() {
978 0 : fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
979 0 : }
980 0 : fmt.Fprintf(&buf, "\n")
981 : }
982 0 : return buf.String()
983 : }
984 :
985 : type manualCompaction struct {
986 : // Count of the retries either due to too many concurrent compactions, or a
987 : // concurrent compaction to overlapping levels.
988 : retries int
989 : level int
990 : outputLevel int
991 : done chan error
992 : start []byte
993 : end []byte
994 : split bool
995 : }
996 :
997 : type readCompaction struct {
998 : level int
999 : // [start, end] key ranges are used for de-duping.
1000 : start []byte
1001 : end []byte
1002 :
1003 : // The file associated with the compaction.
1004 : // If the file no longer belongs in the same
1005 : // level, then we skip the compaction.
1006 : fileNum base.FileNum
1007 : }
1008 :
1009 1 : func (d *DB) addInProgressCompaction(c *compaction) {
1010 1 : d.mu.compact.inProgress[c] = struct{}{}
1011 1 : var isBase, isIntraL0 bool
1012 1 : for _, cl := range c.inputs {
1013 1 : iter := cl.files.Iter()
1014 1 : for f := iter.First(); f != nil; f = iter.Next() {
1015 1 : if f.IsCompacting() {
1016 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1017 0 : }
1018 1 : f.SetCompactionState(manifest.CompactionStateCompacting)
1019 1 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
1020 1 : if c.outputLevel.level == 0 {
1021 1 : f.IsIntraL0Compacting = true
1022 1 : isIntraL0 = true
1023 1 : } else {
1024 1 : isBase = true
1025 1 : }
1026 : }
1027 : }
1028 : }
1029 :
1030 1 : if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
1031 1 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
1032 1 : if isIntraL0 {
1033 1 : l0Inputs = append(l0Inputs, c.outputLevel.files)
1034 1 : }
1035 1 : if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
1036 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
1037 0 : }
1038 : }
1039 : }
1040 :
1041 : // Removes compaction markers from files in a compaction. The rollback parameter
1042 : // indicates whether the compaction state should be rolled back to its original
1043 : // state in the case of an unsuccessful compaction.
1044 : //
1045 : // DB.mu must be held when calling this method, however this method can drop and
1046 : // re-acquire that mutex. All writes to the manifest for this compaction should
1047 : // have completed by this point.
1048 1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
1049 1 : c.versionEditApplied = true
1050 1 : if c.slot != nil {
1051 0 : panic("pebble: compaction slot should have been released before clearing compacting state")
1052 : }
1053 1 : for _, cl := range c.inputs {
1054 1 : iter := cl.files.Iter()
1055 1 : for f := iter.First(); f != nil; f = iter.Next() {
1056 1 : if !f.IsCompacting() {
1057 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1058 0 : }
1059 1 : if !rollback {
1060 1 : // On success all compactions other than move and delete-only compactions
1061 1 : // transition the file into the Compacted state. Move-compacted files
1062 1 : // become eligible for compaction again and transition back to NotCompacting.
1063 1 : // Delete-only compactions could, on rare occasion, leave files untouched
1064 1 : // (eg. if files have a loose bound), so we revert them all to NotCompacting
1065 1 : // just in case they need to be compacted again.
1066 1 : if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
1067 1 : f.SetCompactionState(manifest.CompactionStateCompacted)
1068 1 : } else {
1069 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1070 1 : }
1071 1 : } else {
1072 1 : // Else, on rollback, all input files unconditionally transition back to
1073 1 : // NotCompacting.
1074 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1075 1 : }
1076 1 : f.IsIntraL0Compacting = false
1077 : }
1078 : }
1079 1 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1080 1 : func() {
1081 1 : // InitCompactingFileInfo requires that no other manifest writes be
1082 1 : // happening in parallel with it, i.e. we're not in the midst of installing
1083 1 : // another version. Otherwise, it's possible that we've created another
1084 1 : // L0Sublevels instance, but not added it to the versions list, causing
1085 1 : // all the indices in FileMetadata to be inaccurate. To ensure this,
1086 1 : // grab the manifest lock.
1087 1 : d.mu.versions.logLock()
1088 1 : defer d.mu.versions.logUnlock()
1089 1 : d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
1090 1 : }()
1091 : }
1092 :
1093 1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1094 1 : space, err := d.opts.FS.GetDiskUsage(d.dirname)
1095 1 : if err != nil {
1096 1 : if !errors.Is(err, vfs.ErrUnsupported) {
1097 0 : d.opts.EventListener.BackgroundError(err)
1098 0 : }
1099 : // Return the last value we managed to obtain.
1100 1 : return d.diskAvailBytes.Load()
1101 : }
1102 :
1103 1 : d.lowDiskSpaceReporter.Report(space.AvailBytes, space.TotalBytes, d.opts.EventListener)
1104 1 : d.diskAvailBytes.Store(space.AvailBytes)
1105 1 : return space.AvailBytes
1106 : }
1107 :
1108 : // maybeScheduleFlush schedules a flush if necessary.
1109 : //
1110 : // d.mu must be held when calling this.
1111 1 : func (d *DB) maybeScheduleFlush() {
1112 1 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1113 1 : return
1114 1 : }
1115 1 : if len(d.mu.mem.queue) <= 1 {
1116 1 : return
1117 1 : }
1118 :
1119 1 : if !d.passedFlushThreshold() {
1120 1 : return
1121 1 : }
1122 :
1123 1 : d.mu.compact.flushing = true
1124 1 : go d.flush()
1125 : }
1126 :
1127 1 : func (d *DB) passedFlushThreshold() bool {
1128 1 : var n int
1129 1 : var size uint64
1130 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1131 1 : if !d.mu.mem.queue[n].readyForFlush() {
1132 1 : break
1133 : }
1134 1 : if d.mu.mem.queue[n].flushForced {
1135 1 : // A flush was forced. Pretend the memtable size is the configured
1136 1 : // size. See minFlushSize below.
1137 1 : size += d.opts.MemTableSize
1138 1 : } else {
1139 1 : size += d.mu.mem.queue[n].totalBytes()
1140 1 : }
1141 : }
1142 1 : if n == 0 {
1143 1 : // None of the immutable memtables are ready for flushing.
1144 1 : return false
1145 1 : }
1146 :
1147 : // Only flush once the sum of the queued memtable sizes exceeds half the
1148 : // configured memtable size. This prevents flushing of memtables at startup
1149 : // while we're undergoing the ramp period on the memtable size. See
1150 : // DB.newMemTable().
1151 1 : minFlushSize := d.opts.MemTableSize / 2
1152 1 : return size >= minFlushSize
1153 : }
1154 :
1155 1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1156 1 : var mem *flushableEntry
1157 1 : for _, m := range d.mu.mem.queue {
1158 1 : if m.flushable == tbl {
1159 1 : mem = m
1160 1 : break
1161 : }
1162 : }
1163 1 : if mem == nil || mem.flushForced {
1164 1 : return
1165 1 : }
1166 1 : deadline := d.timeNow().Add(dur)
1167 1 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1168 1 : // Already scheduled to flush sooner than within `dur`.
1169 1 : return
1170 1 : }
1171 1 : mem.delayedFlushForcedAt = deadline
1172 1 : go func() {
1173 1 : timer := time.NewTimer(dur)
1174 1 : defer timer.Stop()
1175 1 :
1176 1 : select {
1177 1 : case <-d.closedCh:
1178 1 : return
1179 1 : case <-mem.flushed:
1180 1 : return
1181 1 : case <-timer.C:
1182 1 : d.commit.mu.Lock()
1183 1 : defer d.commit.mu.Unlock()
1184 1 : d.mu.Lock()
1185 1 : defer d.mu.Unlock()
1186 1 :
1187 1 : // NB: The timer may fire concurrently with a call to Close. If a
1188 1 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1189 1 : // and it's too late to flush anything. Otherwise, the Close call
1190 1 : // will block on locking d.mu until we've finished scheduling the
1191 1 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1192 1 : // for the current flush to complete.
1193 1 : if d.closed.Load() != nil {
1194 0 : return
1195 0 : }
1196 :
1197 1 : if d.mu.mem.mutable == tbl {
1198 1 : d.makeRoomForWrite(nil)
1199 1 : } else {
1200 1 : mem.flushForced = true
1201 1 : }
1202 1 : d.maybeScheduleFlush()
1203 : }
1204 : }()
1205 : }
1206 :
1207 1 : func (d *DB) flush() {
1208 1 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1209 1 : flushingWorkStart := crtime.NowMono()
1210 1 : d.mu.Lock()
1211 1 : defer d.mu.Unlock()
1212 1 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1213 1 : var bytesFlushed uint64
1214 1 : var err error
1215 1 : if bytesFlushed, err = d.flush1(); err != nil {
1216 1 : // TODO(peter): count consecutive flush errors and backoff.
1217 1 : d.opts.EventListener.BackgroundError(err)
1218 1 : }
1219 1 : d.mu.compact.flushing = false
1220 1 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
1221 1 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1222 1 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1223 1 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1224 1 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1225 1 : // More flush work may have arrived while we were flushing, so schedule
1226 1 : // another flush if needed.
1227 1 : d.maybeScheduleFlush()
1228 1 : // The flush may have produced too many files in a level, so schedule a
1229 1 : // compaction if needed.
1230 1 : d.maybeScheduleCompaction()
1231 1 : d.mu.compact.cond.Broadcast()
1232 : })
1233 : }
1234 :
1235 : // runIngestFlush is used to generate a flush version edit for sstables which
1236 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1237 : // while runIngestFlush is called.
1238 1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1239 1 : if len(c.flushing) != 1 {
1240 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1241 : }
1242 1 : defer func() {
1243 1 : c.slot.Release(0 /* totalBytesWritten */)
1244 1 : c.slot = nil
1245 1 : }()
1246 :
1247 : // Construct the VersionEdit, levelMetrics etc.
1248 1 : c.metrics = make(map[int]*LevelMetrics, numLevels)
1249 1 : // Finding the target level for ingestion must use the latest version
1250 1 : // after the logLock has been acquired.
1251 1 : c.version = d.mu.versions.currentVersion()
1252 1 :
1253 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1254 1 : ve := &versionEdit{}
1255 1 : var ingestSplitFiles []ingestSplitFile
1256 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1257 1 :
1258 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
1259 1 : levelMetrics := c.metrics[level]
1260 1 : if levelMetrics == nil {
1261 1 : levelMetrics = &LevelMetrics{}
1262 1 : c.metrics[level] = levelMetrics
1263 1 : }
1264 1 : levelMetrics.NumFiles--
1265 1 : levelMetrics.Size -= int64(m.Size)
1266 1 : for i := range added {
1267 1 : levelMetrics.NumFiles++
1268 1 : levelMetrics.Size += int64(added[i].Meta.Size)
1269 1 : }
1270 : }
1271 :
1272 1 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1273 1 : d.FormatMajorVersion() >= FormatVirtualSSTables
1274 1 :
1275 1 : if suggestSplit || ingestFlushable.exciseSpan.Valid() {
1276 1 : // We could add deleted files to ve.
1277 1 : ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
1278 1 : }
1279 :
1280 1 : ctx := context.Background()
1281 1 : overlapChecker := &overlapChecker{
1282 1 : comparer: d.opts.Comparer,
1283 1 : newIters: d.newIters,
1284 1 : opts: IterOptions{
1285 1 : logger: d.opts.Logger,
1286 1 : Category: categoryIngest,
1287 1 : },
1288 1 : v: c.version,
1289 1 : }
1290 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
1291 1 : for _, file := range ingestFlushable.files {
1292 1 : var fileToSplit *fileMetadata
1293 1 : var level int
1294 1 :
1295 1 : // This file fits perfectly within the excise span, so we can slot it at L6.
1296 1 : if ingestFlushable.exciseSpan.Valid() &&
1297 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
1298 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
1299 1 : level = 6
1300 1 : } else {
1301 1 : // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
1302 1 : lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
1303 1 : if err != nil {
1304 0 : return nil, err
1305 0 : }
1306 1 : level, fileToSplit, err = ingestTargetLevel(
1307 1 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file.FileMetadata, suggestSplit,
1308 1 : )
1309 1 : if err != nil {
1310 0 : return nil, err
1311 0 : }
1312 : }
1313 :
1314 : // Add the current flushableIngest file to the version.
1315 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
1316 1 : if fileToSplit != nil {
1317 1 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1318 1 : ingestFile: file.FileMetadata,
1319 1 : splitFile: fileToSplit,
1320 1 : level: level,
1321 1 : })
1322 1 : }
1323 1 : levelMetrics := c.metrics[level]
1324 1 : if levelMetrics == nil {
1325 1 : levelMetrics = &LevelMetrics{}
1326 1 : c.metrics[level] = levelMetrics
1327 1 : }
1328 1 : levelMetrics.BytesIngested += file.Size
1329 1 : levelMetrics.TablesIngested++
1330 : }
1331 1 : if ingestFlushable.exciseSpan.Valid() {
1332 1 : // Iterate through all levels and find files that intersect with exciseSpan.
1333 1 : for l := range c.version.Levels {
1334 1 : overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
1335 1 : iter := overlaps.Iter()
1336 1 :
1337 1 : for m := iter.First(); m != nil; m = iter.Next() {
1338 1 : newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
1339 1 : if err != nil {
1340 0 : return nil, err
1341 0 : }
1342 :
1343 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
1344 1 : Level: l,
1345 1 : FileNum: m.FileNum,
1346 1 : }]; !ok {
1347 1 : // We did not excise this file.
1348 1 : continue
1349 : }
1350 1 : replacedFiles[m.FileNum] = newFiles
1351 1 : updateLevelMetricsOnExcise(m, l, newFiles)
1352 : }
1353 : }
1354 : }
1355 :
1356 1 : if len(ingestSplitFiles) > 0 {
1357 1 : if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
1358 0 : return nil, err
1359 0 : }
1360 : }
1361 :
1362 1 : return ve, nil
1363 : }
1364 :
1365 : // flush runs a compaction that copies the immutable memtables from memory to
1366 : // disk.
1367 : //
1368 : // d.mu must be held when calling this, but the mutex may be dropped and
1369 : // re-acquired during the course of this method.
1370 1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1371 1 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1372 1 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1373 1 : // level in the lsm. Let's say the flushable queue contains a prefix of
1374 1 : // regular immutable memtables, then an ingestedFlushable, and then the
1375 1 : // mutable memtable. When the flush of the ingestedFlushable is performed,
1376 1 : // it needs an updated view of the lsm. That is, the prefix of immutable
1377 1 : // memtables must have already been flushed. Similarly, if there are two
1378 1 : // contiguous ingestedFlushables in the queue, then the first flushable must
1379 1 : // be flushed, so that the second flushable can see an updated view of the
1380 1 : // lsm.
1381 1 : //
1382 1 : // Given the above, we restrict flushes to either some prefix of regular
1383 1 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
1384 1 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
1385 1 : // the remaining flush work should be scheduled right away.
1386 1 : //
1387 1 : // NB: Large batches placed in the flushable queue share the WAL with the
1388 1 : // previous memtable in the queue. We must ensure the property that both the
1389 1 : // large batch and the memtable with which it shares a WAL are flushed
1390 1 : // together. The property ensures that the minimum unflushed log number
1391 1 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
1392 1 : // returns true, and since the large batch will always be placed right after
1393 1 : // the memtable with which it shares a WAL, the property is naturally
1394 1 : // ensured. The large batch will always be placed after the memtable with
1395 1 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
1396 1 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
1397 1 : // measure, if we try to flush the memtable without also flushing the
1398 1 : // flushable batch in the same flush, since the memtable and flushableBatch
1399 1 : // have the same logNum, the logNum invariant check below will trigger.
1400 1 : var n, inputs int
1401 1 : var inputBytes uint64
1402 1 : var ingest bool
1403 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1404 1 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
1405 1 : if n == 0 {
1406 1 : // The first flushable is of type ingestedFlushable. Since these
1407 1 : // must be flushed individually, we perform a flush for just
1408 1 : // this.
1409 1 : if !f.readyForFlush() {
1410 0 : // This check is almost unnecessary, but we guard against it
1411 0 : // just in case this invariant changes in the future.
1412 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
1413 : }
1414 : // By setting n = 1, we ensure that the first flushable(n == 0)
1415 : // is scheduled for a flush. The number of tables added is equal to the
1416 : // number of files in the ingest operation.
1417 1 : n = 1
1418 1 : inputs = len(f.files)
1419 1 : ingest = true
1420 1 : break
1421 1 : } else {
1422 1 : // There was some prefix of flushables which weren't of type
1423 1 : // ingestedFlushable. So, perform a flush for those.
1424 1 : break
1425 : }
1426 : }
1427 1 : if !d.mu.mem.queue[n].readyForFlush() {
1428 0 : break
1429 : }
1430 1 : inputBytes += d.mu.mem.queue[n].inuseBytes()
1431 : }
1432 1 : if n == 0 {
1433 0 : // None of the immutable memtables are ready for flushing.
1434 0 : return 0, nil
1435 0 : }
1436 1 : if !ingest {
1437 1 : // Flushes of memtables add the prefix of n memtables from the flushable
1438 1 : // queue.
1439 1 : inputs = n
1440 1 : }
1441 :
1442 : // Require that every memtable being flushed has a log number less than the
1443 : // new minimum unflushed log number.
1444 1 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
1445 1 : if !d.opts.DisableWAL {
1446 1 : for i := 0; i < n; i++ {
1447 1 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
1448 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
1449 0 : n,
1450 0 : i, d.mu.mem.queue[i].flushable, logNum,
1451 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
1452 : }
1453 : }
1454 : }
1455 :
1456 1 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
1457 1 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
1458 1 : if err != nil {
1459 0 : return 0, err
1460 0 : }
1461 1 : d.addInProgressCompaction(c)
1462 1 :
1463 1 : jobID := d.newJobIDLocked()
1464 1 : d.opts.EventListener.FlushBegin(FlushInfo{
1465 1 : JobID: int(jobID),
1466 1 : Input: inputs,
1467 1 : InputBytes: inputBytes,
1468 1 : Ingest: ingest,
1469 1 : })
1470 1 : startTime := d.timeNow()
1471 1 :
1472 1 : var ve *manifest.VersionEdit
1473 1 : var stats compact.Stats
1474 1 : // To determine the target level of the files in the ingestedFlushable, we
1475 1 : // need to acquire the logLock, and not release it for that duration. Since,
1476 1 : // we need to acquire the logLock below to perform the logAndApply step
1477 1 : // anyway, we create the VersionEdit for ingestedFlushable outside of
1478 1 : // runCompaction. For all other flush cases, we construct the VersionEdit
1479 1 : // inside runCompaction.
1480 1 : if c.kind != compactionKindIngestedFlushable {
1481 1 : ve, stats, err = d.runCompaction(jobID, c)
1482 1 : }
1483 :
1484 : // Acquire logLock. This will be released either on an error, by way of
1485 : // logUnlock, or through a call to logAndApply if there is no error.
1486 1 : d.mu.versions.logLock()
1487 1 :
1488 1 : if c.kind == compactionKindIngestedFlushable {
1489 1 : ve, err = d.runIngestFlush(c)
1490 1 : }
1491 :
1492 1 : info := FlushInfo{
1493 1 : JobID: int(jobID),
1494 1 : Input: inputs,
1495 1 : InputBytes: inputBytes,
1496 1 : Duration: d.timeNow().Sub(startTime),
1497 1 : Done: true,
1498 1 : Ingest: ingest,
1499 1 : Err: err,
1500 1 : }
1501 1 : if err == nil {
1502 1 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
1503 1 : for i := range ve.NewFiles {
1504 1 : e := &ve.NewFiles[i]
1505 1 : info.Output = append(info.Output, e.Meta.TableInfo())
1506 1 : // Ingested tables are not necessarily flushed to L0. Record the level of
1507 1 : // each ingested file explicitly.
1508 1 : if ingest {
1509 1 : info.IngestLevels = append(info.IngestLevels, e.Level)
1510 1 : }
1511 : }
1512 1 : if len(ve.NewFiles) == 0 {
1513 1 : info.Err = errEmptyTable
1514 1 : }
1515 :
1516 : // The flush succeeded or it produced an empty sstable. In either case we
1517 : // want to bump the minimum unflushed log number to the log number of the
1518 : // oldest unflushed memtable.
1519 1 : ve.MinUnflushedLogNum = minUnflushedLogNum
1520 1 : if c.kind != compactionKindIngestedFlushable {
1521 1 : metrics := c.metrics[0]
1522 1 : if d.opts.DisableWAL {
1523 1 : // If the WAL is disabled, every flushable has a zero [logSize],
1524 1 : // resulting in zero bytes in. Instead, use the number of bytes we
1525 1 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
1526 1 : // calculation even when the WAL is disabled.
1527 1 : metrics.BytesIn = metrics.BytesFlushed
1528 1 : } else {
1529 1 : for i := 0; i < n; i++ {
1530 1 : metrics.BytesIn += d.mu.mem.queue[i].logSize
1531 1 : }
1532 : }
1533 1 : } else {
1534 1 : // c.kind == compactionKindIngestedFlushable && we could have deleted files due
1535 1 : // to ingest-time splits or excises.
1536 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1537 1 : for c2 := range d.mu.compact.inProgress {
1538 1 : // Check if this compaction overlaps with the excise span. Note that just
1539 1 : // checking if the inputs individually overlap with the excise span
1540 1 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
1541 1 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
1542 1 : // doing a [c,d) excise at the same time as this compaction, we will have
1543 1 : // to error out the whole compaction as we can't guarantee it hasn't/won't
1544 1 : // write a file overlapping with the excise span.
1545 1 : if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
1546 1 : c2.cancel.Store(true)
1547 1 : continue
1548 : }
1549 : }
1550 :
1551 1 : if len(ve.DeletedFiles) > 0 {
1552 1 : // Iterate through all other compactions, and check if their inputs have
1553 1 : // been replaced due to an ingest-time split or excise. In that case,
1554 1 : // cancel the compaction.
1555 1 : for c2 := range d.mu.compact.inProgress {
1556 1 : for i := range c2.inputs {
1557 1 : iter := c2.inputs[i].files.Iter()
1558 1 : for f := iter.First(); f != nil; f = iter.Next() {
1559 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
1560 1 : c2.cancel.Store(true)
1561 1 : break
1562 : }
1563 : }
1564 : }
1565 : }
1566 : }
1567 : }
1568 1 : err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
1569 1 : func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
1570 1 : if err != nil {
1571 0 : info.Err = err
1572 0 : }
1573 1 : } else {
1574 1 : // We won't be performing the logAndApply step because of the error,
1575 1 : // so logUnlock.
1576 1 : d.mu.versions.logUnlock()
1577 1 : }
1578 :
1579 : // If err != nil, then the flush will be retried, and we will recalculate
1580 : // these metrics.
1581 1 : if err == nil {
1582 1 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
1583 1 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
1584 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
1585 1 : }
1586 :
1587 1 : d.clearCompactingState(c, err != nil)
1588 1 : delete(d.mu.compact.inProgress, c)
1589 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
1590 1 :
1591 1 : var flushed flushableList
1592 1 : if err == nil {
1593 1 : flushed = d.mu.mem.queue[:n]
1594 1 : d.mu.mem.queue = d.mu.mem.queue[n:]
1595 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1596 1 : d.updateTableStatsLocked(ve.NewFiles)
1597 1 : if ingest {
1598 1 : d.mu.versions.metrics.Flush.AsIngestCount++
1599 1 : for _, l := range c.metrics {
1600 1 : d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
1601 1 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1602 1 : }
1603 : }
1604 1 : d.maybeTransitionSnapshotsToFileOnlyLocked()
1605 :
1606 : }
1607 : // Signal FlushEnd after installing the new readState. This helps for unit
1608 : // tests that use the callback to trigger a read using an iterator with
1609 : // IterOptions.OnlyReadGuaranteedDurable.
1610 1 : info.TotalDuration = d.timeNow().Sub(startTime)
1611 1 : d.opts.EventListener.FlushEnd(info)
1612 1 :
1613 1 : // The order of these operations matters here for ease of testing.
1614 1 : // Removing the reader reference first allows tests to be guaranteed that
1615 1 : // the memtable reservation has been released by the time a synchronous
1616 1 : // flush returns. readerUnrefLocked may also produce obsolete files so the
1617 1 : // call to deleteObsoleteFiles must happen after it.
1618 1 : for i := range flushed {
1619 1 : flushed[i].readerUnrefLocked(true)
1620 1 : }
1621 :
1622 1 : d.deleteObsoleteFiles(jobID)
1623 1 :
1624 1 : // Mark all the memtables we flushed as flushed.
1625 1 : for i := range flushed {
1626 1 : close(flushed[i].flushed)
1627 1 : }
1628 :
1629 1 : return inputBytes, err
1630 : }
1631 :
1632 : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
1633 : // file-only" snapshots to be file-only if all their visible state has been
1634 : // flushed to sstables.
1635 : //
1636 : // REQUIRES: d.mu.
1637 1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
1638 1 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
1639 1 : currentVersion := d.mu.versions.currentVersion()
1640 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
1641 1 : if s.efos == nil {
1642 1 : s = s.next
1643 1 : continue
1644 : }
1645 1 : overlapsFlushable := false
1646 1 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
1647 1 : // There are some unflushed keys that are still visible to the EFOS.
1648 1 : // Check if any memtables older than the EFOS contain keys within a
1649 1 : // protected range of the EFOS. If no, we can transition.
1650 1 : protectedRanges := make([]bounded, len(s.efos.protectedRanges))
1651 1 : for i := range s.efos.protectedRanges {
1652 1 : protectedRanges[i] = s.efos.protectedRanges[i]
1653 1 : }
1654 1 : for i := range d.mu.mem.queue {
1655 1 : if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
1656 1 : // All keys in this memtable are newer than the EFOS. Skip this
1657 1 : // memtable.
1658 1 : continue
1659 : }
1660 : // NB: computePossibleOverlaps could have false positives, such as if
1661 : // the flushable is a flushable ingest and not a memtable. In that
1662 : // case we don't open the sstables to check; we just pessimistically
1663 : // assume an overlap.
1664 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
1665 1 : overlapsFlushable = true
1666 1 : return stopIteration
1667 1 : }, protectedRanges...)
1668 1 : if overlapsFlushable {
1669 1 : break
1670 : }
1671 : }
1672 : }
1673 1 : if overlapsFlushable {
1674 1 : s = s.next
1675 1 : continue
1676 : }
1677 1 : currentVersion.Ref()
1678 1 :
1679 1 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
1680 1 : // case s.next would be nil. Save it before calling it.
1681 1 : next := s.next
1682 1 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
1683 1 : s = next
1684 : }
1685 : }
1686 :
1687 : // maybeScheduleCompactionAsync should be used when
1688 : // we want to possibly schedule a compaction, but don't
1689 : // want to eat the cost of running maybeScheduleCompaction.
1690 : // This method should be launched in a separate goroutine.
1691 : // d.mu must not be held when this is called.
1692 0 : func (d *DB) maybeScheduleCompactionAsync() {
1693 0 : defer d.compactionSchedulers.Done()
1694 0 :
1695 0 : d.mu.Lock()
1696 0 : d.maybeScheduleCompaction()
1697 0 : d.mu.Unlock()
1698 0 : }
1699 :
1700 : // maybeScheduleCompaction schedules a compaction if necessary.
1701 : //
1702 : // d.mu must be held when calling this.
1703 1 : func (d *DB) maybeScheduleCompaction() {
1704 1 : d.maybeScheduleCompactionPicker(pickAuto)
1705 1 : }
1706 :
1707 1 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
1708 1 : return picker.pickAuto(env)
1709 1 : }
1710 :
1711 1 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
1712 1 : return picker.pickElisionOnlyCompaction(env)
1713 1 : }
1714 :
1715 : // tryScheduleDownloadCompaction tries to start a download compaction.
1716 : //
1717 : // Returns true if we started a download compaction (or completed it
1718 : // immediately because it is a no-op or we hit an error).
1719 : //
1720 : // Requires d.mu to be held. Updates d.mu.compact.downloads.
1721 1 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
1722 1 : vers := d.mu.versions.currentVersion()
1723 1 : for i := 0; i < len(d.mu.compact.downloads); {
1724 1 : download := d.mu.compact.downloads[i]
1725 1 : switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
1726 1 : case launchedCompaction:
1727 1 : return true
1728 1 : case didNotLaunchCompaction:
1729 1 : // See if we can launch a compaction for another download task.
1730 1 : i++
1731 1 : case downloadTaskCompleted:
1732 1 : // Task is completed and must be removed.
1733 1 : d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
1734 : }
1735 : }
1736 1 : return false
1737 : }
1738 :
1739 : // maybeScheduleCompactionPicker schedules a compaction if necessary,
1740 : // calling `pickFunc` to pick automatic compactions.
1741 : //
1742 : // Requires d.mu to be held.
1743 : func (d *DB) maybeScheduleCompactionPicker(
1744 : pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
1745 1 : ) {
1746 1 : if d.closed.Load() != nil || d.opts.ReadOnly {
1747 1 : return
1748 1 : }
1749 1 : maxCompactions := d.opts.MaxConcurrentCompactions()
1750 1 : maxDownloads := d.opts.MaxConcurrentDownloads()
1751 1 :
1752 1 : if d.mu.compact.compactingCount >= maxCompactions &&
1753 1 : (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
1754 1 : if len(d.mu.compact.manual) > 0 {
1755 1 : // Inability to run head blocks later manual compactions.
1756 1 : d.mu.compact.manual[0].retries++
1757 1 : }
1758 1 : return
1759 : }
1760 :
1761 : // Compaction picking needs a coherent view of a Version. In particular, we
1762 : // need to exclude concurrent ingestions from making a decision on which level
1763 : // to ingest into that conflicts with our compaction
1764 : // decision. versionSet.logLock provides the necessary mutual exclusion.
1765 1 : d.mu.versions.logLock()
1766 1 : defer d.mu.versions.logUnlock()
1767 1 :
1768 1 : // Check for the closed flag again, in case the DB was closed while we were
1769 1 : // waiting for logLock().
1770 1 : if d.closed.Load() != nil {
1771 1 : return
1772 1 : }
1773 :
1774 1 : env := compactionEnv{
1775 1 : diskAvailBytes: d.diskAvailBytes.Load(),
1776 1 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
1777 1 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
1778 1 : }
1779 1 :
1780 1 : if d.mu.compact.compactingCount < maxCompactions {
1781 1 : // Check for delete-only compactions first, because they're expected to be
1782 1 : // cheap and reduce future compaction work.
1783 1 : if !d.opts.private.disableDeleteOnlyCompactions &&
1784 1 : !d.opts.DisableAutomaticCompactions &&
1785 1 : len(d.mu.compact.deletionHints) > 0 {
1786 1 : d.tryScheduleDeleteOnlyCompaction()
1787 1 : }
1788 :
1789 1 : for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
1790 1 : if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
1791 1 : // Inability to run head blocks later manual compactions.
1792 1 : manual.retries++
1793 1 : break
1794 : }
1795 1 : d.mu.compact.manual = d.mu.compact.manual[1:]
1796 : }
1797 :
1798 1 : for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
1799 1 : d.tryScheduleAutoCompaction(env, pickFunc) {
1800 1 : }
1801 : }
1802 :
1803 1 : for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
1804 1 : d.tryScheduleDownloadCompaction(env, maxDownloads) {
1805 1 : }
1806 : }
1807 :
1808 : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
1809 : // for all files that can be deleted as suggested by deletionHints.
1810 : //
1811 : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
1812 1 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
1813 1 : v := d.mu.versions.currentVersion()
1814 1 : snapshots := d.mu.snapshots.toSlice()
1815 1 : // We need to save the value of exciseEnabled in the compaction itself, as
1816 1 : // it can change dynamically between now and when the compaction runs.
1817 1 : exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables &&
1818 1 : d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises()
1819 1 : // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
1820 1 : // and passed-in as an option during Open.
1821 1 : limiter := d.opts.Experimental.CompactionLimiter
1822 1 : var slot base.CompactionSlot
1823 1 : // TODO(bilal): Should we always take a slot without permission?
1824 1 : if n := len(d.getInProgressCompactionInfoLocked(nil)); n == 0 {
1825 1 : // We are not running a compaction at the moment. We should take a compaction slot
1826 1 : // without permission.
1827 1 : slot = limiter.TookWithoutPermission(context.TODO())
1828 1 : } else {
1829 1 : var err error
1830 1 : slot, err = limiter.RequestSlot(context.TODO())
1831 1 : if err != nil {
1832 0 : d.opts.EventListener.BackgroundError(err)
1833 0 : return
1834 0 : }
1835 1 : if slot == nil {
1836 0 : // The limiter is denying us a compaction slot. Yield to other work.
1837 0 : return
1838 0 : }
1839 : }
1840 1 : inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled)
1841 1 : d.mu.compact.deletionHints = unresolvedHints
1842 1 :
1843 1 : if len(inputs) > 0 {
1844 1 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
1845 1 : c.slot = slot
1846 1 : d.mu.compact.compactingCount++
1847 1 : d.addInProgressCompaction(c)
1848 1 : go d.compact(c, nil)
1849 1 : } else {
1850 1 : slot.Release(0 /* totalBytesWritten */)
1851 1 : }
1852 : }
1853 :
1854 : // tryScheduleManualCompaction tries to kick off the given manual compaction.
1855 : //
1856 : // Returns false if we are not able to run this compaction at this time.
1857 : //
1858 : // Requires d.mu to be held.
1859 1 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
1860 1 : v := d.mu.versions.currentVersion()
1861 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1862 1 : pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
1863 1 : if pc == nil {
1864 1 : if !retryLater {
1865 1 : // Manual compaction is a no-op. Signal completion and exit.
1866 1 : manual.done <- nil
1867 1 : return true
1868 1 : }
1869 : // We are not able to run this manual compaction at this time.
1870 1 : return false
1871 : }
1872 :
1873 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), nil /* compactionSlot */)
1874 1 : d.mu.compact.compactingCount++
1875 1 : d.addInProgressCompaction(c)
1876 1 : go d.compact(c, manual.done)
1877 1 : return true
1878 : }
1879 :
1880 : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
1881 : //
1882 : // Returns false if no automatic compactions are necessary or able to run at
1883 : // this time.
1884 : //
1885 : // Requires d.mu to be held.
1886 : func (d *DB) tryScheduleAutoCompaction(
1887 : env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
1888 1 : ) bool {
1889 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1890 1 : env.readCompactionEnv = readCompactionEnv{
1891 1 : readCompactions: &d.mu.compact.readCompactions,
1892 1 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
1893 1 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
1894 1 : }
1895 1 : // NB: CompactionLimiter defaults to a no-op limiter unless one is implemented
1896 1 : // and passed-in as an option during Open.
1897 1 : limiter := d.opts.Experimental.CompactionLimiter
1898 1 : var slot base.CompactionSlot
1899 1 : if n := len(env.inProgressCompactions); n == 0 {
1900 1 : // We are not running a compaction at the moment. We should take a compaction slot
1901 1 : // without permission.
1902 1 : slot = limiter.TookWithoutPermission(context.TODO())
1903 1 : } else {
1904 1 : var err error
1905 1 : slot, err = limiter.RequestSlot(context.TODO())
1906 1 : if err != nil {
1907 0 : d.opts.EventListener.BackgroundError(err)
1908 0 : return false
1909 0 : }
1910 1 : if slot == nil {
1911 0 : // The limiter is denying us a compaction slot. Yield to other work.
1912 0 : return false
1913 0 : }
1914 : }
1915 1 : pc := pickFunc(d.mu.versions.picker, env)
1916 1 : if pc == nil {
1917 1 : slot.Release(0 /* bytesWritten */)
1918 1 : return false
1919 1 : }
1920 1 : var inputSize uint64
1921 1 : for i := range pc.inputs {
1922 1 : inputSize += pc.inputs[i].files.SizeSum()
1923 1 : }
1924 1 : slot.CompactionSelected(pc.startLevel.level, pc.outputLevel.level, inputSize)
1925 1 :
1926 1 : // Responsibility for releasing slot passes over to the compaction.
1927 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), slot)
1928 1 : d.mu.compact.compactingCount++
1929 1 : d.addInProgressCompaction(c)
1930 1 : go d.compact(c, nil)
1931 1 : return true
1932 : }
1933 :
1934 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
1935 : // generated from a span containing a range del (point key only), a range key
1936 : // delete (range key only), or both a point and range key.
1937 : type deleteCompactionHintType uint8
1938 :
1939 : const (
1940 : // NOTE: While these are primarily used as enumeration types, they are also
1941 : // used for some bitwise operations. Care should be taken when updating.
1942 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
1943 : deleteCompactionHintTypePointKeyOnly
1944 : deleteCompactionHintTypeRangeKeyOnly
1945 : deleteCompactionHintTypePointAndRangeKey
1946 : )
1947 :
1948 : // String implements fmt.Stringer.
1949 0 : func (h deleteCompactionHintType) String() string {
1950 0 : switch h {
1951 0 : case deleteCompactionHintTypeUnknown:
1952 0 : return "unknown"
1953 0 : case deleteCompactionHintTypePointKeyOnly:
1954 0 : return "point-key-only"
1955 0 : case deleteCompactionHintTypeRangeKeyOnly:
1956 0 : return "range-key-only"
1957 0 : case deleteCompactionHintTypePointAndRangeKey:
1958 0 : return "point-and-range-key"
1959 0 : default:
1960 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
1961 : }
1962 : }
1963 :
1964 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
1965 : // keyspan.Keys.
1966 1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
1967 1 : var hintType deleteCompactionHintType
1968 1 : for _, k := range keys {
1969 1 : switch k.Kind() {
1970 1 : case base.InternalKeyKindRangeDelete:
1971 1 : hintType |= deleteCompactionHintTypePointKeyOnly
1972 1 : case base.InternalKeyKindRangeKeyDelete:
1973 1 : hintType |= deleteCompactionHintTypeRangeKeyOnly
1974 0 : default:
1975 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
1976 : }
1977 : }
1978 1 : return hintType
1979 : }
1980 :
1981 : // A deleteCompactionHint records a user key and sequence number span that has been
1982 : // deleted by a range tombstone. A hint is recorded if at least one sstable
1983 : // falls completely within both the user key and sequence number spans.
1984 : // Once the tombstones and the observed completely-contained sstables fall
1985 : // into the same snapshot stripe, a delete-only compaction may delete any
1986 : // sstables within the range.
1987 : type deleteCompactionHint struct {
1988 : // The type of key span that generated this hint (point key, range key, or
1989 : // both).
1990 : hintType deleteCompactionHintType
1991 : // start and end are user keys specifying a key range [start, end) of
1992 : // deleted keys.
1993 : start []byte
1994 : end []byte
1995 : // The level of the file containing the range tombstone(s) when the hint
1996 : // was created. Only lower levels need to be searched for files that may
1997 : // be deleted.
1998 : tombstoneLevel int
1999 : // The file containing the range tombstone(s) that created the hint.
2000 : tombstoneFile *fileMetadata
2001 : // The smallest and largest sequence numbers of the abutting tombstones
2002 : // merged to form this hint. All of a tables' keys must be less than the
2003 : // tombstone smallest sequence number to be deleted. All of a tables'
2004 : // sequence numbers must fall into the same snapshot stripe as the
2005 : // tombstone largest sequence number to be deleted.
2006 : tombstoneLargestSeqNum base.SeqNum
2007 : tombstoneSmallestSeqNum base.SeqNum
2008 : // The smallest sequence number of a sstable that was found to be covered
2009 : // by this hint. The hint cannot be resolved until this sequence number is
2010 : // in the same snapshot stripe as the largest tombstone sequence number.
2011 : // This is set when a hint is created, so the LSM may look different and
2012 : // notably no longer contain the sstable that contained the key at this
2013 : // sequence number.
2014 : fileSmallestSeqNum base.SeqNum
2015 : }
2016 :
2017 : type deletionHintOverlap int8
2018 :
2019 : const (
2020 : // hintDoesNotApply indicates that the hint does not apply to the file.
2021 : hintDoesNotApply deletionHintOverlap = iota
2022 : // hintExcisesFile indicates that the hint excises a portion of the file,
2023 : // and the format major version of the DB supports excises.
2024 : hintExcisesFile
2025 : // hintDeletesFile indicates that the hint deletes the entirety of the file.
2026 : hintDeletesFile
2027 : )
2028 :
2029 0 : func (h deleteCompactionHint) String() string {
2030 0 : return fmt.Sprintf(
2031 0 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
2032 0 : h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
2033 0 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
2034 0 : h.hintType,
2035 0 : )
2036 0 : }
2037 :
2038 : func (h *deleteCompactionHint) canDeleteOrExcise(
2039 : cmp Compare, m *fileMetadata, snapshots compact.Snapshots, exciseEnabled bool,
2040 1 : ) deletionHintOverlap {
2041 1 : // The file can only be deleted if all of its keys are older than the
2042 1 : // earliest tombstone aggregated into the hint. Note that we use
2043 1 : // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
2044 1 : // zeroes sequence numbers. A compaction may zero the sequence number of a
2045 1 : // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
2046 1 : // zero. If we looked at m.LargestSeqNum, the resulting output file would
2047 1 : // appear to not contain any keys more recent than the oldest tombstone. To
2048 1 : // avoid this error, the largest pre-zeroing sequence number is maintained
2049 1 : // in LargestSeqNumAbsolute and used here to make the determination whether
2050 1 : // the file's keys are older than all of the hint's tombstones.
2051 1 : if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
2052 1 : return hintDoesNotApply
2053 1 : }
2054 :
2055 : // The file's oldest key must be in the same snapshot stripe as the
2056 : // newest tombstone. NB: We already checked the hint's sequence numbers,
2057 : // but this file's oldest sequence number might be lower than the hint's
2058 : // smallest sequence number despite the file falling within the key range
2059 : // if this file was constructed after the hint by a compaction.
2060 1 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
2061 0 : return hintDoesNotApply
2062 0 : }
2063 :
2064 1 : switch h.hintType {
2065 1 : case deleteCompactionHintTypePointKeyOnly:
2066 1 : // A hint generated by a range del span cannot delete tables that contain
2067 1 : // range keys.
2068 1 : if m.HasRangeKeys {
2069 1 : return hintDoesNotApply
2070 1 : }
2071 1 : case deleteCompactionHintTypeRangeKeyOnly:
2072 1 : // A hint generated by a range key del span cannot delete tables that
2073 1 : // contain point keys.
2074 1 : if m.HasPointKeys {
2075 1 : return hintDoesNotApply
2076 1 : }
2077 1 : case deleteCompactionHintTypePointAndRangeKey:
2078 : // A hint from a span that contains both range dels *and* range keys can
2079 : // only be deleted if both bounds fall within the hint. The next check takes
2080 : // care of this.
2081 0 : default:
2082 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
2083 : }
2084 1 : if cmp(h.start, m.Smallest.UserKey) <= 0 &&
2085 1 : base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0 {
2086 1 : return hintDeletesFile
2087 1 : }
2088 1 : if !exciseEnabled {
2089 1 : // The file's keys must be completely contained within the hint range; excises
2090 1 : // aren't allowed.
2091 1 : return hintDoesNotApply
2092 1 : }
2093 : // Check for any overlap. In cases of partial overlap, we can excise the part of the file
2094 : // that overlaps with the deletion hint.
2095 1 : if cmp(h.end, m.Smallest.UserKey) > 0 &&
2096 1 : (m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0) {
2097 1 : return hintExcisesFile
2098 1 : }
2099 1 : return hintDoesNotApply
2100 : }
2101 :
2102 : // checkDeleteCompactionHints checks the passed-in deleteCompactionHints for those that
2103 : // can be resolved and those that cannot. A hint is considered resolved when its largest
2104 : // tombstone sequence number and the smallest sequence number of covered files fall in
2105 : // the same snapshot stripe. No more than maxHintsPerDeleteOnlyCompaction will be resolved
2106 : // per method call. Resolved and unresolved hints are returned in separate return values.
2107 : // The files that the resolved hints apply to, are returned as compactionLevels.
2108 : func checkDeleteCompactionHints(
2109 : cmp Compare,
2110 : v *version,
2111 : hints []deleteCompactionHint,
2112 : snapshots compact.Snapshots,
2113 : exciseEnabled bool,
2114 1 : ) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
2115 1 : var files map[*fileMetadata]bool
2116 1 : var byLevel [numLevels][]*fileMetadata
2117 1 :
2118 1 : // Delete-only compactions can be quadratic (O(mn)) in terms of runtime
2119 1 : // where m = number of files in the delete-only compaction and n = number
2120 1 : // of resolved hints. To prevent these from growing unbounded, we cap
2121 1 : // the number of hints we resolve for one delete-only compaction. This
2122 1 : // cap only applies if exciseEnabled == true.
2123 1 : const maxHintsPerDeleteOnlyCompaction = 10
2124 1 :
2125 1 : unresolvedHints := hints[:0]
2126 1 : // Lazily populate resolvedHints, similar to files above.
2127 1 : resolvedHints := make([]deleteCompactionHint, 0)
2128 1 : for _, h := range hints {
2129 1 : // Check each compaction hint to see if it's resolvable. Resolvable
2130 1 : // hints are removed and trigger a delete-only compaction if any files
2131 1 : // in the current LSM still meet their criteria. Unresolvable hints
2132 1 : // are saved and don't trigger a delete-only compaction.
2133 1 : //
2134 1 : // When a compaction hint is created, the sequence numbers of the
2135 1 : // range tombstones and the covered file with the oldest key are
2136 1 : // recorded. The largest tombstone sequence number and the smallest
2137 1 : // file sequence number must be in the same snapshot stripe for the
2138 1 : // hint to be resolved. The below graphic models a compaction hint
2139 1 : // covering the keyspace [b, r). The hint completely contains two
2140 1 : // files, 000002 and 000003. The file 000003 contains the lowest
2141 1 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
2142 1 : // the highest tombstone sequence number incorporated into the hint.
2143 1 : // The hint may be resolved only once the snapshots at #100, #180 and
2144 1 : // #210 are all closed. File 000001 is not included within the hint
2145 1 : // because it extends beyond the range tombstones in user key space.
2146 1 : //
2147 1 : // 250
2148 1 : //
2149 1 : // |-b...230:h-|
2150 1 : // _____________________________________________________ snapshot #210
2151 1 : // 200 |--h.RANGEDEL.200:r--|
2152 1 : //
2153 1 : // _____________________________________________________ snapshot #180
2154 1 : //
2155 1 : // 150 +--------+
2156 1 : // +---------+ | 000003 |
2157 1 : // | 000002 | | |
2158 1 : // +_________+ | |
2159 1 : // 100_____________________|________|___________________ snapshot #100
2160 1 : // +--------+
2161 1 : // _____________________________________________________ snapshot #70
2162 1 : // +---------------+
2163 1 : // 50 | 000001 |
2164 1 : // | |
2165 1 : // +---------------+
2166 1 : // ______________________________________________________________
2167 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2168 1 :
2169 1 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) ||
2170 1 : (len(resolvedHints) >= maxHintsPerDeleteOnlyCompaction && exciseEnabled) {
2171 1 : // Cannot resolve yet.
2172 1 : unresolvedHints = append(unresolvedHints, h)
2173 1 : continue
2174 : }
2175 :
2176 : // The hint h will be resolved and dropped, if it either affects no files at all
2177 : // or if the number of files it creates (eg. through excision) is less than or
2178 : // equal to the number of files it deletes. First, determine how many files are
2179 : // affected by this hint.
2180 1 : filesDeletedByCurrentHint := 0
2181 1 : var filesDeletedByLevel [7][]*fileMetadata
2182 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2183 1 : overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
2184 1 : iter := overlaps.Iter()
2185 1 :
2186 1 : for m := iter.First(); m != nil; m = iter.Next() {
2187 1 : doesHintApply := h.canDeleteOrExcise(cmp, m, snapshots, exciseEnabled)
2188 1 : if m.IsCompacting() || doesHintApply == hintDoesNotApply || files[m] {
2189 1 : continue
2190 : }
2191 1 : switch doesHintApply {
2192 1 : case hintDeletesFile:
2193 1 : filesDeletedByCurrentHint++
2194 1 : case hintExcisesFile:
2195 1 : // Account for the original file being deleted.
2196 1 : filesDeletedByCurrentHint++
2197 1 : // An excise could produce up to 2 new files. If the hint
2198 1 : // leaves a fragment of the file on the left, decrement
2199 1 : // the counter once. If the hint leaves a fragment of the
2200 1 : // file on the right, decrement the counter once.
2201 1 : if cmp(h.start, m.Smallest.UserKey) > 0 {
2202 1 : filesDeletedByCurrentHint--
2203 1 : }
2204 1 : if m.UserKeyBounds().End.IsUpperBoundFor(cmp, h.end) {
2205 1 : filesDeletedByCurrentHint--
2206 1 : }
2207 : }
2208 1 : filesDeletedByLevel[l] = append(filesDeletedByLevel[l], m)
2209 : }
2210 : }
2211 1 : if filesDeletedByCurrentHint < 0 {
2212 1 : // This hint does not delete a sufficient number of files to warrant
2213 1 : // a delete-only compaction at this stage. Drop it (ie. don't add it
2214 1 : // to either resolved or unresolved hints) so it doesn't stick around
2215 1 : // forever.
2216 1 : continue
2217 : }
2218 : // This hint will be resolved and dropped.
2219 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2220 1 : byLevel[l] = append(byLevel[l], filesDeletedByLevel[l]...)
2221 1 : for _, m := range filesDeletedByLevel[l] {
2222 1 : if files == nil {
2223 1 : // Construct files lazily, assuming most calls will not
2224 1 : // produce delete-only compactions.
2225 1 : files = make(map[*fileMetadata]bool)
2226 1 : }
2227 1 : files[m] = true
2228 : }
2229 : }
2230 1 : resolvedHints = append(resolvedHints, h)
2231 : }
2232 :
2233 1 : var compactLevels []compactionLevel
2234 1 : for l, files := range byLevel {
2235 1 : if len(files) == 0 {
2236 1 : continue
2237 : }
2238 1 : compactLevels = append(compactLevels, compactionLevel{
2239 1 : level: l,
2240 1 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2241 1 : })
2242 : }
2243 1 : return compactLevels, resolvedHints, unresolvedHints
2244 : }
2245 :
2246 1 : func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet {
2247 1 : activity := "compact"
2248 1 : if len(c.flushing) != 0 {
2249 0 : activity = "flush"
2250 0 : }
2251 1 : level := "L?"
2252 1 : // Delete-only compactions don't have an output level.
2253 1 : if c.outputLevel != nil {
2254 1 : level = fmt.Sprintf("L%d", c.outputLevel.level)
2255 1 : }
2256 1 : if kc := d.opts.Experimental.UserKeyCategories; kc.Len() > 0 {
2257 0 : cat := kc.CategorizeKeyRange(c.smallest.UserKey, c.largest.UserKey)
2258 0 : return pprof.Labels("pebble", activity, "output-level", level, "key-type", cat)
2259 0 : }
2260 1 : return pprof.Labels("pebble", activity, "output-level", level)
2261 : }
2262 :
2263 : // compact runs one compaction and maybe schedules another call to compact.
2264 1 : func (d *DB) compact(c *compaction, errChannel chan error) {
2265 1 : pprof.Do(context.Background(), d.compactionPprofLabels(c), func(context.Context) {
2266 1 : d.mu.Lock()
2267 1 : defer d.mu.Unlock()
2268 1 : if err := d.compact1(c, errChannel); err != nil {
2269 1 : // TODO(peter): count consecutive compaction errors and backoff.
2270 1 : d.opts.EventListener.BackgroundError(err)
2271 1 : }
2272 1 : if c.isDownload {
2273 1 : d.mu.compact.downloadingCount--
2274 1 : } else {
2275 1 : d.mu.compact.compactingCount--
2276 1 : }
2277 1 : delete(d.mu.compact.inProgress, c)
2278 1 : // Add this compaction's duration to the cumulative duration. NB: This
2279 1 : // must be atomic with the above removal of c from
2280 1 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2281 1 : // miss or double count a completing compaction's duration.
2282 1 : d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2283 1 :
2284 1 : // The previous compaction may have produced too many files in a
2285 1 : // level, so reschedule another compaction if needed.
2286 1 : d.maybeScheduleCompaction()
2287 1 : d.mu.compact.cond.Broadcast()
2288 : })
2289 : }
2290 :
2291 : // cleanupVersionEdit cleans up any on-disk artifacts that were created
2292 : // for the application of a versionEdit that is no longer going to be applied.
2293 : //
2294 : // d.mu must be held when calling this method.
2295 1 : func (d *DB) cleanupVersionEdit(ve *versionEdit) {
2296 1 : obsoleteFiles := make([]*fileBacking, 0, len(ve.NewFiles))
2297 1 : deletedFiles := make(map[base.FileNum]struct{})
2298 1 : for key := range ve.DeletedFiles {
2299 1 : deletedFiles[key.FileNum] = struct{}{}
2300 1 : }
2301 1 : for i := range ve.NewFiles {
2302 1 : if ve.NewFiles[i].Meta.Virtual {
2303 1 : // We handle backing files separately.
2304 1 : continue
2305 : }
2306 1 : if _, ok := deletedFiles[ve.NewFiles[i].Meta.FileNum]; ok {
2307 1 : // This file is being moved in this ve to a different level.
2308 1 : // Don't mark it as obsolete.
2309 1 : continue
2310 : }
2311 1 : obsoleteFiles = append(obsoleteFiles, ve.NewFiles[i].Meta.PhysicalMeta().FileBacking)
2312 : }
2313 1 : for i := range ve.CreatedBackingTables {
2314 1 : if ve.CreatedBackingTables[i].IsUnused() {
2315 0 : obsoleteFiles = append(obsoleteFiles, ve.CreatedBackingTables[i])
2316 0 : }
2317 : }
2318 1 : for i := range obsoleteFiles {
2319 1 : // Add this file to zombie tables as well, as the versionSet
2320 1 : // asserts on whether every obsolete file was at one point
2321 1 : // marked zombie.
2322 1 : d.mu.versions.zombieTables[obsoleteFiles[i].DiskFileNum] = objectInfo{
2323 1 : fileInfo: fileInfo{
2324 1 : FileNum: obsoleteFiles[i].DiskFileNum,
2325 1 : FileSize: obsoleteFiles[i].Size,
2326 1 : },
2327 1 : // TODO(bilal): This is harmless if it's wrong, as it only causes
2328 1 : // incorrect accounting for the size of it in metrics. Currently
2329 1 : // all compactions only write to local files anyway except with
2330 1 : // disaggregated storage; if this becomes the norm, we should do
2331 1 : // an objprovider lookup here.
2332 1 : isLocal: true,
2333 1 : }
2334 1 : }
2335 1 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
2336 : }
2337 :
2338 : // compact1 runs one compaction.
2339 : //
2340 : // d.mu must be held when calling this, but the mutex may be dropped and
2341 : // re-acquired during the course of this method.
2342 1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2343 1 : if errChannel != nil {
2344 1 : defer func() {
2345 1 : errChannel <- err
2346 1 : }()
2347 : }
2348 :
2349 1 : jobID := d.newJobIDLocked()
2350 1 : info := c.makeInfo(jobID)
2351 1 : d.opts.EventListener.CompactionBegin(info)
2352 1 : startTime := d.timeNow()
2353 1 :
2354 1 : ve, stats, err := d.runCompaction(jobID, c)
2355 1 :
2356 1 : info.Duration = d.timeNow().Sub(startTime)
2357 1 : if err == nil {
2358 1 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
2359 1 : err = func() error {
2360 1 : var err error
2361 1 : d.mu.versions.logLock()
2362 1 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2363 1 : // that necessitates it restarting from scratch. Note that since we hold
2364 1 : // the manifest lock, we don't expect this bool to change its value
2365 1 : // as only the holder of the manifest lock will ever write to it.
2366 1 : if c.cancel.Load() {
2367 1 : d.mu.versions.metrics.Compact.CancelledCount++
2368 1 : d.mu.versions.metrics.Compact.CancelledBytes += c.bytesWritten
2369 1 :
2370 1 : err = firstError(err, ErrCancelledCompaction)
2371 1 : // This is the first time we've seen a cancellation during the
2372 1 : // life of this compaction (or the original condition on err == nil
2373 1 : // would not have been true). We should delete any tables already
2374 1 : // created, as d.runCompaction did not do that.
2375 1 : d.cleanupVersionEdit(ve)
2376 1 : // logAndApply calls logUnlock. If we didn't call it, we need to call
2377 1 : // logUnlock ourselves.
2378 1 : d.mu.versions.logUnlock()
2379 1 : return err
2380 1 : }
2381 1 : return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
2382 1 : return d.getInProgressCompactionInfoLocked(c)
2383 1 : })
2384 : }()
2385 : }
2386 :
2387 1 : info.Done = true
2388 1 : info.Err = err
2389 1 : if err == nil {
2390 1 : for i := range ve.NewFiles {
2391 1 : e := &ve.NewFiles[i]
2392 1 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2393 1 : }
2394 1 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
2395 1 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
2396 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
2397 : }
2398 :
2399 : // NB: clearing compacting state must occur before updating the read state;
2400 : // L0Sublevels initialization depends on it.
2401 1 : d.clearCompactingState(c, err != nil)
2402 1 : if err != nil && errors.Is(err, ErrCancelledCompaction) {
2403 1 : d.mu.versions.metrics.Compact.CancelledCount++
2404 1 : d.mu.versions.metrics.Compact.CancelledBytes += c.bytesWritten
2405 1 : }
2406 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
2407 1 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
2408 1 :
2409 1 : info.TotalDuration = d.timeNow().Sub(c.beganAt)
2410 1 : d.opts.EventListener.CompactionEnd(info)
2411 1 :
2412 1 : // Update the read state before deleting obsolete files because the
2413 1 : // read-state update will cause the previous version to be unref'd and if
2414 1 : // there are no references obsolete tables will be added to the obsolete
2415 1 : // table list.
2416 1 : if err == nil {
2417 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2418 1 : d.updateTableStatsLocked(ve.NewFiles)
2419 1 : }
2420 1 : d.deleteObsoleteFiles(jobID)
2421 1 :
2422 1 : return err
2423 : }
2424 :
2425 : // runCopyCompaction runs a copy compaction where a new FileNum is created that
2426 : // is a byte-for-byte copy of the input file or span thereof in some cases. This
2427 : // is used in lieu of a move compaction when a file is being moved across the
2428 : // local/remote storage boundary. It could also be used in lieu of a rewrite
2429 : // compaction as part of a Download() call, which allows copying only a span of
2430 : // the external file, provided the file does not contain range keys or value
2431 : // blocks (see sstable.CopySpan).
2432 : //
2433 : // d.mu must be held when calling this method. The mutex will be released when
2434 : // doing IO.
2435 : func (d *DB) runCopyCompaction(
2436 : jobID JobID, c *compaction,
2437 1 : ) (ve *versionEdit, stats compact.Stats, _ error) {
2438 1 : iter := c.startLevel.files.Iter()
2439 1 : inputMeta := iter.First()
2440 1 : if iter.Next() != nil {
2441 0 : return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2442 0 : }
2443 1 : if c.cancel.Load() {
2444 0 : return nil, compact.Stats{}, ErrCancelledCompaction
2445 0 : }
2446 1 : ve = &versionEdit{
2447 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
2448 1 : {Level: c.startLevel.level, FileNum: inputMeta.FileNum}: inputMeta,
2449 1 : },
2450 1 : }
2451 1 :
2452 1 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.FileBacking.DiskFileNum)
2453 1 : if err != nil {
2454 0 : return nil, compact.Stats{}, err
2455 0 : }
2456 1 : if !objMeta.IsExternal() {
2457 1 : if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
2458 0 : panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
2459 : }
2460 : // Note that based on logic in the compaction picker, we're guaranteed
2461 : // inputMeta.Virtual is false.
2462 1 : if inputMeta.Virtual {
2463 0 : panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
2464 : }
2465 : }
2466 :
2467 : // We are in the relatively more complex case where we need to copy this
2468 : // file to remote storage. Drop the db mutex while we do the copy
2469 : //
2470 : // To ease up cleanup of the local file and tracking of refs, we create
2471 : // a new FileNum. This has the potential of making the block cache less
2472 : // effective, however.
2473 1 : newMeta := &fileMetadata{
2474 1 : Size: inputMeta.Size,
2475 1 : CreationTime: inputMeta.CreationTime,
2476 1 : SmallestSeqNum: inputMeta.SmallestSeqNum,
2477 1 : LargestSeqNum: inputMeta.LargestSeqNum,
2478 1 : LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
2479 1 : Stats: inputMeta.Stats,
2480 1 : Virtual: inputMeta.Virtual,
2481 1 : SyntheticPrefixAndSuffix: inputMeta.SyntheticPrefixAndSuffix,
2482 1 : }
2483 1 : if inputMeta.HasPointKeys {
2484 1 : newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
2485 1 : }
2486 1 : if inputMeta.HasRangeKeys {
2487 1 : newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
2488 1 : }
2489 1 : newMeta.FileNum = d.mu.versions.getNextFileNum()
2490 1 : if objMeta.IsExternal() {
2491 1 : // external -> local/shared copy. File must be virtual.
2492 1 : // We will update this size later after we produce the new backing file.
2493 1 : newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
2494 1 : } else {
2495 1 : // local -> shared copy. New file is guaranteed to not be virtual.
2496 1 : newMeta.InitPhysicalBacking()
2497 1 : }
2498 :
2499 : // Before dropping the db mutex, grab a ref to the current version. This
2500 : // prevents any concurrent excises from deleting files that this compaction
2501 : // needs to read/maintain a reference to.
2502 1 : vers := d.mu.versions.currentVersion()
2503 1 : vers.Ref()
2504 1 : defer vers.UnrefLocked()
2505 1 :
2506 1 : // NB: The order here is reversed, lock after unlock. This is similar to
2507 1 : // runCompaction.
2508 1 : d.mu.Unlock()
2509 1 : defer d.mu.Lock()
2510 1 :
2511 1 : deleteOnExit := false
2512 1 : defer func() {
2513 1 : if deleteOnExit {
2514 0 : _ = d.objProvider.Remove(base.FileTypeTable, newMeta.FileBacking.DiskFileNum)
2515 0 : }
2516 : }()
2517 :
2518 : // If the src obj is external, we're doing an external to local/shared copy.
2519 1 : if objMeta.IsExternal() {
2520 1 : ctx := context.TODO()
2521 1 : src, err := d.objProvider.OpenForReading(
2522 1 : ctx, base.FileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
2523 1 : )
2524 1 : if err != nil {
2525 0 : return nil, compact.Stats{}, err
2526 0 : }
2527 1 : defer func() {
2528 1 : if src != nil {
2529 0 : src.Close()
2530 0 : }
2531 : }()
2532 :
2533 1 : w, _, err := d.objProvider.Create(
2534 1 : ctx, base.FileTypeTable, newMeta.FileBacking.DiskFileNum,
2535 1 : objstorage.CreateOptions{
2536 1 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
2537 1 : },
2538 1 : )
2539 1 : if err != nil {
2540 0 : return nil, compact.Stats{}, err
2541 0 : }
2542 1 : deleteOnExit = true
2543 1 :
2544 1 : start, end := newMeta.Smallest, newMeta.Largest
2545 1 : if newMeta.SyntheticPrefixAndSuffix.HasPrefix() {
2546 1 : syntheticPrefix := newMeta.SyntheticPrefixAndSuffix.Prefix()
2547 1 : start.UserKey = syntheticPrefix.Invert(start.UserKey)
2548 1 : end.UserKey = syntheticPrefix.Invert(end.UserKey)
2549 1 : }
2550 1 : if newMeta.SyntheticPrefixAndSuffix.HasSuffix() {
2551 1 : // Extend the bounds as necessary so that the keys don't include suffixes.
2552 1 : start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
2553 1 : if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
2554 0 : end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
2555 0 : }
2556 : }
2557 :
2558 : // NB: external files are always virtual.
2559 1 : var wrote uint64
2560 1 : err = d.fileCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
2561 1 : var err error
2562 1 : wrote, err = sstable.CopySpan(ctx,
2563 1 : src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
2564 1 : w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
2565 1 : start, end,
2566 1 : )
2567 1 : return err
2568 1 : })
2569 :
2570 1 : src = nil // We passed src to CopySpan; it's responsible for closing it.
2571 1 : if err != nil {
2572 0 : if errors.Is(err, sstable.ErrEmptySpan) {
2573 0 : // The virtual table was empty. Just remove the backing file.
2574 0 : // Note that deleteOnExit is true so we will delete the created object.
2575 0 : c.metrics = map[int]*LevelMetrics{
2576 0 : c.outputLevel.level: {
2577 0 : BytesIn: inputMeta.Size,
2578 0 : },
2579 0 : }
2580 0 : return ve, compact.Stats{}, nil
2581 0 : }
2582 0 : return nil, compact.Stats{}, err
2583 : }
2584 1 : newMeta.FileBacking.Size = wrote
2585 1 : newMeta.Size = wrote
2586 1 : } else {
2587 1 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2588 1 : d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.FileBacking.DiskFileNum,
2589 1 : objstorage.CreateOptions{PreferSharedStorage: true})
2590 1 : if err != nil {
2591 0 : return nil, compact.Stats{}, err
2592 0 : }
2593 1 : deleteOnExit = true
2594 : }
2595 1 : ve.NewFiles = []newFileEntry{{
2596 1 : Level: c.outputLevel.level,
2597 1 : Meta: newMeta,
2598 1 : }}
2599 1 : if newMeta.Virtual {
2600 1 : ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
2601 1 : }
2602 1 : c.metrics = map[int]*LevelMetrics{
2603 1 : c.outputLevel.level: {
2604 1 : BytesIn: inputMeta.Size,
2605 1 : BytesCompacted: newMeta.Size,
2606 1 : TablesCompacted: 1,
2607 1 : },
2608 1 : }
2609 1 :
2610 1 : if err := d.objProvider.Sync(); err != nil {
2611 0 : return nil, compact.Stats{}, err
2612 0 : }
2613 1 : deleteOnExit = false
2614 1 : return ve, compact.Stats{}, nil
2615 : }
2616 :
2617 : // applyHintOnFile applies a deleteCompactionHint to a file, and updates the
2618 : // versionEdit accordingly. It returns a list of new files that were created
2619 : // if the hint was applied partially to a file (eg. through an excise as opposed
2620 : // to an outright deletion). levelMetrics is kept up-to-date with the number
2621 : // of tables deleted or excised.
2622 : func (d *DB) applyHintOnFile(
2623 : h deleteCompactionHint,
2624 : f *fileMetadata,
2625 : level int,
2626 : levelMetrics *LevelMetrics,
2627 : ve *versionEdit,
2628 : hintOverlap deletionHintOverlap,
2629 1 : ) (newFiles []manifest.NewFileEntry, err error) {
2630 1 : if hintOverlap == hintDoesNotApply {
2631 0 : return nil, nil
2632 0 : }
2633 :
2634 : // The hint overlaps with at least part of the file.
2635 1 : if hintOverlap == hintDeletesFile {
2636 1 : // The hint deletes the entirety of this file.
2637 1 : ve.DeletedFiles[deletedFileEntry{
2638 1 : Level: level,
2639 1 : FileNum: f.FileNum,
2640 1 : }] = f
2641 1 : levelMetrics.TablesDeleted++
2642 1 : return nil, nil
2643 1 : }
2644 : // The hint overlaps with only a part of the file, not the entirety of it. We need
2645 : // to use d.excise. (hintOverlap == hintExcisesFile)
2646 1 : if d.FormatMajorVersion() < FormatVirtualSSTables {
2647 0 : panic("pebble: delete-only compaction hint excising a file is not supported in this version")
2648 : }
2649 :
2650 1 : levelMetrics.TablesExcised++
2651 1 : newFiles, err = d.excise(context.TODO(), base.UserKeyBoundsEndExclusive(h.start, h.end), f, ve, level)
2652 1 : if err != nil {
2653 0 : return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
2654 0 : }
2655 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2656 1 : Level: level,
2657 1 : FileNum: f.FileNum,
2658 1 : }]; !ok {
2659 0 : panic("pebble: delete-only compaction hint overlapping a file did not excise that file")
2660 : }
2661 1 : return newFiles, nil
2662 : }
2663 :
2664 : func (d *DB) runDeleteOnlyCompactionForLevel(
2665 : cl compactionLevel,
2666 : levelMetrics *LevelMetrics,
2667 : ve *versionEdit,
2668 : snapshots compact.Snapshots,
2669 : fragments []deleteCompactionHintFragment,
2670 : exciseEnabled bool,
2671 1 : ) error {
2672 1 : curFragment := 0
2673 1 : iter := cl.files.Iter()
2674 1 : if cl.level == 0 {
2675 0 : panic("cannot run delete-only compaction for L0")
2676 : }
2677 :
2678 : // Outer loop loops on files. Middle loop loops on fragments. Inner loop
2679 : // loops on raw fragments of hints. Number of fragments are bounded by
2680 : // the number of hints this compaction was created with, which is capped
2681 : // in the compaction picker to avoid very CPU-hot loops here.
2682 1 : for f := iter.First(); f != nil; f = iter.Next() {
2683 1 : // curFile usually matches f, except if f got excised in which case
2684 1 : // it maps to a virtual file that replaces f, or nil if f got removed
2685 1 : // in its entirety.
2686 1 : curFile := f
2687 1 : for curFragment < len(fragments) && d.cmp(fragments[curFragment].start, f.Smallest.UserKey) <= 0 {
2688 1 : curFragment++
2689 1 : }
2690 1 : if curFragment > 0 {
2691 1 : curFragment--
2692 1 : }
2693 :
2694 1 : for ; curFragment < len(fragments); curFragment++ {
2695 1 : if f.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(fragments[curFragment].start)) < 0 {
2696 1 : break
2697 : }
2698 : // Process all overlapping hints with this file. Note that applying
2699 : // a hint twice is idempotent; curFile should have already been excised
2700 : // the first time, resulting in no change the second time.
2701 1 : for _, h := range fragments[curFragment].hints {
2702 1 : if h.tombstoneLevel >= cl.level {
2703 1 : // We cannot excise out the deletion tombstone itself, or anything
2704 1 : // above it.
2705 1 : continue
2706 : }
2707 1 : hintOverlap := h.canDeleteOrExcise(d.cmp, curFile, snapshots, exciseEnabled)
2708 1 : if hintOverlap == hintDoesNotApply {
2709 1 : continue
2710 : }
2711 1 : newFiles, err := d.applyHintOnFile(h, curFile, cl.level, levelMetrics, ve, hintOverlap)
2712 1 : if err != nil {
2713 0 : return err
2714 0 : }
2715 1 : if _, ok := ve.DeletedFiles[manifest.DeletedFileEntry{Level: cl.level, FileNum: curFile.FileNum}]; ok {
2716 1 : curFile = nil
2717 1 : }
2718 1 : if len(newFiles) > 0 {
2719 1 : curFile = newFiles[len(newFiles)-1].Meta
2720 1 : } else if curFile == nil {
2721 1 : // Nothing remains of the file.
2722 1 : break
2723 : }
2724 : }
2725 1 : if curFile == nil {
2726 1 : // Nothing remains of the file.
2727 1 : break
2728 : }
2729 : }
2730 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2731 1 : Level: cl.level,
2732 1 : FileNum: f.FileNum,
2733 1 : }]; !ok {
2734 0 : panic("pebble: delete-only compaction scheduled with hints that did not delete or excise a file")
2735 : }
2736 : }
2737 1 : return nil
2738 : }
2739 :
2740 : // deleteCompactionHintFragment represents a fragment of the key space and
2741 : // contains a set of deleteCompactionHints that apply to that fragment; a
2742 : // fragment starts at the start field and ends where the next fragment starts.
2743 : type deleteCompactionHintFragment struct {
2744 : start []byte
2745 : hints []deleteCompactionHint
2746 : }
2747 :
2748 : // Delete compaction hints can overlap with each other, and multiple fragments
2749 : // can apply to a single file. This function takes a list of hints and fragments
2750 : // them, to make it easier to apply them to non-overlapping files occupying a level;
2751 : // that way, files and hint fragments can be iterated on in lockstep, while efficiently
2752 : // being able to apply all hints overlapping with a given file.
2753 : func fragmentDeleteCompactionHints(
2754 : cmp Compare, hints []deleteCompactionHint,
2755 1 : ) []deleteCompactionHintFragment {
2756 1 : fragments := make([]deleteCompactionHintFragment, 0, len(hints)*2)
2757 1 : for i := range hints {
2758 1 : fragments = append(fragments, deleteCompactionHintFragment{start: hints[i].start},
2759 1 : deleteCompactionHintFragment{start: hints[i].end})
2760 1 : }
2761 1 : slices.SortFunc(fragments, func(i, j deleteCompactionHintFragment) int {
2762 1 : return cmp(i.start, j.start)
2763 1 : })
2764 1 : fragments = slices.CompactFunc(fragments, func(i, j deleteCompactionHintFragment) bool {
2765 1 : return bytes.Equal(i.start, j.start)
2766 1 : })
2767 1 : for _, h := range hints {
2768 1 : startIdx := sort.Search(len(fragments), func(i int) bool {
2769 1 : return cmp(fragments[i].start, h.start) >= 0
2770 1 : })
2771 1 : endIdx := sort.Search(len(fragments), func(i int) bool {
2772 1 : return cmp(fragments[i].start, h.end) >= 0
2773 1 : })
2774 1 : for i := startIdx; i < endIdx; i++ {
2775 1 : fragments[i].hints = append(fragments[i].hints, h)
2776 1 : }
2777 : }
2778 1 : return fragments
2779 : }
2780 :
2781 : // Runs a delete-only compaction.
2782 : //
2783 : // d.mu must *not* be held when calling this.
2784 : func (d *DB) runDeleteOnlyCompaction(
2785 : jobID JobID, c *compaction, snapshots compact.Snapshots,
2786 1 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
2787 1 : c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
2788 1 : fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
2789 1 : ve = &versionEdit{
2790 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2791 1 : }
2792 1 : for _, cl := range c.inputs {
2793 1 : levelMetrics := &LevelMetrics{}
2794 1 : if err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.exciseEnabled); err != nil {
2795 0 : return nil, stats, err
2796 0 : }
2797 1 : c.metrics[cl.level] = levelMetrics
2798 : }
2799 : // Remove any files that were added and deleted in the same versionEdit.
2800 1 : ve.NewFiles = slices.DeleteFunc(ve.NewFiles, func(e manifest.NewFileEntry) bool {
2801 1 : deletedFileEntry := manifest.DeletedFileEntry{Level: e.Level, FileNum: e.Meta.FileNum}
2802 1 : if _, deleted := ve.DeletedFiles[deletedFileEntry]; deleted {
2803 1 : delete(ve.DeletedFiles, deletedFileEntry)
2804 1 : return true
2805 1 : }
2806 1 : return false
2807 : })
2808 : // Remove any entries from CreatedBackingTables that are not used in any
2809 : // NewFiles.
2810 1 : usedBackingFiles := make(map[base.DiskFileNum]struct{})
2811 1 : for _, e := range ve.NewFiles {
2812 1 : if e.Meta.Virtual {
2813 1 : usedBackingFiles[e.Meta.FileBacking.DiskFileNum] = struct{}{}
2814 1 : }
2815 : }
2816 1 : ve.CreatedBackingTables = slices.DeleteFunc(ve.CreatedBackingTables, func(b *fileBacking) bool {
2817 1 : _, used := usedBackingFiles[b.DiskFileNum]
2818 1 : return !used
2819 1 : })
2820 : // Refresh the disk available statistic whenever a compaction/flush
2821 : // completes, before re-acquiring the mutex.
2822 1 : d.calculateDiskAvailableBytes()
2823 1 : return ve, stats, nil
2824 : }
2825 :
2826 : func (d *DB) runMoveCompaction(
2827 : jobID JobID, c *compaction,
2828 1 : ) (ve *versionEdit, stats compact.Stats, _ error) {
2829 1 : iter := c.startLevel.files.Iter()
2830 1 : meta := iter.First()
2831 1 : if iter.Next() != nil {
2832 0 : return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
2833 0 : }
2834 1 : if c.cancel.Load() {
2835 0 : return ve, stats, ErrCancelledCompaction
2836 0 : }
2837 1 : c.metrics = map[int]*LevelMetrics{
2838 1 : c.outputLevel.level: {
2839 1 : BytesMoved: meta.Size,
2840 1 : TablesMoved: 1,
2841 1 : },
2842 1 : }
2843 1 : ve = &versionEdit{
2844 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
2845 1 : {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
2846 1 : },
2847 1 : NewFiles: []newFileEntry{
2848 1 : {Level: c.outputLevel.level, Meta: meta},
2849 1 : },
2850 1 : }
2851 1 :
2852 1 : return ve, stats, nil
2853 : }
2854 :
2855 : // runCompaction runs a compaction that produces new on-disk tables from
2856 : // memtables or old on-disk tables.
2857 : //
2858 : // runCompaction cannot be used for compactionKindIngestedFlushable.
2859 : //
2860 : // d.mu must be held when calling this, but the mutex may be dropped and
2861 : // re-acquired during the course of this method.
2862 : func (d *DB) runCompaction(
2863 : jobID JobID, c *compaction,
2864 1 : ) (ve *versionEdit, stats compact.Stats, retErr error) {
2865 1 : defer func() {
2866 1 : c.slot.Release(stats.CumulativeWrittenSize)
2867 1 : c.slot = nil
2868 1 : }()
2869 1 : if c.cancel.Load() {
2870 1 : return ve, stats, ErrCancelledCompaction
2871 1 : }
2872 1 : switch c.kind {
2873 1 : case compactionKindDeleteOnly:
2874 1 : // Before dropping the db mutex, grab a ref to the current version. This
2875 1 : // prevents any concurrent excises from deleting files that this compaction
2876 1 : // needs to read/maintain a reference to.
2877 1 : //
2878 1 : // Note that delete-only compactions can call excise(), which needs to be able
2879 1 : // to read these files.
2880 1 : vers := d.mu.versions.currentVersion()
2881 1 : vers.Ref()
2882 1 : defer vers.UnrefLocked()
2883 1 : // Release the d.mu lock while doing I/O.
2884 1 : // Note the unusual order: Unlock and then Lock.
2885 1 : snapshots := d.mu.snapshots.toSlice()
2886 1 : d.mu.Unlock()
2887 1 : defer d.mu.Lock()
2888 1 : return d.runDeleteOnlyCompaction(jobID, c, snapshots)
2889 1 : case compactionKindMove:
2890 1 : return d.runMoveCompaction(jobID, c)
2891 1 : case compactionKindCopy:
2892 1 : return d.runCopyCompaction(jobID, c)
2893 0 : case compactionKindIngestedFlushable:
2894 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
2895 : }
2896 :
2897 1 : snapshots := d.mu.snapshots.toSlice()
2898 1 :
2899 1 : if c.flushing == nil {
2900 1 : // Before dropping the db mutex, grab a ref to the current version. This
2901 1 : // prevents any concurrent excises from deleting files that this compaction
2902 1 : // needs to read/maintain a reference to.
2903 1 : //
2904 1 : // Note that unlike user iterators, compactionIter does not maintain a ref
2905 1 : // of the version or read state.
2906 1 : vers := d.mu.versions.currentVersion()
2907 1 : vers.Ref()
2908 1 : defer vers.UnrefLocked()
2909 1 : }
2910 :
2911 : // The table is typically written at the maximum allowable format implied by
2912 : // the current format major version of the DB, but Options may define
2913 : // additional constraints.
2914 1 : tableFormat := d.TableFormat()
2915 1 :
2916 1 : // Release the d.mu lock while doing I/O.
2917 1 : // Note the unusual order: Unlock and then Lock.
2918 1 : d.mu.Unlock()
2919 1 : defer d.mu.Lock()
2920 1 :
2921 1 : result := d.compactAndWrite(jobID, c, snapshots, tableFormat)
2922 1 : if result.Err == nil {
2923 1 : ve, result.Err = c.makeVersionEdit(result)
2924 1 : }
2925 1 : if result.Err != nil {
2926 1 : // Delete any created tables.
2927 1 : obsoleteFiles := make([]*fileBacking, 0, len(result.Tables))
2928 1 : d.mu.Lock()
2929 1 : for i := range result.Tables {
2930 1 : backing := &fileBacking{
2931 1 : DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
2932 1 : Size: result.Tables[i].WriterMeta.Size,
2933 1 : }
2934 1 : obsoleteFiles = append(obsoleteFiles, backing)
2935 1 : // Add this file to zombie tables as well, as the versionSet
2936 1 : // asserts on whether every obsolete file was at one point
2937 1 : // marked zombie.
2938 1 : d.mu.versions.zombieTables[backing.DiskFileNum] = objectInfo{
2939 1 : fileInfo: fileInfo{
2940 1 : FileNum: backing.DiskFileNum,
2941 1 : FileSize: backing.Size,
2942 1 : },
2943 1 : isLocal: true,
2944 1 : }
2945 1 : }
2946 1 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
2947 1 : d.mu.Unlock()
2948 : }
2949 : // Refresh the disk available statistic whenever a compaction/flush
2950 : // completes, before re-acquiring the mutex.
2951 1 : d.calculateDiskAvailableBytes()
2952 1 : return ve, result.Stats, result.Err
2953 : }
2954 :
2955 : // compactAndWrite runs the data part of a compaction, where we set up a
2956 : // compaction iterator and use it to write output tables.
2957 : func (d *DB) compactAndWrite(
2958 : jobID JobID, c *compaction, snapshots compact.Snapshots, tableFormat sstable.TableFormat,
2959 1 : ) (result compact.Result) {
2960 1 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
2961 1 : // block cache with blocks that will not be read again. We initialize the
2962 1 : // buffer pool with a size 12. This initial size does not need to be
2963 1 : // accurate, because the pool will grow to accommodate the maximum number of
2964 1 : // blocks allocated at a given time over the course of the compaction. But
2965 1 : // choosing a size larger than that working set avoids any additional
2966 1 : // allocations to grow the size of the pool over the course of iteration.
2967 1 : //
2968 1 : // Justification for initial size 12: In a two-level compaction, at any
2969 1 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
2970 1 : // Additionally, when decoding a compressed block, we'll temporarily
2971 1 : // allocate 1 additional block to hold the compressed buffer. In the worst
2972 1 : // case that all input sstables have two-level index blocks (+2), value
2973 1 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
2974 1 : // additionally require 2n+4 blocks where n is the number of input sstables.
2975 1 : // Range deletion and range key blocks are relatively rare, and the cost of
2976 1 : // an additional allocation or two over the course of the compaction is
2977 1 : // considered to be okay. A larger initial size would cause the pool to hold
2978 1 : // on to more memory, even when it's not in-use because the pool will
2979 1 : // recycle buffers up to the current capacity of the pool. The memory use of
2980 1 : // a 12-buffer pool is expected to be within reason, even if all the buffers
2981 1 : // grow to the typical size of an index block (256 KiB) which would
2982 1 : // translate to 3 MiB per compaction.
2983 1 : c.bufferPool.Init(12)
2984 1 : defer c.bufferPool.Release()
2985 1 : iiopts := internalIterOpts{
2986 1 : compaction: true,
2987 1 : readEnv: block.ReadEnv{
2988 1 : BufferPool: &c.bufferPool,
2989 1 : Stats: &c.stats,
2990 1 : IterStats: d.fileCache.SSTStatsCollector().Accumulator(
2991 1 : uint64(uintptr(unsafe.Pointer(c))),
2992 1 : categoryCompaction,
2993 1 : ),
2994 1 : },
2995 1 : }
2996 1 :
2997 1 : pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter, iiopts)
2998 1 : defer func() {
2999 1 : for _, closer := range c.closers {
3000 1 : closer.FragmentIterator.Close()
3001 1 : }
3002 : }()
3003 1 : if err != nil {
3004 0 : return compact.Result{Err: err}
3005 0 : }
3006 1 : c.allowedZeroSeqNum = c.allowZeroSeqNum()
3007 1 : cfg := compact.IterConfig{
3008 1 : Comparer: c.comparer,
3009 1 : Merge: d.merge,
3010 1 : TombstoneElision: c.delElision,
3011 1 : RangeKeyElision: c.rangeKeyElision,
3012 1 : Snapshots: snapshots,
3013 1 : AllowZeroSeqNum: c.allowedZeroSeqNum,
3014 1 : IneffectualSingleDeleteCallback: func(userKey []byte) {
3015 1 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3016 1 : Kind: IneffectualSingleDelete,
3017 1 : UserKey: slices.Clone(userKey),
3018 1 : })
3019 1 : },
3020 0 : NondeterministicSingleDeleteCallback: func(userKey []byte) {
3021 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3022 0 : Kind: NondeterministicSingleDelete,
3023 0 : UserKey: slices.Clone(userKey),
3024 0 : })
3025 0 : },
3026 : }
3027 1 : iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
3028 1 :
3029 1 : runnerCfg := compact.RunnerConfig{
3030 1 : CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest),
3031 1 : L0SplitKeys: c.l0Limits,
3032 1 : Grandparents: c.grandparents,
3033 1 : MaxGrandparentOverlapBytes: c.maxOverlapBytes,
3034 1 : TargetOutputFileSize: c.maxOutputFileSize,
3035 1 : Slot: c.slot,
3036 1 : IteratorStats: &c.stats,
3037 1 : }
3038 1 : runner := compact.NewRunner(runnerCfg, iter)
3039 1 : for runner.MoreDataToWrite() {
3040 1 : if c.cancel.Load() {
3041 1 : return runner.Finish().WithError(ErrCancelledCompaction)
3042 1 : }
3043 : // Create a new table.
3044 1 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3045 1 : objMeta, tw, cpuWorkHandle, err := d.newCompactionOutput(jobID, c, writerOpts)
3046 1 : if err != nil {
3047 0 : return runner.Finish().WithError(err)
3048 0 : }
3049 1 : runner.WriteTable(objMeta, tw)
3050 1 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
3051 : }
3052 1 : result = runner.Finish()
3053 1 : if result.Err == nil {
3054 1 : result.Err = d.objProvider.Sync()
3055 1 : }
3056 1 : return result
3057 : }
3058 :
3059 : // makeVersionEdit creates the version edit for a compaction, based on the
3060 : // tables in compact.Result.
3061 1 : func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error) {
3062 1 : ve := &versionEdit{
3063 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
3064 1 : }
3065 1 : for _, cl := range c.inputs {
3066 1 : iter := cl.files.Iter()
3067 1 : for f := iter.First(); f != nil; f = iter.Next() {
3068 1 : ve.DeletedFiles[deletedFileEntry{
3069 1 : Level: cl.level,
3070 1 : FileNum: f.FileNum,
3071 1 : }] = f
3072 1 : }
3073 : }
3074 :
3075 1 : startLevelBytes := c.startLevel.files.SizeSum()
3076 1 : outputMetrics := &LevelMetrics{
3077 1 : BytesIn: startLevelBytes,
3078 1 : BytesRead: c.outputLevel.files.SizeSum(),
3079 1 : }
3080 1 : if len(c.extraLevels) > 0 {
3081 1 : outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
3082 1 : }
3083 1 : outputMetrics.BytesRead += outputMetrics.BytesIn
3084 1 :
3085 1 : c.metrics = map[int]*LevelMetrics{
3086 1 : c.outputLevel.level: outputMetrics,
3087 1 : }
3088 1 : if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
3089 1 : c.metrics[c.startLevel.level] = &LevelMetrics{}
3090 1 : }
3091 1 : if len(c.extraLevels) > 0 {
3092 1 : c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
3093 1 : outputMetrics.MultiLevel.BytesInTop = startLevelBytes
3094 1 : outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
3095 1 : outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
3096 1 : }
3097 :
3098 1 : inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
3099 1 : ve.NewFiles = make([]newFileEntry, len(result.Tables))
3100 1 : for i := range result.Tables {
3101 1 : t := &result.Tables[i]
3102 1 :
3103 1 : fileMeta := &fileMetadata{
3104 1 : FileNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
3105 1 : CreationTime: t.CreationTime.Unix(),
3106 1 : Size: t.WriterMeta.Size,
3107 1 : SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
3108 1 : LargestSeqNum: t.WriterMeta.LargestSeqNum,
3109 1 : }
3110 1 : if c.flushing == nil {
3111 1 : // Set the file's LargestSeqNumAbsolute to be the maximum value of any
3112 1 : // of the compaction's input sstables.
3113 1 : // TODO(jackson): This could be narrowed to be the maximum of input
3114 1 : // sstables that overlap the output sstable's key range.
3115 1 : fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
3116 1 : } else {
3117 1 : fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
3118 1 : }
3119 1 : fileMeta.InitPhysicalBacking()
3120 1 :
3121 1 : // If the file didn't contain any range deletions, we can fill its
3122 1 : // table stats now, avoiding unnecessarily loading the table later.
3123 1 : maybeSetStatsFromProperties(
3124 1 : fileMeta.PhysicalMeta(), &t.WriterMeta.Properties,
3125 1 : )
3126 1 :
3127 1 : if t.WriterMeta.HasPointKeys {
3128 1 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint)
3129 1 : }
3130 1 : if t.WriterMeta.HasRangeDelKeys {
3131 1 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel)
3132 1 : }
3133 1 : if t.WriterMeta.HasRangeKeys {
3134 1 : fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
3135 1 : }
3136 :
3137 1 : ve.NewFiles[i] = newFileEntry{
3138 1 : Level: c.outputLevel.level,
3139 1 : Meta: fileMeta,
3140 1 : }
3141 1 :
3142 1 : // Update metrics.
3143 1 : if c.flushing == nil {
3144 1 : outputMetrics.TablesCompacted++
3145 1 : outputMetrics.BytesCompacted += fileMeta.Size
3146 1 : } else {
3147 1 : outputMetrics.TablesFlushed++
3148 1 : outputMetrics.BytesFlushed += fileMeta.Size
3149 1 : }
3150 1 : outputMetrics.Size += int64(fileMeta.Size)
3151 1 : outputMetrics.NumFiles++
3152 1 : outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
3153 1 : outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
3154 : }
3155 :
3156 : // Sanity check that the tables are ordered and don't overlap.
3157 1 : for i := 1; i < len(ve.NewFiles); i++ {
3158 1 : if ve.NewFiles[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewFiles[i].Meta.Smallest.UserKey) {
3159 0 : return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
3160 0 : ve.NewFiles[i-1].Meta.DebugString(c.formatKey, true),
3161 0 : ve.NewFiles[i].Meta.DebugString(c.formatKey, true),
3162 0 : )
3163 0 : }
3164 : }
3165 :
3166 1 : return ve, nil
3167 : }
3168 :
3169 : // newCompactionOutput creates an object for a new table produced by a
3170 : // compaction or flush.
3171 : func (d *DB) newCompactionOutput(
3172 : jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
3173 1 : ) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
3174 1 : diskFileNum := d.mu.versions.getNextDiskFileNum()
3175 1 :
3176 1 : var writeCategory vfs.DiskWriteCategory
3177 1 : if d.opts.EnableSQLRowSpillMetrics {
3178 0 : // In the scenario that the Pebble engine is used for SQL row spills the
3179 0 : // data written to the memtable will correspond to spills to disk and
3180 0 : // should be categorized as such.
3181 0 : writeCategory = "sql-row-spill"
3182 1 : } else if c.kind == compactionKindFlush {
3183 1 : writeCategory = "pebble-memtable-flush"
3184 1 : } else {
3185 1 : writeCategory = "pebble-compaction"
3186 1 : }
3187 :
3188 1 : var reason string
3189 1 : if c.kind == compactionKindFlush {
3190 1 : reason = "flushing"
3191 1 : } else {
3192 1 : reason = "compacting"
3193 1 : }
3194 :
3195 1 : ctx := context.TODO()
3196 1 : if objiotracing.Enabled {
3197 0 : ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
3198 0 : if c.kind == compactionKindFlush {
3199 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
3200 0 : } else {
3201 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
3202 0 : }
3203 : }
3204 :
3205 : // Prefer shared storage if present.
3206 1 : createOpts := objstorage.CreateOptions{
3207 1 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
3208 1 : WriteCategory: writeCategory,
3209 1 : }
3210 1 : writable, objMeta, err := d.objProvider.Create(ctx, base.FileTypeTable, diskFileNum, createOpts)
3211 1 : if err != nil {
3212 0 : return objstorage.ObjectMetadata{}, nil, nil, err
3213 0 : }
3214 :
3215 1 : if c.kind != compactionKindFlush {
3216 1 : writable = &compactionWritable{
3217 1 : Writable: writable,
3218 1 : versions: d.mu.versions,
3219 1 : written: &c.bytesWritten,
3220 1 : }
3221 1 : }
3222 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
3223 1 : JobID: int(jobID),
3224 1 : Reason: reason,
3225 1 : Path: d.objProvider.Path(objMeta),
3226 1 : FileNum: diskFileNum,
3227 1 : })
3228 1 :
3229 1 : writerOpts.SetInternal(sstableinternal.WriterOptions{
3230 1 : CacheOpts: sstableinternal.CacheOptions{
3231 1 : CacheHandle: d.cacheHandle,
3232 1 : FileNum: diskFileNum,
3233 1 : },
3234 1 : })
3235 1 :
3236 1 : const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
3237 1 : cpuWorkHandle := d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
3238 1 : MaxFileWriteAdditionalCPUTime,
3239 1 : )
3240 1 : writerOpts.Parallelism =
3241 1 : d.opts.Experimental.MaxWriterConcurrency > 0 &&
3242 1 : (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
3243 1 :
3244 1 : // TODO(jackson): Make the compaction body generic over the RawWriter type,
3245 1 : // so that we don't need to pay the cost of dynamic dispatch?
3246 1 : tw := sstable.NewRawWriter(writable, writerOpts)
3247 1 : return objMeta, tw, cpuWorkHandle, nil
3248 : }
3249 :
3250 : // validateVersionEdit validates that start and end keys across new and deleted
3251 : // files in a versionEdit pass the given validation function.
3252 : func validateVersionEdit(
3253 : ve *versionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
3254 1 : ) {
3255 1 : validateKey := func(f *manifest.FileMetadata, key []byte) {
3256 1 : if err := vk.Validate(key); err != nil {
3257 0 : logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
3258 0 : }
3259 : }
3260 :
3261 : // Validate both new and deleted files.
3262 1 : for _, f := range ve.NewFiles {
3263 1 : validateKey(f.Meta, f.Meta.Smallest.UserKey)
3264 1 : validateKey(f.Meta, f.Meta.Largest.UserKey)
3265 1 : }
3266 1 : for _, m := range ve.DeletedFiles {
3267 1 : validateKey(m, m.Smallest.UserKey)
3268 1 : validateKey(m, m.Largest.UserKey)
3269 1 : }
3270 : }
|