Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "fmt"
12 : "io"
13 : "math"
14 : "runtime/pprof"
15 : "slices"
16 : "sort"
17 : "sync/atomic"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/compact"
23 : "github.com/cockroachdb/pebble/internal/invalidating"
24 : "github.com/cockroachdb/pebble/internal/invariants"
25 : "github.com/cockroachdb/pebble/internal/keyspan"
26 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
27 : "github.com/cockroachdb/pebble/internal/manifest"
28 : "github.com/cockroachdb/pebble/internal/private"
29 : "github.com/cockroachdb/pebble/internal/rangedel"
30 : "github.com/cockroachdb/pebble/internal/rangekey"
31 : "github.com/cockroachdb/pebble/objstorage"
32 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
33 : "github.com/cockroachdb/pebble/objstorage/remote"
34 : "github.com/cockroachdb/pebble/sstable"
35 : "github.com/cockroachdb/pebble/vfs"
36 : "github.com/cockroachdb/pebble/wal"
37 : )
38 :
39 : var errEmptyTable = errors.New("pebble: empty table")
40 :
41 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
42 : // concurrent excise or ingest-split operation.
43 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
44 :
45 : var compactLabels = pprof.Labels("pebble", "compact")
46 : var flushLabels = pprof.Labels("pebble", "flush")
47 : var gcLabels = pprof.Labels("pebble", "gc")
48 :
49 : // getInternalWriterProperties accesses a private variable (in the
50 : // internal/private package) initialized by the sstable Writer. This indirection
51 : // is necessary to ensure non-Pebble users constructing sstables for ingestion
52 : // are unable to set internal-only properties.
53 : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
54 :
55 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
56 : // compacted files. We avoid expanding the lower level file set of a compaction
57 : // if it would make the total compaction cover more than this many bytes.
58 1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
59 1 : v := uint64(25 * opts.Level(level).TargetFileSize)
60 1 :
61 1 : // Never expand a compaction beyond half the available capacity, divided
62 1 : // by the maximum number of concurrent compactions. Each of the concurrent
63 1 : // compactions may expand up to this limit, so this attempts to limit
64 1 : // compactions to half of available disk space. Note that this will not
65 1 : // prevent compaction picking from pursuing compactions that are larger
66 1 : // than this threshold before expansion.
67 1 : diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
68 1 : if v > diskMax {
69 0 : v = diskMax
70 0 : }
71 1 : return v
72 : }
73 :
74 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
75 : // before we stop building a single file in a level-1 to level compaction.
76 1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
77 1 : return uint64(10 * opts.Level(level).TargetFileSize)
78 1 : }
79 :
80 : // maxReadCompactionBytes is used to prevent read compactions which
81 : // are too wide.
82 1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
83 1 : return uint64(10 * opts.Level(level).TargetFileSize)
84 1 : }
85 :
86 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
87 : // calls to Close. It is used during compaction to ensure that rangeDelIters
88 : // are not closed prematurely.
89 : type noCloseIter struct {
90 : keyspan.FragmentIterator
91 : }
92 :
93 1 : func (i noCloseIter) Close() error {
94 1 : return nil
95 1 : }
96 :
97 : type compactionLevel struct {
98 : level int
99 : files manifest.LevelSlice
100 : // l0SublevelInfo contains information about L0 sublevels being compacted.
101 : // It's only set for the start level of a compaction starting out of L0 and
102 : // is nil for all other compactions.
103 : l0SublevelInfo []sublevelInfo
104 : }
105 :
106 1 : func (cl compactionLevel) Clone() compactionLevel {
107 1 : newCL := compactionLevel{
108 1 : level: cl.level,
109 1 : files: cl.files.Reslice(func(start, end *manifest.LevelIterator) {}),
110 : }
111 1 : return newCL
112 : }
113 0 : func (cl compactionLevel) String() string {
114 0 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
115 0 : }
116 :
117 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
118 : // updates a metric in `versions` on bytes written by in-progress compactions so
119 : // far. It also increments a per-compaction `written` int.
120 : type compactionWritable struct {
121 : objstorage.Writable
122 :
123 : versions *versionSet
124 : written *int64
125 : }
126 :
127 : // Write is part of the objstorage.Writable interface.
128 1 : func (c *compactionWritable) Write(p []byte) error {
129 1 : if err := c.Writable.Write(p); err != nil {
130 0 : return err
131 0 : }
132 :
133 1 : *c.written += int64(len(p))
134 1 : c.versions.incrementCompactionBytes(int64(len(p)))
135 1 : return nil
136 : }
137 :
138 : type compactionKind int
139 :
140 : const (
141 : compactionKindDefault compactionKind = iota
142 : compactionKindFlush
143 : // compactionKindMove denotes a move compaction where the input file is
144 : // retained and linked in a new level without being obsoleted.
145 : compactionKindMove
146 : // compactionKindCopy denotes a copy compaction where the input file is
147 : // copied byte-by-byte into a new file with a new FileNum in the output level.
148 : compactionKindCopy
149 : compactionKindDeleteOnly
150 : compactionKindElisionOnly
151 : compactionKindRead
152 : compactionKindRewrite
153 : compactionKindIngestedFlushable
154 : )
155 :
156 1 : func (k compactionKind) String() string {
157 1 : switch k {
158 1 : case compactionKindDefault:
159 1 : return "default"
160 0 : case compactionKindFlush:
161 0 : return "flush"
162 1 : case compactionKindMove:
163 1 : return "move"
164 1 : case compactionKindDeleteOnly:
165 1 : return "delete-only"
166 1 : case compactionKindElisionOnly:
167 1 : return "elision-only"
168 0 : case compactionKindRead:
169 0 : return "read"
170 1 : case compactionKindRewrite:
171 1 : return "rewrite"
172 0 : case compactionKindIngestedFlushable:
173 0 : return "ingested-flushable"
174 1 : case compactionKindCopy:
175 1 : return "copy"
176 : }
177 0 : return "?"
178 : }
179 :
180 : // rangeKeyCompactionTransform is used to transform range key spans as part of the
181 : // keyspanimpl.MergingIter. As part of this transformation step, we can elide range
182 : // keys in the last snapshot stripe, as well as coalesce range keys within
183 : // snapshot stripes.
184 : func rangeKeyCompactionTransform(
185 : eq base.Equal, snapshots []uint64, elideRangeKey func(start, end []byte) bool,
186 1 : ) keyspan.Transformer {
187 1 : return keyspan.TransformerFunc(func(cmp base.Compare, s keyspan.Span, dst *keyspan.Span) error {
188 1 : elideInLastStripe := func(keys []keyspan.Key) []keyspan.Key {
189 1 : // Unsets and deletes in the last snapshot stripe can be elided.
190 1 : k := 0
191 1 : for j := range keys {
192 1 : if elideRangeKey(s.Start, s.End) &&
193 1 : (keys[j].Kind() == InternalKeyKindRangeKeyUnset || keys[j].Kind() == InternalKeyKindRangeKeyDelete) {
194 1 : continue
195 : }
196 1 : keys[k] = keys[j]
197 1 : k++
198 : }
199 1 : keys = keys[:k]
200 1 : return keys
201 : }
202 : // snapshots are in ascending order, while s.keys are in descending seqnum
203 : // order. Partition s.keys by snapshot stripes, and call rangekey.Coalesce
204 : // on each partition.
205 1 : dst.Start = s.Start
206 1 : dst.End = s.End
207 1 : dst.Keys = dst.Keys[:0]
208 1 : i, j := len(snapshots)-1, 0
209 1 : usedLen := 0
210 1 : for i >= 0 {
211 1 : start := j
212 1 : for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) {
213 1 : // Include j in current partition.
214 1 : j++
215 1 : }
216 1 : if j > start {
217 1 : keysDst := dst.Keys[usedLen:cap(dst.Keys)]
218 1 : rangekey.Coalesce(cmp, eq, s.Keys[start:j], &keysDst)
219 1 : if j == len(s.Keys) {
220 1 : // This is the last snapshot stripe. Unsets and deletes can be elided.
221 1 : keysDst = elideInLastStripe(keysDst)
222 1 : }
223 1 : usedLen += len(keysDst)
224 1 : dst.Keys = append(dst.Keys, keysDst...)
225 : }
226 1 : i--
227 : }
228 1 : if j < len(s.Keys) {
229 1 : keysDst := dst.Keys[usedLen:cap(dst.Keys)]
230 1 : rangekey.Coalesce(cmp, eq, s.Keys[j:], &keysDst)
231 1 : keysDst = elideInLastStripe(keysDst)
232 1 : usedLen += len(keysDst)
233 1 : dst.Keys = append(dst.Keys, keysDst...)
234 1 : }
235 1 : return nil
236 : })
237 : }
238 :
239 : // compaction is a table compaction from one level to the next, starting from a
240 : // given version.
241 : type compaction struct {
242 : // cancel is a bool that can be used by other goroutines to signal a compaction
243 : // to cancel, such as if a conflicting excise operation raced it to manifest
244 : // application. Only holders of the manifest lock will write to this atomic.
245 : cancel atomic.Bool
246 :
247 : kind compactionKind
248 : // isDownload is true if this compaction was started as part of a Download
249 : // operation. In this case kind is compactionKindCopy or
250 : // compactionKindRewrite.
251 : isDownload bool
252 :
253 : cmp Compare
254 : equal Equal
255 : comparer *base.Comparer
256 : formatKey base.FormatKey
257 : logger Logger
258 : version *version
259 : stats base.InternalIteratorStats
260 : beganAt time.Time
261 : // versionEditApplied is set to true when a compaction has completed and the
262 : // resulting version has been installed (if successful), but the compaction
263 : // goroutine is still cleaning up (eg, deleting obsolete files).
264 : versionEditApplied bool
265 : bufferPool sstable.BufferPool
266 :
267 : // startLevel is the level that is being compacted. Inputs from startLevel
268 : // and outputLevel will be merged to produce a set of outputLevel files.
269 : startLevel *compactionLevel
270 :
271 : // outputLevel is the level that files are being produced in. outputLevel is
272 : // equal to startLevel+1 except when:
273 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
274 : // - in multilevel compaction, the output level is the lowest level involved in
275 : // the compaction
276 : // A compaction's outputLevel is nil for delete-only compactions.
277 : outputLevel *compactionLevel
278 :
279 : // extraLevels point to additional levels in between the input and output
280 : // levels that get compacted in multilevel compactions
281 : extraLevels []*compactionLevel
282 :
283 : inputs []compactionLevel
284 :
285 : // maxOutputFileSize is the maximum size of an individual table created
286 : // during compaction.
287 : maxOutputFileSize uint64
288 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
289 : // single output table with the tables in the grandparent level.
290 : maxOverlapBytes uint64
291 : // disableSpanElision disables elision of range tombstones and range keys. Used
292 : // by tests to allow range tombstones or range keys to be added to tables where
293 : // they would otherwise be elided.
294 : disableSpanElision bool
295 :
296 : // flushing contains the flushables (aka memtables) that are being flushed.
297 : flushing flushableList
298 : // bytesWritten contains the number of bytes that have been written to outputs.
299 : bytesWritten int64
300 :
301 : // The boundaries of the input data.
302 : smallest InternalKey
303 : largest InternalKey
304 :
305 : // rangeDelInterlaving is an interleaving iterator for range deletions, that
306 : // interleaves range tombstones among the point keys.
307 : rangeDelInterleaving keyspan.InterleavingIter
308 : // rangeKeyInterleaving is the interleaving iter for range keys.
309 : rangeKeyInterleaving keyspan.InterleavingIter
310 :
311 : // A list of objects to close when the compaction finishes. Used by input
312 : // iteration to keep rangeDelIters open for the lifetime of the compaction,
313 : // and only close them when the compaction finishes.
314 : closers []io.Closer
315 :
316 : // grandparents are the tables in level+2 that overlap with the files being
317 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
318 : // in the grandparent when this compaction finishes will be the same.
319 : grandparents manifest.LevelSlice
320 :
321 : // Boundaries at which flushes to L0 should be split. Determined by
322 : // L0Sublevels. If nil, flushes aren't split.
323 : l0Limits [][]byte
324 :
325 : // List of disjoint inuse key ranges the compaction overlaps with in
326 : // grandparent and lower levels. See setupInuseKeyRanges() for the
327 : // construction. Used by elideTombstone() and elideRangeTombstone() to
328 : // determine if keys affected by a tombstone possibly exist at a lower level.
329 : inuseKeyRanges []manifest.UserKeyRange
330 : // inuseEntireRange is set if the above inuse key ranges wholly contain the
331 : // compaction's key range. This allows compactions in higher levels to often
332 : // elide key comparisons.
333 : inuseEntireRange bool
334 : elideTombstoneIndex int
335 :
336 : // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
337 : // snapshots requiring them to be kept. This determination is made by
338 : // looking for an sstable which overlaps the bounds of the compaction at a
339 : // lower level in the LSM during runCompaction.
340 : allowedZeroSeqNum bool
341 :
342 : metrics map[int]*LevelMetrics
343 :
344 : pickerMetrics compactionPickerMetrics
345 : }
346 :
347 1 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
348 1 : info := CompactionInfo{
349 1 : JobID: int(jobID),
350 1 : Reason: c.kind.String(),
351 1 : Input: make([]LevelInfo, 0, len(c.inputs)),
352 1 : Annotations: []string{},
353 1 : }
354 1 : if c.isDownload {
355 1 : info.Reason = "download," + info.Reason
356 1 : }
357 1 : for _, cl := range c.inputs {
358 1 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
359 1 : iter := cl.files.Iter()
360 1 : for m := iter.First(); m != nil; m = iter.Next() {
361 1 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
362 1 : }
363 1 : info.Input = append(info.Input, inputInfo)
364 : }
365 1 : if c.outputLevel != nil {
366 1 : info.Output.Level = c.outputLevel.level
367 1 :
368 1 : // If there are no inputs from the output level (eg, a move
369 1 : // compaction), add an empty LevelInfo to info.Input.
370 1 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
371 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
372 0 : }
373 1 : } else {
374 1 : // For a delete-only compaction, set the output level to L6. The
375 1 : // output level is not meaningful here, but complicating the
376 1 : // info.Output interface with a pointer doesn't seem worth the
377 1 : // semantic distinction.
378 1 : info.Output.Level = numLevels - 1
379 1 : }
380 :
381 1 : for i, score := range c.pickerMetrics.scores {
382 1 : info.Input[i].Score = score
383 1 : }
384 1 : info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
385 1 : info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
386 1 : if len(info.Input) > 2 {
387 1 : info.Annotations = append(info.Annotations, "multilevel")
388 1 : }
389 1 : return info
390 : }
391 :
392 1 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
393 1 : return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
394 1 : }
395 :
396 : func newCompaction(
397 : pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
398 1 : ) *compaction {
399 1 : c := &compaction{
400 1 : kind: compactionKindDefault,
401 1 : cmp: pc.cmp,
402 1 : equal: opts.Comparer.Equal,
403 1 : comparer: opts.Comparer,
404 1 : formatKey: opts.Comparer.FormatKey,
405 1 : inputs: pc.inputs,
406 1 : smallest: pc.smallest,
407 1 : largest: pc.largest,
408 1 : logger: opts.Logger,
409 1 : version: pc.version,
410 1 : beganAt: beganAt,
411 1 : maxOutputFileSize: pc.maxOutputFileSize,
412 1 : maxOverlapBytes: pc.maxOverlapBytes,
413 1 : pickerMetrics: pc.pickerMetrics,
414 1 : }
415 1 : c.startLevel = &c.inputs[0]
416 1 : if pc.startLevel.l0SublevelInfo != nil {
417 1 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
418 1 : }
419 1 : c.outputLevel = &c.inputs[1]
420 1 :
421 1 : if len(pc.extraLevels) > 0 {
422 1 : c.extraLevels = pc.extraLevels
423 1 : c.outputLevel = &c.inputs[len(c.inputs)-1]
424 1 : }
425 : // Compute the set of outputLevel+1 files that overlap this compaction (these
426 : // are the grandparent sstables).
427 1 : if c.outputLevel.level+1 < numLevels {
428 1 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
429 1 : }
430 1 : c.setupInuseKeyRanges()
431 1 : c.kind = pc.kind
432 1 :
433 1 : if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
434 1 : c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
435 1 : // This compaction can be converted into a move or copy from one level
436 1 : // to the next. We avoid such a move if there is lots of overlapping
437 1 : // grandparent data. Otherwise, the move could create a parent file
438 1 : // that will require a very expensive merge later on.
439 1 : iter := c.startLevel.files.Iter()
440 1 : meta := iter.First()
441 1 : isRemote := false
442 1 : // We should always be passed a provider, except in some unit tests.
443 1 : if provider != nil {
444 1 : isRemote = !objstorage.IsLocalTable(provider, meta.FileBacking.DiskFileNum)
445 1 : }
446 : // Avoid a trivial move or copy if all of these are true, as rewriting a
447 : // new file is better:
448 : //
449 : // 1) The source file is a virtual sstable
450 : // 2) The existing file `meta` is on non-remote storage
451 : // 3) The output level prefers shared storage
452 1 : mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
453 1 : if mustCopy {
454 1 : // If the source is virtual, it's best to just rewrite the file as all
455 1 : // conditions in the above comment are met.
456 1 : if !meta.Virtual {
457 1 : c.kind = compactionKindCopy
458 1 : }
459 1 : } else {
460 1 : c.kind = compactionKindMove
461 1 : }
462 : }
463 1 : return c
464 : }
465 :
466 : func newDeleteOnlyCompaction(
467 : opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
468 1 : ) *compaction {
469 1 : c := &compaction{
470 1 : kind: compactionKindDeleteOnly,
471 1 : cmp: opts.Comparer.Compare,
472 1 : equal: opts.Comparer.Equal,
473 1 : comparer: opts.Comparer,
474 1 : formatKey: opts.Comparer.FormatKey,
475 1 : logger: opts.Logger,
476 1 : version: cur,
477 1 : beganAt: beganAt,
478 1 : inputs: inputs,
479 1 : }
480 1 :
481 1 : // Set c.smallest, c.largest.
482 1 : files := make([]manifest.LevelIterator, 0, len(inputs))
483 1 : for _, in := range inputs {
484 1 : files = append(files, in.files.Iter())
485 1 : }
486 1 : c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
487 1 : return c
488 : }
489 :
490 1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
491 1 : // Heuristic to place a lower bound on compaction output file size
492 1 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
493 1 : // production with 310K files of which 290K files were < 10KB in size.
494 1 : // Our hypothesis is that it was caused by L1 having 2600 files and
495 1 : // ~10GB, such that each flush got split into many tiny files due to
496 1 : // overlapping with most of the files in Lbase.
497 1 : //
498 1 : // The computation below is general in that it accounts
499 1 : // for flushing different volumes of data (e.g. we may be flushing
500 1 : // many memtables). For illustration, we consider the typical
501 1 : // example of flushing a 64MB memtable. So 12.8MB output,
502 1 : // based on the compression guess below. If the compressed bytes
503 1 : // guess is an over-estimate we will end up with smaller files,
504 1 : // and if an under-estimate we will end up with larger files.
505 1 : // With a 2MB target file size, 7 files. We are willing to accept
506 1 : // 4x the number of files, if it results in better write amplification
507 1 : // when later compacting to Lbase, i.e., ~450KB files (target file
508 1 : // size / 4).
509 1 : //
510 1 : // Note that this is a pessimistic heuristic in that
511 1 : // fileCountUpperBoundDueToGrandparents could be far from the actual
512 1 : // number of files produced due to the grandparent limits. For
513 1 : // example, in the extreme, consider a flush that overlaps with 1000
514 1 : // files in Lbase f0...f999, and the initially calculated value of
515 1 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
516 1 : // means an upper bound file count of 100 files. Say the input bytes
517 1 : // in the flush are such that acceptableFileCount=10. We will fatten
518 1 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
519 1 : // drops to 10. However, it is possible that in practice, even without
520 1 : // this change, we would have produced no more than 10 files, and that
521 1 : // this change makes the files unnecessarily wide. Say the input bytes
522 1 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
523 1 : // 10% in f80...f89 and 10% in f990...f999. The original value of
524 1 : // maxOverlapBytes would have actually produced only 10 sstables. But
525 1 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
526 1 : // spans f0...f89, i.e., a much wider sstable than necessary.
527 1 : //
528 1 : // We could produce a tighter estimate of
529 1 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
530 1 : // distribution of the flush. The 4x multiplier mentioned earlier is
531 1 : // a way to try to compensate for this pessimism.
532 1 : //
533 1 : // TODO(sumeer): we don't have compression info for the data being
534 1 : // flushed, but it is likely that existing files that overlap with
535 1 : // this flush in Lbase are representative wrt compression ratio. We
536 1 : // could store the uncompressed size in FileMetadata and estimate
537 1 : // the compression ratio.
538 1 : const approxCompressionRatio = 0.2
539 1 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
540 1 : approxNumFilesBasedOnTargetSize :=
541 1 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
542 1 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
543 1 : // The byte calculation is linear in numGrandparentFiles, but we will
544 1 : // incur this linear cost in findGrandparentLimit too, so we are also
545 1 : // willing to pay it now. We could approximate this cheaply by using
546 1 : // the mean file size of Lbase.
547 1 : grandparentFileBytes := c.grandparents.SizeSum()
548 1 : fileCountUpperBoundDueToGrandparents :=
549 1 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
550 1 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
551 1 : c.maxOverlapBytes = uint64(
552 1 : float64(c.maxOverlapBytes) *
553 1 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
554 1 : }
555 : }
556 :
557 : func newFlush(
558 : opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
559 1 : ) (*compaction, error) {
560 1 : c := &compaction{
561 1 : kind: compactionKindFlush,
562 1 : cmp: opts.Comparer.Compare,
563 1 : equal: opts.Comparer.Equal,
564 1 : comparer: opts.Comparer,
565 1 : formatKey: opts.Comparer.FormatKey,
566 1 : logger: opts.Logger,
567 1 : version: cur,
568 1 : beganAt: beganAt,
569 1 : inputs: []compactionLevel{{level: -1}, {level: 0}},
570 1 : maxOutputFileSize: math.MaxUint64,
571 1 : maxOverlapBytes: math.MaxUint64,
572 1 : flushing: flushing,
573 1 : }
574 1 : c.startLevel = &c.inputs[0]
575 1 : c.outputLevel = &c.inputs[1]
576 1 :
577 1 : if len(flushing) > 0 {
578 1 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
579 1 : if len(flushing) != 1 {
580 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
581 : }
582 1 : c.kind = compactionKindIngestedFlushable
583 1 : return c, nil
584 : }
585 : }
586 :
587 : // Make sure there's no ingestedFlushable after the first flushable in the
588 : // list.
589 1 : for _, f := range flushing {
590 1 : if _, ok := f.flushable.(*ingestedFlushable); ok {
591 0 : panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
592 : }
593 : }
594 :
595 1 : if cur.L0Sublevels != nil {
596 1 : c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
597 1 : }
598 :
599 1 : smallestSet, largestSet := false, false
600 1 : updatePointBounds := func(iter internalIterator) {
601 1 : if kv := iter.First(); kv != nil {
602 1 : if !smallestSet ||
603 1 : base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
604 1 : smallestSet = true
605 1 : c.smallest = kv.K.Clone()
606 1 : }
607 : }
608 1 : if kv := iter.Last(); kv != nil {
609 1 : if !largestSet ||
610 1 : base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
611 1 : largestSet = true
612 1 : c.largest = kv.K.Clone()
613 1 : }
614 : }
615 : }
616 :
617 1 : updateRangeBounds := func(iter keyspan.FragmentIterator) error {
618 1 : // File bounds require s != nil && !s.Empty(). We only need to check for
619 1 : // s != nil here, as the memtable's FragmentIterator would never surface
620 1 : // empty spans.
621 1 : if s, err := iter.First(); err != nil {
622 0 : return err
623 1 : } else if s != nil {
624 1 : if key := s.SmallestKey(); !smallestSet ||
625 1 : base.InternalCompare(c.cmp, c.smallest, key) > 0 {
626 1 : smallestSet = true
627 1 : c.smallest = key.Clone()
628 1 : }
629 : }
630 1 : if s, err := iter.Last(); err != nil {
631 0 : return err
632 1 : } else if s != nil {
633 1 : if key := s.LargestKey(); !largestSet ||
634 1 : base.InternalCompare(c.cmp, c.largest, key) < 0 {
635 1 : largestSet = true
636 1 : c.largest = key.Clone()
637 1 : }
638 : }
639 1 : return nil
640 : }
641 :
642 1 : var flushingBytes uint64
643 1 : for i := range flushing {
644 1 : f := flushing[i]
645 1 : updatePointBounds(f.newIter(nil))
646 1 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
647 1 : if err := updateRangeBounds(rangeDelIter); err != nil {
648 0 : return nil, err
649 0 : }
650 : }
651 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
652 1 : if err := updateRangeBounds(rangeKeyIter); err != nil {
653 0 : return nil, err
654 0 : }
655 : }
656 1 : flushingBytes += f.inuseBytes()
657 : }
658 :
659 1 : if opts.FlushSplitBytes > 0 {
660 1 : c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
661 1 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
662 1 : c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
663 1 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
664 1 : }
665 :
666 1 : c.setupInuseKeyRanges()
667 1 : return c, nil
668 : }
669 :
670 1 : func (c *compaction) hasExtraLevelData() bool {
671 1 : if len(c.extraLevels) == 0 {
672 1 : // not a multi level compaction
673 1 : return false
674 1 : } else if c.extraLevels[0].files.Empty() {
675 1 : // a multi level compaction without data in the intermediate input level;
676 1 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
677 1 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
678 1 : return false
679 1 : }
680 1 : return true
681 : }
682 :
683 1 : func (c *compaction) setupInuseKeyRanges() {
684 1 : level := c.outputLevel.level + 1
685 1 : if c.outputLevel.level == 0 {
686 1 : level = 0
687 1 : }
688 : // calculateInuseKeyRanges will return a series of sorted spans. Overlapping
689 : // or abutting spans have already been merged.
690 1 : c.inuseKeyRanges = c.version.CalculateInuseKeyRanges(
691 1 : level, numLevels-1, c.smallest.UserKey, c.largest.UserKey,
692 1 : )
693 1 : // Check if there's a single in-use span that encompasses the entire key
694 1 : // range of the compaction. This is an optimization to avoid key comparisons
695 1 : // against inuseKeyRanges during the compaction when every key within the
696 1 : // compaction overlaps with an in-use span.
697 1 : if len(c.inuseKeyRanges) > 0 {
698 1 : c.inuseEntireRange = c.cmp(c.inuseKeyRanges[0].Start, c.smallest.UserKey) <= 0 &&
699 1 : c.cmp(c.inuseKeyRanges[0].End, c.largest.UserKey) >= 0
700 1 : }
701 : }
702 :
703 : // findGrandparentLimit takes the start user key for a table and returns the
704 : // user key to which that table can extend without excessively overlapping
705 : // the grandparent level. If no limit is needed considering the grandparent
706 : // files, this function returns nil. This is done in order to prevent a table
707 : // at level N from overlapping too much data at level N+1. We want to avoid
708 : // such large overlaps because they translate into large compactions. The
709 : // current heuristic stops output of a table if the addition of another key
710 : // would cause the table to overlap more than 10x the target file size at
711 : // level N. See maxGrandparentOverlapBytes.
712 1 : func (c *compaction) findGrandparentLimit(start []byte) []byte {
713 1 : iter := c.grandparents.Iter()
714 1 : var overlappedBytes uint64
715 1 : var greater bool
716 1 : for f := iter.SeekGE(c.cmp, start); f != nil; f = iter.Next() {
717 1 : overlappedBytes += f.Size
718 1 : // To ensure forward progress we always return a larger user
719 1 : // key than where we started. See comments above clients of
720 1 : // this function for how this is used.
721 1 : greater = greater || c.cmp(f.Smallest.UserKey, start) > 0
722 1 : if !greater {
723 1 : continue
724 : }
725 :
726 : // We return the smallest bound of a sstable rather than the
727 : // largest because the smallest is always inclusive, and limits
728 : // are used exlusively when truncating range tombstones. If we
729 : // truncated an output to the largest key while there's a
730 : // pending tombstone, the next output file would also overlap
731 : // the same grandparent f.
732 1 : if overlappedBytes > c.maxOverlapBytes {
733 1 : return f.Smallest.UserKey
734 1 : }
735 : }
736 1 : return nil
737 : }
738 :
739 : // findL0Limit takes the start key for a table and returns the user key to which
740 : // that table can be extended without hitting the next l0Limit. Having flushed
741 : // sstables "bridging across" an l0Limit could lead to increased L0 -> LBase
742 : // compaction sizes as well as elevated read amplification.
743 1 : func (c *compaction) findL0Limit(start []byte) []byte {
744 1 : if c.startLevel.level > -1 || c.outputLevel.level != 0 || len(c.l0Limits) == 0 {
745 1 : return nil
746 1 : }
747 1 : index := sort.Search(len(c.l0Limits), func(i int) bool {
748 1 : return c.cmp(c.l0Limits[i], start) > 0
749 1 : })
750 1 : if index < len(c.l0Limits) {
751 1 : return c.l0Limits[index]
752 1 : }
753 1 : return nil
754 : }
755 :
756 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
757 : // this compaction have revisions of the same user key present in both sstables,
758 : // when it shouldn't (eg. when splitting flushes).
759 1 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
760 1 : if n := len(ve.NewFiles); n > 1 {
761 1 : meta := ve.NewFiles[n-1].Meta
762 1 : prevMeta := ve.NewFiles[n-2].Meta
763 1 : if !prevMeta.Largest.IsExclusiveSentinel() &&
764 1 : c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
765 0 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
766 0 : prevMeta.Largest.Pretty(c.formatKey),
767 0 : prevMeta.FileNum,
768 0 : meta.FileNum)
769 0 : }
770 : }
771 1 : return nil
772 : }
773 :
774 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
775 : // snapshots requiring them to be kept. It performs this determination by
776 : // looking for an sstable which overlaps the bounds of the compaction at a
777 : // lower level in the LSM.
778 1 : func (c *compaction) allowZeroSeqNum() bool {
779 1 : return c.elideRangeTombstone(c.smallest.UserKey, c.largest.UserKey)
780 1 : }
781 :
782 : // elideTombstone returns true if it is ok to elide a tombstone for the
783 : // specified key. A return value of true guarantees that there are no key/value
784 : // pairs at c.level+2 or higher that possibly contain the specified user
785 : // key. The keys in multiple invocations to elideTombstone must be supplied in
786 : // order.
787 1 : func (c *compaction) elideTombstone(key []byte) bool {
788 1 : if c.inuseEntireRange || len(c.flushing) != 0 {
789 1 : return false
790 1 : }
791 :
792 1 : for ; c.elideTombstoneIndex < len(c.inuseKeyRanges); c.elideTombstoneIndex++ {
793 1 : r := &c.inuseKeyRanges[c.elideTombstoneIndex]
794 1 : if c.cmp(key, r.End) <= 0 {
795 1 : if c.cmp(key, r.Start) >= 0 {
796 1 : return false
797 1 : }
798 1 : break
799 : }
800 : }
801 1 : return true
802 : }
803 :
804 : // elideRangeTombstone returns true if it is ok to elide the specified range
805 : // tombstone. A return value of true guarantees that there are no key/value
806 : // pairs at c.outputLevel.level+1 or higher that possibly overlap the specified
807 : // tombstone.
808 1 : func (c *compaction) elideRangeTombstone(start, end []byte) bool {
809 1 : // Disable range tombstone elision if the testing knob for that is enabled,
810 1 : // or if we are flushing memtables. The latter requirement is due to
811 1 : // inuseKeyRanges not accounting for key ranges in other memtables that are
812 1 : // being flushed in the same compaction. It's possible for a range tombstone
813 1 : // in one memtable to overlap keys in a preceding memtable in c.flushing.
814 1 : //
815 1 : // This function is also used in setting allowZeroSeqNum, so disabling
816 1 : // elision of range tombstones also disables zeroing of SeqNums.
817 1 : //
818 1 : // TODO(peter): we disable zeroing of seqnums during flushing to match
819 1 : // RocksDB behavior and to avoid generating overlapping sstables during
820 1 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
821 1 : // WAL is replayed building up a single version edit that is
822 1 : // applied. Because we don't apply the version edit after each flush, this
823 1 : // code doesn't know that L0 contains files and zeroing of seqnums should
824 1 : // be disabled. That is fixable, but it seems safer to just match the
825 1 : // RocksDB behavior for now.
826 1 : if c.disableSpanElision || len(c.flushing) != 0 {
827 1 : return false
828 1 : }
829 :
830 1 : lower := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
831 1 : return c.cmp(c.inuseKeyRanges[i].End, start) >= 0
832 1 : })
833 1 : upper := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
834 1 : return c.cmp(c.inuseKeyRanges[i].Start, end) > 0
835 1 : })
836 1 : return lower >= upper
837 : }
838 :
839 : // elideRangeKey returns true if it is ok to elide the specified range key. A
840 : // return value of true guarantees that there are no key/value pairs at
841 : // c.outputLevel.level+1 or higher that possibly overlap the specified range key.
842 1 : func (c *compaction) elideRangeKey(start, end []byte) bool {
843 1 : // TODO(bilal): Track inuseKeyRanges separately for the range keyspace as
844 1 : // opposed to the point keyspace. Once that is done, elideRangeTombstone
845 1 : // can just check in the point keyspace, and this function can check for
846 1 : // inuseKeyRanges in the range keyspace.
847 1 : return c.elideRangeTombstone(start, end)
848 1 : }
849 :
850 : // newInputIter returns an iterator over all the input tables in a compaction.
851 : func (c *compaction) newInputIter(
852 : newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, snapshots []uint64,
853 1 : ) (_ internalIterator, retErr error) {
854 1 : // Validate the ordering of compaction input files for defense in depth.
855 1 : if len(c.flushing) == 0 {
856 1 : if c.startLevel.level >= 0 {
857 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
858 1 : manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
859 1 : if err != nil {
860 0 : return nil, err
861 0 : }
862 : }
863 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
864 1 : manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
865 1 : if err != nil {
866 0 : return nil, err
867 0 : }
868 1 : if c.startLevel.level == 0 {
869 1 : if c.startLevel.l0SublevelInfo == nil {
870 0 : panic("l0SublevelInfo not created for compaction out of L0")
871 : }
872 1 : for _, info := range c.startLevel.l0SublevelInfo {
873 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
874 1 : info.sublevel, info.Iter())
875 1 : if err != nil {
876 0 : return nil, err
877 0 : }
878 : }
879 : }
880 1 : if len(c.extraLevels) > 0 {
881 1 : if len(c.extraLevels) > 1 {
882 0 : panic("n>2 multi level compaction not implemented yet")
883 : }
884 1 : interLevel := c.extraLevels[0]
885 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
886 1 : manifest.Level(interLevel.level), interLevel.files.Iter())
887 1 : if err != nil {
888 0 : return nil, err
889 0 : }
890 : }
891 : }
892 :
893 : // There are three classes of keys that a compaction needs to process: point
894 : // keys, range deletion tombstones and range keys. Collect all iterators for
895 : // all these classes of keys from all the levels. We'll aggregate them
896 : // together farther below.
897 : //
898 : // numInputLevels is an approximation of the number of iterator levels. Due
899 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
900 : // initial capacity.
901 1 : numInputLevels := max(len(c.flushing), len(c.inputs))
902 1 : iters := make([]internalIterator, 0, numInputLevels)
903 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
904 1 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
905 1 :
906 1 : // If construction of the iterator inputs fails, ensure that we close all
907 1 : // the consitutent iterators.
908 1 : defer func() {
909 1 : if retErr != nil {
910 0 : for _, iter := range iters {
911 0 : if iter != nil {
912 0 : iter.Close()
913 0 : }
914 : }
915 0 : for _, rangeDelIter := range rangeDelIters {
916 0 : rangeDelIter.Close()
917 0 : }
918 : }
919 : }()
920 1 : iterOpts := IterOptions{
921 1 : CategoryAndQoS: sstable.CategoryAndQoS{
922 1 : Category: "pebble-compaction",
923 1 : QoSLevel: sstable.NonLatencySensitiveQoSLevel,
924 1 : },
925 1 : logger: c.logger,
926 1 : }
927 1 :
928 1 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
929 1 : // constituent iterators. This depends on whether this is a flush or a
930 1 : // compaction.
931 1 : if len(c.flushing) != 0 {
932 1 : // If flushing, we need to build the input iterators over the memtables
933 1 : // stored in c.flushing.
934 1 : for i := range c.flushing {
935 1 : f := c.flushing[i]
936 1 : iters = append(iters, f.newFlushIter(nil))
937 1 : rangeDelIter := f.newRangeDelIter(nil)
938 1 : if rangeDelIter != nil {
939 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
940 1 : }
941 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
942 1 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
943 1 : }
944 : }
945 1 : } else {
946 1 : addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
947 1 : // Add a *levelIter for point iterators. Because we don't call
948 1 : // initRangeDel, the levelIter will close and forget the range
949 1 : // deletion iterator when it steps on to a new file. Surfacing range
950 1 : // deletions to compactions are handled below.
951 1 : iters = append(iters, newLevelIter(context.Background(),
952 1 : iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
953 1 : compaction: true,
954 1 : bufferPool: &c.bufferPool,
955 1 : }))
956 1 : // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
957 1 : // deletions into memory upfront. (See #2015, which reverted this.) There
958 1 : // will be no user keys that are split between sstables within a level in
959 1 : // Cockroach 23.1, which unblocks this optimization.
960 1 :
961 1 : // Add the range deletion iterator for each file as an independent level
962 1 : // in mergingIter, as opposed to making a levelIter out of those. This
963 1 : // is safer as levelIter expects all keys coming from underlying
964 1 : // iterators to be in order. Due to compaction / tombstone writing
965 1 : // logic in finishOutput(), it is possible for range tombstones to not
966 1 : // be strictly ordered across all files in one level.
967 1 : //
968 1 : // Consider this example from the metamorphic tests (also repeated in
969 1 : // finishOutput()), consisting of three L3 files with their bounds
970 1 : // specified in square brackets next to the file name:
971 1 : //
972 1 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
973 1 : // tmgc#391,MERGE [786e627a]
974 1 : // tmgc-udkatvs#331,RANGEDEL
975 1 : //
976 1 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
977 1 : // tmgc#384,MERGE [666c7070]
978 1 : // tmgc-tvsalezade#383,RANGEDEL
979 1 : // tmgc-tvsalezade#331,RANGEDEL
980 1 : //
981 1 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
982 1 : // tmgc-tvsalezade#383,RANGEDEL
983 1 : // tmgc#375,SET [72646c78766965616c72776865676e79]
984 1 : // tmgc-tvsalezade#356,RANGEDEL
985 1 : //
986 1 : // Here, the range tombstone in 000240.sst falls "after" one in
987 1 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
988 1 : // levelIter's purposes. While each file is still consistent before its
989 1 : // bounds, it's safer to have all rangedel iterators be visible to
990 1 : // mergingIter.
991 1 : iter := level.files.Iter()
992 1 : for f := iter.First(); f != nil; f = iter.Next() {
993 1 : rangeDelIter, closer, err := c.newRangeDelIter(
994 1 : newIters, iter.Take(), iterOpts, l)
995 1 : if err != nil {
996 0 : // The error will already be annotated with the BackingFileNum, so
997 0 : // we annotate it with the FileNum.
998 0 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
999 0 : }
1000 1 : if rangeDelIter == nil {
1001 1 : continue
1002 : }
1003 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
1004 1 : c.closers = append(c.closers, closer)
1005 : }
1006 :
1007 : // Check if this level has any range keys.
1008 1 : hasRangeKeys := false
1009 1 : for f := iter.First(); f != nil; f = iter.Next() {
1010 1 : if f.HasRangeKeys {
1011 1 : hasRangeKeys = true
1012 1 : break
1013 : }
1014 : }
1015 1 : if hasRangeKeys {
1016 1 : li := &keyspanimpl.LevelIter{}
1017 1 : newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
1018 1 : iter, err := newRangeKeyIter(file, iterOptions)
1019 1 : if err != nil {
1020 0 : return nil, err
1021 1 : } else if iter == nil {
1022 0 : return emptyKeyspanIter, nil
1023 0 : }
1024 : // Ensure that the range key iter is not closed until the compaction is
1025 : // finished. This is necessary because range key processing
1026 : // requires the range keys to be held in memory for up to the
1027 : // lifetime of the compaction.
1028 1 : c.closers = append(c.closers, iter)
1029 1 : iter = noCloseIter{iter}
1030 1 :
1031 1 : // We do not need to truncate range keys to sstable boundaries, or
1032 1 : // only read within the file's atomic compaction units, unlike with
1033 1 : // range tombstones. This is because range keys were added after we
1034 1 : // stopped splitting user keys across sstables, so all the range keys
1035 1 : // in this sstable must wholly lie within the file's bounds.
1036 1 : return iter, err
1037 : }
1038 1 : li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
1039 1 : rangeKeyIters = append(rangeKeyIters, li)
1040 : }
1041 1 : return nil
1042 : }
1043 :
1044 1 : for i := range c.inputs {
1045 1 : // If the level is annotated with l0SublevelInfo, expand it into one
1046 1 : // level per sublevel.
1047 1 : // TODO(jackson): Perform this expansion even earlier when we pick the
1048 1 : // compaction?
1049 1 : if len(c.inputs[i].l0SublevelInfo) > 0 {
1050 1 : for _, info := range c.startLevel.l0SublevelInfo {
1051 1 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
1052 1 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
1053 0 : return nil, err
1054 0 : }
1055 : }
1056 1 : continue
1057 : }
1058 1 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
1059 0 : return nil, err
1060 0 : }
1061 : }
1062 : }
1063 :
1064 : // If there's only one constituent point iterator, we can avoid the overhead
1065 : // of a *mergingIter. This is possible, for example, when performing a flush
1066 : // of a single memtable. Otherwise, combine all the iterators into a merging
1067 : // iter.
1068 1 : iter := iters[0]
1069 1 : if len(iters) > 1 {
1070 1 : iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
1071 1 : }
1072 :
1073 : // In normal operation, levelIter iterates over the point operations in a
1074 : // level, and initializes a rangeDelIter pointer for the range deletions in
1075 : // each table. During compaction, we want to iterate over the merged view of
1076 : // point operations and range deletions. In order to do this we create one
1077 : // levelIter per level to iterate over the point operations, and collect up
1078 : // all the range deletion files.
1079 : //
1080 : // The range deletion levels are combined with a keyspanimpl.MergingIter. The
1081 : // resulting merged rangedel iterator is then included using an
1082 : // InterleavingIter.
1083 : // TODO(jackson): Consider using a defragmenting iterator to stitch together
1084 : // logical range deletions that were fragmented due to previous file
1085 : // boundaries.
1086 1 : if len(rangeDelIters) > 0 {
1087 1 : mi := &keyspanimpl.MergingIter{}
1088 1 : mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
1089 1 : c.rangeDelInterleaving.Init(c.comparer, iter, mi, keyspan.InterleavingIterOpts{})
1090 1 : iter = &c.rangeDelInterleaving
1091 1 : }
1092 :
1093 : // If there are range key iterators, we need to combine them using
1094 : // keyspanimpl.MergingIter, and then interleave them among the points.
1095 1 : if len(rangeKeyIters) > 0 {
1096 1 : mi := &keyspanimpl.MergingIter{}
1097 1 : mi.Init(c.cmp, rangeKeyCompactionTransform(c.equal, snapshots, c.elideRangeKey), new(keyspanimpl.MergingBuffers), rangeKeyIters...)
1098 1 : di := &keyspan.DefragmentingIter{}
1099 1 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
1100 1 : c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{})
1101 1 : iter = &c.rangeKeyInterleaving
1102 1 : }
1103 1 : return iter, nil
1104 : }
1105 :
1106 : func (c *compaction) newRangeDelIter(
1107 : newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Level,
1108 1 : ) (keyspan.FragmentIterator, io.Closer, error) {
1109 1 : opts.level = l
1110 1 : iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
1111 1 : internalIterOpts{
1112 1 : compaction: true,
1113 1 : bufferPool: &c.bufferPool,
1114 1 : }, iterRangeDeletions)
1115 1 : if err != nil {
1116 0 : return nil, nil, err
1117 1 : } else if iterSet.rangeDeletion == nil {
1118 1 : // The file doesn't contain any range deletions.
1119 1 : return nil, nil, nil
1120 1 : }
1121 : // Ensure that rangeDelIter is not closed until the compaction is
1122 : // finished. This is necessary because range tombstone processing
1123 : // requires the range tombstones to be held in memory for up to the
1124 : // lifetime of the compaction.
1125 1 : return noCloseIter{iterSet.rangeDeletion}, iterSet.rangeDeletion, nil
1126 : }
1127 :
1128 0 : func (c *compaction) String() string {
1129 0 : if len(c.flushing) != 0 {
1130 0 : return "flush\n"
1131 0 : }
1132 :
1133 0 : var buf bytes.Buffer
1134 0 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
1135 0 : i := level - c.startLevel.level
1136 0 : fmt.Fprintf(&buf, "%d:", level)
1137 0 : iter := c.inputs[i].files.Iter()
1138 0 : for f := iter.First(); f != nil; f = iter.Next() {
1139 0 : fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
1140 0 : }
1141 0 : fmt.Fprintf(&buf, "\n")
1142 : }
1143 0 : return buf.String()
1144 : }
1145 :
1146 : type manualCompaction struct {
1147 : // Count of the retries either due to too many concurrent compactions, or a
1148 : // concurrent compaction to overlapping levels.
1149 : retries int
1150 : level int
1151 : outputLevel int
1152 : done chan error
1153 : start []byte
1154 : end []byte
1155 : split bool
1156 : }
1157 :
1158 : type readCompaction struct {
1159 : level int
1160 : // [start, end] key ranges are used for de-duping.
1161 : start []byte
1162 : end []byte
1163 :
1164 : // The file associated with the compaction.
1165 : // If the file no longer belongs in the same
1166 : // level, then we skip the compaction.
1167 : fileNum base.FileNum
1168 : }
1169 :
1170 1 : func (d *DB) addInProgressCompaction(c *compaction) {
1171 1 : d.mu.compact.inProgress[c] = struct{}{}
1172 1 : var isBase, isIntraL0 bool
1173 1 : for _, cl := range c.inputs {
1174 1 : iter := cl.files.Iter()
1175 1 : for f := iter.First(); f != nil; f = iter.Next() {
1176 1 : if f.IsCompacting() {
1177 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1178 0 : }
1179 1 : f.SetCompactionState(manifest.CompactionStateCompacting)
1180 1 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
1181 1 : if c.outputLevel.level == 0 {
1182 1 : f.IsIntraL0Compacting = true
1183 1 : isIntraL0 = true
1184 1 : } else {
1185 1 : isBase = true
1186 1 : }
1187 : }
1188 : }
1189 : }
1190 :
1191 1 : if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
1192 1 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
1193 1 : if isIntraL0 {
1194 1 : l0Inputs = append(l0Inputs, c.outputLevel.files)
1195 1 : }
1196 1 : if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
1197 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
1198 0 : }
1199 : }
1200 : }
1201 :
1202 : // Removes compaction markers from files in a compaction. The rollback parameter
1203 : // indicates whether the compaction state should be rolled back to its original
1204 : // state in the case of an unsuccessful compaction.
1205 : //
1206 : // DB.mu must be held when calling this method, however this method can drop and
1207 : // re-acquire that mutex. All writes to the manifest for this compaction should
1208 : // have completed by this point.
1209 1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
1210 1 : c.versionEditApplied = true
1211 1 : for _, cl := range c.inputs {
1212 1 : iter := cl.files.Iter()
1213 1 : for f := iter.First(); f != nil; f = iter.Next() {
1214 1 : if !f.IsCompacting() {
1215 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1216 0 : }
1217 1 : if !rollback {
1218 1 : // On success all compactions other than move-compactions transition the
1219 1 : // file into the Compacted state. Move-compacted files become eligible
1220 1 : // for compaction again and transition back to NotCompacting.
1221 1 : if c.kind != compactionKindMove {
1222 1 : f.SetCompactionState(manifest.CompactionStateCompacted)
1223 1 : } else {
1224 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1225 1 : }
1226 1 : } else {
1227 1 : // Else, on rollback, all input files unconditionally transition back to
1228 1 : // NotCompacting.
1229 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1230 1 : }
1231 1 : f.IsIntraL0Compacting = false
1232 : }
1233 : }
1234 1 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1235 1 : func() {
1236 1 : // InitCompactingFileInfo requires that no other manifest writes be
1237 1 : // happening in parallel with it, i.e. we're not in the midst of installing
1238 1 : // another version. Otherwise, it's possible that we've created another
1239 1 : // L0Sublevels instance, but not added it to the versions list, causing
1240 1 : // all the indices in FileMetadata to be inaccurate. To ensure this,
1241 1 : // grab the manifest lock.
1242 1 : d.mu.versions.logLock()
1243 1 : defer d.mu.versions.logUnlock()
1244 1 : d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
1245 1 : }()
1246 : }
1247 :
1248 1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1249 1 : if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
1250 1 : d.diskAvailBytes.Store(space.AvailBytes)
1251 1 : return space.AvailBytes
1252 1 : } else if !errors.Is(err, vfs.ErrUnsupported) {
1253 0 : d.opts.EventListener.BackgroundError(err)
1254 0 : }
1255 1 : return d.diskAvailBytes.Load()
1256 : }
1257 :
1258 1 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
1259 1 : var pacerInfo deletionPacerInfo
1260 1 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
1261 1 : // but in practice this was observed to take constant time, regardless of
1262 1 : // volume size used, at least on linux with ext4 and zfs. All invocations
1263 1 : // take 10 microseconds or less.
1264 1 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
1265 1 : d.mu.Lock()
1266 1 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
1267 1 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
1268 1 : d.mu.Unlock()
1269 1 : return pacerInfo
1270 1 : }
1271 :
1272 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
1273 1 : func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
1274 1 : d.mu.Lock()
1275 1 : d.mu.versions.metrics.Table.ObsoleteCount--
1276 1 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
1277 1 : if isLocal {
1278 1 : d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
1279 1 : }
1280 1 : d.mu.Unlock()
1281 : }
1282 :
1283 : // maybeScheduleFlush schedules a flush if necessary.
1284 : //
1285 : // d.mu must be held when calling this.
1286 1 : func (d *DB) maybeScheduleFlush() {
1287 1 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1288 1 : return
1289 1 : }
1290 1 : if len(d.mu.mem.queue) <= 1 {
1291 1 : return
1292 1 : }
1293 :
1294 1 : if !d.passedFlushThreshold() {
1295 1 : return
1296 1 : }
1297 :
1298 1 : d.mu.compact.flushing = true
1299 1 : go d.flush()
1300 : }
1301 :
1302 1 : func (d *DB) passedFlushThreshold() bool {
1303 1 : var n int
1304 1 : var size uint64
1305 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1306 1 : if !d.mu.mem.queue[n].readyForFlush() {
1307 1 : break
1308 : }
1309 1 : if d.mu.mem.queue[n].flushForced {
1310 1 : // A flush was forced. Pretend the memtable size is the configured
1311 1 : // size. See minFlushSize below.
1312 1 : size += d.opts.MemTableSize
1313 1 : } else {
1314 1 : size += d.mu.mem.queue[n].totalBytes()
1315 1 : }
1316 : }
1317 1 : if n == 0 {
1318 1 : // None of the immutable memtables are ready for flushing.
1319 1 : return false
1320 1 : }
1321 :
1322 : // Only flush once the sum of the queued memtable sizes exceeds half the
1323 : // configured memtable size. This prevents flushing of memtables at startup
1324 : // while we're undergoing the ramp period on the memtable size. See
1325 : // DB.newMemTable().
1326 1 : minFlushSize := d.opts.MemTableSize / 2
1327 1 : return size >= minFlushSize
1328 : }
1329 :
1330 1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1331 1 : var mem *flushableEntry
1332 1 : for _, m := range d.mu.mem.queue {
1333 1 : if m.flushable == tbl {
1334 1 : mem = m
1335 1 : break
1336 : }
1337 : }
1338 1 : if mem == nil || mem.flushForced {
1339 1 : return
1340 1 : }
1341 1 : deadline := d.timeNow().Add(dur)
1342 1 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1343 1 : // Already scheduled to flush sooner than within `dur`.
1344 1 : return
1345 1 : }
1346 1 : mem.delayedFlushForcedAt = deadline
1347 1 : go func() {
1348 1 : timer := time.NewTimer(dur)
1349 1 : defer timer.Stop()
1350 1 :
1351 1 : select {
1352 1 : case <-d.closedCh:
1353 1 : return
1354 1 : case <-mem.flushed:
1355 1 : return
1356 1 : case <-timer.C:
1357 1 : d.commit.mu.Lock()
1358 1 : defer d.commit.mu.Unlock()
1359 1 : d.mu.Lock()
1360 1 : defer d.mu.Unlock()
1361 1 :
1362 1 : // NB: The timer may fire concurrently with a call to Close. If a
1363 1 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1364 1 : // and it's too late to flush anything. Otherwise, the Close call
1365 1 : // will block on locking d.mu until we've finished scheduling the
1366 1 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1367 1 : // for the current flush to complete.
1368 1 : if d.closed.Load() != nil {
1369 0 : return
1370 0 : }
1371 :
1372 1 : if d.mu.mem.mutable == tbl {
1373 1 : d.makeRoomForWrite(nil)
1374 1 : } else {
1375 1 : mem.flushForced = true
1376 1 : }
1377 1 : d.maybeScheduleFlush()
1378 : }
1379 : }()
1380 : }
1381 :
1382 1 : func (d *DB) flush() {
1383 1 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1384 1 : flushingWorkStart := time.Now()
1385 1 : d.mu.Lock()
1386 1 : defer d.mu.Unlock()
1387 1 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1388 1 : var bytesFlushed uint64
1389 1 : var err error
1390 1 : if bytesFlushed, err = d.flush1(); err != nil {
1391 0 : // TODO(peter): count consecutive flush errors and backoff.
1392 0 : d.opts.EventListener.BackgroundError(err)
1393 0 : }
1394 1 : d.mu.compact.flushing = false
1395 1 : d.mu.compact.noOngoingFlushStartTime = time.Now()
1396 1 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1397 1 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1398 1 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1399 1 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1400 1 : // More flush work may have arrived while we were flushing, so schedule
1401 1 : // another flush if needed.
1402 1 : d.maybeScheduleFlush()
1403 1 : // The flush may have produced too many files in a level, so schedule a
1404 1 : // compaction if needed.
1405 1 : d.maybeScheduleCompaction()
1406 1 : d.mu.compact.cond.Broadcast()
1407 : })
1408 : }
1409 :
1410 : // runIngestFlush is used to generate a flush version edit for sstables which
1411 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1412 : // while runIngestFlush is called.
1413 1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1414 1 : if len(c.flushing) != 1 {
1415 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1416 : }
1417 :
1418 : // Construct the VersionEdit, levelMetrics etc.
1419 1 : c.metrics = make(map[int]*LevelMetrics, numLevels)
1420 1 : // Finding the target level for ingestion must use the latest version
1421 1 : // after the logLock has been acquired.
1422 1 : c.version = d.mu.versions.currentVersion()
1423 1 :
1424 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1425 1 : iterOpts := IterOptions{logger: d.opts.Logger}
1426 1 : ve := &versionEdit{}
1427 1 : var ingestSplitFiles []ingestSplitFile
1428 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1429 1 :
1430 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
1431 1 : levelMetrics := c.metrics[level]
1432 1 : if levelMetrics == nil {
1433 1 : levelMetrics = &LevelMetrics{}
1434 1 : c.metrics[level] = levelMetrics
1435 1 : }
1436 1 : levelMetrics.NumFiles--
1437 1 : levelMetrics.Size -= int64(m.Size)
1438 1 : for i := range added {
1439 1 : levelMetrics.NumFiles++
1440 1 : levelMetrics.Size += int64(added[i].Meta.Size)
1441 1 : }
1442 : }
1443 :
1444 1 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1445 1 : d.FormatMajorVersion() >= FormatVirtualSSTables
1446 1 :
1447 1 : if suggestSplit || ingestFlushable.exciseSpan.Valid() {
1448 1 : // We could add deleted files to ve.
1449 1 : ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
1450 1 : }
1451 :
1452 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
1453 1 : for _, file := range ingestFlushable.files {
1454 1 : var fileToSplit *fileMetadata
1455 1 : var level int
1456 1 :
1457 1 : // This file fits perfectly within the excise span, so we can slot it at L6.
1458 1 : if ingestFlushable.exciseSpan.Valid() &&
1459 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Smallest) &&
1460 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.FileMetadata.Largest) {
1461 1 : level = 6
1462 1 : } else {
1463 1 : var err error
1464 1 : level, fileToSplit, err = ingestTargetLevel(
1465 1 : d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
1466 1 : c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
1467 1 : suggestSplit,
1468 1 : )
1469 1 : if err != nil {
1470 0 : return nil, err
1471 0 : }
1472 : }
1473 :
1474 : // Add the current flushableIngest file to the version.
1475 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
1476 1 : if fileToSplit != nil {
1477 1 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1478 1 : ingestFile: file.FileMetadata,
1479 1 : splitFile: fileToSplit,
1480 1 : level: level,
1481 1 : })
1482 1 : }
1483 1 : levelMetrics := c.metrics[level]
1484 1 : if levelMetrics == nil {
1485 1 : levelMetrics = &LevelMetrics{}
1486 1 : c.metrics[level] = levelMetrics
1487 1 : }
1488 1 : levelMetrics.BytesIngested += file.Size
1489 1 : levelMetrics.TablesIngested++
1490 : }
1491 1 : if ingestFlushable.exciseSpan.Valid() {
1492 1 : // Iterate through all levels and find files that intersect with exciseSpan.
1493 1 : for l := range c.version.Levels {
1494 1 : overlaps := c.version.Overlaps(l, base.UserKeyBoundsEndExclusive(ingestFlushable.exciseSpan.Start, ingestFlushable.exciseSpan.End))
1495 1 : iter := overlaps.Iter()
1496 1 :
1497 1 : for m := iter.First(); m != nil; m = iter.Next() {
1498 1 : newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
1499 1 : if err != nil {
1500 0 : return nil, err
1501 0 : }
1502 :
1503 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
1504 1 : Level: l,
1505 1 : FileNum: m.FileNum,
1506 1 : }]; !ok {
1507 1 : // We did not excise this file.
1508 1 : continue
1509 : }
1510 1 : replacedFiles[m.FileNum] = newFiles
1511 1 : updateLevelMetricsOnExcise(m, l, newFiles)
1512 : }
1513 : }
1514 : }
1515 :
1516 1 : if len(ingestSplitFiles) > 0 {
1517 1 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
1518 0 : return nil, err
1519 0 : }
1520 : }
1521 :
1522 1 : return ve, nil
1523 : }
1524 :
1525 : // flush runs a compaction that copies the immutable memtables from memory to
1526 : // disk.
1527 : //
1528 : // d.mu must be held when calling this, but the mutex may be dropped and
1529 : // re-acquired during the course of this method.
1530 1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1531 1 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1532 1 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1533 1 : // level in the lsm. Let's say the flushable queue contains a prefix of
1534 1 : // regular immutable memtables, then an ingestedFlushable, and then the
1535 1 : // mutable memtable. When the flush of the ingestedFlushable is performed,
1536 1 : // it needs an updated view of the lsm. That is, the prefix of immutable
1537 1 : // memtables must have already been flushed. Similarly, if there are two
1538 1 : // contiguous ingestedFlushables in the queue, then the first flushable must
1539 1 : // be flushed, so that the second flushable can see an updated view of the
1540 1 : // lsm.
1541 1 : //
1542 1 : // Given the above, we restrict flushes to either some prefix of regular
1543 1 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
1544 1 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
1545 1 : // the remaining flush work should be scheduled right away.
1546 1 : //
1547 1 : // NB: Large batches placed in the flushable queue share the WAL with the
1548 1 : // previous memtable in the queue. We must ensure the property that both the
1549 1 : // large batch and the memtable with which it shares a WAL are flushed
1550 1 : // together. The property ensures that the minimum unflushed log number
1551 1 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
1552 1 : // returns true, and since the large batch will always be placed right after
1553 1 : // the memtable with which it shares a WAL, the property is naturally
1554 1 : // ensured. The large batch will always be placed after the memtable with
1555 1 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
1556 1 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
1557 1 : // measure, if we try to flush the memtable without also flushing the
1558 1 : // flushable batch in the same flush, since the memtable and flushableBatch
1559 1 : // have the same logNum, the logNum invariant check below will trigger.
1560 1 : var n, inputs int
1561 1 : var inputBytes uint64
1562 1 : var ingest bool
1563 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1564 1 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
1565 1 : if n == 0 {
1566 1 : // The first flushable is of type ingestedFlushable. Since these
1567 1 : // must be flushed individually, we perform a flush for just
1568 1 : // this.
1569 1 : if !f.readyForFlush() {
1570 0 : // This check is almost unnecessary, but we guard against it
1571 0 : // just in case this invariant changes in the future.
1572 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
1573 : }
1574 : // By setting n = 1, we ensure that the first flushable(n == 0)
1575 : // is scheduled for a flush. The number of tables added is equal to the
1576 : // number of files in the ingest operation.
1577 1 : n = 1
1578 1 : inputs = len(f.files)
1579 1 : ingest = true
1580 1 : break
1581 1 : } else {
1582 1 : // There was some prefix of flushables which weren't of type
1583 1 : // ingestedFlushable. So, perform a flush for those.
1584 1 : break
1585 : }
1586 : }
1587 1 : if !d.mu.mem.queue[n].readyForFlush() {
1588 0 : break
1589 : }
1590 1 : inputBytes += d.mu.mem.queue[n].inuseBytes()
1591 : }
1592 1 : if n == 0 {
1593 0 : // None of the immutable memtables are ready for flushing.
1594 0 : return 0, nil
1595 0 : }
1596 1 : if !ingest {
1597 1 : // Flushes of memtables add the prefix of n memtables from the flushable
1598 1 : // queue.
1599 1 : inputs = n
1600 1 : }
1601 :
1602 : // Require that every memtable being flushed has a log number less than the
1603 : // new minimum unflushed log number.
1604 1 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
1605 1 : if !d.opts.DisableWAL {
1606 1 : for i := 0; i < n; i++ {
1607 1 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
1608 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
1609 0 : n,
1610 0 : i, d.mu.mem.queue[i].flushable, logNum,
1611 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
1612 : }
1613 : }
1614 : }
1615 :
1616 1 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(),
1617 1 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
1618 1 : if err != nil {
1619 0 : return 0, err
1620 0 : }
1621 1 : d.addInProgressCompaction(c)
1622 1 :
1623 1 : jobID := d.newJobIDLocked()
1624 1 : d.opts.EventListener.FlushBegin(FlushInfo{
1625 1 : JobID: int(jobID),
1626 1 : Input: inputs,
1627 1 : InputBytes: inputBytes,
1628 1 : Ingest: ingest,
1629 1 : })
1630 1 : startTime := d.timeNow()
1631 1 :
1632 1 : var ve *manifest.VersionEdit
1633 1 : var pendingOutputs []compactionOutput
1634 1 : var stats compactStats
1635 1 : // To determine the target level of the files in the ingestedFlushable, we
1636 1 : // need to acquire the logLock, and not release it for that duration. Since,
1637 1 : // we need to acquire the logLock below to perform the logAndApply step
1638 1 : // anyway, we create the VersionEdit for ingestedFlushable outside of
1639 1 : // runCompaction. For all other flush cases, we construct the VersionEdit
1640 1 : // inside runCompaction.
1641 1 : if c.kind != compactionKindIngestedFlushable {
1642 1 : ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
1643 1 : }
1644 :
1645 : // Acquire logLock. This will be released either on an error, by way of
1646 : // logUnlock, or through a call to logAndApply if there is no error.
1647 1 : d.mu.versions.logLock()
1648 1 :
1649 1 : if c.kind == compactionKindIngestedFlushable {
1650 1 : ve, err = d.runIngestFlush(c)
1651 1 : }
1652 :
1653 1 : info := FlushInfo{
1654 1 : JobID: int(jobID),
1655 1 : Input: inputs,
1656 1 : InputBytes: inputBytes,
1657 1 : Duration: d.timeNow().Sub(startTime),
1658 1 : Done: true,
1659 1 : Ingest: ingest,
1660 1 : Err: err,
1661 1 : }
1662 1 : if err == nil {
1663 1 : for i := range ve.NewFiles {
1664 1 : e := &ve.NewFiles[i]
1665 1 : info.Output = append(info.Output, e.Meta.TableInfo())
1666 1 : // Ingested tables are not necessarily flushed to L0. Record the level of
1667 1 : // each ingested file explicitly.
1668 1 : if ingest {
1669 1 : info.IngestLevels = append(info.IngestLevels, e.Level)
1670 1 : }
1671 : }
1672 1 : if len(ve.NewFiles) == 0 {
1673 1 : info.Err = errEmptyTable
1674 1 : }
1675 :
1676 : // The flush succeeded or it produced an empty sstable. In either case we
1677 : // want to bump the minimum unflushed log number to the log number of the
1678 : // oldest unflushed memtable.
1679 1 : ve.MinUnflushedLogNum = minUnflushedLogNum
1680 1 : if c.kind != compactionKindIngestedFlushable {
1681 1 : metrics := c.metrics[0]
1682 1 : if d.opts.DisableWAL {
1683 1 : // If the WAL is disabled, every flushable has a zero [logSize],
1684 1 : // resulting in zero bytes in. Instead, use the number of bytes we
1685 1 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
1686 1 : // calculation even when the WAL is disabled.
1687 1 : metrics.BytesIn = metrics.BytesFlushed
1688 1 : } else {
1689 1 : metrics := c.metrics[0]
1690 1 : for i := 0; i < n; i++ {
1691 1 : metrics.BytesIn += d.mu.mem.queue[i].logSize
1692 1 : }
1693 : }
1694 1 : } else {
1695 1 : // c.kind == compactionKindIngestedFlushable && we could have deleted files due
1696 1 : // to ingest-time splits or excises.
1697 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1698 1 : for c2 := range d.mu.compact.inProgress {
1699 1 : // Check if this compaction overlaps with the excise span. Note that just
1700 1 : // checking if the inputs individually overlap with the excise span
1701 1 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
1702 1 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
1703 1 : // doing a [c,d) excise at the same time as this compaction, we will have
1704 1 : // to error out the whole compaction as we can't guarantee it hasn't/won't
1705 1 : // write a file overlapping with the excise span.
1706 1 : if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
1707 1 : c2.cancel.Store(true)
1708 1 : continue
1709 : }
1710 : }
1711 :
1712 1 : if len(ve.DeletedFiles) > 0 {
1713 1 : // Iterate through all other compactions, and check if their inputs have
1714 1 : // been replaced due to an ingest-time split or excise. In that case,
1715 1 : // cancel the compaction.
1716 1 : for c2 := range d.mu.compact.inProgress {
1717 1 : for i := range c2.inputs {
1718 1 : iter := c2.inputs[i].files.Iter()
1719 1 : for f := iter.First(); f != nil; f = iter.Next() {
1720 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
1721 1 : c2.cancel.Store(true)
1722 1 : break
1723 : }
1724 : }
1725 : }
1726 : }
1727 : }
1728 : }
1729 1 : err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
1730 1 : func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
1731 1 : if err != nil {
1732 0 : info.Err = err
1733 0 : // TODO(peter): untested.
1734 0 : for _, f := range pendingOutputs {
1735 0 : // Note that the FileBacking for the file metadata might not have
1736 0 : // been set yet. So, we directly use the FileNum. Since these
1737 0 : // files were generated as compaction outputs, these must be
1738 0 : // physical files on disk. This property might not hold once
1739 0 : // https://github.com/cockroachdb/pebble/issues/389 is
1740 0 : // implemented if #389 creates virtual sstables as output files.
1741 0 : d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
1742 0 : fileInfo: fileInfo{
1743 0 : FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
1744 0 : FileSize: f.meta.Size,
1745 0 : },
1746 0 : isLocal: f.isLocal,
1747 0 : })
1748 0 : }
1749 0 : d.mu.versions.updateObsoleteTableMetricsLocked()
1750 : }
1751 0 : } else {
1752 0 : // We won't be performing the logAndApply step because of the error,
1753 0 : // so logUnlock.
1754 0 : d.mu.versions.logUnlock()
1755 0 : }
1756 :
1757 : // If err != nil, then the flush will be retried, and we will recalculate
1758 : // these metrics.
1759 1 : if err == nil {
1760 1 : d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
1761 1 : d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
1762 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
1763 1 : d.maybeUpdateDeleteCompactionHints(c)
1764 1 : }
1765 :
1766 1 : d.clearCompactingState(c, err != nil)
1767 1 : delete(d.mu.compact.inProgress, c)
1768 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
1769 1 :
1770 1 : var flushed flushableList
1771 1 : if err == nil {
1772 1 : flushed = d.mu.mem.queue[:n]
1773 1 : d.mu.mem.queue = d.mu.mem.queue[n:]
1774 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1775 1 : d.updateTableStatsLocked(ve.NewFiles)
1776 1 : if ingest {
1777 1 : d.mu.versions.metrics.Flush.AsIngestCount++
1778 1 : for _, l := range c.metrics {
1779 1 : d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
1780 1 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1781 1 : }
1782 : }
1783 1 : d.maybeTransitionSnapshotsToFileOnlyLocked()
1784 :
1785 : }
1786 : // Signal FlushEnd after installing the new readState. This helps for unit
1787 : // tests that use the callback to trigger a read using an iterator with
1788 : // IterOptions.OnlyReadGuaranteedDurable.
1789 1 : info.TotalDuration = d.timeNow().Sub(startTime)
1790 1 : d.opts.EventListener.FlushEnd(info)
1791 1 :
1792 1 : // The order of these operations matters here for ease of testing.
1793 1 : // Removing the reader reference first allows tests to be guaranteed that
1794 1 : // the memtable reservation has been released by the time a synchronous
1795 1 : // flush returns. readerUnrefLocked may also produce obsolete files so the
1796 1 : // call to deleteObsoleteFiles must happen after it.
1797 1 : for i := range flushed {
1798 1 : flushed[i].readerUnrefLocked(true)
1799 1 : }
1800 :
1801 1 : d.deleteObsoleteFiles(jobID)
1802 1 :
1803 1 : // Mark all the memtables we flushed as flushed.
1804 1 : for i := range flushed {
1805 1 : close(flushed[i].flushed)
1806 1 : }
1807 :
1808 1 : return inputBytes, err
1809 : }
1810 :
1811 : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
1812 : // file-only" snapshots to be file-only if all their visible state has been
1813 : // flushed to sstables.
1814 : //
1815 : // REQUIRES: d.mu.
1816 1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
1817 1 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
1818 1 : currentVersion := d.mu.versions.currentVersion()
1819 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
1820 1 : if s.efos == nil {
1821 1 : s = s.next
1822 1 : continue
1823 : }
1824 1 : overlapsFlushable := false
1825 1 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
1826 1 : // There are some unflushed keys that are still visible to the EFOS.
1827 1 : // Check if any memtables older than the EFOS contain keys within a
1828 1 : // protected range of the EFOS. If no, we can transition.
1829 1 : protectedRanges := make([]bounded, len(s.efos.protectedRanges))
1830 1 : for i := range s.efos.protectedRanges {
1831 1 : protectedRanges[i] = s.efos.protectedRanges[i]
1832 1 : }
1833 1 : for i := range d.mu.mem.queue {
1834 1 : if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
1835 1 : // All keys in this memtable are newer than the EFOS. Skip this
1836 1 : // memtable.
1837 1 : continue
1838 : }
1839 : // NB: computePossibleOverlaps could have false positives, such as if
1840 : // the flushable is a flushable ingest and not a memtable. In that
1841 : // case we don't open the sstables to check; we just pessimistically
1842 : // assume an overlap.
1843 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
1844 1 : overlapsFlushable = true
1845 1 : return stopIteration
1846 1 : }, protectedRanges...)
1847 1 : if overlapsFlushable {
1848 1 : break
1849 : }
1850 : }
1851 : }
1852 1 : if overlapsFlushable {
1853 1 : s = s.next
1854 1 : continue
1855 : }
1856 1 : currentVersion.Ref()
1857 1 :
1858 1 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
1859 1 : // case s.next would be nil. Save it before calling it.
1860 1 : next := s.next
1861 1 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
1862 1 : s = next
1863 : }
1864 : }
1865 :
1866 : // maybeScheduleCompactionAsync should be used when
1867 : // we want to possibly schedule a compaction, but don't
1868 : // want to eat the cost of running maybeScheduleCompaction.
1869 : // This method should be launched in a separate goroutine.
1870 : // d.mu must not be held when this is called.
1871 0 : func (d *DB) maybeScheduleCompactionAsync() {
1872 0 : defer d.compactionSchedulers.Done()
1873 0 :
1874 0 : d.mu.Lock()
1875 0 : d.maybeScheduleCompaction()
1876 0 : d.mu.Unlock()
1877 0 : }
1878 :
1879 : // maybeScheduleCompaction schedules a compaction if necessary.
1880 : //
1881 : // d.mu must be held when calling this.
1882 1 : func (d *DB) maybeScheduleCompaction() {
1883 1 : d.maybeScheduleCompactionPicker(pickAuto)
1884 1 : }
1885 :
1886 1 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
1887 1 : return picker.pickAuto(env)
1888 1 : }
1889 :
1890 1 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
1891 1 : return picker.pickElisionOnlyCompaction(env)
1892 1 : }
1893 :
1894 : // tryScheduleDownloadCompaction tries to start a download compaction.
1895 : //
1896 : // Returns true if we started a download compaction (or completed it
1897 : // immediately because it is a no-op or we hit an error).
1898 : //
1899 : // Requires d.mu to be held. Updates d.mu.compact.downloads.
1900 1 : func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownloads int) bool {
1901 1 : vers := d.mu.versions.currentVersion()
1902 1 : for i := 0; i < len(d.mu.compact.downloads); {
1903 1 : download := d.mu.compact.downloads[i]
1904 1 : switch d.tryLaunchDownloadCompaction(download, vers, env, maxConcurrentDownloads) {
1905 1 : case launchedCompaction:
1906 1 : return true
1907 1 : case didNotLaunchCompaction:
1908 1 : // See if we can launch a compaction for another download task.
1909 1 : i++
1910 1 : case downloadTaskCompleted:
1911 1 : // Task is completed and must be removed.
1912 1 : d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
1913 : }
1914 : }
1915 1 : return false
1916 : }
1917 :
1918 : // maybeScheduleCompactionPicker schedules a compaction if necessary,
1919 : // calling `pickFunc` to pick automatic compactions.
1920 : //
1921 : // Requires d.mu to be held.
1922 : func (d *DB) maybeScheduleCompactionPicker(
1923 : pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
1924 1 : ) {
1925 1 : if d.closed.Load() != nil || d.opts.ReadOnly {
1926 1 : return
1927 1 : }
1928 1 : maxCompactions := d.opts.MaxConcurrentCompactions()
1929 1 : maxDownloads := d.opts.MaxConcurrentDownloads()
1930 1 :
1931 1 : if d.mu.compact.compactingCount >= maxCompactions &&
1932 1 : (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) {
1933 1 : if len(d.mu.compact.manual) > 0 {
1934 1 : // Inability to run head blocks later manual compactions.
1935 1 : d.mu.compact.manual[0].retries++
1936 1 : }
1937 1 : return
1938 : }
1939 :
1940 : // Compaction picking needs a coherent view of a Version. In particular, we
1941 : // need to exclude concurrent ingestions from making a decision on which level
1942 : // to ingest into that conflicts with our compaction
1943 : // decision. versionSet.logLock provides the necessary mutual exclusion.
1944 1 : d.mu.versions.logLock()
1945 1 : defer d.mu.versions.logUnlock()
1946 1 :
1947 1 : // Check for the closed flag again, in case the DB was closed while we were
1948 1 : // waiting for logLock().
1949 1 : if d.closed.Load() != nil {
1950 1 : return
1951 1 : }
1952 :
1953 1 : env := compactionEnv{
1954 1 : diskAvailBytes: d.diskAvailBytes.Load(),
1955 1 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
1956 1 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
1957 1 : }
1958 1 :
1959 1 : if d.mu.compact.compactingCount < maxCompactions {
1960 1 : // Check for delete-only compactions first, because they're expected to be
1961 1 : // cheap and reduce future compaction work.
1962 1 : if !d.opts.private.disableDeleteOnlyCompactions &&
1963 1 : !d.opts.DisableAutomaticCompactions &&
1964 1 : len(d.mu.compact.deletionHints) > 0 {
1965 1 : d.tryScheduleDeleteOnlyCompaction()
1966 1 : }
1967 :
1968 1 : for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxCompactions {
1969 1 : if manual := d.mu.compact.manual[0]; !d.tryScheduleManualCompaction(env, manual) {
1970 1 : // Inability to run head blocks later manual compactions.
1971 1 : manual.retries++
1972 1 : break
1973 : }
1974 1 : d.mu.compact.manual = d.mu.compact.manual[1:]
1975 : }
1976 :
1977 1 : for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions &&
1978 1 : d.tryScheduleAutoCompaction(env, pickFunc) {
1979 1 : }
1980 : }
1981 :
1982 1 : for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads &&
1983 1 : d.tryScheduleDownloadCompaction(env, maxDownloads) {
1984 1 : }
1985 : }
1986 :
1987 : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
1988 : // for all files that can be deleted as suggested by deletionHints.
1989 : //
1990 : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
1991 1 : func (d *DB) tryScheduleDeleteOnlyCompaction() {
1992 1 : v := d.mu.versions.currentVersion()
1993 1 : snapshots := d.mu.snapshots.toSlice()
1994 1 : inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
1995 1 : d.mu.compact.deletionHints = unresolvedHints
1996 1 :
1997 1 : if len(inputs) > 0 {
1998 1 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
1999 1 : d.mu.compact.compactingCount++
2000 1 : d.addInProgressCompaction(c)
2001 1 : go d.compact(c, nil)
2002 1 : }
2003 : }
2004 :
2005 : // tryScheduleManualCompaction tries to kick off the given manual compaction.
2006 : //
2007 : // Returns false if we are not able to run this compaction at this time.
2008 : //
2009 : // Requires d.mu to be held.
2010 1 : func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompaction) bool {
2011 1 : v := d.mu.versions.currentVersion()
2012 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2013 1 : pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
2014 1 : if pc == nil {
2015 1 : if !retryLater {
2016 1 : // Manual compaction is a no-op. Signal completion and exit.
2017 1 : manual.done <- nil
2018 1 : return true
2019 1 : }
2020 : // We are not able to run this manual compaction at this time.
2021 1 : return false
2022 : }
2023 :
2024 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
2025 1 : d.mu.compact.compactingCount++
2026 1 : d.addInProgressCompaction(c)
2027 1 : go d.compact(c, manual.done)
2028 1 : return true
2029 : }
2030 :
2031 : // tryScheduleAutoCompaction tries to kick off an automatic compaction.
2032 : //
2033 : // Returns false if no automatic compactions are necessary or able to run at
2034 : // this time.
2035 : //
2036 : // Requires d.mu to be held.
2037 : func (d *DB) tryScheduleAutoCompaction(
2038 : env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
2039 1 : ) bool {
2040 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2041 1 : env.readCompactionEnv = readCompactionEnv{
2042 1 : readCompactions: &d.mu.compact.readCompactions,
2043 1 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
2044 1 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
2045 1 : }
2046 1 : pc := pickFunc(d.mu.versions.picker, env)
2047 1 : if pc == nil {
2048 1 : return false
2049 1 : }
2050 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
2051 1 : d.mu.compact.compactingCount++
2052 1 : d.addInProgressCompaction(c)
2053 1 : go d.compact(c, nil)
2054 1 : return true
2055 : }
2056 :
2057 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
2058 : // generated from a span containing a range del (point key only), a range key
2059 : // delete (range key only), or both a point and range key.
2060 : type deleteCompactionHintType uint8
2061 :
2062 : const (
2063 : // NOTE: While these are primarily used as enumeration types, they are also
2064 : // used for some bitwise operations. Care should be taken when updating.
2065 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
2066 : deleteCompactionHintTypePointKeyOnly
2067 : deleteCompactionHintTypeRangeKeyOnly
2068 : deleteCompactionHintTypePointAndRangeKey
2069 : )
2070 :
2071 : // String implements fmt.Stringer.
2072 0 : func (h deleteCompactionHintType) String() string {
2073 0 : switch h {
2074 0 : case deleteCompactionHintTypeUnknown:
2075 0 : return "unknown"
2076 0 : case deleteCompactionHintTypePointKeyOnly:
2077 0 : return "point-key-only"
2078 0 : case deleteCompactionHintTypeRangeKeyOnly:
2079 0 : return "range-key-only"
2080 0 : case deleteCompactionHintTypePointAndRangeKey:
2081 0 : return "point-and-range-key"
2082 0 : default:
2083 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
2084 : }
2085 : }
2086 :
2087 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
2088 : // keyspan.Keys.
2089 1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
2090 1 : var hintType deleteCompactionHintType
2091 1 : for _, k := range keys {
2092 1 : switch k.Kind() {
2093 1 : case base.InternalKeyKindRangeDelete:
2094 1 : hintType |= deleteCompactionHintTypePointKeyOnly
2095 1 : case base.InternalKeyKindRangeKeyDelete:
2096 1 : hintType |= deleteCompactionHintTypeRangeKeyOnly
2097 0 : default:
2098 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
2099 : }
2100 : }
2101 1 : return hintType
2102 : }
2103 :
2104 : // A deleteCompactionHint records a user key and sequence number span that has been
2105 : // deleted by a range tombstone. A hint is recorded if at least one sstable
2106 : // falls completely within both the user key and sequence number spans.
2107 : // Once the tombstones and the observed completely-contained sstables fall
2108 : // into the same snapshot stripe, a delete-only compaction may delete any
2109 : // sstables within the range.
2110 : type deleteCompactionHint struct {
2111 : // The type of key span that generated this hint (point key, range key, or
2112 : // both).
2113 : hintType deleteCompactionHintType
2114 : // start and end are user keys specifying a key range [start, end) of
2115 : // deleted keys.
2116 : start []byte
2117 : end []byte
2118 : // The level of the file containing the range tombstone(s) when the hint
2119 : // was created. Only lower levels need to be searched for files that may
2120 : // be deleted.
2121 : tombstoneLevel int
2122 : // The file containing the range tombstone(s) that created the hint.
2123 : tombstoneFile *fileMetadata
2124 : // The smallest and largest sequence numbers of the abutting tombstones
2125 : // merged to form this hint. All of a tables' keys must be less than the
2126 : // tombstone smallest sequence number to be deleted. All of a tables'
2127 : // sequence numbers must fall into the same snapshot stripe as the
2128 : // tombstone largest sequence number to be deleted.
2129 : tombstoneLargestSeqNum uint64
2130 : tombstoneSmallestSeqNum uint64
2131 : // The smallest sequence number of a sstable that was found to be covered
2132 : // by this hint. The hint cannot be resolved until this sequence number is
2133 : // in the same snapshot stripe as the largest tombstone sequence number.
2134 : // This is set when a hint is created, so the LSM may look different and
2135 : // notably no longer contain the sstable that contained the key at this
2136 : // sequence number.
2137 : fileSmallestSeqNum uint64
2138 : }
2139 :
2140 0 : func (h deleteCompactionHint) String() string {
2141 0 : return fmt.Sprintf(
2142 0 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
2143 0 : h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
2144 0 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
2145 0 : h.hintType,
2146 0 : )
2147 0 : }
2148 :
2149 1 : func (h *deleteCompactionHint) canDelete(cmp Compare, m *fileMetadata, snapshots []uint64) bool {
2150 1 : // The file can only be deleted if all of its keys are older than the
2151 1 : // earliest tombstone aggregated into the hint.
2152 1 : if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
2153 1 : return false
2154 1 : }
2155 :
2156 : // The file's oldest key must be in the same snapshot stripe as the
2157 : // newest tombstone. NB: We already checked the hint's sequence numbers,
2158 : // but this file's oldest sequence number might be lower than the hint's
2159 : // smallest sequence number despite the file falling within the key range
2160 : // if this file was constructed after the hint by a compaction.
2161 1 : ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
2162 1 : fi, _ := snapshotIndex(m.SmallestSeqNum, snapshots)
2163 1 : if ti != fi {
2164 0 : return false
2165 0 : }
2166 :
2167 1 : switch h.hintType {
2168 1 : case deleteCompactionHintTypePointKeyOnly:
2169 1 : // A hint generated by a range del span cannot delete tables that contain
2170 1 : // range keys.
2171 1 : if m.HasRangeKeys {
2172 1 : return false
2173 1 : }
2174 1 : case deleteCompactionHintTypeRangeKeyOnly:
2175 1 : // A hint generated by a range key del span cannot delete tables that
2176 1 : // contain point keys.
2177 1 : if m.HasPointKeys {
2178 1 : return false
2179 1 : }
2180 1 : case deleteCompactionHintTypePointAndRangeKey:
2181 : // A hint from a span that contains both range dels *and* range keys can
2182 : // only be deleted if both bounds fall within the hint. The next check takes
2183 : // care of this.
2184 0 : default:
2185 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
2186 : }
2187 :
2188 : // The file's keys must be completely contained within the hint range.
2189 1 : return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
2190 : }
2191 :
2192 1 : func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
2193 1 : // Compactions that zero sequence numbers can interfere with compaction
2194 1 : // deletion hints. Deletion hints apply to tables containing keys older
2195 1 : // than a threshold. If a key more recent than the threshold is zeroed in
2196 1 : // a compaction, a delete-only compaction may mistake it as meeting the
2197 1 : // threshold and drop a table containing live data.
2198 1 : //
2199 1 : // To avoid this scenario, compactions that zero sequence numbers remove
2200 1 : // any conflicting deletion hints. A deletion hint is conflicting if both
2201 1 : // of the following conditions apply:
2202 1 : // * its key space overlaps with the compaction
2203 1 : // * at least one of its inputs contains a key as recent as one of the
2204 1 : // hint's tombstones.
2205 1 : //
2206 1 : if !c.allowedZeroSeqNum {
2207 1 : return
2208 1 : }
2209 :
2210 1 : updatedHints := d.mu.compact.deletionHints[:0]
2211 1 : for _, h := range d.mu.compact.deletionHints {
2212 1 : // If the compaction's key space is disjoint from the hint's key
2213 1 : // space, the zeroing of sequence numbers won't affect the hint. Keep
2214 1 : // the hint.
2215 1 : keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
2216 1 : if keysDisjoint {
2217 1 : updatedHints = append(updatedHints, h)
2218 1 : continue
2219 : }
2220 :
2221 : // All of the compaction's inputs must be older than the hint's
2222 : // tombstones.
2223 1 : inputsOlder := true
2224 1 : for _, in := range c.inputs {
2225 1 : iter := in.files.Iter()
2226 1 : for f := iter.First(); f != nil; f = iter.Next() {
2227 1 : inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
2228 1 : }
2229 : }
2230 1 : if inputsOlder {
2231 1 : updatedHints = append(updatedHints, h)
2232 1 : continue
2233 : }
2234 :
2235 : // Drop h, because the compaction c may have zeroed sequence numbers
2236 : // of keys more recent than some of h's tombstones.
2237 : }
2238 1 : d.mu.compact.deletionHints = updatedHints
2239 : }
2240 :
2241 : func checkDeleteCompactionHints(
2242 : cmp Compare, v *version, hints []deleteCompactionHint, snapshots []uint64,
2243 1 : ) ([]compactionLevel, []deleteCompactionHint) {
2244 1 : var files map[*fileMetadata]bool
2245 1 : var byLevel [numLevels][]*fileMetadata
2246 1 :
2247 1 : unresolvedHints := hints[:0]
2248 1 : for _, h := range hints {
2249 1 : // Check each compaction hint to see if it's resolvable. Resolvable
2250 1 : // hints are removed and trigger a delete-only compaction if any files
2251 1 : // in the current LSM still meet their criteria. Unresolvable hints
2252 1 : // are saved and don't trigger a delete-only compaction.
2253 1 : //
2254 1 : // When a compaction hint is created, the sequence numbers of the
2255 1 : // range tombstones and the covered file with the oldest key are
2256 1 : // recorded. The largest tombstone sequence number and the smallest
2257 1 : // file sequence number must be in the same snapshot stripe for the
2258 1 : // hint to be resolved. The below graphic models a compaction hint
2259 1 : // covering the keyspace [b, r). The hint completely contains two
2260 1 : // files, 000002 and 000003. The file 000003 contains the lowest
2261 1 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
2262 1 : // the highest tombstone sequence number incorporated into the hint.
2263 1 : // The hint may be resolved only once the snapshots at #100, #180 and
2264 1 : // #210 are all closed. File 000001 is not included within the hint
2265 1 : // because it extends beyond the range tombstones in user key space.
2266 1 : //
2267 1 : // 250
2268 1 : //
2269 1 : // |-b...230:h-|
2270 1 : // _____________________________________________________ snapshot #210
2271 1 : // 200 |--h.RANGEDEL.200:r--|
2272 1 : //
2273 1 : // _____________________________________________________ snapshot #180
2274 1 : //
2275 1 : // 150 +--------+
2276 1 : // +---------+ | 000003 |
2277 1 : // | 000002 | | |
2278 1 : // +_________+ | |
2279 1 : // 100_____________________|________|___________________ snapshot #100
2280 1 : // +--------+
2281 1 : // _____________________________________________________ snapshot #70
2282 1 : // +---------------+
2283 1 : // 50 | 000001 |
2284 1 : // | |
2285 1 : // +---------------+
2286 1 : // ______________________________________________________________
2287 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2288 1 :
2289 1 : ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
2290 1 : fi, _ := snapshotIndex(h.fileSmallestSeqNum, snapshots)
2291 1 : if ti != fi {
2292 1 : // Cannot resolve yet.
2293 1 : unresolvedHints = append(unresolvedHints, h)
2294 1 : continue
2295 : }
2296 :
2297 : // The hint h will be resolved and dropped, regardless of whether
2298 : // there are any tables that can be deleted.
2299 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2300 1 : overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
2301 1 : iter := overlaps.Iter()
2302 1 : for m := iter.First(); m != nil; m = iter.Next() {
2303 1 : if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
2304 1 : continue
2305 : }
2306 1 : if files == nil {
2307 1 : // Construct files lazily, assuming most calls will not
2308 1 : // produce delete-only compactions.
2309 1 : files = make(map[*fileMetadata]bool)
2310 1 : }
2311 1 : files[m] = true
2312 1 : byLevel[l] = append(byLevel[l], m)
2313 : }
2314 : }
2315 : }
2316 :
2317 1 : var compactLevels []compactionLevel
2318 1 : for l, files := range byLevel {
2319 1 : if len(files) == 0 {
2320 1 : continue
2321 : }
2322 1 : compactLevels = append(compactLevels, compactionLevel{
2323 1 : level: l,
2324 1 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2325 1 : })
2326 : }
2327 1 : return compactLevels, unresolvedHints
2328 : }
2329 :
2330 : // compact runs one compaction and maybe schedules another call to compact.
2331 1 : func (d *DB) compact(c *compaction, errChannel chan error) {
2332 1 : pprof.Do(context.Background(), compactLabels, func(context.Context) {
2333 1 : d.mu.Lock()
2334 1 : defer d.mu.Unlock()
2335 1 : if err := d.compact1(c, errChannel); err != nil {
2336 1 : // TODO(peter): count consecutive compaction errors and backoff.
2337 1 : d.opts.EventListener.BackgroundError(err)
2338 1 : }
2339 1 : if c.isDownload {
2340 1 : d.mu.compact.downloadingCount--
2341 1 : } else {
2342 1 : d.mu.compact.compactingCount--
2343 1 : }
2344 1 : delete(d.mu.compact.inProgress, c)
2345 1 : // Add this compaction's duration to the cumulative duration. NB: This
2346 1 : // must be atomic with the above removal of c from
2347 1 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2348 1 : // miss or double count a completing compaction's duration.
2349 1 : d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2350 1 :
2351 1 : // The previous compaction may have produced too many files in a
2352 1 : // level, so reschedule another compaction if needed.
2353 1 : d.maybeScheduleCompaction()
2354 1 : d.mu.compact.cond.Broadcast()
2355 : })
2356 : }
2357 :
2358 : // compact1 runs one compaction.
2359 : //
2360 : // d.mu must be held when calling this, but the mutex may be dropped and
2361 : // re-acquired during the course of this method.
2362 1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2363 1 : if errChannel != nil {
2364 1 : defer func() {
2365 1 : errChannel <- err
2366 1 : }()
2367 : }
2368 :
2369 1 : jobID := d.newJobIDLocked()
2370 1 : info := c.makeInfo(jobID)
2371 1 : d.opts.EventListener.CompactionBegin(info)
2372 1 : startTime := d.timeNow()
2373 1 :
2374 1 : ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
2375 1 :
2376 1 : info.Duration = d.timeNow().Sub(startTime)
2377 1 : if err == nil {
2378 1 : err = func() error {
2379 1 : var err error
2380 1 : d.mu.versions.logLock()
2381 1 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2382 1 : // that necessitates it restarting from scratch. Note that since we hold
2383 1 : // the manifest lock, we don't expect this bool to change its value
2384 1 : // as only the holder of the manifest lock will ever write to it.
2385 1 : if c.cancel.Load() {
2386 1 : err = firstError(err, ErrCancelledCompaction)
2387 1 : }
2388 1 : if err != nil {
2389 1 : // logAndApply calls logUnlock. If we didn't call it, we need to call
2390 1 : // logUnlock ourselves.
2391 1 : d.mu.versions.logUnlock()
2392 1 : return err
2393 1 : }
2394 1 : return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
2395 1 : return d.getInProgressCompactionInfoLocked(c)
2396 1 : })
2397 : }()
2398 1 : if err != nil {
2399 1 : // TODO(peter): untested.
2400 1 : for _, f := range pendingOutputs {
2401 1 : // Note that the FileBacking for the file metadata might not have
2402 1 : // been set yet. So, we directly use the FileNum. Since these
2403 1 : // files were generated as compaction outputs, these must be
2404 1 : // physical files on disk. This property might not hold once
2405 1 : // https://github.com/cockroachdb/pebble/issues/389 is
2406 1 : // implemented if #389 creates virtual sstables as output files.
2407 1 : d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
2408 1 : fileInfo: fileInfo{
2409 1 : FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
2410 1 : FileSize: f.meta.Size,
2411 1 : },
2412 1 : isLocal: f.isLocal,
2413 1 : })
2414 1 : }
2415 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
2416 : }
2417 : }
2418 :
2419 1 : info.Done = true
2420 1 : info.Err = err
2421 1 : if err == nil {
2422 1 : for i := range ve.NewFiles {
2423 1 : e := &ve.NewFiles[i]
2424 1 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2425 1 : }
2426 1 : d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
2427 1 : d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
2428 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
2429 1 : d.maybeUpdateDeleteCompactionHints(c)
2430 : }
2431 :
2432 : // NB: clearing compacting state must occur before updating the read state;
2433 : // L0Sublevels initialization depends on it.
2434 1 : d.clearCompactingState(c, err != nil)
2435 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
2436 1 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
2437 1 :
2438 1 : info.TotalDuration = d.timeNow().Sub(c.beganAt)
2439 1 : d.opts.EventListener.CompactionEnd(info)
2440 1 :
2441 1 : // Update the read state before deleting obsolete files because the
2442 1 : // read-state update will cause the previous version to be unref'd and if
2443 1 : // there are no references obsolete tables will be added to the obsolete
2444 1 : // table list.
2445 1 : if err == nil {
2446 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2447 1 : d.updateTableStatsLocked(ve.NewFiles)
2448 1 : }
2449 1 : d.deleteObsoleteFiles(jobID)
2450 1 :
2451 1 : return err
2452 : }
2453 :
2454 : type compactStats struct {
2455 : cumulativePinnedKeys uint64
2456 : cumulativePinnedSize uint64
2457 : countMissizedDels uint64
2458 : }
2459 :
2460 : // runCopyCompaction runs a copy compaction where a new FileNum is created that
2461 : // is a byte-for-byte copy of the input file or span thereof in some cases. This
2462 : // is used in lieu of a move compaction when a file is being moved across the
2463 : // local/remote storage boundary. It could also be used in lieu of a rewrite
2464 : // compaction as part of a Download() call, which allows copying only a span of
2465 : // the external file, provided the file does not contain range keys or value
2466 : // blocks (see sstable.CopySpan).
2467 : //
2468 : // d.mu must be held when calling this method. The mutex will be released when
2469 : // doing IO.
2470 : func (d *DB) runCopyCompaction(
2471 : jobID JobID,
2472 : c *compaction,
2473 : inputMeta *fileMetadata,
2474 : objMeta objstorage.ObjectMetadata,
2475 : versionEdit *versionEdit,
2476 1 : ) (ve *versionEdit, pendingOutputs []compactionOutput, retErr error) {
2477 1 : ctx := context.TODO()
2478 1 :
2479 1 : ve = versionEdit
2480 1 : if !objMeta.IsExternal() {
2481 1 : if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
2482 0 : panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
2483 : }
2484 : // Note that based on logic in the compaction picker, we're guaranteed
2485 : // inputMeta.Virtual is false.
2486 1 : if inputMeta.Virtual {
2487 0 : panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
2488 : }
2489 : }
2490 :
2491 : // We are in the relatively more complex case where we need to copy this
2492 : // file to remote storage. Drop the db mutex while we do the copy
2493 : //
2494 : // To ease up cleanup of the local file and tracking of refs, we create
2495 : // a new FileNum. This has the potential of making the block cache less
2496 : // effective, however.
2497 1 : newMeta := &fileMetadata{
2498 1 : Size: inputMeta.Size,
2499 1 : CreationTime: inputMeta.CreationTime,
2500 1 : SmallestSeqNum: inputMeta.SmallestSeqNum,
2501 1 : LargestSeqNum: inputMeta.LargestSeqNum,
2502 1 : Stats: inputMeta.Stats,
2503 1 : Virtual: inputMeta.Virtual,
2504 1 : SyntheticPrefix: inputMeta.SyntheticPrefix,
2505 1 : SyntheticSuffix: inputMeta.SyntheticSuffix,
2506 1 : }
2507 1 : if inputMeta.HasPointKeys {
2508 1 : newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
2509 1 : }
2510 1 : if inputMeta.HasRangeKeys {
2511 1 : newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.SmallestRangeKey, inputMeta.LargestRangeKey)
2512 1 : }
2513 1 : newMeta.FileNum = d.mu.versions.getNextFileNum()
2514 1 : if objMeta.IsExternal() {
2515 1 : // external -> local/shared copy. File must be virtual.
2516 1 : // We will update this size later after we produce the new backing file.
2517 1 : newMeta.InitProviderBacking(base.DiskFileNum(newMeta.FileNum), inputMeta.FileBacking.Size)
2518 1 : } else {
2519 1 : // local -> shared copy. New file is guaranteed to not be virtual.
2520 1 : newMeta.InitPhysicalBacking()
2521 1 : }
2522 :
2523 1 : c.metrics = map[int]*LevelMetrics{
2524 1 : c.outputLevel.level: {
2525 1 : BytesIn: inputMeta.Size,
2526 1 : BytesCompacted: inputMeta.Size,
2527 1 : TablesCompacted: 1,
2528 1 : },
2529 1 : }
2530 1 :
2531 1 : // Before dropping the db mutex, grab a ref to the current version. This
2532 1 : // prevents any concurrent excises from deleting files that this compaction
2533 1 : // needs to read/maintain a reference to.
2534 1 : vers := d.mu.versions.currentVersion()
2535 1 : vers.Ref()
2536 1 : defer vers.UnrefLocked()
2537 1 :
2538 1 : // NB: The order here is reversed, lock after unlock. This is similar to
2539 1 : // runCompaction.
2540 1 : d.mu.Unlock()
2541 1 : defer d.mu.Lock()
2542 1 :
2543 1 : // If the src obj is external, we're doing an external to local/shared copy.
2544 1 : if objMeta.IsExternal() {
2545 1 : src, err := d.objProvider.OpenForReading(
2546 1 : ctx, fileTypeTable, inputMeta.FileBacking.DiskFileNum, objstorage.OpenOptions{},
2547 1 : )
2548 1 : if err != nil {
2549 0 : return ve, pendingOutputs, err
2550 0 : }
2551 1 : defer func() {
2552 1 : if src != nil {
2553 0 : src.Close()
2554 0 : }
2555 : }()
2556 :
2557 1 : w, outObjMeta, err := d.objProvider.Create(
2558 1 : ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum),
2559 1 : objstorage.CreateOptions{
2560 1 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
2561 1 : },
2562 1 : )
2563 1 : if err != nil {
2564 0 : return ve, pendingOutputs, err
2565 0 : }
2566 1 : pendingOutputs = append(pendingOutputs, compactionOutput{
2567 1 : meta: newMeta,
2568 1 : isLocal: !outObjMeta.IsRemote(),
2569 1 : })
2570 1 :
2571 1 : start, end := newMeta.Smallest, newMeta.Largest
2572 1 : if newMeta.SyntheticPrefix.IsSet() {
2573 1 : start.UserKey = newMeta.SyntheticPrefix.Invert(start.UserKey)
2574 1 : end.UserKey = newMeta.SyntheticPrefix.Invert(end.UserKey)
2575 1 : }
2576 1 : if newMeta.SyntheticSuffix.IsSet() {
2577 1 : // Extend the bounds as necessary so that the keys don't include suffixes.
2578 1 : start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
2579 1 : if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
2580 0 : end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
2581 0 : }
2582 : }
2583 :
2584 1 : wrote, err := sstable.CopySpan(ctx,
2585 1 : src, d.opts.MakeReaderOptions(),
2586 1 : w, d.opts.MakeWriterOptions(c.outputLevel.level, d.FormatMajorVersion().MaxTableFormat()),
2587 1 : start, end,
2588 1 : )
2589 1 : src = nil // We passed src to CopySpan; it's responsible for closing it.
2590 1 : if err != nil {
2591 0 : return ve, pendingOutputs, err
2592 0 : }
2593 1 : newMeta.FileBacking.Size = wrote
2594 1 : newMeta.Size = wrote
2595 1 : } else {
2596 1 : pendingOutputs = append(pendingOutputs, compactionOutput{
2597 1 : meta: newMeta.PhysicalMeta().FileMetadata,
2598 1 : isLocal: true,
2599 1 : })
2600 1 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2601 1 : d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
2602 1 : objstorage.CreateOptions{PreferSharedStorage: true})
2603 1 :
2604 1 : if err != nil {
2605 0 : return ve, pendingOutputs, err
2606 0 : }
2607 : }
2608 1 : ve.NewFiles[0].Meta = newMeta
2609 1 : if newMeta.Virtual {
2610 1 : ve.CreatedBackingTables = []*fileBacking{newMeta.FileBacking}
2611 1 : }
2612 :
2613 1 : if err := d.objProvider.Sync(); err != nil {
2614 0 : return nil, pendingOutputs, err
2615 0 : }
2616 1 : return ve, pendingOutputs, nil
2617 : }
2618 :
2619 : type compactionOutput struct {
2620 : meta *fileMetadata
2621 : isLocal bool
2622 : }
2623 :
2624 : // runCompactions runs a compaction that produces new on-disk tables from
2625 : // memtables or old on-disk tables.
2626 : //
2627 : // d.mu must be held when calling this, but the mutex may be dropped and
2628 : // re-acquired during the course of this method.
2629 : func (d *DB) runCompaction(
2630 : jobID JobID, c *compaction,
2631 1 : ) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
2632 1 : // As a sanity check, confirm that the smallest / largest keys for new and
2633 1 : // deleted files in the new versionEdit pass a validation function before
2634 1 : // returning the edit.
2635 1 : defer func() {
2636 1 : // If we're handling a panic, don't expect the version edit to validate.
2637 1 : if r := recover(); r != nil {
2638 0 : panic(r)
2639 1 : } else if ve != nil {
2640 1 : err := validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey)
2641 1 : if err != nil {
2642 0 : d.opts.Logger.Fatalf("pebble: version edit validation failed: %s", err)
2643 0 : }
2644 : }
2645 : }()
2646 :
2647 : // Check for a delete-only compaction. This can occur when wide range
2648 : // tombstones completely contain sstables.
2649 1 : if c.kind == compactionKindDeleteOnly {
2650 1 : c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
2651 1 : ve := &versionEdit{
2652 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2653 1 : }
2654 1 : for _, cl := range c.inputs {
2655 1 : levelMetrics := &LevelMetrics{}
2656 1 : iter := cl.files.Iter()
2657 1 : for f := iter.First(); f != nil; f = iter.Next() {
2658 1 : ve.DeletedFiles[deletedFileEntry{
2659 1 : Level: cl.level,
2660 1 : FileNum: f.FileNum,
2661 1 : }] = f
2662 1 : }
2663 1 : c.metrics[cl.level] = levelMetrics
2664 : }
2665 1 : return ve, nil, stats, nil
2666 : }
2667 :
2668 1 : if c.kind == compactionKindIngestedFlushable {
2669 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
2670 : }
2671 :
2672 : // Check for a move or copy of one table from one level to the next. We avoid
2673 : // such a move if there is lots of overlapping grandparent data. Otherwise,
2674 : // the move could create a parent file that will require a very expensive
2675 : // merge later on.
2676 1 : if c.kind == compactionKindMove || c.kind == compactionKindCopy {
2677 1 : iter := c.startLevel.files.Iter()
2678 1 : meta := iter.First()
2679 1 : if invariants.Enabled {
2680 1 : if iter.Next() != nil {
2681 0 : panic("got more than one file for a move or copy compaction")
2682 : }
2683 : }
2684 1 : if c.cancel.Load() {
2685 1 : return ve, nil, stats, ErrCancelledCompaction
2686 1 : }
2687 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
2688 1 : if err != nil {
2689 0 : return ve, pendingOutputs, stats, err
2690 0 : }
2691 1 : c.metrics = map[int]*LevelMetrics{
2692 1 : c.outputLevel.level: {
2693 1 : BytesMoved: meta.Size,
2694 1 : TablesMoved: 1,
2695 1 : },
2696 1 : }
2697 1 : ve := &versionEdit{
2698 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
2699 1 : {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
2700 1 : },
2701 1 : NewFiles: []newFileEntry{
2702 1 : {Level: c.outputLevel.level, Meta: meta},
2703 1 : },
2704 1 : }
2705 1 : if c.kind == compactionKindCopy {
2706 1 : ve, pendingOutputs, retErr = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
2707 1 : if retErr != nil {
2708 0 : return ve, pendingOutputs, stats, retErr
2709 0 : }
2710 : }
2711 1 : return ve, nil, stats, nil
2712 : }
2713 :
2714 1 : defer func() {
2715 1 : if retErr != nil {
2716 1 : pendingOutputs = nil
2717 1 : }
2718 : }()
2719 :
2720 1 : snapshots := d.mu.snapshots.toSlice()
2721 1 : formatVers := d.FormatMajorVersion()
2722 1 :
2723 1 : if c.flushing == nil {
2724 1 : // Before dropping the db mutex, grab a ref to the current version. This
2725 1 : // prevents any concurrent excises from deleting files that this compaction
2726 1 : // needs to read/maintain a reference to.
2727 1 : //
2728 1 : // Note that unlike user iterators, compactionIter does not maintain a ref
2729 1 : // of the version or read state.
2730 1 : vers := d.mu.versions.currentVersion()
2731 1 : vers.Ref()
2732 1 : defer vers.UnrefLocked()
2733 1 : }
2734 :
2735 1 : if c.cancel.Load() {
2736 1 : return ve, nil, stats, ErrCancelledCompaction
2737 1 : }
2738 :
2739 : // Release the d.mu lock while doing I/O.
2740 : // Note the unusual order: Unlock and then Lock.
2741 1 : d.mu.Unlock()
2742 1 : defer d.mu.Lock()
2743 1 :
2744 1 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
2745 1 : // block cache with blocks that will not be read again. We initialize the
2746 1 : // buffer pool with a size 12. This initial size does not need to be
2747 1 : // accurate, because the pool will grow to accommodate the maximum number of
2748 1 : // blocks allocated at a given time over the course of the compaction. But
2749 1 : // choosing a size larger than that working set avoids any additional
2750 1 : // allocations to grow the size of the pool over the course of iteration.
2751 1 : //
2752 1 : // Justification for initial size 12: In a two-level compaction, at any
2753 1 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
2754 1 : // Additionally, when decoding a compressed block, we'll temporarily
2755 1 : // allocate 1 additional block to hold the compressed buffer. In the worst
2756 1 : // case that all input sstables have two-level index blocks (+2), value
2757 1 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
2758 1 : // additionally require 2n+4 blocks where n is the number of input sstables.
2759 1 : // Range deletion and range key blocks are relatively rare, and the cost of
2760 1 : // an additional allocation or two over the course of the compaction is
2761 1 : // considered to be okay. A larger initial size would cause the pool to hold
2762 1 : // on to more memory, even when it's not in-use because the pool will
2763 1 : // recycle buffers up to the current capacity of the pool. The memory use of
2764 1 : // a 12-buffer pool is expected to be within reason, even if all the buffers
2765 1 : // grow to the typical size of an index block (256 KiB) which would
2766 1 : // translate to 3 MiB per compaction.
2767 1 : c.bufferPool.Init(12)
2768 1 : defer c.bufferPool.Release()
2769 1 :
2770 1 : iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
2771 1 : if err != nil {
2772 0 : return nil, pendingOutputs, stats, err
2773 0 : }
2774 1 : c.allowedZeroSeqNum = c.allowZeroSeqNum()
2775 1 : iiter = invalidating.MaybeWrapIfInvariants(iiter)
2776 1 : iter := newCompactionIter(c.cmp, c.equal, d.merge, iiter, snapshots,
2777 1 : c.allowedZeroSeqNum, c.elideTombstone,
2778 1 : c.elideRangeTombstone, d.opts.Experimental.IneffectualSingleDeleteCallback,
2779 1 : d.opts.Experimental.SingleDeleteInvariantViolationCallback,
2780 1 : d.FormatMajorVersion())
2781 1 :
2782 1 : var (
2783 1 : createdFiles []base.DiskFileNum
2784 1 : tw *sstable.Writer
2785 1 : pinnedKeySize uint64
2786 1 : pinnedValueSize uint64
2787 1 : pinnedCount uint64
2788 1 : )
2789 1 : defer func() {
2790 1 : if iter != nil {
2791 1 : retErr = firstError(retErr, iter.Close())
2792 1 : }
2793 1 : if tw != nil {
2794 0 : retErr = firstError(retErr, tw.Close())
2795 0 : }
2796 1 : if retErr != nil {
2797 1 : for _, fileNum := range createdFiles {
2798 1 : _ = d.objProvider.Remove(fileTypeTable, fileNum)
2799 1 : }
2800 : }
2801 1 : for _, closer := range c.closers {
2802 1 : retErr = firstError(retErr, closer.Close())
2803 1 : }
2804 : }()
2805 :
2806 1 : ve = &versionEdit{
2807 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2808 1 : }
2809 1 :
2810 1 : startLevelBytes := c.startLevel.files.SizeSum()
2811 1 : outputMetrics := &LevelMetrics{
2812 1 : BytesIn: startLevelBytes,
2813 1 : BytesRead: c.outputLevel.files.SizeSum(),
2814 1 : }
2815 1 : if len(c.extraLevels) > 0 {
2816 1 : outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
2817 1 : }
2818 1 : outputMetrics.BytesRead += outputMetrics.BytesIn
2819 1 :
2820 1 : c.metrics = map[int]*LevelMetrics{
2821 1 : c.outputLevel.level: outputMetrics,
2822 1 : }
2823 1 : if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
2824 1 : c.metrics[c.startLevel.level] = &LevelMetrics{}
2825 1 : }
2826 1 : if len(c.extraLevels) > 0 {
2827 1 : c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
2828 1 : outputMetrics.MultiLevel.BytesInTop = startLevelBytes
2829 1 : outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
2830 1 : outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
2831 1 : }
2832 :
2833 : // The table is typically written at the maximum allowable format implied by
2834 : // the current format major version of the DB.
2835 1 : tableFormat := formatVers.MaxTableFormat()
2836 1 :
2837 1 : // In format major versions with maximum table formats of Pebblev3, value
2838 1 : // blocks were conditional on an experimental setting. In format major
2839 1 : // versions with maximum table formats of Pebblev4 and higher, value blocks
2840 1 : // are always enabled.
2841 1 : if tableFormat == sstable.TableFormatPebblev3 &&
2842 1 : (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
2843 1 : tableFormat = sstable.TableFormatPebblev2
2844 1 : }
2845 :
2846 1 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
2847 1 :
2848 1 : // prevPointKey is a sstable.WriterOption that provides access to
2849 1 : // the last point key written to a writer's sstable. When a new
2850 1 : // output begins in newOutput, prevPointKey is updated to point to
2851 1 : // the new output's sstable.Writer. This allows the compaction loop
2852 1 : // to access the last written point key without requiring the
2853 1 : // compaction loop to make a copy of each key ahead of time. Users
2854 1 : // must be careful, because the byte slice returned by UnsafeKey
2855 1 : // points directly into the Writer's block buffer.
2856 1 : var prevPointKey sstable.PreviousPointKeyOpt
2857 1 : var cpuWorkHandle CPUWorkHandle
2858 1 : defer func() {
2859 1 : if cpuWorkHandle != nil {
2860 0 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
2861 0 : }
2862 : }()
2863 :
2864 1 : newOutput := func() error {
2865 1 : // Check if we've been cancelled by a concurrent operation.
2866 1 : if c.cancel.Load() {
2867 1 : return ErrCancelledCompaction
2868 1 : }
2869 1 : d.mu.Lock()
2870 1 : fileNum := d.mu.versions.getNextFileNum()
2871 1 : d.mu.Unlock()
2872 1 :
2873 1 : ctx := context.TODO()
2874 1 : if objiotracing.Enabled {
2875 0 : ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
2876 0 : switch c.kind {
2877 0 : case compactionKindFlush:
2878 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
2879 0 : case compactionKindIngestedFlushable:
2880 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForIngestion)
2881 0 : default:
2882 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
2883 : }
2884 : }
2885 1 : var writeCategory vfs.DiskWriteCategory
2886 1 : switch c.kind {
2887 1 : case compactionKindFlush:
2888 1 : if d.opts.EnableSQLRowSpillMetrics {
2889 0 : // In the scenario that the Pebble engine is used for SQL row spills the data written to
2890 0 : // the memtable will correspond to spills to disk and should be categorized as such.
2891 0 : writeCategory = "sql-row-spill"
2892 1 : } else {
2893 1 : writeCategory = "pebble-memtable-flush"
2894 1 : }
2895 0 : case compactionKindIngestedFlushable:
2896 0 : writeCategory = "pebble-ingestion"
2897 1 : default:
2898 1 : writeCategory = "pebble-compaction"
2899 : }
2900 : // Prefer shared storage if present.
2901 1 : createOpts := objstorage.CreateOptions{
2902 1 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
2903 1 : WriteCategory: writeCategory,
2904 1 : }
2905 1 : diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
2906 1 : writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
2907 1 : if err != nil {
2908 0 : return err
2909 0 : }
2910 1 : fileMeta := &fileMetadata{}
2911 1 : fileMeta.FileNum = fileNum
2912 1 : pendingOutputs = append(pendingOutputs, compactionOutput{
2913 1 : meta: fileMeta.PhysicalMeta().FileMetadata,
2914 1 : isLocal: !objMeta.IsRemote(),
2915 1 : })
2916 1 :
2917 1 : reason := "flushing"
2918 1 : if c.flushing == nil {
2919 1 : reason = "compacting"
2920 1 : }
2921 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
2922 1 : JobID: int(jobID),
2923 1 : Reason: reason,
2924 1 : Path: d.objProvider.Path(objMeta),
2925 1 : FileNum: diskFileNum,
2926 1 : })
2927 1 : if c.kind != compactionKindFlush {
2928 1 : writable = &compactionWritable{
2929 1 : Writable: writable,
2930 1 : versions: d.mu.versions,
2931 1 : written: &c.bytesWritten,
2932 1 : }
2933 1 : }
2934 1 : createdFiles = append(createdFiles, diskFileNum)
2935 1 : cacheOpts := private.SSTableCacheOpts(d.cacheID, diskFileNum).(sstable.WriterOption)
2936 1 :
2937 1 : const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
2938 1 : cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
2939 1 : MaxFileWriteAdditionalCPUTime,
2940 1 : )
2941 1 : writerOpts.Parallelism =
2942 1 : d.opts.Experimental.MaxWriterConcurrency > 0 &&
2943 1 : (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
2944 1 :
2945 1 : tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)
2946 1 :
2947 1 : fileMeta.CreationTime = time.Now().Unix()
2948 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{
2949 1 : Level: c.outputLevel.level,
2950 1 : Meta: fileMeta,
2951 1 : })
2952 1 : return nil
2953 : }
2954 :
2955 : // splitL0Outputs is true during flushes and intra-L0 compactions with flush
2956 : // splits enabled.
2957 1 : splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0
2958 1 :
2959 1 : // finishOutput is called with the a user key up to which all tombstones
2960 1 : // should be flushed. Typically, this is the first key of the next
2961 1 : // sstable or an empty key if this output is the final sstable.
2962 1 : finishOutput := func(splitKey []byte) error {
2963 1 : // If we haven't output any point records to the sstable (tw == nil) then the
2964 1 : // sstable will only contain range tombstones and/or range keys. The smallest
2965 1 : // key in the sstable will be the start key of the first range tombstone or
2966 1 : // range key added. We need to ensure that this start key is distinct from
2967 1 : // the splitKey passed to finishOutput (if set), otherwise we would generate
2968 1 : // an sstable where the largest key is smaller than the smallest key due to
2969 1 : // how the largest key boundary is set below. NB: It is permissible for the
2970 1 : // range tombstone / range key start key to be nil.
2971 1 : //
2972 1 : // TODO: It is unfortunate that we have to do this check here rather than
2973 1 : // when we decide to finish the sstable in the runCompaction loop. A better
2974 1 : // structure currently eludes us.
2975 1 : if tw == nil {
2976 1 : startKey := iter.FirstTombstoneStart()
2977 1 : if startKey == nil {
2978 1 : startKey = iter.FirstRangeKeyStart()
2979 1 : }
2980 1 : if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
2981 0 : return nil
2982 0 : }
2983 : }
2984 :
2985 1 : for _, v := range iter.TombstonesUpTo(splitKey) {
2986 1 : if tw == nil {
2987 1 : if err := newOutput(); err != nil {
2988 1 : return err
2989 1 : }
2990 : }
2991 : // The tombstone being added could be completely outside the
2992 : // eventual bounds of the sstable. Consider this example (bounds
2993 : // in square brackets next to table filename):
2994 : //
2995 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
2996 : // tmgc#391,MERGE [786e627a]
2997 : // tmgc-udkatvs#331,RANGEDEL
2998 : //
2999 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
3000 : // tmgc#384,MERGE [666c7070]
3001 : // tmgc-tvsalezade#383,RANGEDEL
3002 : // tmgc-tvsalezade#331,RANGEDEL
3003 : //
3004 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
3005 : // tmgc-tvsalezade#383,RANGEDEL
3006 : // tmgc#375,SET [72646c78766965616c72776865676e79]
3007 : // tmgc-tvsalezade#356,RANGEDEL
3008 : //
3009 : // Note that both of the top two SSTables have range tombstones
3010 : // that start after the file's end keys. Since the file bound
3011 : // computation happens well after all range tombstones have been
3012 : // added to the writer, eliding out-of-file range tombstones based
3013 : // on sequence number at this stage is difficult, and necessitates
3014 : // read-time logic to ignore range tombstones outside file bounds.
3015 1 : if err := rangedel.Encode(&v, tw.Add); err != nil {
3016 0 : return err
3017 0 : }
3018 : }
3019 1 : for _, v := range iter.RangeKeysUpTo(splitKey) {
3020 1 : // Same logic as for range tombstones, except added using tw.AddRangeKey.
3021 1 : if tw == nil {
3022 1 : if err := newOutput(); err != nil {
3023 1 : return err
3024 1 : }
3025 : }
3026 1 : if err := rangekey.Encode(&v, tw.AddRangeKey); err != nil {
3027 0 : return err
3028 0 : }
3029 : }
3030 :
3031 1 : if tw == nil {
3032 1 : return nil
3033 1 : }
3034 1 : {
3035 1 : // Set internal sstable properties.
3036 1 : p := getInternalWriterProperties(tw)
3037 1 : // Set the external sst version to 0. This is what RocksDB expects for
3038 1 : // db-internal sstables; otherwise, it could apply a global sequence number.
3039 1 : p.ExternalFormatVersion = 0
3040 1 : // Set the snapshot pinned totals.
3041 1 : p.SnapshotPinnedKeys = pinnedCount
3042 1 : p.SnapshotPinnedKeySize = pinnedKeySize
3043 1 : p.SnapshotPinnedValueSize = pinnedValueSize
3044 1 : stats.cumulativePinnedKeys += pinnedCount
3045 1 : stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
3046 1 : pinnedCount = 0
3047 1 : pinnedKeySize = 0
3048 1 : pinnedValueSize = 0
3049 1 : }
3050 1 : if err := tw.Close(); err != nil {
3051 0 : tw = nil
3052 0 : return err
3053 0 : }
3054 1 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
3055 1 : cpuWorkHandle = nil
3056 1 : writerMeta, err := tw.Metadata()
3057 1 : if err != nil {
3058 0 : tw = nil
3059 0 : return err
3060 0 : }
3061 1 : tw = nil
3062 1 : meta := ve.NewFiles[len(ve.NewFiles)-1].Meta
3063 1 : meta.Size = writerMeta.Size
3064 1 : meta.SmallestSeqNum = writerMeta.SmallestSeqNum
3065 1 : meta.LargestSeqNum = writerMeta.LargestSeqNum
3066 1 : meta.InitPhysicalBacking()
3067 1 :
3068 1 : // If the file didn't contain any range deletions, we can fill its
3069 1 : // table stats now, avoiding unnecessarily loading the table later.
3070 1 : maybeSetStatsFromProperties(
3071 1 : meta.PhysicalMeta(), &writerMeta.Properties,
3072 1 : )
3073 1 :
3074 1 : if c.flushing == nil {
3075 1 : outputMetrics.TablesCompacted++
3076 1 : outputMetrics.BytesCompacted += meta.Size
3077 1 : } else {
3078 1 : outputMetrics.TablesFlushed++
3079 1 : outputMetrics.BytesFlushed += meta.Size
3080 1 : }
3081 1 : outputMetrics.Size += int64(meta.Size)
3082 1 : outputMetrics.NumFiles++
3083 1 : outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize
3084 1 : outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize
3085 1 :
3086 1 : if n := len(ve.NewFiles); n > 1 {
3087 1 : // This is not the first output file. Ensure the sstable boundaries
3088 1 : // are nonoverlapping.
3089 1 : prevMeta := ve.NewFiles[n-2].Meta
3090 1 : if writerMeta.SmallestRangeDel.UserKey != nil {
3091 1 : c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey)
3092 1 : if c < 0 {
3093 0 : return errors.Errorf(
3094 0 : "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s",
3095 0 : writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3096 0 : prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
3097 1 : } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() {
3098 0 : // The user key portion of the range boundary start key is
3099 0 : // equal to the previous table's largest key user key, and
3100 0 : // the previous table's largest key is not exclusive. This
3101 0 : // violates the invariant that tables are key-space
3102 0 : // partitioned.
3103 0 : return errors.Errorf(
3104 0 : "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s",
3105 0 : prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey),
3106 0 : writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3107 0 : )
3108 0 : }
3109 : }
3110 : }
3111 :
3112 : // Verify that all range deletions outputted to the sstable are
3113 : // truncated to split key.
3114 1 : if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil &&
3115 1 : d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 {
3116 0 : return errors.Errorf(
3117 0 : "pebble: invariant violation: rangedel largest key %q extends beyond split key %q",
3118 0 : writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3119 0 : d.opts.Comparer.FormatKey(splitKey),
3120 0 : )
3121 0 : }
3122 :
3123 1 : if writerMeta.HasPointKeys {
3124 1 : meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint)
3125 1 : }
3126 1 : if writerMeta.HasRangeDelKeys {
3127 1 : meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel)
3128 1 : }
3129 1 : if writerMeta.HasRangeKeys {
3130 1 : meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey)
3131 1 : }
3132 :
3133 : // Verify that the sstable bounds fall within the compaction input
3134 : // bounds. This is a sanity check that we don't have a logic error
3135 : // elsewhere that causes the sstable bounds to accidentally expand past the
3136 : // compaction input bounds as doing so could lead to various badness such
3137 : // as keys being deleted by a range tombstone incorrectly.
3138 1 : if c.smallest.UserKey != nil {
3139 1 : switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); {
3140 1 : case v >= 0:
3141 : // Nothing to do.
3142 0 : case v < 0:
3143 0 : return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s",
3144 0 : meta.Smallest.Pretty(d.opts.Comparer.FormatKey),
3145 0 : c.smallest.Pretty(d.opts.Comparer.FormatKey))
3146 : }
3147 : }
3148 1 : if c.largest.UserKey != nil {
3149 1 : switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); {
3150 1 : case v <= 0:
3151 : // Nothing to do.
3152 0 : case v > 0:
3153 0 : return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s",
3154 0 : meta.Largest.Pretty(d.opts.Comparer.FormatKey),
3155 0 : c.largest.Pretty(d.opts.Comparer.FormatKey))
3156 : }
3157 : }
3158 : // Verify that we never split different revisions of the same user key
3159 : // across two different sstables.
3160 1 : if err := c.errorOnUserKeyOverlap(ve); err != nil {
3161 0 : return err
3162 0 : }
3163 1 : if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
3164 0 : return err
3165 0 : }
3166 1 : return nil
3167 : }
3168 :
3169 : // Build a compactionOutputSplitter that contains all logic to determine
3170 : // whether the compaction loop should stop writing to one output sstable and
3171 : // switch to a new one. Some splitters can wrap other splitters, and the
3172 : // splitterGroup can be composed of multiple splitters. In this case, we
3173 : // start off with splitters for file sizes, grandparent limits, and (for L0
3174 : // splits) L0 limits, before wrapping them in an splitterGroup.
3175 1 : unsafePrevUserKey := func() []byte {
3176 1 : // Return the largest point key written to tw or the start of
3177 1 : // the current range deletion in the fragmenter, whichever is
3178 1 : // greater.
3179 1 : prevPoint := prevPointKey.UnsafeKey()
3180 1 : if c.cmp(prevPoint.UserKey, iter.FirstTombstoneStart()) > 0 {
3181 1 : return prevPoint.UserKey
3182 1 : }
3183 1 : return iter.FirstTombstoneStart()
3184 : }
3185 1 : outputSplitters := []compact.OutputSplitter{
3186 1 : // We do not split the same user key across different sstables within
3187 1 : // one flush or compaction. The FileSizeSplitter may request a split in
3188 1 : // the middle of a user key, so PreventSplitUserKeys ensures we are at a
3189 1 : // user key change boundary when doing a split.
3190 1 : compact.PreventSplitUserKeys(
3191 1 : c.cmp,
3192 1 : compact.FileSizeSplitter(&iter.frontiers, c.maxOutputFileSize, c.grandparents.Iter()),
3193 1 : unsafePrevUserKey,
3194 1 : ),
3195 1 : compact.LimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
3196 1 : }
3197 1 : if splitL0Outputs {
3198 1 : outputSplitters = append(outputSplitters, compact.LimitFuncSplitter(&iter.frontiers, c.findL0Limit))
3199 1 : }
3200 1 : splitter := compact.CombineSplitters(c.cmp, outputSplitters...)
3201 1 :
3202 1 : // Each outer loop iteration produces one output file. An iteration that
3203 1 : // produces a file containing point keys (and optionally range tombstones)
3204 1 : // guarantees that the input iterator advanced. An iteration that produces
3205 1 : // a file containing only range tombstones guarantees the limit passed to
3206 1 : // `finishOutput()` advanced to a strictly greater user key corresponding
3207 1 : // to a grandparent file largest key, or nil. Taken together, these
3208 1 : // progress guarantees ensure that eventually the input iterator will be
3209 1 : // exhausted and the range tombstone fragments will all be flushed.
3210 1 : for key, val := iter.First(); key != nil || iter.FirstTombstoneStart() != nil || iter.FirstRangeKeyStart() != nil; {
3211 1 : var firstKey []byte
3212 1 : if key != nil {
3213 1 : firstKey = key.UserKey
3214 1 : } else if startKey := iter.FirstTombstoneStart(); startKey != nil {
3215 1 : // Pass the start key of the first pending tombstone to find the
3216 1 : // next limit. All pending tombstones have the same start key. We
3217 1 : // use this as opposed to the end key of the last written sstable to
3218 1 : // effectively handle cases like these:
3219 1 : //
3220 1 : // a.SET.3
3221 1 : // (lf.limit at b)
3222 1 : // d.RANGEDEL.4:f
3223 1 : //
3224 1 : // In this case, the partition after b has only range deletions, so
3225 1 : // if we were to find the limit after the last written key at the
3226 1 : // split point (key a), we'd get the limit b again, and
3227 1 : // finishOutput() would not advance any further because the next
3228 1 : // range tombstone to write does not start until after the L0 split
3229 1 : // point.
3230 1 : firstKey = startKey
3231 1 : }
3232 1 : splitterSuggestion := splitter.OnNewOutput(firstKey)
3233 1 :
3234 1 : // Each inner loop iteration processes one key from the input iterator.
3235 1 : for ; key != nil; key, val = iter.Next() {
3236 1 : if split := splitter.ShouldSplitBefore(key, tw); split == compact.SplitNow {
3237 1 : break
3238 : }
3239 :
3240 1 : switch key.Kind() {
3241 1 : case InternalKeyKindRangeDelete:
3242 1 : // Range tombstones are handled specially. They are not written until
3243 1 : // later during `finishOutput()`. We add them to the compaction iterator
3244 1 : // so covered keys in the same snapshot stripe can be elided.
3245 1 : //
3246 1 : // Since the keys' Suffix and Value fields are not deep cloned, the
3247 1 : // underlying blockIter must be kept open for the lifetime of the
3248 1 : // compaction.
3249 1 : iter.AddTombstoneSpan(c.rangeDelInterleaving.Span())
3250 1 : continue
3251 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
3252 1 : // Range keys are handled in the same way as range tombstones.
3253 1 : // Since the keys' Suffix and Value fields are not deep cloned, the
3254 1 : // underlying blockIter must be kept open for the lifetime of the
3255 1 : // compaction.
3256 1 : iter.AddRangeKeySpan(c.rangeKeyInterleaving.Span())
3257 1 : continue
3258 : }
3259 1 : if tw == nil {
3260 1 : if err := newOutput(); err != nil {
3261 1 : return nil, pendingOutputs, stats, err
3262 1 : }
3263 : }
3264 1 : if err := tw.AddWithForceObsolete(*key, val, iter.forceObsoleteDueToRangeDel); err != nil {
3265 0 : return nil, pendingOutputs, stats, err
3266 0 : }
3267 1 : if iter.snapshotPinned {
3268 1 : // The kv pair we just added to the sstable was only surfaced by
3269 1 : // the compaction iterator because an open snapshot prevented
3270 1 : // its elision. Increment the stats.
3271 1 : pinnedCount++
3272 1 : pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
3273 1 : pinnedValueSize += uint64(len(val))
3274 1 : }
3275 : }
3276 :
3277 : // A splitter requested a split, and we're ready to finish the output.
3278 : // We need to choose the key at which to split any pending range
3279 : // tombstones. There are two options:
3280 : // 1. splitterSuggestion — The key suggested by the splitter. This key
3281 : // is guaranteed to be greater than the last key written to the
3282 : // current output.
3283 : // 2. key.UserKey — the first key of the next sstable output. This user
3284 : // key is also guaranteed to be greater than the last user key
3285 : // written to the current output (see userKeyChangeSplitter).
3286 : //
3287 : // Use whichever is smaller. Using the smaller of the two limits
3288 : // overlap with grandparents. Consider the case where the
3289 : // grandparent limit is calculated to be 'b', key is 'x', and
3290 : // there exist many sstables between 'b' and 'x'. If the range
3291 : // deletion fragmenter has a pending tombstone [a,x), splitting
3292 : // at 'x' would cause the output table to overlap many
3293 : // grandparents well beyond the calculated grandparent limit
3294 : // 'b'. Splitting at the smaller `splitterSuggestion` avoids
3295 : // this unbounded overlap with grandparent tables.
3296 1 : splitKey := splitterSuggestion
3297 1 : if key != nil && (splitKey == nil || c.cmp(splitKey, key.UserKey) > 0) {
3298 1 : splitKey = key.UserKey
3299 1 : }
3300 1 : if err := finishOutput(splitKey); err != nil {
3301 1 : return nil, pendingOutputs, stats, err
3302 1 : }
3303 : }
3304 :
3305 1 : for _, cl := range c.inputs {
3306 1 : iter := cl.files.Iter()
3307 1 : for f := iter.First(); f != nil; f = iter.Next() {
3308 1 : ve.DeletedFiles[deletedFileEntry{
3309 1 : Level: cl.level,
3310 1 : FileNum: f.FileNum,
3311 1 : }] = f
3312 1 : }
3313 : }
3314 :
3315 : // The compaction iterator keeps track of a count of the number of DELSIZED
3316 : // keys that encoded an incorrect size. Propagate it up as a part of
3317 : // compactStats.
3318 1 : stats.countMissizedDels = iter.stats.countMissizedDels
3319 1 :
3320 1 : if err := d.objProvider.Sync(); err != nil {
3321 0 : return nil, pendingOutputs, stats, err
3322 0 : }
3323 :
3324 : // Refresh the disk available statistic whenever a compaction/flush
3325 : // completes, before re-acquiring the mutex.
3326 1 : _ = d.calculateDiskAvailableBytes()
3327 1 :
3328 1 : return ve, pendingOutputs, stats, nil
3329 : }
3330 :
3331 : // validateVersionEdit validates that start and end keys across new and deleted
3332 : // files in a versionEdit pass the given validation function.
3333 : func validateVersionEdit(
3334 : ve *versionEdit, validateFn func([]byte) error, format base.FormatKey,
3335 1 : ) error {
3336 1 : validateMetaFn := func(f *manifest.FileMetadata) error {
3337 1 : for _, key := range []InternalKey{f.Smallest, f.Largest} {
3338 1 : if err := validateFn(key.UserKey); err != nil {
3339 0 : return errors.Wrapf(err, "key=%q; file=%s", format(key.UserKey), f)
3340 0 : }
3341 : }
3342 1 : return nil
3343 : }
3344 :
3345 : // Validate both new and deleted files.
3346 1 : for _, f := range ve.NewFiles {
3347 1 : if err := validateMetaFn(f.Meta); err != nil {
3348 0 : return err
3349 0 : }
3350 : }
3351 1 : for _, m := range ve.DeletedFiles {
3352 1 : if err := validateMetaFn(m); err != nil {
3353 0 : return err
3354 0 : }
3355 : }
3356 :
3357 1 : return nil
3358 : }
3359 :
3360 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
3361 : // and adds those to the internal lists of obsolete files. Note that the files
3362 : // are not actually deleted by this method. A subsequent call to
3363 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
3364 : // with compactions and flushes. db.mu must be held when calling this function.
3365 1 : func (d *DB) scanObsoleteFiles(list []string) {
3366 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
3367 1 : // flushes from interfering. The original value is restored on completion.
3368 1 : disabledPrev := d.opts.DisableAutomaticCompactions
3369 1 : defer func() {
3370 1 : d.opts.DisableAutomaticCompactions = disabledPrev
3371 1 : }()
3372 1 : d.opts.DisableAutomaticCompactions = true
3373 1 :
3374 1 : // Wait for any ongoing compaction to complete before continuing.
3375 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.downloadingCount > 0 || d.mu.compact.flushing {
3376 0 : d.mu.compact.cond.Wait()
3377 0 : }
3378 :
3379 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
3380 1 : d.mu.versions.addLiveFileNums(liveFileNums)
3381 1 : // Protect against files which are only referred to by the ingestedFlushable
3382 1 : // from being deleted. These are added to the flushable queue on WAL replay
3383 1 : // during read only mode and aren't part of the Version. Note that if
3384 1 : // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
3385 1 : // already been flushed.
3386 1 : for _, fEntry := range d.mu.mem.queue {
3387 1 : if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
3388 0 : for _, file := range f.files {
3389 0 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
3390 0 : }
3391 : }
3392 : }
3393 :
3394 1 : manifestFileNum := d.mu.versions.manifestFileNum
3395 1 :
3396 1 : var obsoleteTables []tableInfo
3397 1 : var obsoleteManifests []fileInfo
3398 1 : var obsoleteOptions []fileInfo
3399 1 :
3400 1 : for _, filename := range list {
3401 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
3402 1 : if !ok {
3403 1 : continue
3404 : }
3405 1 : switch fileType {
3406 1 : case fileTypeManifest:
3407 1 : if diskFileNum >= manifestFileNum {
3408 0 : continue
3409 : }
3410 1 : fi := fileInfo{FileNum: diskFileNum}
3411 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
3412 0 : fi.FileSize = uint64(stat.Size())
3413 0 : }
3414 1 : obsoleteManifests = append(obsoleteManifests, fi)
3415 1 : case fileTypeOptions:
3416 1 : if diskFileNum >= d.optionsFileNum {
3417 0 : continue
3418 : }
3419 1 : fi := fileInfo{FileNum: diskFileNum}
3420 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
3421 0 : fi.FileSize = uint64(stat.Size())
3422 0 : }
3423 1 : obsoleteOptions = append(obsoleteOptions, fi)
3424 1 : case fileTypeTable:
3425 : // Objects are handled through the objstorage provider below.
3426 1 : default:
3427 : // Don't delete files we don't know about.
3428 : }
3429 : }
3430 :
3431 1 : objects := d.objProvider.List()
3432 1 : for _, obj := range objects {
3433 1 : switch obj.FileType {
3434 1 : case fileTypeTable:
3435 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
3436 1 : continue
3437 : }
3438 1 : fileInfo := fileInfo{
3439 1 : FileNum: obj.DiskFileNum,
3440 1 : }
3441 1 : if size, err := d.objProvider.Size(obj); err == nil {
3442 1 : fileInfo.FileSize = uint64(size)
3443 1 : }
3444 1 : obsoleteTables = append(obsoleteTables, tableInfo{
3445 1 : fileInfo: fileInfo,
3446 1 : isLocal: !obj.IsRemote(),
3447 1 : })
3448 :
3449 0 : default:
3450 : // Ignore object types we don't know about.
3451 : }
3452 : }
3453 :
3454 1 : d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
3455 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
3456 1 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
3457 1 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
3458 : }
3459 :
3460 : // disableFileDeletions disables file deletions and then waits for any
3461 : // in-progress deletion to finish. The caller is required to call
3462 : // enableFileDeletions in order to enable file deletions again. It is ok for
3463 : // multiple callers to disable file deletions simultaneously, though they must
3464 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
3465 : // (there is an internal reference count on file deletion disablement).
3466 : //
3467 : // d.mu must be held when calling this method.
3468 1 : func (d *DB) disableFileDeletions() {
3469 1 : d.mu.disableFileDeletions++
3470 1 : d.mu.Unlock()
3471 1 : defer d.mu.Lock()
3472 1 : d.cleanupManager.Wait()
3473 1 : }
3474 :
3475 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
3476 : // is queued if necessary.
3477 : //
3478 : // d.mu must be held when calling this method.
3479 1 : func (d *DB) enableFileDeletions() {
3480 1 : if d.mu.disableFileDeletions <= 0 {
3481 0 : panic("pebble: file deletion disablement invariant violated")
3482 : }
3483 1 : d.mu.disableFileDeletions--
3484 1 : if d.mu.disableFileDeletions > 0 {
3485 0 : return
3486 0 : }
3487 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
3488 : }
3489 :
3490 : type fileInfo = base.FileInfo
3491 :
3492 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
3493 : //
3494 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
3495 : //
3496 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
3497 : // cleanup job will be scheduled when file deletions are re-enabled.
3498 1 : func (d *DB) deleteObsoleteFiles(jobID JobID) {
3499 1 : if d.mu.disableFileDeletions > 0 {
3500 1 : return
3501 1 : }
3502 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
3503 1 :
3504 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
3505 1 : // log that has not had its contents flushed to an sstable.
3506 1 : obsoleteLogs, err := d.mu.log.manager.Obsolete(wal.NumWAL(d.mu.versions.minUnflushedLogNum), noRecycle)
3507 1 : if err != nil {
3508 0 : panic(err)
3509 : }
3510 :
3511 1 : obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
3512 1 : d.mu.versions.obsoleteTables = nil
3513 1 :
3514 1 : for _, tbl := range obsoleteTables {
3515 1 : delete(d.mu.versions.zombieTables, tbl.FileNum)
3516 1 : }
3517 :
3518 : // Sort the manifests cause we want to delete some contiguous prefix
3519 : // of the older manifests.
3520 1 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
3521 1 : return cmp.Compare(a.FileNum, b.FileNum)
3522 1 : })
3523 :
3524 1 : var obsoleteManifests []fileInfo
3525 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
3526 1 : if manifestsToDelete > 0 {
3527 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
3528 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
3529 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
3530 0 : d.mu.versions.obsoleteManifests = nil
3531 0 : }
3532 : }
3533 :
3534 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
3535 1 : d.mu.versions.obsoleteOptions = nil
3536 1 :
3537 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
3538 1 : // Note the unusual order: Unlock and then Lock.
3539 1 : d.mu.Unlock()
3540 1 : defer d.mu.Lock()
3541 1 :
3542 1 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
3543 1 : for _, f := range obsoleteLogs {
3544 1 : filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
3545 1 : }
3546 : // We sort to make the order of deletions deterministic, which is nice for
3547 : // tests.
3548 1 : slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
3549 1 : return cmp.Compare(a.FileNum, b.FileNum)
3550 1 : })
3551 1 : for _, f := range obsoleteTables {
3552 1 : d.tableCache.evict(f.FileNum)
3553 1 : filesToDelete = append(filesToDelete, obsoleteFile{
3554 1 : fileType: fileTypeTable,
3555 1 : nonLogFile: deletableFile{
3556 1 : dir: d.dirname,
3557 1 : fileNum: f.FileNum,
3558 1 : fileSize: f.FileSize,
3559 1 : isLocal: f.isLocal,
3560 1 : },
3561 1 : })
3562 1 : }
3563 1 : files := [2]struct {
3564 1 : fileType fileType
3565 1 : obsolete []fileInfo
3566 1 : }{
3567 1 : {fileTypeManifest, obsoleteManifests},
3568 1 : {fileTypeOptions, obsoleteOptions},
3569 1 : }
3570 1 : for _, f := range files {
3571 1 : // We sort to make the order of deletions deterministic, which is nice for
3572 1 : // tests.
3573 1 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
3574 1 : return cmp.Compare(a.FileNum, b.FileNum)
3575 1 : })
3576 1 : for _, fi := range f.obsolete {
3577 1 : dir := d.dirname
3578 1 : filesToDelete = append(filesToDelete, obsoleteFile{
3579 1 : fileType: f.fileType,
3580 1 : nonLogFile: deletableFile{
3581 1 : dir: dir,
3582 1 : fileNum: fi.FileNum,
3583 1 : fileSize: fi.FileSize,
3584 1 : isLocal: true,
3585 1 : },
3586 1 : })
3587 1 : }
3588 : }
3589 1 : if len(filesToDelete) > 0 {
3590 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
3591 1 : }
3592 1 : if d.opts.private.testingAlwaysWaitForCleanup {
3593 0 : d.cleanupManager.Wait()
3594 0 : }
3595 : }
3596 :
3597 1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
3598 1 : d.mu.Lock()
3599 1 : defer d.mu.Unlock()
3600 1 : d.maybeScheduleObsoleteTableDeletionLocked()
3601 1 : }
3602 :
3603 1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
3604 1 : if len(d.mu.versions.obsoleteTables) > 0 {
3605 1 : d.deleteObsoleteFiles(d.newJobIDLocked())
3606 1 : }
3607 : }
3608 :
3609 1 : func merge(a, b []fileInfo) []fileInfo {
3610 1 : if len(b) == 0 {
3611 1 : return a
3612 1 : }
3613 :
3614 1 : a = append(a, b...)
3615 1 : slices.SortFunc(a, func(a, b fileInfo) int {
3616 1 : return cmp.Compare(a.FileNum, b.FileNum)
3617 1 : })
3618 1 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
3619 1 : return a.FileNum == b.FileNum
3620 1 : })
3621 : }
3622 :
3623 1 : func mergeTableInfos(a, b []tableInfo) []tableInfo {
3624 1 : if len(b) == 0 {
3625 1 : return a
3626 1 : }
3627 :
3628 1 : a = append(a, b...)
3629 1 : slices.SortFunc(a, func(a, b tableInfo) int {
3630 0 : return cmp.Compare(a.FileNum, b.FileNum)
3631 0 : })
3632 1 : return slices.CompactFunc(a, func(a, b tableInfo) bool {
3633 0 : return a.FileNum == b.FileNum
3634 0 : })
3635 : }
|