Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "iter"
12 : "math"
13 : "runtime/pprof"
14 : "slices"
15 : "sort"
16 : "sync/atomic"
17 : "time"
18 : "unsafe"
19 :
20 : "github.com/cockroachdb/crlib/crtime"
21 : "github.com/cockroachdb/errors"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/compact"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
26 : "github.com/cockroachdb/pebble/internal/manifest"
27 : "github.com/cockroachdb/pebble/internal/sstableinternal"
28 : "github.com/cockroachdb/pebble/objstorage"
29 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
30 : "github.com/cockroachdb/pebble/objstorage/remote"
31 : "github.com/cockroachdb/pebble/sstable"
32 : "github.com/cockroachdb/pebble/sstable/blob"
33 : "github.com/cockroachdb/pebble/sstable/block"
34 : "github.com/cockroachdb/pebble/vfs"
35 : )
36 :
37 : var errEmptyTable = errors.New("pebble: empty table")
38 :
39 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
40 : // concurrent excise or ingest-split operation.
41 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
42 :
43 : var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
44 : var gcLabels = pprof.Labels("pebble", "gc")
45 :
46 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
47 : // compacted files. We avoid expanding the lower level file set of a compaction
48 : // if it would make the total compaction cover more than this many bytes.
49 1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
50 1 : v := uint64(25 * opts.Levels[level].TargetFileSize)
51 1 :
52 1 : // Never expand a compaction beyond half the available capacity, divided
53 1 : // by the maximum number of concurrent compactions. Each of the concurrent
54 1 : // compactions may expand up to this limit, so this attempts to limit
55 1 : // compactions to half of available disk space. Note that this will not
56 1 : // prevent compaction picking from pursuing compactions that are larger
57 1 : // than this threshold before expansion.
58 1 : //
59 1 : // NB: this heuristic is an approximation since we may run more compactions
60 1 : // than the upper concurrency limit.
61 1 : _, maxConcurrency := opts.CompactionConcurrencyRange()
62 1 : diskMax := (availBytes / 2) / uint64(maxConcurrency)
63 1 : if v > diskMax {
64 1 : v = diskMax
65 1 : }
66 1 : return v
67 : }
68 :
69 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
70 : // before we stop building a single file in a level-1 to level compaction.
71 1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
72 1 : return uint64(10 * opts.Levels[level].TargetFileSize)
73 1 : }
74 :
75 : // maxReadCompactionBytes is used to prevent read compactions which
76 : // are too wide.
77 1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
78 1 : return uint64(10 * opts.Levels[level].TargetFileSize)
79 1 : }
80 :
81 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
82 : // calls to Close. It is used during compaction to ensure that rangeDelIters
83 : // are not closed prematurely.
84 : type noCloseIter struct {
85 : keyspan.FragmentIterator
86 : }
87 :
88 1 : func (i *noCloseIter) Close() {}
89 :
90 : type compactionLevel struct {
91 : level int
92 : files manifest.LevelSlice
93 : // l0SublevelInfo contains information about L0 sublevels being compacted.
94 : // It's only set for the start level of a compaction starting out of L0 and
95 : // is nil for all other compactions.
96 : l0SublevelInfo []sublevelInfo
97 : }
98 :
99 1 : func (cl compactionLevel) Clone() compactionLevel {
100 1 : newCL := compactionLevel{
101 1 : level: cl.level,
102 1 : files: cl.files,
103 1 : }
104 1 : return newCL
105 1 : }
106 1 : func (cl compactionLevel) String() string {
107 1 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
108 1 : }
109 :
110 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
111 : // updates a metric in `versions` on bytes written by in-progress compactions so
112 : // far. It also increments a per-compaction `written` atomic int.
113 : type compactionWritable struct {
114 : objstorage.Writable
115 :
116 : versions *versionSet
117 : written *atomic.Int64
118 : }
119 :
120 : // Write is part of the objstorage.Writable interface.
121 1 : func (c *compactionWritable) Write(p []byte) error {
122 1 : if err := c.Writable.Write(p); err != nil {
123 0 : return err
124 0 : }
125 :
126 1 : c.written.Add(int64(len(p)))
127 1 : c.versions.incrementCompactionBytes(int64(len(p)))
128 1 : return nil
129 : }
130 :
131 : type compactionKind int
132 :
133 : const (
134 : compactionKindDefault compactionKind = iota
135 : compactionKindFlush
136 : // compactionKindMove denotes a move compaction where the input file is
137 : // retained and linked in a new level without being obsoleted.
138 : compactionKindMove
139 : // compactionKindCopy denotes a copy compaction where the input file is
140 : // copied byte-by-byte into a new file with a new TableNum in the output level.
141 : compactionKindCopy
142 : // compactionKindDeleteOnly denotes a compaction that only deletes input
143 : // files. It can occur when wide range tombstones completely contain sstables.
144 : compactionKindDeleteOnly
145 : compactionKindElisionOnly
146 : compactionKindRead
147 : compactionKindTombstoneDensity
148 : compactionKindRewrite
149 : compactionKindIngestedFlushable
150 : )
151 :
152 1 : func (k compactionKind) String() string {
153 1 : switch k {
154 1 : case compactionKindDefault:
155 1 : return "default"
156 0 : case compactionKindFlush:
157 0 : return "flush"
158 1 : case compactionKindMove:
159 1 : return "move"
160 1 : case compactionKindDeleteOnly:
161 1 : return "delete-only"
162 1 : case compactionKindElisionOnly:
163 1 : return "elision-only"
164 1 : case compactionKindRead:
165 1 : return "read"
166 1 : case compactionKindTombstoneDensity:
167 1 : return "tombstone-density"
168 1 : case compactionKindRewrite:
169 1 : return "rewrite"
170 0 : case compactionKindIngestedFlushable:
171 0 : return "ingested-flushable"
172 1 : case compactionKindCopy:
173 1 : return "copy"
174 : }
175 0 : return "?"
176 : }
177 :
178 : // compactingOrFlushing returns "flushing" if the compaction kind is a flush,
179 : // otherwise it returns "compacting".
180 1 : func (k compactionKind) compactingOrFlushing() string {
181 1 : if k == compactionKindFlush {
182 1 : return "flushing"
183 1 : }
184 1 : return "compacting"
185 : }
186 :
187 : // compaction is a table compaction from one level to the next, starting from a
188 : // given version.
189 : type compaction struct {
190 : // cancel is a bool that can be used by other goroutines to signal a compaction
191 : // to cancel, such as if a conflicting excise operation raced it to manifest
192 : // application. Only holders of the manifest lock will write to this atomic.
193 : cancel atomic.Bool
194 :
195 : kind compactionKind
196 : // isDownload is true if this compaction was started as part of a Download
197 : // operation. In this case kind is compactionKindCopy or
198 : // compactionKindRewrite.
199 : isDownload bool
200 :
201 : cmp Compare
202 : equal Equal
203 : comparer *base.Comparer
204 : formatKey base.FormatKey
205 : logger Logger
206 : version *manifest.Version
207 : stats base.InternalIteratorStats
208 : beganAt time.Time
209 : // versionEditApplied is set to true when a compaction has completed and the
210 : // resulting version has been installed (if successful), but the compaction
211 : // goroutine is still cleaning up (eg, deleting obsolete files).
212 : versionEditApplied bool
213 : bufferPool sstable.BufferPool
214 : // getValueSeparation constructs a compact.ValueSeparation for use in a
215 : // compaction. It implements heuristics around choosing whether a compaction
216 : // should:
217 : //
218 : // a) preserve existing blob references: The compaction does not write any
219 : // new blob files, but propagates existing references to blob files.This
220 : // conserves write bandwidth by avoiding rewriting the referenced values. It
221 : // also reduces the locality of the referenced values which can reduce scan
222 : // performance because a scan must load values from more unique blob files.
223 : // It can also delay reclamation of disk space if some of the references to
224 : // blob values are elided by the compaction, increasing space amplification.
225 : //
226 : // b) rewrite blob files: The compaction will write eligible values to new
227 : // blob files. This consumes more write bandwidth because all values are
228 : // rewritten. However it restores locality.
229 : getValueSeparation func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation
230 : // valueFetcher is used to fetch values from blob files. It's propagated
231 : // down the iterator tree through the internal iterator options.
232 : valueFetcher blob.ValueFetcher
233 :
234 : // startLevel is the level that is being compacted. Inputs from startLevel
235 : // and outputLevel will be merged to produce a set of outputLevel files.
236 : startLevel *compactionLevel
237 :
238 : // outputLevel is the level that files are being produced in. outputLevel is
239 : // equal to startLevel+1 except when:
240 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
241 : // - in multilevel compaction, the output level is the lowest level involved in
242 : // the compaction
243 : // A compaction's outputLevel is nil for delete-only compactions.
244 : outputLevel *compactionLevel
245 :
246 : // extraLevels point to additional levels in between the input and output
247 : // levels that get compacted in multilevel compactions
248 : extraLevels []*compactionLevel
249 :
250 : inputs []compactionLevel
251 :
252 : // maxOutputFileSize is the maximum size of an individual table created
253 : // during compaction.
254 : maxOutputFileSize uint64
255 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
256 : // single output table with the tables in the grandparent level.
257 : maxOverlapBytes uint64
258 :
259 : // flushing contains the flushables (aka memtables) that are being flushed.
260 : flushing flushableList
261 : // bytesWritten contains the number of bytes that have been written to outputs.
262 : bytesWritten atomic.Int64
263 :
264 : // The boundaries of the input data.
265 : smallest InternalKey
266 : largest InternalKey
267 :
268 : // A list of fragment iterators to close when the compaction finishes. Used by
269 : // input iteration to keep rangeDelIters open for the lifetime of the
270 : // compaction, and only close them when the compaction finishes.
271 : closers []*noCloseIter
272 :
273 : // grandparents are the tables in level+2 that overlap with the files being
274 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
275 : // in the grandparent when this compaction finishes will be the same.
276 : grandparents manifest.LevelSlice
277 :
278 : // Boundaries at which flushes to L0 should be split. Determined by
279 : // L0Sublevels. If nil, flushes aren't split.
280 : l0Limits [][]byte
281 :
282 : delElision compact.TombstoneElision
283 : rangeKeyElision compact.TombstoneElision
284 :
285 : // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
286 : // snapshots requiring them to be kept. This determination is made by
287 : // looking for an sstable which overlaps the bounds of the compaction at a
288 : // lower level in the LSM during runCompaction.
289 : allowedZeroSeqNum bool
290 :
291 : // deletionHints are set if this is a compactionKindDeleteOnly. Used to figure
292 : // out whether an input must be deleted in its entirety, or excised into
293 : // virtual sstables.
294 : deletionHints []deleteCompactionHint
295 :
296 : // exciseEnabled is set to true if this is a compactionKindDeleteOnly and
297 : // this compaction is allowed to excise files.
298 : exciseEnabled bool
299 :
300 : metrics levelMetricsDelta
301 :
302 : pickerMetrics pickedCompactionMetrics
303 :
304 : grantHandle CompactionGrantHandle
305 :
306 : tableFormat sstable.TableFormat
307 : objCreateOpts objstorage.CreateOptions
308 : }
309 :
310 : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
311 : // input sstables.
312 1 : func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
313 1 : var seqNum base.SeqNum
314 1 : for _, cl := range c.inputs {
315 1 : for m := range cl.files.All() {
316 1 : seqNum = max(seqNum, m.LargestSeqNumAbsolute)
317 1 : }
318 : }
319 1 : return seqNum
320 : }
321 :
322 1 : func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
323 1 : info := CompactionInfo{
324 1 : JobID: int(jobID),
325 1 : Reason: c.kind.String(),
326 1 : Input: make([]LevelInfo, 0, len(c.inputs)),
327 1 : Annotations: []string{},
328 1 : }
329 1 : if c.isDownload {
330 1 : info.Reason = "download," + info.Reason
331 1 : }
332 1 : for _, cl := range c.inputs {
333 1 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
334 1 : for m := range cl.files.All() {
335 1 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
336 1 : }
337 1 : info.Input = append(info.Input, inputInfo)
338 : }
339 1 : if c.outputLevel != nil {
340 1 : info.Output.Level = c.outputLevel.level
341 1 :
342 1 : // If there are no inputs from the output level (eg, a move
343 1 : // compaction), add an empty LevelInfo to info.Input.
344 1 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
345 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
346 0 : }
347 1 : } else {
348 1 : // For a delete-only compaction, set the output level to L6. The
349 1 : // output level is not meaningful here, but complicating the
350 1 : // info.Output interface with a pointer doesn't seem worth the
351 1 : // semantic distinction.
352 1 : info.Output.Level = numLevels - 1
353 1 : }
354 :
355 1 : for i, score := range c.pickerMetrics.scores {
356 1 : info.Input[i].Score = score
357 1 : }
358 1 : info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
359 1 : info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
360 1 : if len(info.Input) > 2 {
361 1 : info.Annotations = append(info.Annotations, "multilevel")
362 1 : }
363 1 : return info
364 : }
365 :
366 1 : func (c *compaction) userKeyBounds() base.UserKeyBounds {
367 1 : return base.UserKeyBoundsFromInternal(c.smallest, c.largest)
368 1 : }
369 :
370 : type getValueSeparation func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation
371 :
372 : func newCompaction(
373 : pc *pickedCompaction,
374 : opts *Options,
375 : beganAt time.Time,
376 : provider objstorage.Provider,
377 : grantHandle CompactionGrantHandle,
378 : tableFormat sstable.TableFormat,
379 : getValueSeparation getValueSeparation,
380 1 : ) *compaction {
381 1 : c := &compaction{
382 1 : kind: compactionKindDefault,
383 1 : cmp: pc.cmp,
384 1 : equal: opts.Comparer.Equal,
385 1 : comparer: opts.Comparer,
386 1 : formatKey: opts.Comparer.FormatKey,
387 1 : inputs: pc.inputs,
388 1 : smallest: pc.smallest,
389 1 : largest: pc.largest,
390 1 : logger: opts.Logger,
391 1 : version: pc.version,
392 1 : beganAt: beganAt,
393 1 : getValueSeparation: getValueSeparation,
394 1 : maxOutputFileSize: pc.maxOutputFileSize,
395 1 : maxOverlapBytes: pc.maxOverlapBytes,
396 1 : pickerMetrics: pc.pickerMetrics,
397 1 : grantHandle: grantHandle,
398 1 : tableFormat: tableFormat,
399 1 : }
400 1 : c.startLevel = &c.inputs[0]
401 1 : if pc.startLevel.l0SublevelInfo != nil {
402 1 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
403 1 : }
404 1 : c.outputLevel = &c.inputs[1]
405 1 :
406 1 : if len(pc.extraLevels) > 0 {
407 1 : c.extraLevels = pc.extraLevels
408 1 : c.outputLevel = &c.inputs[len(c.inputs)-1]
409 1 : }
410 : // Compute the set of outputLevel+1 files that overlap this compaction (these
411 : // are the grandparent sstables).
412 1 : if c.outputLevel.level+1 < numLevels {
413 1 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.userKeyBounds())
414 1 : }
415 1 : c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
416 1 : c.cmp, c.version, pc.l0Organizer, c.outputLevel.level, base.UserKeyBoundsFromInternal(c.smallest, c.largest),
417 1 : )
418 1 : c.kind = pc.kind
419 1 :
420 1 : preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
421 1 : remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
422 1 : c.maybeSwitchToMoveOrCopy(preferSharedStorage, provider)
423 1 : c.objCreateOpts = objstorage.CreateOptions{
424 1 : PreferSharedStorage: preferSharedStorage,
425 1 : WriteCategory: getDiskWriteCategoryForCompaction(opts, c.kind),
426 1 : }
427 1 : if preferSharedStorage {
428 1 : c.getValueSeparation = neverSeparateValues
429 1 : }
430 :
431 1 : return c
432 : }
433 :
434 : // maybeSwitchToMoveOrCopy decides if the compaction can be changed into a move
435 : // or copy compaction, in which case c.kind is updated.
436 : func (c *compaction) maybeSwitchToMoveOrCopy(
437 : preferSharedStorage bool, provider objstorage.Provider,
438 1 : ) {
439 1 : // Only non-multi-level compactions with a single input file can be
440 1 : // considered.
441 1 : if c.startLevel.files.Len() != 1 || !c.outputLevel.files.Empty() || c.hasExtraLevelData() {
442 1 : return
443 1 : }
444 :
445 : // In addition to the default compaction, we also check whether a tombstone
446 : // density compaction can be optimized into a move compaction. However, we
447 : // want to avoid performing a move compaction into the lowest level, since the
448 : // goal there is to actually remove the tombstones.
449 : //
450 : // Tombstone density compaction is meant to address cases where tombstones
451 : // don't reclaim much space but are still expensive to scan over. We can only
452 : // remove the tombstones once there's nothing at all underneath them.
453 1 : switch c.kind {
454 1 : case compactionKindDefault:
455 : // Proceed.
456 1 : case compactionKindTombstoneDensity:
457 1 : // Tombstone density compaction can be optimized into a move compaction.
458 1 : // However, we want to avoid performing a move compaction into the lowest
459 1 : // level, since the goal there is to actually remove the tombstones; even if
460 1 : // they don't prevent a lot of space from being reclaimed, tombstones can
461 1 : // still be expensive to scan over.
462 1 : if c.outputLevel.level == numLevels-1 {
463 1 : return
464 1 : }
465 1 : default:
466 1 : // Other compaction kinds not supported.
467 1 : return
468 : }
469 :
470 : // We avoid a move or copy if there is lots of overlapping grandparent data.
471 : // Otherwise, the move could create a parent file that will require a very
472 : // expensive merge later on.
473 1 : if c.grandparents.AggregateSizeSum() > c.maxOverlapBytes {
474 1 : return
475 1 : }
476 :
477 1 : iter := c.startLevel.files.Iter()
478 1 : meta := iter.First()
479 1 :
480 1 : // We should always be passed a provider, except in some unit tests.
481 1 : isRemote := provider != nil && !objstorage.IsLocalTable(provider, meta.TableBacking.DiskFileNum)
482 1 :
483 1 : // Shared and external tables can always be moved. We can also move a local
484 1 : // table unless we need the result to be on shared storage.
485 1 : if isRemote || !preferSharedStorage {
486 1 : c.kind = compactionKindMove
487 1 : return
488 1 : }
489 :
490 : // We can rewrite the table (regular compaction) or we can use a copy compaction.
491 1 : switch {
492 1 : case meta.Virtual:
493 : // We want to avoid a copy compaction if the table is virtual, as we may end
494 : // up copying a lot more data than necessary.
495 0 : case meta.BlobReferenceDepth != 0:
496 : // We also want to avoid copy compactions for tables with blob references,
497 : // as we currently lack a mechanism to propagate blob references along with
498 : // the sstable.
499 1 : default:
500 1 : c.kind = compactionKindCopy
501 : }
502 : }
503 :
504 : func newDeleteOnlyCompaction(
505 : opts *Options,
506 : cur *manifest.Version,
507 : inputs []compactionLevel,
508 : beganAt time.Time,
509 : hints []deleteCompactionHint,
510 : exciseEnabled bool,
511 1 : ) *compaction {
512 1 : c := &compaction{
513 1 : kind: compactionKindDeleteOnly,
514 1 : cmp: opts.Comparer.Compare,
515 1 : equal: opts.Comparer.Equal,
516 1 : comparer: opts.Comparer,
517 1 : formatKey: opts.Comparer.FormatKey,
518 1 : logger: opts.Logger,
519 1 : version: cur,
520 1 : beganAt: beganAt,
521 1 : inputs: inputs,
522 1 : deletionHints: hints,
523 1 : exciseEnabled: exciseEnabled,
524 1 : grantHandle: noopGrantHandle{},
525 1 : }
526 1 :
527 1 : // Set c.smallest, c.largest.
528 1 : files := make([]iter.Seq[*manifest.TableMetadata], 0, len(inputs))
529 1 : for _, in := range inputs {
530 1 : files = append(files, in.files.All())
531 1 : }
532 1 : c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
533 1 : return c
534 : }
535 :
536 1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
537 1 : // Heuristic to place a lower bound on compaction output file size
538 1 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
539 1 : // production with 310K files of which 290K files were < 10KB in size.
540 1 : // Our hypothesis is that it was caused by L1 having 2600 files and
541 1 : // ~10GB, such that each flush got split into many tiny files due to
542 1 : // overlapping with most of the files in Lbase.
543 1 : //
544 1 : // The computation below is general in that it accounts
545 1 : // for flushing different volumes of data (e.g. we may be flushing
546 1 : // many memtables). For illustration, we consider the typical
547 1 : // example of flushing a 64MB memtable. So 12.8MB output,
548 1 : // based on the compression guess below. If the compressed bytes
549 1 : // guess is an over-estimate we will end up with smaller files,
550 1 : // and if an under-estimate we will end up with larger files.
551 1 : // With a 2MB target file size, 7 files. We are willing to accept
552 1 : // 4x the number of files, if it results in better write amplification
553 1 : // when later compacting to Lbase, i.e., ~450KB files (target file
554 1 : // size / 4).
555 1 : //
556 1 : // Note that this is a pessimistic heuristic in that
557 1 : // fileCountUpperBoundDueToGrandparents could be far from the actual
558 1 : // number of files produced due to the grandparent limits. For
559 1 : // example, in the extreme, consider a flush that overlaps with 1000
560 1 : // files in Lbase f0...f999, and the initially calculated value of
561 1 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
562 1 : // means an upper bound file count of 100 files. Say the input bytes
563 1 : // in the flush are such that acceptableFileCount=10. We will fatten
564 1 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
565 1 : // drops to 10. However, it is possible that in practice, even without
566 1 : // this change, we would have produced no more than 10 files, and that
567 1 : // this change makes the files unnecessarily wide. Say the input bytes
568 1 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
569 1 : // 10% in f80...f89 and 10% in f990...f999. The original value of
570 1 : // maxOverlapBytes would have actually produced only 10 sstables. But
571 1 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
572 1 : // spans f0...f89, i.e., a much wider sstable than necessary.
573 1 : //
574 1 : // We could produce a tighter estimate of
575 1 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
576 1 : // distribution of the flush. The 4x multiplier mentioned earlier is
577 1 : // a way to try to compensate for this pessimism.
578 1 : //
579 1 : // TODO(sumeer): we don't have compression info for the data being
580 1 : // flushed, but it is likely that existing files that overlap with
581 1 : // this flush in Lbase are representative wrt compression ratio. We
582 1 : // could store the uncompressed size in TableMetadata and estimate
583 1 : // the compression ratio.
584 1 : const approxCompressionRatio = 0.2
585 1 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
586 1 : approxNumFilesBasedOnTargetSize :=
587 1 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
588 1 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
589 1 : // The byte calculation is linear in numGrandparentFiles, but we will
590 1 : // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
591 1 : // also willing to pay it now. We could approximate this cheaply by using the
592 1 : // mean file size of Lbase.
593 1 : grandparentFileBytes := c.grandparents.AggregateSizeSum()
594 1 : fileCountUpperBoundDueToGrandparents :=
595 1 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
596 1 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
597 1 : c.maxOverlapBytes = uint64(
598 1 : float64(c.maxOverlapBytes) *
599 1 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
600 1 : }
601 : }
602 :
603 : func newFlush(
604 : opts *Options,
605 : cur *manifest.Version,
606 : l0Organizer *manifest.L0Organizer,
607 : baseLevel int,
608 : flushing flushableList,
609 : beganAt time.Time,
610 : tableFormat sstable.TableFormat,
611 : getValueSeparation getValueSeparation,
612 1 : ) (*compaction, error) {
613 1 : c := &compaction{
614 1 : kind: compactionKindFlush,
615 1 : cmp: opts.Comparer.Compare,
616 1 : equal: opts.Comparer.Equal,
617 1 : comparer: opts.Comparer,
618 1 : formatKey: opts.Comparer.FormatKey,
619 1 : logger: opts.Logger,
620 1 : version: cur,
621 1 : beganAt: beganAt,
622 1 : inputs: []compactionLevel{{level: -1}, {level: 0}},
623 1 : getValueSeparation: getValueSeparation,
624 1 : maxOutputFileSize: math.MaxUint64,
625 1 : maxOverlapBytes: math.MaxUint64,
626 1 : flushing: flushing,
627 1 : grantHandle: noopGrantHandle{},
628 1 : tableFormat: tableFormat,
629 1 : }
630 1 : c.startLevel = &c.inputs[0]
631 1 : c.outputLevel = &c.inputs[1]
632 1 : if len(flushing) > 0 {
633 1 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
634 1 : if len(flushing) != 1 {
635 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
636 : }
637 1 : c.kind = compactionKindIngestedFlushable
638 1 : return c, nil
639 : }
640 : }
641 :
642 1 : preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
643 1 : remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
644 1 : c.objCreateOpts = objstorage.CreateOptions{
645 1 : PreferSharedStorage: preferSharedStorage,
646 1 : WriteCategory: getDiskWriteCategoryForCompaction(opts, c.kind),
647 1 : }
648 1 : if preferSharedStorage {
649 1 : c.getValueSeparation = neverSeparateValues
650 1 : }
651 :
652 : // Make sure there's no ingestedFlushable after the first flushable in the
653 : // list.
654 1 : for _, f := range flushing {
655 1 : if _, ok := f.flushable.(*ingestedFlushable); ok {
656 0 : panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
657 : }
658 : }
659 :
660 1 : c.l0Limits = l0Organizer.FlushSplitKeys()
661 1 :
662 1 : smallestSet, largestSet := false, false
663 1 : updatePointBounds := func(iter internalIterator) {
664 1 : if kv := iter.First(); kv != nil {
665 1 : if !smallestSet ||
666 1 : base.InternalCompare(c.cmp, c.smallest, kv.K) > 0 {
667 1 : smallestSet = true
668 1 : c.smallest = kv.K.Clone()
669 1 : }
670 : }
671 1 : if kv := iter.Last(); kv != nil {
672 1 : if !largestSet ||
673 1 : base.InternalCompare(c.cmp, c.largest, kv.K) < 0 {
674 1 : largestSet = true
675 1 : c.largest = kv.K.Clone()
676 1 : }
677 : }
678 : }
679 :
680 1 : updateRangeBounds := func(iter keyspan.FragmentIterator) error {
681 1 : // File bounds require s != nil && !s.Empty(). We only need to check for
682 1 : // s != nil here, as the memtable's FragmentIterator would never surface
683 1 : // empty spans.
684 1 : if s, err := iter.First(); err != nil {
685 0 : return err
686 1 : } else if s != nil {
687 1 : if key := s.SmallestKey(); !smallestSet ||
688 1 : base.InternalCompare(c.cmp, c.smallest, key) > 0 {
689 1 : smallestSet = true
690 1 : c.smallest = key.Clone()
691 1 : }
692 : }
693 1 : if s, err := iter.Last(); err != nil {
694 0 : return err
695 1 : } else if s != nil {
696 1 : if key := s.LargestKey(); !largestSet ||
697 1 : base.InternalCompare(c.cmp, c.largest, key) < 0 {
698 1 : largestSet = true
699 1 : c.largest = key.Clone()
700 1 : }
701 : }
702 1 : return nil
703 : }
704 :
705 1 : var flushingBytes uint64
706 1 : for i := range flushing {
707 1 : f := flushing[i]
708 1 : updatePointBounds(f.newIter(nil))
709 1 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
710 1 : if err := updateRangeBounds(rangeDelIter); err != nil {
711 0 : return nil, err
712 0 : }
713 : }
714 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
715 1 : if err := updateRangeBounds(rangeKeyIter); err != nil {
716 0 : return nil, err
717 0 : }
718 : }
719 1 : flushingBytes += f.inuseBytes()
720 : }
721 :
722 1 : if opts.FlushSplitBytes > 0 {
723 1 : c.maxOutputFileSize = uint64(opts.Levels[0].TargetFileSize)
724 1 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
725 1 : c.grandparents = c.version.Overlaps(baseLevel, c.userKeyBounds())
726 1 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
727 1 : }
728 :
729 : // We don't elide tombstones for flushes.
730 1 : c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
731 1 : return c, nil
732 : }
733 :
734 1 : func (c *compaction) hasExtraLevelData() bool {
735 1 : if len(c.extraLevels) == 0 {
736 1 : // not a multi level compaction
737 1 : return false
738 1 : } else if c.extraLevels[0].files.Empty() {
739 1 : // a multi level compaction without data in the intermediate input level;
740 1 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
741 1 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
742 1 : return false
743 1 : }
744 1 : return true
745 : }
746 :
747 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
748 : // this compaction have revisions of the same user key present in both sstables,
749 : // when it shouldn't (eg. when splitting flushes).
750 1 : func (c *compaction) errorOnUserKeyOverlap(ve *manifest.VersionEdit) error {
751 1 : if n := len(ve.NewTables); n > 1 {
752 1 : meta := ve.NewTables[n-1].Meta
753 1 : prevMeta := ve.NewTables[n-2].Meta
754 1 : if !prevMeta.Largest().IsExclusiveSentinel() &&
755 1 : c.cmp(prevMeta.Largest().UserKey, meta.Smallest().UserKey) >= 0 {
756 1 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
757 1 : prevMeta.Largest().Pretty(c.formatKey),
758 1 : prevMeta.TableNum,
759 1 : meta.TableNum)
760 1 : }
761 : }
762 1 : return nil
763 : }
764 :
765 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
766 : // snapshots requiring them to be kept. It performs this determination by
767 : // looking at the TombstoneElision values which are set up based on sstables
768 : // which overlap the bounds of the compaction at a lower level in the LSM.
769 1 : func (c *compaction) allowZeroSeqNum() bool {
770 1 : // TODO(peter): we disable zeroing of seqnums during flushing to match
771 1 : // RocksDB behavior and to avoid generating overlapping sstables during
772 1 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
773 1 : // WAL is replayed building up a single version edit that is
774 1 : // applied. Because we don't apply the version edit after each flush, this
775 1 : // code doesn't know that L0 contains files and zeroing of seqnums should
776 1 : // be disabled. That is fixable, but it seems safer to just match the
777 1 : // RocksDB behavior for now.
778 1 : return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
779 1 : }
780 :
781 : // newInputIters returns an iterator over all the input tables in a compaction.
782 : func (c *compaction) newInputIters(
783 : newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter, iiopts internalIterOpts,
784 : ) (
785 : pointIter internalIterator,
786 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
787 : retErr error,
788 1 : ) {
789 1 : // Validate the ordering of compaction input files for defense in depth.
790 1 : if len(c.flushing) == 0 {
791 1 : if c.startLevel.level >= 0 {
792 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
793 1 : manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
794 1 : if err != nil {
795 1 : return nil, nil, nil, err
796 1 : }
797 : }
798 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
799 1 : manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
800 1 : if err != nil {
801 1 : return nil, nil, nil, err
802 1 : }
803 1 : if c.startLevel.level == 0 {
804 1 : if c.startLevel.l0SublevelInfo == nil {
805 0 : panic("l0SublevelInfo not created for compaction out of L0")
806 : }
807 1 : for _, info := range c.startLevel.l0SublevelInfo {
808 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
809 1 : info.sublevel, info.Iter())
810 1 : if err != nil {
811 1 : return nil, nil, nil, err
812 1 : }
813 : }
814 : }
815 1 : if len(c.extraLevels) > 0 {
816 1 : if len(c.extraLevels) > 1 {
817 0 : panic("n>2 multi level compaction not implemented yet")
818 : }
819 1 : interLevel := c.extraLevels[0]
820 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
821 1 : manifest.Level(interLevel.level), interLevel.files.Iter())
822 1 : if err != nil {
823 0 : return nil, nil, nil, err
824 0 : }
825 : }
826 : }
827 :
828 : // There are three classes of keys that a compaction needs to process: point
829 : // keys, range deletion tombstones and range keys. Collect all iterators for
830 : // all these classes of keys from all the levels. We'll aggregate them
831 : // together farther below.
832 : //
833 : // numInputLevels is an approximation of the number of iterator levels. Due
834 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
835 : // initial capacity.
836 1 : numInputLevels := max(len(c.flushing), len(c.inputs))
837 1 : iters := make([]internalIterator, 0, numInputLevels)
838 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
839 1 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
840 1 :
841 1 : // If construction of the iterator inputs fails, ensure that we close all
842 1 : // the consitutent iterators.
843 1 : defer func() {
844 1 : if retErr != nil {
845 1 : for _, iter := range iters {
846 1 : if iter != nil {
847 1 : _ = iter.Close()
848 1 : }
849 : }
850 1 : for _, rangeDelIter := range rangeDelIters {
851 0 : rangeDelIter.Close()
852 0 : }
853 : }
854 : }()
855 1 : iterOpts := IterOptions{
856 1 : Category: categoryCompaction,
857 1 : logger: c.logger,
858 1 : }
859 1 :
860 1 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
861 1 : // constituent iterators. This depends on whether this is a flush or a
862 1 : // compaction.
863 1 : if len(c.flushing) != 0 {
864 1 : // If flushing, we need to build the input iterators over the memtables
865 1 : // stored in c.flushing.
866 1 : for i := range c.flushing {
867 1 : f := c.flushing[i]
868 1 : iters = append(iters, f.newFlushIter(nil))
869 1 : rangeDelIter := f.newRangeDelIter(nil)
870 1 : if rangeDelIter != nil {
871 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
872 1 : }
873 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
874 1 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
875 1 : }
876 : }
877 1 : } else {
878 1 : addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
879 1 : // Add a *levelIter for point iterators. Because we don't call
880 1 : // initRangeDel, the levelIter will close and forget the range
881 1 : // deletion iterator when it steps on to a new file. Surfacing range
882 1 : // deletions to compactions are handled below.
883 1 : iters = append(iters, newLevelIter(context.Background(),
884 1 : iterOpts, c.comparer, newIters, level.files.Iter(), l, iiopts))
885 1 : // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
886 1 : // deletions into memory upfront. (See #2015, which reverted this.) There
887 1 : // will be no user keys that are split between sstables within a level in
888 1 : // Cockroach 23.1, which unblocks this optimization.
889 1 :
890 1 : // Add the range deletion iterator for each file as an independent level
891 1 : // in mergingIter, as opposed to making a levelIter out of those. This
892 1 : // is safer as levelIter expects all keys coming from underlying
893 1 : // iterators to be in order. Due to compaction / tombstone writing
894 1 : // logic in finishOutput(), it is possible for range tombstones to not
895 1 : // be strictly ordered across all files in one level.
896 1 : //
897 1 : // Consider this example from the metamorphic tests (also repeated in
898 1 : // finishOutput()), consisting of three L3 files with their bounds
899 1 : // specified in square brackets next to the file name:
900 1 : //
901 1 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
902 1 : // tmgc#391,MERGE [786e627a]
903 1 : // tmgc-udkatvs#331,RANGEDEL
904 1 : //
905 1 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
906 1 : // tmgc#384,MERGE [666c7070]
907 1 : // tmgc-tvsalezade#383,RANGEDEL
908 1 : // tmgc-tvsalezade#331,RANGEDEL
909 1 : //
910 1 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
911 1 : // tmgc-tvsalezade#383,RANGEDEL
912 1 : // tmgc#375,SET [72646c78766965616c72776865676e79]
913 1 : // tmgc-tvsalezade#356,RANGEDEL
914 1 : //
915 1 : // Here, the range tombstone in 000240.sst falls "after" one in
916 1 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
917 1 : // levelIter's purposes. While each file is still consistent before its
918 1 : // bounds, it's safer to have all rangedel iterators be visible to
919 1 : // mergingIter.
920 1 : iter := level.files.Iter()
921 1 : for f := iter.First(); f != nil; f = iter.Next() {
922 1 : rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, iiopts, l)
923 1 : if err != nil {
924 1 : // The error will already be annotated with the BackingFileNum, so
925 1 : // we annotate it with the FileNum.
926 1 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.TableNum))
927 1 : }
928 1 : if rangeDelIter == nil {
929 1 : continue
930 : }
931 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
932 1 : c.closers = append(c.closers, rangeDelIter)
933 : }
934 :
935 : // Check if this level has any range keys.
936 1 : hasRangeKeys := false
937 1 : for f := iter.First(); f != nil; f = iter.Next() {
938 1 : if f.HasRangeKeys {
939 1 : hasRangeKeys = true
940 1 : break
941 : }
942 : }
943 1 : if hasRangeKeys {
944 1 : newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
945 1 : rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
946 1 : if err != nil {
947 0 : return nil, err
948 1 : } else if rangeKeyIter == nil {
949 0 : return emptyKeyspanIter, nil
950 0 : }
951 : // Ensure that the range key iter is not closed until the compaction is
952 : // finished. This is necessary because range key processing
953 : // requires the range keys to be held in memory for up to the
954 : // lifetime of the compaction.
955 1 : noCloseIter := &noCloseIter{rangeKeyIter}
956 1 : c.closers = append(c.closers, noCloseIter)
957 1 :
958 1 : // We do not need to truncate range keys to sstable boundaries, or
959 1 : // only read within the file's atomic compaction units, unlike with
960 1 : // range tombstones. This is because range keys were added after we
961 1 : // stopped splitting user keys across sstables, so all the range keys
962 1 : // in this sstable must wholly lie within the file's bounds.
963 1 : return noCloseIter, err
964 : }
965 1 : li := keyspanimpl.NewLevelIter(
966 1 : context.Background(), keyspan.SpanIterOptions{}, c.cmp,
967 1 : newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
968 1 : )
969 1 : rangeKeyIters = append(rangeKeyIters, li)
970 : }
971 1 : return nil
972 : }
973 :
974 1 : for i := range c.inputs {
975 1 : // If the level is annotated with l0SublevelInfo, expand it into one
976 1 : // level per sublevel.
977 1 : // TODO(jackson): Perform this expansion even earlier when we pick the
978 1 : // compaction?
979 1 : if len(c.inputs[i].l0SublevelInfo) > 0 {
980 1 : for _, info := range c.startLevel.l0SublevelInfo {
981 1 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
982 1 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
983 1 : return nil, nil, nil, err
984 1 : }
985 : }
986 1 : continue
987 : }
988 1 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
989 1 : return nil, nil, nil, err
990 1 : }
991 : }
992 : }
993 :
994 : // If there's only one constituent point iterator, we can avoid the overhead
995 : // of a *mergingIter. This is possible, for example, when performing a flush
996 : // of a single memtable. Otherwise, combine all the iterators into a merging
997 : // iter.
998 1 : pointIter = iters[0]
999 1 : if len(iters) > 1 {
1000 1 : pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
1001 1 : }
1002 :
1003 : // In normal operation, levelIter iterates over the point operations in a
1004 : // level, and initializes a rangeDelIter pointer for the range deletions in
1005 : // each table. During compaction, we want to iterate over the merged view of
1006 : // point operations and range deletions. In order to do this we create one
1007 : // levelIter per level to iterate over the point operations, and collect up
1008 : // all the range deletion files.
1009 : //
1010 : // The range deletion levels are combined with a keyspanimpl.MergingIter. The
1011 : // resulting merged rangedel iterator is then included using an
1012 : // InterleavingIter.
1013 : // TODO(jackson): Consider using a defragmenting iterator to stitch together
1014 : // logical range deletions that were fragmented due to previous file
1015 : // boundaries.
1016 1 : if len(rangeDelIters) > 0 {
1017 1 : mi := &keyspanimpl.MergingIter{}
1018 1 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
1019 1 : rangeDelIter = mi
1020 1 : }
1021 :
1022 : // If there are range key iterators, we need to combine them using
1023 : // keyspanimpl.MergingIter, and then interleave them among the points.
1024 1 : if len(rangeKeyIters) > 0 {
1025 1 : mi := &keyspanimpl.MergingIter{}
1026 1 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
1027 1 : // TODO(radu): why do we have a defragmenter here but not above?
1028 1 : di := &keyspan.DefragmentingIter{}
1029 1 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
1030 1 : rangeKeyIter = di
1031 1 : }
1032 1 : return pointIter, rangeDelIter, rangeKeyIter, nil
1033 : }
1034 :
1035 : func (c *compaction) newRangeDelIter(
1036 : newIters tableNewIters,
1037 : f manifest.LevelFile,
1038 : opts IterOptions,
1039 : iiopts internalIterOpts,
1040 : l manifest.Layer,
1041 1 : ) (*noCloseIter, error) {
1042 1 : opts.layer = l
1043 1 : iterSet, err := newIters(context.Background(), f.TableMetadata, &opts,
1044 1 : internalIterOpts{
1045 1 : compaction: true,
1046 1 : readEnv: sstable.ReadEnv{Block: block.ReadEnv{BufferPool: &c.bufferPool}},
1047 1 : }, iterRangeDeletions)
1048 1 : if err != nil {
1049 1 : return nil, err
1050 1 : } else if iterSet.rangeDeletion == nil {
1051 1 : // The file doesn't contain any range deletions.
1052 1 : return nil, nil
1053 1 : }
1054 : // Ensure that rangeDelIter is not closed until the compaction is
1055 : // finished. This is necessary because range tombstone processing
1056 : // requires the range tombstones to be held in memory for up to the
1057 : // lifetime of the compaction.
1058 1 : return &noCloseIter{iterSet.rangeDeletion}, nil
1059 : }
1060 :
1061 1 : func (c *compaction) String() string {
1062 1 : if len(c.flushing) != 0 {
1063 0 : return "flush\n"
1064 0 : }
1065 :
1066 1 : var buf bytes.Buffer
1067 1 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
1068 1 : i := level - c.startLevel.level
1069 1 : fmt.Fprintf(&buf, "%d:", level)
1070 1 : for f := range c.inputs[i].files.All() {
1071 1 : fmt.Fprintf(&buf, " %s:%s-%s", f.TableNum, f.Smallest(), f.Largest())
1072 1 : }
1073 1 : fmt.Fprintf(&buf, "\n")
1074 : }
1075 1 : return buf.String()
1076 : }
1077 :
1078 : type manualCompaction struct {
1079 : // id is for internal bookkeeping.
1080 : id uint64
1081 : // Count of the retries due to concurrent compaction to overlapping levels.
1082 : retries int
1083 : level int
1084 : outputLevel int
1085 : done chan error
1086 : start []byte
1087 : end []byte
1088 : split bool
1089 : }
1090 :
1091 : type readCompaction struct {
1092 : level int
1093 : // [start, end] key ranges are used for de-duping.
1094 : start []byte
1095 : end []byte
1096 :
1097 : // The file associated with the compaction.
1098 : // If the file no longer belongs in the same
1099 : // level, then we skip the compaction.
1100 : tableNum base.TableNum
1101 : }
1102 :
1103 1 : func (d *DB) addInProgressCompaction(c *compaction) {
1104 1 : d.mu.compact.inProgress[c] = struct{}{}
1105 1 : var isBase, isIntraL0 bool
1106 1 : for _, cl := range c.inputs {
1107 1 : for f := range cl.files.All() {
1108 1 : if f.IsCompacting() {
1109 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
1110 0 : }
1111 1 : f.SetCompactionState(manifest.CompactionStateCompacting)
1112 1 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
1113 1 : if c.outputLevel.level == 0 {
1114 1 : f.IsIntraL0Compacting = true
1115 1 : isIntraL0 = true
1116 1 : } else {
1117 1 : isBase = true
1118 1 : }
1119 : }
1120 : }
1121 : }
1122 :
1123 1 : if isIntraL0 || isBase {
1124 1 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
1125 1 : if isIntraL0 {
1126 1 : l0Inputs = append(l0Inputs, c.outputLevel.files)
1127 1 : }
1128 1 : if err := d.mu.versions.l0Organizer.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
1129 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
1130 0 : }
1131 : }
1132 : }
1133 :
1134 : // Removes compaction markers from files in a compaction. The rollback parameter
1135 : // indicates whether the compaction state should be rolled back to its original
1136 : // state in the case of an unsuccessful compaction.
1137 : //
1138 : // DB.mu must be held when calling this method, however this method can drop and
1139 : // re-acquire that mutex. All writes to the manifest for this compaction should
1140 : // have completed by this point.
1141 1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
1142 1 : c.versionEditApplied = true
1143 1 : for _, cl := range c.inputs {
1144 1 : for f := range cl.files.All() {
1145 1 : if !f.IsCompacting() {
1146 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
1147 0 : }
1148 1 : if !rollback {
1149 1 : // On success all compactions other than move and delete-only compactions
1150 1 : // transition the file into the Compacted state. Move-compacted files
1151 1 : // become eligible for compaction again and transition back to NotCompacting.
1152 1 : // Delete-only compactions could, on rare occasion, leave files untouched
1153 1 : // (eg. if files have a loose bound), so we revert them all to NotCompacting
1154 1 : // just in case they need to be compacted again.
1155 1 : if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
1156 1 : f.SetCompactionState(manifest.CompactionStateCompacted)
1157 1 : } else {
1158 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1159 1 : }
1160 1 : } else {
1161 1 : // Else, on rollback, all input files unconditionally transition back to
1162 1 : // NotCompacting.
1163 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1164 1 : }
1165 1 : f.IsIntraL0Compacting = false
1166 : }
1167 : }
1168 1 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1169 1 : func() {
1170 1 : // InitCompactingFileInfo requires that no other manifest writes be
1171 1 : // happening in parallel with it, i.e. we're not in the midst of installing
1172 1 : // another version. Otherwise, it's possible that we've created another
1173 1 : // L0Sublevels instance, but not added it to the versions list, causing
1174 1 : // all the indices in TableMetadata to be inaccurate. To ensure this,
1175 1 : // grab the manifest lock.
1176 1 : d.mu.versions.logLock()
1177 1 : // It is a bit peculiar that we are fiddling with th current version state
1178 1 : // in a separate critical section from when this version was installed.
1179 1 : // But this fiddling is necessary if the compaction failed. When the
1180 1 : // compaction succeeded, we've already done this in UpdateVersionLocked, so
1181 1 : // this seems redundant. Anyway, we clear the pickedCompactionCache since we
1182 1 : // may be able to pick a better compaction (though when this compaction
1183 1 : // succeeded we've also cleared the cache in UpdateVersionLocked).
1184 1 : defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
1185 1 : d.mu.versions.l0Organizer.InitCompactingFileInfo(l0InProgress)
1186 1 : }()
1187 : }
1188 :
1189 1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1190 1 : space, err := d.opts.FS.GetDiskUsage(d.dirname)
1191 1 : if err != nil {
1192 1 : if !errors.Is(err, vfs.ErrUnsupported) {
1193 1 : d.opts.EventListener.BackgroundError(err)
1194 1 : }
1195 : // Return the last value we managed to obtain.
1196 1 : return d.diskAvailBytes.Load()
1197 : }
1198 :
1199 1 : d.lowDiskSpaceReporter.Report(space.AvailBytes, space.TotalBytes, d.opts.EventListener)
1200 1 : d.diskAvailBytes.Store(space.AvailBytes)
1201 1 : return space.AvailBytes
1202 : }
1203 :
1204 : // maybeScheduleFlush schedules a flush if necessary.
1205 : //
1206 : // d.mu must be held when calling this.
1207 1 : func (d *DB) maybeScheduleFlush() {
1208 1 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1209 1 : return
1210 1 : }
1211 1 : if len(d.mu.mem.queue) <= 1 {
1212 1 : return
1213 1 : }
1214 :
1215 1 : if !d.passedFlushThreshold() {
1216 1 : return
1217 1 : }
1218 :
1219 1 : d.mu.compact.flushing = true
1220 1 : go d.flush()
1221 : }
1222 :
1223 1 : func (d *DB) passedFlushThreshold() bool {
1224 1 : var n int
1225 1 : var size uint64
1226 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1227 1 : if !d.mu.mem.queue[n].readyForFlush() {
1228 1 : break
1229 : }
1230 1 : if d.mu.mem.queue[n].flushForced {
1231 1 : // A flush was forced. Pretend the memtable size is the configured
1232 1 : // size. See minFlushSize below.
1233 1 : size += d.opts.MemTableSize
1234 1 : } else {
1235 1 : size += d.mu.mem.queue[n].totalBytes()
1236 1 : }
1237 : }
1238 1 : if n == 0 {
1239 1 : // None of the immutable memtables are ready for flushing.
1240 1 : return false
1241 1 : }
1242 :
1243 : // Only flush once the sum of the queued memtable sizes exceeds half the
1244 : // configured memtable size. This prevents flushing of memtables at startup
1245 : // while we're undergoing the ramp period on the memtable size. See
1246 : // DB.newMemTable().
1247 1 : minFlushSize := d.opts.MemTableSize / 2
1248 1 : return size >= minFlushSize
1249 : }
1250 :
1251 1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1252 1 : var mem *flushableEntry
1253 1 : for _, m := range d.mu.mem.queue {
1254 1 : if m.flushable == tbl {
1255 1 : mem = m
1256 1 : break
1257 : }
1258 : }
1259 1 : if mem == nil || mem.flushForced {
1260 1 : return
1261 1 : }
1262 1 : deadline := d.timeNow().Add(dur)
1263 1 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1264 1 : // Already scheduled to flush sooner than within `dur`.
1265 1 : return
1266 1 : }
1267 1 : mem.delayedFlushForcedAt = deadline
1268 1 : go func() {
1269 1 : timer := time.NewTimer(dur)
1270 1 : defer timer.Stop()
1271 1 :
1272 1 : select {
1273 1 : case <-d.closedCh:
1274 1 : return
1275 1 : case <-mem.flushed:
1276 1 : return
1277 1 : case <-timer.C:
1278 1 : d.commit.mu.Lock()
1279 1 : defer d.commit.mu.Unlock()
1280 1 : d.mu.Lock()
1281 1 : defer d.mu.Unlock()
1282 1 :
1283 1 : // NB: The timer may fire concurrently with a call to Close. If a
1284 1 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1285 1 : // and it's too late to flush anything. Otherwise, the Close call
1286 1 : // will block on locking d.mu until we've finished scheduling the
1287 1 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1288 1 : // for the current flush to complete.
1289 1 : if d.closed.Load() != nil {
1290 1 : return
1291 1 : }
1292 :
1293 1 : if d.mu.mem.mutable == tbl {
1294 1 : _ = d.makeRoomForWrite(nil)
1295 1 : } else {
1296 1 : mem.flushForced = true
1297 1 : }
1298 1 : d.maybeScheduleFlush()
1299 : }
1300 : }()
1301 : }
1302 :
1303 1 : func (d *DB) flush() {
1304 1 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1305 1 : flushingWorkStart := crtime.NowMono()
1306 1 : d.mu.Lock()
1307 1 : defer d.mu.Unlock()
1308 1 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1309 1 : var bytesFlushed uint64
1310 1 : var err error
1311 1 : if bytesFlushed, err = d.flush1(); err != nil {
1312 1 : // TODO(peter): count consecutive flush errors and backoff.
1313 1 : d.opts.EventListener.BackgroundError(err)
1314 1 : }
1315 1 : d.mu.compact.flushing = false
1316 1 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
1317 1 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1318 1 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1319 1 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1320 1 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1321 1 : // More flush work may have arrived while we were flushing, so schedule
1322 1 : // another flush if needed.
1323 1 : d.maybeScheduleFlush()
1324 1 : // Let the CompactionScheduler know, so that it can react immediately to
1325 1 : // an increase in DB.GetAllowedWithoutPermission.
1326 1 : d.opts.Experimental.CompactionScheduler.UpdateGetAllowedWithoutPermission()
1327 1 : // The flush may have produced too many files in a level, so schedule a
1328 1 : // compaction if needed.
1329 1 : d.maybeScheduleCompaction()
1330 1 : d.mu.compact.cond.Broadcast()
1331 : })
1332 : }
1333 :
1334 : // runIngestFlush is used to generate a flush version edit for sstables which
1335 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1336 : // while runIngestFlush is called.
1337 1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1338 1 : if len(c.flushing) != 1 {
1339 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1340 : }
1341 :
1342 : // Finding the target level for ingestion must use the latest version
1343 : // after the logLock has been acquired.
1344 1 : c.version = d.mu.versions.currentVersion()
1345 1 :
1346 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1347 1 : ve := &manifest.VersionEdit{}
1348 1 : var ingestSplitFiles []ingestSplitFile
1349 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1350 1 :
1351 1 : updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
1352 1 : levelMetrics := c.metrics[level]
1353 1 : if levelMetrics == nil {
1354 1 : levelMetrics = &LevelMetrics{}
1355 1 : c.metrics[level] = levelMetrics
1356 1 : }
1357 1 : levelMetrics.TablesCount--
1358 1 : levelMetrics.TablesSize -= int64(m.Size)
1359 1 : levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
1360 1 : for i := range added {
1361 1 : levelMetrics.TablesCount++
1362 1 : levelMetrics.TablesSize += int64(added[i].Meta.Size)
1363 1 : levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
1364 1 : }
1365 : }
1366 :
1367 1 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1368 1 : d.FormatMajorVersion() >= FormatVirtualSSTables
1369 1 :
1370 1 : if suggestSplit || ingestFlushable.exciseSpan.Valid() {
1371 1 : // We could add deleted files to ve.
1372 1 : ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
1373 1 : }
1374 :
1375 1 : ctx := context.Background()
1376 1 : overlapChecker := &overlapChecker{
1377 1 : comparer: d.opts.Comparer,
1378 1 : newIters: d.newIters,
1379 1 : opts: IterOptions{
1380 1 : logger: d.opts.Logger,
1381 1 : Category: categoryIngest,
1382 1 : },
1383 1 : v: c.version,
1384 1 : }
1385 1 : replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
1386 1 : for _, file := range ingestFlushable.files {
1387 1 : var fileToSplit *manifest.TableMetadata
1388 1 : var level int
1389 1 :
1390 1 : // This file fits perfectly within the excise span, so we can slot it at L6.
1391 1 : if ingestFlushable.exciseSpan.Valid() &&
1392 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.Smallest()) &&
1393 1 : ingestFlushable.exciseSpan.Contains(d.cmp, file.Largest()) {
1394 1 : level = 6
1395 1 : } else {
1396 1 : // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
1397 1 : lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
1398 1 : if err != nil {
1399 0 : return nil, err
1400 0 : }
1401 1 : level, fileToSplit, err = ingestTargetLevel(
1402 1 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file, suggestSplit,
1403 1 : )
1404 1 : if err != nil {
1405 0 : return nil, err
1406 0 : }
1407 : }
1408 :
1409 : // Add the current flushableIngest file to the version.
1410 1 : ve.NewTables = append(ve.NewTables, manifest.NewTableEntry{Level: level, Meta: file})
1411 1 : if fileToSplit != nil {
1412 0 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1413 0 : ingestFile: file,
1414 0 : splitFile: fileToSplit,
1415 0 : level: level,
1416 0 : })
1417 0 : }
1418 1 : levelMetrics := c.metrics[level]
1419 1 : if levelMetrics == nil {
1420 1 : levelMetrics = &LevelMetrics{}
1421 1 : c.metrics[level] = levelMetrics
1422 1 : }
1423 1 : levelMetrics.TableBytesIngested += file.Size
1424 1 : levelMetrics.TablesIngested++
1425 : }
1426 1 : if ingestFlushable.exciseSpan.Valid() {
1427 1 : exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
1428 1 : // Iterate through all levels and find files that intersect with exciseSpan.
1429 1 : for layer, ls := range c.version.AllLevelsAndSublevels() {
1430 1 : for m := range ls.Overlaps(d.cmp, ingestFlushable.exciseSpan.UserKeyBounds()).All() {
1431 1 : leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, m, layer.Level(), tightExciseBounds)
1432 1 : if err != nil {
1433 0 : return nil, err
1434 0 : }
1435 1 : newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
1436 1 : replacedTables[m.TableNum] = newFiles
1437 1 : updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
1438 : }
1439 : }
1440 : }
1441 :
1442 1 : if len(ingestSplitFiles) > 0 {
1443 0 : if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedTables); err != nil {
1444 0 : return nil, err
1445 0 : }
1446 : }
1447 :
1448 1 : return ve, nil
1449 : }
1450 :
1451 : // flush runs a compaction that copies the immutable memtables from memory to
1452 : // disk.
1453 : //
1454 : // d.mu must be held when calling this, but the mutex may be dropped and
1455 : // re-acquired during the course of this method.
1456 1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1457 1 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1458 1 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1459 1 : // level in the lsm. Let's say the flushable queue contains a prefix of
1460 1 : // regular immutable memtables, then an ingestedFlushable, and then the
1461 1 : // mutable memtable. When the flush of the ingestedFlushable is performed,
1462 1 : // it needs an updated view of the lsm. That is, the prefix of immutable
1463 1 : // memtables must have already been flushed. Similarly, if there are two
1464 1 : // contiguous ingestedFlushables in the queue, then the first flushable must
1465 1 : // be flushed, so that the second flushable can see an updated view of the
1466 1 : // lsm.
1467 1 : //
1468 1 : // Given the above, we restrict flushes to either some prefix of regular
1469 1 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
1470 1 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
1471 1 : // the remaining flush work should be scheduled right away.
1472 1 : //
1473 1 : // NB: Large batches placed in the flushable queue share the WAL with the
1474 1 : // previous memtable in the queue. We must ensure the property that both the
1475 1 : // large batch and the memtable with which it shares a WAL are flushed
1476 1 : // together. The property ensures that the minimum unflushed log number
1477 1 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
1478 1 : // returns true, and since the large batch will always be placed right after
1479 1 : // the memtable with which it shares a WAL, the property is naturally
1480 1 : // ensured. The large batch will always be placed after the memtable with
1481 1 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
1482 1 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
1483 1 : // measure, if we try to flush the memtable without also flushing the
1484 1 : // flushable batch in the same flush, since the memtable and flushableBatch
1485 1 : // have the same logNum, the logNum invariant check below will trigger.
1486 1 : var n, inputs int
1487 1 : var inputBytes uint64
1488 1 : var ingest bool
1489 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1490 1 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
1491 1 : if n == 0 {
1492 1 : // The first flushable is of type ingestedFlushable. Since these
1493 1 : // must be flushed individually, we perform a flush for just
1494 1 : // this.
1495 1 : if !f.readyForFlush() {
1496 0 : // This check is almost unnecessary, but we guard against it
1497 0 : // just in case this invariant changes in the future.
1498 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
1499 : }
1500 : // By setting n = 1, we ensure that the first flushable(n == 0)
1501 : // is scheduled for a flush. The number of tables added is equal to the
1502 : // number of files in the ingest operation.
1503 1 : n = 1
1504 1 : inputs = len(f.files)
1505 1 : ingest = true
1506 1 : break
1507 1 : } else {
1508 1 : // There was some prefix of flushables which weren't of type
1509 1 : // ingestedFlushable. So, perform a flush for those.
1510 1 : break
1511 : }
1512 : }
1513 1 : if !d.mu.mem.queue[n].readyForFlush() {
1514 1 : break
1515 : }
1516 1 : inputBytes += d.mu.mem.queue[n].inuseBytes()
1517 : }
1518 1 : if n == 0 {
1519 0 : // None of the immutable memtables are ready for flushing.
1520 0 : return 0, nil
1521 0 : }
1522 1 : if !ingest {
1523 1 : // Flushes of memtables add the prefix of n memtables from the flushable
1524 1 : // queue.
1525 1 : inputs = n
1526 1 : }
1527 :
1528 : // Require that every memtable being flushed has a log number less than the
1529 : // new minimum unflushed log number.
1530 1 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
1531 1 : if !d.opts.DisableWAL {
1532 1 : for i := 0; i < n; i++ {
1533 1 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
1534 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
1535 0 : n,
1536 0 : i, d.mu.mem.queue[i].flushable, logNum,
1537 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
1538 : }
1539 : }
1540 : }
1541 :
1542 1 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.l0Organizer,
1543 1 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow(), d.TableFormat(), d.determineCompactionValueSeparation)
1544 1 : if err != nil {
1545 0 : return 0, err
1546 0 : }
1547 1 : d.addInProgressCompaction(c)
1548 1 :
1549 1 : jobID := d.newJobIDLocked()
1550 1 : info := FlushInfo{
1551 1 : JobID: int(jobID),
1552 1 : Input: inputs,
1553 1 : InputBytes: inputBytes,
1554 1 : Ingest: ingest,
1555 1 : }
1556 1 : d.opts.EventListener.FlushBegin(info)
1557 1 :
1558 1 : startTime := d.timeNow()
1559 1 :
1560 1 : var ve *manifest.VersionEdit
1561 1 : var stats compact.Stats
1562 1 : // To determine the target level of the files in the ingestedFlushable, we
1563 1 : // need to acquire the logLock, and not release it for that duration. Since
1564 1 : // UpdateVersionLocked acquires it anyway, we create the VersionEdit for
1565 1 : // ingestedFlushable outside runCompaction. For all other flush cases, we
1566 1 : // construct the VersionEdit inside runCompaction.
1567 1 : var compactionErr error
1568 1 : if c.kind != compactionKindIngestedFlushable {
1569 1 : ve, stats, compactionErr = d.runCompaction(jobID, c)
1570 1 : }
1571 :
1572 1 : err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1573 1 : err := compactionErr
1574 1 : if c.kind == compactionKindIngestedFlushable {
1575 1 : ve, err = d.runIngestFlush(c)
1576 1 : }
1577 1 : info.Duration = d.timeNow().Sub(startTime)
1578 1 : if err != nil {
1579 1 : return versionUpdate{}, err
1580 1 : }
1581 :
1582 1 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
1583 1 : for i := range ve.NewTables {
1584 1 : e := &ve.NewTables[i]
1585 1 : info.Output = append(info.Output, e.Meta.TableInfo())
1586 1 : // Ingested tables are not necessarily flushed to L0. Record the level of
1587 1 : // each ingested file explicitly.
1588 1 : if ingest {
1589 1 : info.IngestLevels = append(info.IngestLevels, e.Level)
1590 1 : }
1591 : }
1592 :
1593 : // The flush succeeded or it produced an empty sstable. In either case we
1594 : // want to bump the minimum unflushed log number to the log number of the
1595 : // oldest unflushed memtable.
1596 1 : ve.MinUnflushedLogNum = minUnflushedLogNum
1597 1 : if c.kind != compactionKindIngestedFlushable {
1598 1 : l0Metrics := c.metrics[0]
1599 1 : if d.opts.DisableWAL {
1600 1 : // If the WAL is disabled, every flushable has a zero [logSize],
1601 1 : // resulting in zero bytes in. Instead, use the number of bytes we
1602 1 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
1603 1 : // calculation even when the WAL is disabled.
1604 1 : l0Metrics.TableBytesIn = l0Metrics.TableBytesFlushed + l0Metrics.BlobBytesFlushed
1605 1 : } else {
1606 1 : for i := 0; i < n; i++ {
1607 1 : l0Metrics.TableBytesIn += d.mu.mem.queue[i].logSize
1608 1 : }
1609 : }
1610 1 : } else {
1611 1 : // c.kind == compactionKindIngestedFlushable && we could have deleted files due
1612 1 : // to ingest-time splits or excises.
1613 1 : ingestFlushable := c.flushing[0].flushable.(*ingestedFlushable)
1614 1 : for c2 := range d.mu.compact.inProgress {
1615 1 : // Check if this compaction overlaps with the excise span. Note that just
1616 1 : // checking if the inputs individually overlap with the excise span
1617 1 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
1618 1 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
1619 1 : // doing a [c,d) excise at the same time as this compaction, we will have
1620 1 : // to error out the whole compaction as we can't guarantee it hasn't/won't
1621 1 : // write a file overlapping with the excise span.
1622 1 : if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
1623 1 : c2.cancel.Store(true)
1624 1 : }
1625 : }
1626 :
1627 1 : if len(ve.DeletedTables) > 0 {
1628 1 : // Iterate through all other compactions, and check if their inputs have
1629 1 : // been replaced due to an ingest-time split or excise. In that case,
1630 1 : // cancel the compaction.
1631 1 : for c2 := range d.mu.compact.inProgress {
1632 1 : for i := range c2.inputs {
1633 1 : for f := range c2.inputs[i].files.All() {
1634 1 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{FileNum: f.TableNum, Level: c2.inputs[i].level}]; ok {
1635 1 : c2.cancel.Store(true)
1636 1 : break
1637 : }
1638 : }
1639 : }
1640 : }
1641 : }
1642 : }
1643 1 : return versionUpdate{
1644 1 : VE: ve,
1645 1 : JobID: jobID,
1646 1 : Metrics: c.metrics,
1647 1 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
1648 : }, nil
1649 : })
1650 :
1651 : // If err != nil, then the flush will be retried, and we will recalculate
1652 : // these metrics.
1653 1 : if err == nil {
1654 1 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
1655 1 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
1656 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
1657 1 : }
1658 :
1659 1 : d.clearCompactingState(c, err != nil)
1660 1 : delete(d.mu.compact.inProgress, c)
1661 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics, c.bytesWritten.Load(), err)
1662 1 :
1663 1 : var flushed flushableList
1664 1 : if err == nil {
1665 1 : flushed = d.mu.mem.queue[:n]
1666 1 : d.mu.mem.queue = d.mu.mem.queue[n:]
1667 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1668 1 : d.updateTableStatsLocked(ve.NewTables)
1669 1 : if ingest {
1670 1 : d.mu.versions.metrics.Flush.AsIngestCount++
1671 1 : for _, l := range c.metrics {
1672 1 : if l != nil {
1673 1 : d.mu.versions.metrics.Flush.AsIngestBytes += l.TableBytesIngested
1674 1 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1675 1 : }
1676 : }
1677 : }
1678 1 : d.maybeTransitionSnapshotsToFileOnlyLocked()
1679 : }
1680 : // Signal FlushEnd after installing the new readState. This helps for unit
1681 : // tests that use the callback to trigger a read using an iterator with
1682 : // IterOptions.OnlyReadGuaranteedDurable.
1683 1 : info.Err = err
1684 1 : if info.Err == nil && len(ve.NewTables) == 0 {
1685 1 : info.Err = errEmptyTable
1686 1 : }
1687 1 : info.Done = true
1688 1 : info.TotalDuration = d.timeNow().Sub(startTime)
1689 1 : d.opts.EventListener.FlushEnd(info)
1690 1 :
1691 1 : // The order of these operations matters here for ease of testing.
1692 1 : // Removing the reader reference first allows tests to be guaranteed that
1693 1 : // the memtable reservation has been released by the time a synchronous
1694 1 : // flush returns. readerUnrefLocked may also produce obsolete files so the
1695 1 : // call to deleteObsoleteFiles must happen after it.
1696 1 : for i := range flushed {
1697 1 : flushed[i].readerUnrefLocked(true)
1698 1 : }
1699 :
1700 1 : d.deleteObsoleteFiles(jobID)
1701 1 :
1702 1 : // Mark all the memtables we flushed as flushed.
1703 1 : for i := range flushed {
1704 1 : close(flushed[i].flushed)
1705 1 : }
1706 :
1707 1 : return inputBytes, err
1708 : }
1709 :
1710 : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
1711 : // file-only" snapshots to be file-only if all their visible state has been
1712 : // flushed to sstables.
1713 : //
1714 : // REQUIRES: d.mu.
1715 1 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
1716 1 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
1717 1 : currentVersion := d.mu.versions.currentVersion()
1718 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
1719 1 : if s.efos == nil {
1720 1 : s = s.next
1721 1 : continue
1722 : }
1723 1 : overlapsFlushable := false
1724 1 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
1725 1 : // There are some unflushed keys that are still visible to the EFOS.
1726 1 : // Check if any memtables older than the EFOS contain keys within a
1727 1 : // protected range of the EFOS. If no, we can transition.
1728 1 : protectedRanges := make([]bounded, len(s.efos.protectedRanges))
1729 1 : for i := range s.efos.protectedRanges {
1730 1 : protectedRanges[i] = s.efos.protectedRanges[i]
1731 1 : }
1732 1 : for i := range d.mu.mem.queue {
1733 1 : if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
1734 0 : // All keys in this memtable are newer than the EFOS. Skip this
1735 0 : // memtable.
1736 0 : continue
1737 : }
1738 : // NB: computePossibleOverlaps could have false positives, such as if
1739 : // the flushable is a flushable ingest and not a memtable. In that
1740 : // case we don't open the sstables to check; we just pessimistically
1741 : // assume an overlap.
1742 1 : d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
1743 1 : overlapsFlushable = true
1744 1 : return stopIteration
1745 1 : }, protectedRanges...)
1746 1 : if overlapsFlushable {
1747 1 : break
1748 : }
1749 : }
1750 : }
1751 1 : if overlapsFlushable {
1752 1 : s = s.next
1753 1 : continue
1754 : }
1755 1 : currentVersion.Ref()
1756 1 :
1757 1 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
1758 1 : // case s.next would be nil. Save it before calling it.
1759 1 : next := s.next
1760 1 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
1761 1 : s = next
1762 : }
1763 : }
1764 :
1765 : // maybeScheduleCompactionAsync should be used when
1766 : // we want to possibly schedule a compaction, but don't
1767 : // want to eat the cost of running maybeScheduleCompaction.
1768 : // This method should be launched in a separate goroutine.
1769 : // d.mu must not be held when this is called.
1770 0 : func (d *DB) maybeScheduleCompactionAsync() {
1771 0 : defer d.compactionSchedulers.Done()
1772 0 :
1773 0 : d.mu.Lock()
1774 0 : d.maybeScheduleCompaction()
1775 0 : d.mu.Unlock()
1776 0 : }
1777 :
1778 : // maybeScheduleCompaction schedules a compaction if necessary.
1779 : //
1780 : // WARNING: maybeScheduleCompaction and Schedule must be the only ways that
1781 : // any compactions are run. These ensure that the pickedCompactionCache is
1782 : // used and not stale (by ensuring invalidation is done).
1783 : //
1784 : // Even compactions that are not scheduled by the CompactionScheduler must be
1785 : // run using maybeScheduleCompaction, since starting those compactions needs
1786 : // to invalidate the pickedCompactionCache.
1787 : //
1788 : // Requires d.mu to be held.
1789 1 : func (d *DB) maybeScheduleCompaction() {
1790 1 : d.mu.versions.logLock()
1791 1 : defer d.mu.versions.logUnlock()
1792 1 : env := d.makeCompactionEnvLocked()
1793 1 : if env == nil {
1794 1 : return
1795 1 : }
1796 : // env.inProgressCompactions will become stale once we pick a compaction, so
1797 : // it needs to be kept fresh. Also, the pickedCompaction in the
1798 : // pickedCompactionCache is not valid if we pick a compaction before using
1799 : // it, since those earlier compactions can mark the same file as compacting.
1800 :
1801 : // Delete-only compactions are expected to be cheap and reduce future
1802 : // compaction work, so schedule them directly instead of using the
1803 : // CompactionScheduler.
1804 1 : if d.tryScheduleDeleteOnlyCompaction() {
1805 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1806 1 : d.mu.versions.pickedCompactionCache.invalidate()
1807 1 : }
1808 : // Download compactions have their own concurrency and do not currently
1809 : // interact with CompactionScheduler.
1810 : //
1811 : // TODO(sumeer): integrate with CompactionScheduler, since these consume
1812 : // disk write bandwidth.
1813 1 : if d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) {
1814 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1815 1 : d.mu.versions.pickedCompactionCache.invalidate()
1816 1 : }
1817 : // The remaining compactions are scheduled by the CompactionScheduler.
1818 1 : if d.mu.versions.pickedCompactionCache.isWaiting() {
1819 1 : // CompactionScheduler already knows that the DB is waiting to run a
1820 1 : // compaction.
1821 1 : return
1822 1 : }
1823 : // INVARIANT: !pickedCompactionCache.isWaiting. The following loop will
1824 : // either exit after successfully starting all the compactions it can pick,
1825 : // or will exit with one pickedCompaction in the cache, and isWaiting=true.
1826 1 : for {
1827 1 : // Do not have a pickedCompaction in the cache.
1828 1 : pc := d.pickAnyCompaction(*env)
1829 1 : if pc == nil {
1830 1 : return
1831 1 : }
1832 1 : success, grantHandle := d.opts.Experimental.CompactionScheduler.TrySchedule()
1833 1 : if !success {
1834 1 : // Can't run now, but remember this pickedCompaction in the cache.
1835 1 : d.mu.versions.pickedCompactionCache.add(pc)
1836 1 : return
1837 1 : }
1838 1 : d.runPickedCompaction(pc, grantHandle)
1839 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1840 : }
1841 : }
1842 :
1843 : // makeCompactionEnv attempts to create a compactionEnv necessary during
1844 : // compaction picking. If the DB is closed or marked as read-only,
1845 : // makeCompactionEnv returns nil to indicate that compactions may not be
1846 : // performed. Else, a new compactionEnv is constructed using the current DB
1847 : // state.
1848 : //
1849 : // Compaction picking needs a coherent view of a Version. For example, we need
1850 : // to exclude concurrent ingestions from making a decision on which level to
1851 : // ingest into that conflicts with our compaction decision.
1852 : //
1853 : // A pickedCompaction constructed using a compactionEnv must only be used if
1854 : // the latest Version has not changed.
1855 : //
1856 : // REQUIRES: d.mu and d.mu.versions.logLock are held.
1857 1 : func (d *DB) makeCompactionEnvLocked() *compactionEnv {
1858 1 : if d.closed.Load() != nil || d.opts.ReadOnly {
1859 1 : return nil
1860 1 : }
1861 1 : env := &compactionEnv{
1862 1 : diskAvailBytes: d.diskAvailBytes.Load(),
1863 1 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
1864 1 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
1865 1 : inProgressCompactions: d.getInProgressCompactionInfoLocked(nil),
1866 1 : readCompactionEnv: readCompactionEnv{
1867 1 : readCompactions: &d.mu.compact.readCompactions,
1868 1 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
1869 1 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
1870 1 : },
1871 1 : }
1872 1 : if !d.problemSpans.IsEmpty() {
1873 1 : env.problemSpans = &d.problemSpans
1874 1 : }
1875 1 : return env
1876 : }
1877 :
1878 : // pickAnyCompaction tries to pick a manual or automatic compaction.
1879 1 : func (d *DB) pickAnyCompaction(env compactionEnv) (pc *pickedCompaction) {
1880 1 : // Pick a score-based compaction first, since a misshapen LSM is bad.
1881 1 : if !d.opts.DisableAutomaticCompactions {
1882 1 : if pc = d.mu.versions.picker.pickAutoScore(env); pc != nil {
1883 1 : return pc
1884 1 : }
1885 : }
1886 : // Pick a manual compaction, if any.
1887 1 : if pc = d.pickManualCompaction(env); pc != nil {
1888 1 : return pc
1889 1 : }
1890 1 : if !d.opts.DisableAutomaticCompactions {
1891 1 : return d.mu.versions.picker.pickAutoNonScore(env)
1892 1 : }
1893 1 : return nil
1894 : }
1895 :
1896 : // runPickedCompaction kicks off the provided pickedCompaction. In case the
1897 : // pickedCompaction is a manual compaction, the corresponding manualCompaction
1898 : // is removed from d.mu.compact.manual.
1899 : //
1900 : // REQUIRES: d.mu and d.mu.versions.logLock is held.
1901 1 : func (d *DB) runPickedCompaction(pc *pickedCompaction, grantHandle CompactionGrantHandle) {
1902 1 : var doneChannel chan error
1903 1 : if pc.manualID > 0 {
1904 1 : for i := range d.mu.compact.manual {
1905 1 : if d.mu.compact.manual[i].id == pc.manualID {
1906 1 : doneChannel = d.mu.compact.manual[i].done
1907 1 : d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
1908 1 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
1909 1 : break
1910 : }
1911 : }
1912 1 : if doneChannel == nil {
1913 0 : panic(errors.AssertionFailedf("did not find manual compaction with id %d", pc.manualID))
1914 : }
1915 : }
1916 :
1917 1 : d.mu.compact.compactingCount++
1918 1 : compaction := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider(), grantHandle, d.TableFormat(), d.determineCompactionValueSeparation)
1919 1 : d.addInProgressCompaction(compaction)
1920 1 : go func() {
1921 1 : d.compact(compaction, doneChannel)
1922 1 : }()
1923 : }
1924 :
1925 : // Schedule implements DBForCompaction (it is called by the
1926 : // CompactionScheduler).
1927 1 : func (d *DB) Schedule(grantHandle CompactionGrantHandle) bool {
1928 1 : d.mu.Lock()
1929 1 : defer d.mu.Unlock()
1930 1 : d.mu.versions.logLock()
1931 1 : defer d.mu.versions.logUnlock()
1932 1 : isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
1933 1 : if !isWaiting {
1934 0 : return false
1935 0 : }
1936 1 : pc := d.mu.versions.pickedCompactionCache.getForRunning()
1937 1 : if pc == nil {
1938 1 : env := d.makeCompactionEnvLocked()
1939 1 : if env != nil {
1940 1 : pc = d.pickAnyCompaction(*env)
1941 1 : }
1942 1 : if pc == nil {
1943 0 : d.mu.versions.pickedCompactionCache.setNotWaiting()
1944 0 : return false
1945 0 : }
1946 : }
1947 : // INVARIANT: pc != nil and is not in the cache. isWaiting is true, since
1948 : // there may be more compactions to run.
1949 1 : d.runPickedCompaction(pc, grantHandle)
1950 1 : return true
1951 : }
1952 :
1953 : // GetWaitingCompaction implements DBForCompaction (it is called by the
1954 : // CompactionScheduler).
1955 1 : func (d *DB) GetWaitingCompaction() (bool, WaitingCompaction) {
1956 1 : d.mu.Lock()
1957 1 : defer d.mu.Unlock()
1958 1 : d.mu.versions.logLock()
1959 1 : defer d.mu.versions.logUnlock()
1960 1 : isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
1961 1 : if !isWaiting {
1962 1 : return false, WaitingCompaction{}
1963 1 : }
1964 1 : pc := d.mu.versions.pickedCompactionCache.peek()
1965 1 : if pc == nil {
1966 1 : // Need to pick a compaction.
1967 1 : env := d.makeCompactionEnvLocked()
1968 1 : if env != nil {
1969 1 : pc = d.pickAnyCompaction(*env)
1970 1 : }
1971 1 : if pc == nil {
1972 1 : // Call setNotWaiting so that next call to GetWaitingCompaction can
1973 1 : // return early.
1974 1 : d.mu.versions.pickedCompactionCache.setNotWaiting()
1975 1 : return false, WaitingCompaction{}
1976 1 : } else {
1977 1 : d.mu.versions.pickedCompactionCache.add(pc)
1978 1 : }
1979 : }
1980 : // INVARIANT: pc != nil and is in the cache.
1981 1 : return true, makeWaitingCompaction(pc.manualID > 0, pc.kind, pc.score)
1982 : }
1983 :
1984 : // GetAllowedWithoutPermission implements DBForCompaction (it is called by the
1985 : // CompactionScheduler).
1986 1 : func (d *DB) GetAllowedWithoutPermission() int {
1987 1 : allowedBasedOnBacklog := int(d.mu.versions.curCompactionConcurrency.Load())
1988 1 : allowedBasedOnManual := 0
1989 1 : manualBacklog := int(d.mu.compact.manualLen.Load())
1990 1 : if manualBacklog > 0 {
1991 1 : _, maxAllowed := d.opts.CompactionConcurrencyRange()
1992 1 : allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog)
1993 1 : }
1994 1 : return max(allowedBasedOnBacklog, allowedBasedOnManual)
1995 : }
1996 :
1997 : // tryScheduleDownloadCompactions tries to start download compactions.
1998 : //
1999 : // Requires d.mu to be held. Updates d.mu.compact.downloads.
2000 : //
2001 : // Returns true iff at least one compaction was started.
2002 1 : func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) bool {
2003 1 : started := false
2004 1 : vers := d.mu.versions.currentVersion()
2005 1 : for i := 0; i < len(d.mu.compact.downloads); {
2006 1 : if d.mu.compact.downloadingCount >= maxConcurrentDownloads {
2007 1 : break
2008 : }
2009 1 : download := d.mu.compact.downloads[i]
2010 1 : switch d.tryLaunchDownloadCompaction(download, vers, d.mu.versions.l0Organizer, env, maxConcurrentDownloads) {
2011 1 : case launchedCompaction:
2012 1 : started = true
2013 1 : continue
2014 0 : case didNotLaunchCompaction:
2015 0 : // See if we can launch a compaction for another download task.
2016 0 : i++
2017 1 : case downloadTaskCompleted:
2018 1 : // Task is completed and must be removed.
2019 1 : d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
2020 : }
2021 : }
2022 1 : return started
2023 : }
2024 :
2025 1 : func (d *DB) pickManualCompaction(env compactionEnv) (pc *pickedCompaction) {
2026 1 : v := d.mu.versions.currentVersion()
2027 1 : for len(d.mu.compact.manual) > 0 {
2028 1 : manual := d.mu.compact.manual[0]
2029 1 : pc, retryLater := newPickedManualCompaction(v, d.mu.versions.l0Organizer, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
2030 1 : if pc != nil {
2031 1 : return pc
2032 1 : }
2033 1 : if retryLater {
2034 1 : // We are not able to run this manual compaction at this time.
2035 1 : // Inability to run the head blocks later manual compactions.
2036 1 : manual.retries++
2037 1 : return nil
2038 1 : }
2039 : // Manual compaction is a no-op. Signal that it's complete.
2040 1 : manual.done <- nil
2041 1 : d.mu.compact.manual = d.mu.compact.manual[1:]
2042 1 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
2043 : }
2044 1 : return nil
2045 : }
2046 :
2047 : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
2048 : // for all files that can be deleted as suggested by deletionHints.
2049 : //
2050 : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
2051 : //
2052 : // Returns true iff a compaction was started.
2053 1 : func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
2054 1 : if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions ||
2055 1 : len(d.mu.compact.deletionHints) == 0 {
2056 1 : return false
2057 1 : }
2058 1 : if _, maxConcurrency := d.opts.CompactionConcurrencyRange(); d.mu.compact.compactingCount >= maxConcurrency {
2059 1 : return false
2060 1 : }
2061 1 : v := d.mu.versions.currentVersion()
2062 1 : snapshots := d.mu.snapshots.toSlice()
2063 1 : // We need to save the value of exciseEnabled in the compaction itself, as
2064 1 : // it can change dynamically between now and when the compaction runs.
2065 1 : exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables &&
2066 1 : d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises()
2067 1 : inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled)
2068 1 : d.mu.compact.deletionHints = unresolvedHints
2069 1 :
2070 1 : if len(inputs) > 0 {
2071 1 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
2072 1 : d.mu.compact.compactingCount++
2073 1 : d.addInProgressCompaction(c)
2074 1 : go d.compact(c, nil)
2075 1 : return true
2076 1 : }
2077 1 : return false
2078 : }
2079 :
2080 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
2081 : // generated from a span containing a range del (point key only), a range key
2082 : // delete (range key only), or both a point and range key.
2083 : type deleteCompactionHintType uint8
2084 :
2085 : const (
2086 : // NOTE: While these are primarily used as enumeration types, they are also
2087 : // used for some bitwise operations. Care should be taken when updating.
2088 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
2089 : deleteCompactionHintTypePointKeyOnly
2090 : deleteCompactionHintTypeRangeKeyOnly
2091 : deleteCompactionHintTypePointAndRangeKey
2092 : )
2093 :
2094 : // String implements fmt.Stringer.
2095 1 : func (h deleteCompactionHintType) String() string {
2096 1 : switch h {
2097 0 : case deleteCompactionHintTypeUnknown:
2098 0 : return "unknown"
2099 1 : case deleteCompactionHintTypePointKeyOnly:
2100 1 : return "point-key-only"
2101 1 : case deleteCompactionHintTypeRangeKeyOnly:
2102 1 : return "range-key-only"
2103 1 : case deleteCompactionHintTypePointAndRangeKey:
2104 1 : return "point-and-range-key"
2105 0 : default:
2106 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
2107 : }
2108 : }
2109 :
2110 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
2111 : // keyspan.Keys.
2112 1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
2113 1 : var hintType deleteCompactionHintType
2114 1 : for _, k := range keys {
2115 1 : switch k.Kind() {
2116 1 : case base.InternalKeyKindRangeDelete:
2117 1 : hintType |= deleteCompactionHintTypePointKeyOnly
2118 1 : case base.InternalKeyKindRangeKeyDelete:
2119 1 : hintType |= deleteCompactionHintTypeRangeKeyOnly
2120 0 : default:
2121 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
2122 : }
2123 : }
2124 1 : return hintType
2125 : }
2126 :
2127 : // A deleteCompactionHint records a user key and sequence number span that has been
2128 : // deleted by a range tombstone. A hint is recorded if at least one sstable
2129 : // falls completely within both the user key and sequence number spans.
2130 : // Once the tombstones and the observed completely-contained sstables fall
2131 : // into the same snapshot stripe, a delete-only compaction may delete any
2132 : // sstables within the range.
2133 : type deleteCompactionHint struct {
2134 : // The type of key span that generated this hint (point key, range key, or
2135 : // both).
2136 : hintType deleteCompactionHintType
2137 : // start and end are user keys specifying a key range [start, end) of
2138 : // deleted keys.
2139 : start []byte
2140 : end []byte
2141 : // The level of the file containing the range tombstone(s) when the hint
2142 : // was created. Only lower levels need to be searched for files that may
2143 : // be deleted.
2144 : tombstoneLevel int
2145 : // The file containing the range tombstone(s) that created the hint.
2146 : tombstoneFile *manifest.TableMetadata
2147 : // The smallest and largest sequence numbers of the abutting tombstones
2148 : // merged to form this hint. All of a tables' keys must be less than the
2149 : // tombstone smallest sequence number to be deleted. All of a tables'
2150 : // sequence numbers must fall into the same snapshot stripe as the
2151 : // tombstone largest sequence number to be deleted.
2152 : tombstoneLargestSeqNum base.SeqNum
2153 : tombstoneSmallestSeqNum base.SeqNum
2154 : // The smallest sequence number of a sstable that was found to be covered
2155 : // by this hint. The hint cannot be resolved until this sequence number is
2156 : // in the same snapshot stripe as the largest tombstone sequence number.
2157 : // This is set when a hint is created, so the LSM may look different and
2158 : // notably no longer contain the sstable that contained the key at this
2159 : // sequence number.
2160 : fileSmallestSeqNum base.SeqNum
2161 : }
2162 :
2163 : type deletionHintOverlap int8
2164 :
2165 : const (
2166 : // hintDoesNotApply indicates that the hint does not apply to the file.
2167 : hintDoesNotApply deletionHintOverlap = iota
2168 : // hintExcisesFile indicates that the hint excises a portion of the file,
2169 : // and the format major version of the DB supports excises.
2170 : hintExcisesFile
2171 : // hintDeletesFile indicates that the hint deletes the entirety of the file.
2172 : hintDeletesFile
2173 : )
2174 :
2175 1 : func (h deleteCompactionHint) String() string {
2176 1 : return fmt.Sprintf(
2177 1 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
2178 1 : h.tombstoneLevel, h.tombstoneFile.TableNum, h.start, h.end,
2179 1 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
2180 1 : h.hintType,
2181 1 : )
2182 1 : }
2183 :
2184 : func (h *deleteCompactionHint) canDeleteOrExcise(
2185 : cmp Compare, m *manifest.TableMetadata, snapshots compact.Snapshots, exciseEnabled bool,
2186 1 : ) deletionHintOverlap {
2187 1 : // The file can only be deleted if all of its keys are older than the
2188 1 : // earliest tombstone aggregated into the hint. Note that we use
2189 1 : // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
2190 1 : // zeroes sequence numbers. A compaction may zero the sequence number of a
2191 1 : // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
2192 1 : // zero. If we looked at m.LargestSeqNum, the resulting output file would
2193 1 : // appear to not contain any keys more recent than the oldest tombstone. To
2194 1 : // avoid this error, the largest pre-zeroing sequence number is maintained
2195 1 : // in LargestSeqNumAbsolute and used here to make the determination whether
2196 1 : // the file's keys are older than all of the hint's tombstones.
2197 1 : if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
2198 1 : return hintDoesNotApply
2199 1 : }
2200 :
2201 : // The file's oldest key must be in the same snapshot stripe as the
2202 : // newest tombstone. NB: We already checked the hint's sequence numbers,
2203 : // but this file's oldest sequence number might be lower than the hint's
2204 : // smallest sequence number despite the file falling within the key range
2205 : // if this file was constructed after the hint by a compaction.
2206 1 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
2207 0 : return hintDoesNotApply
2208 0 : }
2209 :
2210 1 : switch h.hintType {
2211 1 : case deleteCompactionHintTypePointKeyOnly:
2212 1 : // A hint generated by a range del span cannot delete tables that contain
2213 1 : // range keys.
2214 1 : if m.HasRangeKeys {
2215 1 : return hintDoesNotApply
2216 1 : }
2217 1 : case deleteCompactionHintTypeRangeKeyOnly:
2218 1 : // A hint generated by a range key del span cannot delete tables that
2219 1 : // contain point keys.
2220 1 : if m.HasPointKeys {
2221 1 : return hintDoesNotApply
2222 1 : }
2223 1 : case deleteCompactionHintTypePointAndRangeKey:
2224 : // A hint from a span that contains both range dels *and* range keys can
2225 : // only be deleted if both bounds fall within the hint. The next check takes
2226 : // care of this.
2227 0 : default:
2228 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
2229 : }
2230 1 : if cmp(h.start, m.Smallest().UserKey) <= 0 &&
2231 1 : base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0 {
2232 1 : return hintDeletesFile
2233 1 : }
2234 1 : if !exciseEnabled {
2235 1 : // The file's keys must be completely contained within the hint range; excises
2236 1 : // aren't allowed.
2237 1 : return hintDoesNotApply
2238 1 : }
2239 : // Check for any overlap. In cases of partial overlap, we can excise the part of the file
2240 : // that overlaps with the deletion hint.
2241 1 : if cmp(h.end, m.Smallest().UserKey) > 0 &&
2242 1 : (m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0) {
2243 1 : return hintExcisesFile
2244 1 : }
2245 1 : return hintDoesNotApply
2246 : }
2247 :
2248 : // checkDeleteCompactionHints checks the passed-in deleteCompactionHints for those that
2249 : // can be resolved and those that cannot. A hint is considered resolved when its largest
2250 : // tombstone sequence number and the smallest sequence number of covered files fall in
2251 : // the same snapshot stripe. No more than maxHintsPerDeleteOnlyCompaction will be resolved
2252 : // per method call. Resolved and unresolved hints are returned in separate return values.
2253 : // The files that the resolved hints apply to, are returned as compactionLevels.
2254 : func checkDeleteCompactionHints(
2255 : cmp Compare,
2256 : v *manifest.Version,
2257 : hints []deleteCompactionHint,
2258 : snapshots compact.Snapshots,
2259 : exciseEnabled bool,
2260 1 : ) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
2261 1 : var files map[*manifest.TableMetadata]bool
2262 1 : var byLevel [numLevels][]*manifest.TableMetadata
2263 1 :
2264 1 : // Delete-only compactions can be quadratic (O(mn)) in terms of runtime
2265 1 : // where m = number of files in the delete-only compaction and n = number
2266 1 : // of resolved hints. To prevent these from growing unbounded, we cap
2267 1 : // the number of hints we resolve for one delete-only compaction. This
2268 1 : // cap only applies if exciseEnabled == true.
2269 1 : const maxHintsPerDeleteOnlyCompaction = 10
2270 1 :
2271 1 : unresolvedHints := hints[:0]
2272 1 : // Lazily populate resolvedHints, similar to files above.
2273 1 : resolvedHints := make([]deleteCompactionHint, 0)
2274 1 : for _, h := range hints {
2275 1 : // Check each compaction hint to see if it's resolvable. Resolvable
2276 1 : // hints are removed and trigger a delete-only compaction if any files
2277 1 : // in the current LSM still meet their criteria. Unresolvable hints
2278 1 : // are saved and don't trigger a delete-only compaction.
2279 1 : //
2280 1 : // When a compaction hint is created, the sequence numbers of the
2281 1 : // range tombstones and the covered file with the oldest key are
2282 1 : // recorded. The largest tombstone sequence number and the smallest
2283 1 : // file sequence number must be in the same snapshot stripe for the
2284 1 : // hint to be resolved. The below graphic models a compaction hint
2285 1 : // covering the keyspace [b, r). The hint completely contains two
2286 1 : // files, 000002 and 000003. The file 000003 contains the lowest
2287 1 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
2288 1 : // the highest tombstone sequence number incorporated into the hint.
2289 1 : // The hint may be resolved only once the snapshots at #100, #180 and
2290 1 : // #210 are all closed. File 000001 is not included within the hint
2291 1 : // because it extends beyond the range tombstones in user key space.
2292 1 : //
2293 1 : // 250
2294 1 : //
2295 1 : // |-b...230:h-|
2296 1 : // _____________________________________________________ snapshot #210
2297 1 : // 200 |--h.RANGEDEL.200:r--|
2298 1 : //
2299 1 : // _____________________________________________________ snapshot #180
2300 1 : //
2301 1 : // 150 +--------+
2302 1 : // +---------+ | 000003 |
2303 1 : // | 000002 | | |
2304 1 : // +_________+ | |
2305 1 : // 100_____________________|________|___________________ snapshot #100
2306 1 : // +--------+
2307 1 : // _____________________________________________________ snapshot #70
2308 1 : // +---------------+
2309 1 : // 50 | 000001 |
2310 1 : // | |
2311 1 : // +---------------+
2312 1 : // ______________________________________________________________
2313 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2314 1 :
2315 1 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) ||
2316 1 : (len(resolvedHints) >= maxHintsPerDeleteOnlyCompaction && exciseEnabled) {
2317 1 : // Cannot resolve yet.
2318 1 : unresolvedHints = append(unresolvedHints, h)
2319 1 : continue
2320 : }
2321 :
2322 : // The hint h will be resolved and dropped, if it either affects no files at all
2323 : // or if the number of files it creates (eg. through excision) is less than or
2324 : // equal to the number of files it deletes. First, determine how many files are
2325 : // affected by this hint.
2326 1 : filesDeletedByCurrentHint := 0
2327 1 : var filesDeletedByLevel [7][]*manifest.TableMetadata
2328 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2329 1 : for m := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end)).All() {
2330 1 : doesHintApply := h.canDeleteOrExcise(cmp, m, snapshots, exciseEnabled)
2331 1 : if m.IsCompacting() || doesHintApply == hintDoesNotApply || files[m] {
2332 1 : continue
2333 : }
2334 1 : switch doesHintApply {
2335 1 : case hintDeletesFile:
2336 1 : filesDeletedByCurrentHint++
2337 1 : case hintExcisesFile:
2338 1 : // Account for the original file being deleted.
2339 1 : filesDeletedByCurrentHint++
2340 1 : // An excise could produce up to 2 new files. If the hint
2341 1 : // leaves a fragment of the file on the left, decrement
2342 1 : // the counter once. If the hint leaves a fragment of the
2343 1 : // file on the right, decrement the counter once.
2344 1 : if cmp(h.start, m.Smallest().UserKey) > 0 {
2345 1 : filesDeletedByCurrentHint--
2346 1 : }
2347 1 : if m.UserKeyBounds().End.IsUpperBoundFor(cmp, h.end) {
2348 1 : filesDeletedByCurrentHint--
2349 1 : }
2350 : }
2351 1 : filesDeletedByLevel[l] = append(filesDeletedByLevel[l], m)
2352 : }
2353 : }
2354 1 : if filesDeletedByCurrentHint < 0 {
2355 1 : // This hint does not delete a sufficient number of files to warrant
2356 1 : // a delete-only compaction at this stage. Drop it (ie. don't add it
2357 1 : // to either resolved or unresolved hints) so it doesn't stick around
2358 1 : // forever.
2359 1 : continue
2360 : }
2361 : // This hint will be resolved and dropped.
2362 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2363 1 : byLevel[l] = append(byLevel[l], filesDeletedByLevel[l]...)
2364 1 : for _, m := range filesDeletedByLevel[l] {
2365 1 : if files == nil {
2366 1 : // Construct files lazily, assuming most calls will not
2367 1 : // produce delete-only compactions.
2368 1 : files = make(map[*manifest.TableMetadata]bool)
2369 1 : }
2370 1 : files[m] = true
2371 : }
2372 : }
2373 1 : resolvedHints = append(resolvedHints, h)
2374 : }
2375 :
2376 1 : var compactLevels []compactionLevel
2377 1 : for l, files := range byLevel {
2378 1 : if len(files) == 0 {
2379 1 : continue
2380 : }
2381 1 : compactLevels = append(compactLevels, compactionLevel{
2382 1 : level: l,
2383 1 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2384 1 : })
2385 : }
2386 1 : return compactLevels, resolvedHints, unresolvedHints
2387 : }
2388 :
2389 1 : func (d *DB) compactionPprofLabels(c *compaction) pprof.LabelSet {
2390 1 : activity := "compact"
2391 1 : if len(c.flushing) != 0 {
2392 0 : activity = "flush"
2393 0 : }
2394 1 : level := "L?"
2395 1 : // Delete-only compactions don't have an output level.
2396 1 : if c.outputLevel != nil {
2397 1 : level = fmt.Sprintf("L%d", c.outputLevel.level)
2398 1 : }
2399 1 : if kc := d.opts.Experimental.UserKeyCategories; kc.Len() > 0 {
2400 0 : cat := kc.CategorizeKeyRange(c.smallest.UserKey, c.largest.UserKey)
2401 0 : return pprof.Labels("pebble", activity, "output-level", level, "key-type", cat)
2402 0 : }
2403 1 : return pprof.Labels("pebble", activity, "output-level", level)
2404 : }
2405 :
2406 : // compact runs one compaction and maybe schedules another call to compact.
2407 1 : func (d *DB) compact(c *compaction, errChannel chan error) {
2408 1 : pprof.Do(context.Background(), d.compactionPprofLabels(c), func(context.Context) {
2409 1 : d.mu.Lock()
2410 1 : c.grantHandle.Started()
2411 1 : if err := d.compact1(c, errChannel); err != nil {
2412 1 : d.handleCompactFailure(c, err)
2413 1 : }
2414 1 : if c.isDownload {
2415 1 : d.mu.compact.downloadingCount--
2416 1 : } else {
2417 1 : d.mu.compact.compactingCount--
2418 1 : }
2419 1 : delete(d.mu.compact.inProgress, c)
2420 1 : // Add this compaction's duration to the cumulative duration. NB: This
2421 1 : // must be atomic with the above removal of c from
2422 1 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2423 1 : // miss or double count a completing compaction's duration.
2424 1 : d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2425 1 : d.mu.Unlock()
2426 1 : // Done must not be called while holding any lock that needs to be
2427 1 : // acquired by Schedule. Also, it must be called after new Version has
2428 1 : // been installed, and metadata related to compactingCount and inProgress
2429 1 : // compactions has been updated. This is because when we are running at
2430 1 : // the limit of permitted compactions, Done can cause the
2431 1 : // CompactionScheduler to schedule another compaction. Note that the only
2432 1 : // compactions that may be scheduled by Done are those integrated with the
2433 1 : // CompactionScheduler.
2434 1 : c.grantHandle.Done()
2435 1 : c.grantHandle = nil
2436 1 : // The previous compaction may have produced too many files in a level, so
2437 1 : // reschedule another compaction if needed.
2438 1 : //
2439 1 : // The preceding Done call will not necessarily cause a compaction to be
2440 1 : // scheduled, so we also need to call maybeScheduleCompaction. And
2441 1 : // maybeScheduleCompaction encompasses all compactions, and not only those
2442 1 : // scheduled via the CompactionScheduler.
2443 1 : d.mu.Lock()
2444 1 : d.maybeScheduleCompaction()
2445 1 : d.mu.compact.cond.Broadcast()
2446 1 : d.mu.Unlock()
2447 : })
2448 : }
2449 :
2450 1 : func (d *DB) handleCompactFailure(c *compaction, err error) {
2451 1 : if errors.Is(err, ErrCancelledCompaction) {
2452 1 : // ErrCancelledCompaction is expected during normal operation, so we don't
2453 1 : // want to report it as a background error.
2454 1 : d.opts.Logger.Infof("%v", err)
2455 1 : return
2456 1 : }
2457 :
2458 : // Record problem spans for a short duration, unless the error is a corruption.
2459 1 : expiration := 30 * time.Second
2460 1 : if IsCorruptionError(err) {
2461 1 : // TODO(radu): ideally, we should be using the corruption reporting
2462 1 : // mechanism which has a tighter span for the corruption. We would need to
2463 1 : // somehow plumb the level of the file.
2464 1 : expiration = 5 * time.Minute
2465 1 : }
2466 1 : for i := range c.inputs {
2467 1 : level := c.inputs[i].level
2468 1 : if level == 0 {
2469 1 : // We do not set problem spans on L0, as they could block flushes.
2470 1 : continue
2471 : }
2472 1 : it := c.inputs[i].files.Iter()
2473 1 : for f := it.First(); f != nil; f = it.Next() {
2474 1 : d.problemSpans.Add(level, f.UserKeyBounds(), expiration)
2475 1 : }
2476 : }
2477 :
2478 : // TODO(peter): count consecutive compaction errors and backoff.
2479 1 : d.opts.EventListener.BackgroundError(err)
2480 : }
2481 :
2482 : // cleanupVersionEdit cleans up any on-disk artifacts that were created
2483 : // for the application of a versionEdit that is no longer going to be applied.
2484 : //
2485 : // d.mu must be held when calling this method.
2486 1 : func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
2487 1 : obsoleteFiles := manifest.ObsoleteFiles{
2488 1 : TableBackings: make([]*manifest.TableBacking, 0, len(ve.NewTables)),
2489 1 : BlobFiles: make([]*manifest.PhysicalBlobFile, 0, len(ve.NewBlobFiles)),
2490 1 : }
2491 1 : deletedTables := make(map[base.TableNum]struct{})
2492 1 : for key := range ve.DeletedTables {
2493 1 : deletedTables[key.FileNum] = struct{}{}
2494 1 : }
2495 1 : for i := range ve.NewBlobFiles {
2496 0 : obsoleteFiles.AddBlob(ve.NewBlobFiles[i].Physical)
2497 0 : d.mu.versions.zombieBlobs.Add(objectInfo{
2498 0 : fileInfo: fileInfo{
2499 0 : FileNum: ve.NewBlobFiles[i].Physical.FileNum,
2500 0 : FileSize: ve.NewBlobFiles[i].Physical.Size,
2501 0 : },
2502 0 : isLocal: objstorage.IsLocalBlobFile(d.objProvider, ve.NewBlobFiles[i].Physical.FileNum),
2503 0 : })
2504 0 : }
2505 1 : for i := range ve.NewTables {
2506 1 : if ve.NewTables[i].Meta.Virtual {
2507 0 : // We handle backing files separately.
2508 0 : continue
2509 : }
2510 1 : if _, ok := deletedTables[ve.NewTables[i].Meta.TableNum]; ok {
2511 0 : // This file is being moved in this ve to a different level.
2512 0 : // Don't mark it as obsolete.
2513 0 : continue
2514 : }
2515 1 : obsoleteFiles.AddBacking(ve.NewTables[i].Meta.PhysicalMeta().TableBacking)
2516 : }
2517 1 : for i := range ve.CreatedBackingTables {
2518 0 : if ve.CreatedBackingTables[i].IsUnused() {
2519 0 : obsoleteFiles.AddBacking(ve.CreatedBackingTables[i])
2520 0 : }
2521 : }
2522 1 : for _, of := range obsoleteFiles.TableBackings {
2523 1 : // Add this file to zombie tables as well, as the versionSet
2524 1 : // asserts on whether every obsolete file was at one point
2525 1 : // marked zombie.
2526 1 : d.mu.versions.zombieTables.Add(objectInfo{
2527 1 : fileInfo: fileInfo{
2528 1 : FileNum: of.DiskFileNum,
2529 1 : FileSize: of.Size,
2530 1 : },
2531 1 : isLocal: objstorage.IsLocalTable(d.objProvider, of.DiskFileNum),
2532 1 : })
2533 1 : }
2534 1 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
2535 : }
2536 :
2537 : // compact1 runs one compaction.
2538 : //
2539 : // d.mu must be held when calling this, but the mutex may be dropped and
2540 : // re-acquired during the course of this method.
2541 1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2542 1 : if errChannel != nil {
2543 1 : defer func() {
2544 1 : errChannel <- err
2545 1 : }()
2546 : }
2547 :
2548 1 : jobID := d.newJobIDLocked()
2549 1 : info := c.makeInfo(jobID)
2550 1 : d.opts.EventListener.CompactionBegin(info)
2551 1 : startTime := d.timeNow()
2552 1 :
2553 1 : ve, stats, err := d.runCompaction(jobID, c)
2554 1 :
2555 1 : info.Duration = d.timeNow().Sub(startTime)
2556 1 : if err == nil {
2557 1 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
2558 1 : err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
2559 1 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2560 1 : // that necessitates it restarting from scratch. Note that since we hold
2561 1 : // the manifest lock, we don't expect this bool to change its value
2562 1 : // as only the holder of the manifest lock will ever write to it.
2563 1 : if c.cancel.Load() {
2564 1 : err = firstError(err, ErrCancelledCompaction)
2565 1 : // This is the first time we've seen a cancellation during the
2566 1 : // life of this compaction (or the original condition on err == nil
2567 1 : // would not have been true). We should delete any tables already
2568 1 : // created, as d.runCompaction did not do that.
2569 1 : d.cleanupVersionEdit(ve)
2570 1 : // Note that UpdateVersionLocked invalidates the pickedCompactionCache
2571 1 : // when we return, which is relevant because this failed compaction
2572 1 : // may be the highest priority to run next.
2573 1 : return versionUpdate{}, err
2574 1 : }
2575 1 : return versionUpdate{
2576 1 : VE: ve,
2577 1 : JobID: jobID,
2578 1 : Metrics: c.metrics,
2579 1 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
2580 : }, nil
2581 : })
2582 : }
2583 :
2584 1 : info.Done = true
2585 1 : info.Err = err
2586 1 : if err == nil {
2587 1 : for i := range ve.NewTables {
2588 1 : e := &ve.NewTables[i]
2589 1 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2590 1 : }
2591 1 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
2592 1 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
2593 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
2594 : }
2595 :
2596 : // NB: clearing compacting state must occur before updating the read state;
2597 : // L0Sublevels initialization depends on it.
2598 1 : d.clearCompactingState(c, err != nil)
2599 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics, c.bytesWritten.Load(), err)
2600 1 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten.Load())
2601 1 :
2602 1 : info.TotalDuration = d.timeNow().Sub(c.beganAt)
2603 1 : d.opts.EventListener.CompactionEnd(info)
2604 1 :
2605 1 : // Update the read state before deleting obsolete files because the
2606 1 : // read-state update will cause the previous version to be unref'd and if
2607 1 : // there are no references obsolete tables will be added to the obsolete
2608 1 : // table list.
2609 1 : if err == nil {
2610 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2611 1 : d.updateTableStatsLocked(ve.NewTables)
2612 1 : }
2613 1 : d.deleteObsoleteFiles(jobID)
2614 1 :
2615 1 : return err
2616 : }
2617 :
2618 : // runCopyCompaction runs a copy compaction where a new TableNum is created that
2619 : // is a byte-for-byte copy of the input file or span thereof in some cases. This
2620 : // is used in lieu of a move compaction when a file is being moved across the
2621 : // local/remote storage boundary. It could also be used in lieu of a rewrite
2622 : // compaction as part of a Download() call, which allows copying only a span of
2623 : // the external file, provided the file does not contain range keys or value
2624 : // blocks (see sstable.CopySpan).
2625 : //
2626 : // d.mu must be held when calling this method. The mutex will be released when
2627 : // doing IO.
2628 : func (d *DB) runCopyCompaction(
2629 : jobID JobID, c *compaction,
2630 1 : ) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
2631 1 : if c.cancel.Load() {
2632 0 : return nil, compact.Stats{}, ErrCancelledCompaction
2633 0 : }
2634 1 : iter := c.startLevel.files.Iter()
2635 1 : inputMeta := iter.First()
2636 1 : if iter.Next() != nil {
2637 0 : return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2638 0 : }
2639 1 : if inputMeta.BlobReferenceDepth > 0 || len(inputMeta.BlobReferences) > 0 {
2640 0 : return nil, compact.Stats{}, base.AssertionFailedf(
2641 0 : "copy compaction for %s with blob references (depth=%d, refs=%d)",
2642 0 : inputMeta.TableNum, inputMeta.BlobReferenceDepth, len(inputMeta.BlobReferences),
2643 0 : )
2644 0 : }
2645 1 : ve = &manifest.VersionEdit{
2646 1 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
2647 1 : {Level: c.startLevel.level, FileNum: inputMeta.TableNum}: inputMeta,
2648 1 : },
2649 1 : }
2650 1 :
2651 1 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.TableBacking.DiskFileNum)
2652 1 : if err != nil {
2653 0 : return nil, compact.Stats{}, err
2654 0 : }
2655 : // This code does not support copying a shared table (which should never be necessary).
2656 1 : if objMeta.IsShared() {
2657 0 : return nil, compact.Stats{}, base.AssertionFailedf("copy compaction of shared table")
2658 0 : }
2659 :
2660 : // We are in the relatively more complex case where we need to copy this
2661 : // file to remote storage. Drop the db mutex while we do the copy
2662 : //
2663 : // To ease up cleanup of the local file and tracking of refs, we create
2664 : // a new FileNum. This has the potential of making the block cache less
2665 : // effective, however.
2666 1 : newMeta := &manifest.TableMetadata{
2667 1 : Size: inputMeta.Size,
2668 1 : CreationTime: inputMeta.CreationTime,
2669 1 : SmallestSeqNum: inputMeta.SmallestSeqNum,
2670 1 : LargestSeqNum: inputMeta.LargestSeqNum,
2671 1 : LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
2672 1 : Stats: inputMeta.Stats,
2673 1 : Virtual: inputMeta.Virtual,
2674 1 : SyntheticPrefixAndSuffix: inputMeta.SyntheticPrefixAndSuffix,
2675 1 : }
2676 1 : if inputMeta.HasPointKeys {
2677 1 : newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.PointKeyBounds.Smallest(), inputMeta.PointKeyBounds.Largest())
2678 1 : }
2679 1 : if inputMeta.HasRangeKeys {
2680 1 : newMeta.ExtendRangeKeyBounds(c.cmp, inputMeta.RangeKeyBounds.Smallest(), inputMeta.RangeKeyBounds.Largest())
2681 1 : }
2682 1 : newMeta.TableNum = d.mu.versions.getNextTableNum()
2683 1 : if objMeta.IsExternal() {
2684 1 : // external -> local/shared copy. File must be virtual.
2685 1 : // We will update this size later after we produce the new backing file.
2686 1 : newMeta.InitVirtualBacking(base.DiskFileNum(newMeta.TableNum), inputMeta.TableBacking.Size)
2687 1 : } else {
2688 1 : // local -> shared copy. New file is guaranteed to not be virtual.
2689 1 : newMeta.InitPhysicalBacking()
2690 1 : }
2691 :
2692 : // Before dropping the db mutex, grab a ref to the current version. This
2693 : // prevents any concurrent excises from deleting files that this compaction
2694 : // needs to read/maintain a reference to.
2695 1 : vers := d.mu.versions.currentVersion()
2696 1 : vers.Ref()
2697 1 : defer vers.UnrefLocked()
2698 1 :
2699 1 : // NB: The order here is reversed, lock after unlock. This is similar to
2700 1 : // runCompaction.
2701 1 : d.mu.Unlock()
2702 1 : defer d.mu.Lock()
2703 1 :
2704 1 : deleteOnExit := false
2705 1 : defer func() {
2706 1 : if deleteOnExit {
2707 1 : _ = d.objProvider.Remove(base.FileTypeTable, newMeta.TableBacking.DiskFileNum)
2708 1 : }
2709 : }()
2710 :
2711 : // If the src obj is external, we're doing an external to local/shared copy.
2712 1 : if objMeta.IsExternal() {
2713 1 : ctx := context.TODO()
2714 1 : src, err := d.objProvider.OpenForReading(
2715 1 : ctx, base.FileTypeTable, inputMeta.TableBacking.DiskFileNum, objstorage.OpenOptions{},
2716 1 : )
2717 1 : if err != nil {
2718 0 : return nil, compact.Stats{}, err
2719 0 : }
2720 1 : defer func() {
2721 1 : if src != nil {
2722 0 : _ = src.Close()
2723 0 : }
2724 : }()
2725 :
2726 1 : w, _, err := d.objProvider.Create(
2727 1 : ctx, base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
2728 1 : objstorage.CreateOptions{
2729 1 : PreferSharedStorage: d.shouldCreateShared(c.outputLevel.level),
2730 1 : },
2731 1 : )
2732 1 : if err != nil {
2733 0 : return nil, compact.Stats{}, err
2734 0 : }
2735 1 : deleteOnExit = true
2736 1 :
2737 1 : start, end := newMeta.Smallest(), newMeta.Largest()
2738 1 : if newMeta.SyntheticPrefixAndSuffix.HasPrefix() {
2739 1 : syntheticPrefix := newMeta.SyntheticPrefixAndSuffix.Prefix()
2740 1 : start.UserKey = syntheticPrefix.Invert(start.UserKey)
2741 1 : end.UserKey = syntheticPrefix.Invert(end.UserKey)
2742 1 : }
2743 1 : if newMeta.SyntheticPrefixAndSuffix.HasSuffix() {
2744 0 : // Extend the bounds as necessary so that the keys don't include suffixes.
2745 0 : start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
2746 0 : if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
2747 0 : end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
2748 0 : }
2749 : }
2750 :
2751 : // NB: external files are always virtual.
2752 1 : var wrote uint64
2753 1 : err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
2754 1 : var err error
2755 1 : // TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
2756 1 : // or update category stats).
2757 1 : wrote, err = sstable.CopySpan(ctx,
2758 1 : src, r, d.opts.MakeReaderOptions(),
2759 1 : w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
2760 1 : start, end,
2761 1 : )
2762 1 : return err
2763 1 : })
2764 :
2765 1 : src = nil // We passed src to CopySpan; it's responsible for closing it.
2766 1 : if err != nil {
2767 1 : if errors.Is(err, sstable.ErrEmptySpan) {
2768 1 : // The virtual table was empty. Just remove the backing file.
2769 1 : // Note that deleteOnExit is true so we will delete the created object.
2770 1 : c.metrics[c.outputLevel.level] = &LevelMetrics{
2771 1 : TableBytesIn: inputMeta.Size,
2772 1 : }
2773 1 :
2774 1 : return ve, compact.Stats{}, nil
2775 1 : }
2776 0 : return nil, compact.Stats{}, err
2777 : }
2778 1 : newMeta.TableBacking.Size = wrote
2779 1 : newMeta.Size = wrote
2780 1 : } else {
2781 1 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2782 1 : d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
2783 1 : objstorage.CreateOptions{PreferSharedStorage: true})
2784 1 : if err != nil {
2785 0 : return nil, compact.Stats{}, err
2786 0 : }
2787 1 : deleteOnExit = true
2788 : }
2789 1 : ve.NewTables = []manifest.NewTableEntry{{
2790 1 : Level: c.outputLevel.level,
2791 1 : Meta: newMeta,
2792 1 : }}
2793 1 : if newMeta.Virtual {
2794 1 : ve.CreatedBackingTables = []*manifest.TableBacking{newMeta.TableBacking}
2795 1 : }
2796 1 : c.metrics[c.outputLevel.level] = &LevelMetrics{
2797 1 : TableBytesIn: inputMeta.Size,
2798 1 : TableBytesCompacted: newMeta.Size,
2799 1 : TablesCompacted: 1,
2800 1 : }
2801 1 :
2802 1 : if err := d.objProvider.Sync(); err != nil {
2803 0 : return nil, compact.Stats{}, err
2804 0 : }
2805 1 : deleteOnExit = false
2806 1 : return ve, compact.Stats{}, nil
2807 : }
2808 :
2809 : // applyHintOnFile applies a deleteCompactionHint to a file, and updates the
2810 : // versionEdit accordingly. It returns a list of new files that were created
2811 : // if the hint was applied partially to a file (eg. through an exciseTable as opposed
2812 : // to an outright deletion). levelMetrics is kept up-to-date with the number
2813 : // of tables deleted or excised.
2814 : func (d *DB) applyHintOnFile(
2815 : h deleteCompactionHint,
2816 : f *manifest.TableMetadata,
2817 : level int,
2818 : levelMetrics *LevelMetrics,
2819 : ve *manifest.VersionEdit,
2820 : hintOverlap deletionHintOverlap,
2821 1 : ) (newFiles []manifest.NewTableEntry, err error) {
2822 1 : if hintOverlap == hintDoesNotApply {
2823 0 : return nil, nil
2824 0 : }
2825 :
2826 : // The hint overlaps with at least part of the file.
2827 1 : if hintOverlap == hintDeletesFile {
2828 1 : // The hint deletes the entirety of this file.
2829 1 : ve.DeletedTables[manifest.DeletedTableEntry{
2830 1 : Level: level,
2831 1 : FileNum: f.TableNum,
2832 1 : }] = f
2833 1 : levelMetrics.TablesDeleted++
2834 1 : return nil, nil
2835 1 : }
2836 : // The hint overlaps with only a part of the file, not the entirety of it. We need
2837 : // to use d.exciseTable. (hintOverlap == hintExcisesFile)
2838 1 : if d.FormatMajorVersion() < FormatVirtualSSTables {
2839 0 : panic("pebble: delete-only compaction hint excising a file is not supported in this version")
2840 : }
2841 :
2842 1 : levelMetrics.TablesExcised++
2843 1 : exciseBounds := base.UserKeyBoundsEndExclusive(h.start, h.end)
2844 1 : leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, f, level, tightExciseBounds)
2845 1 : if err != nil {
2846 0 : return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
2847 0 : }
2848 1 : newFiles = applyExciseToVersionEdit(ve, f, leftTable, rightTable, level)
2849 1 : return newFiles, nil
2850 : }
2851 :
2852 : func (d *DB) runDeleteOnlyCompactionForLevel(
2853 : cl compactionLevel,
2854 : levelMetrics *LevelMetrics,
2855 : ve *manifest.VersionEdit,
2856 : snapshots compact.Snapshots,
2857 : fragments []deleteCompactionHintFragment,
2858 : exciseEnabled bool,
2859 1 : ) error {
2860 1 : if cl.level == 0 {
2861 0 : panic("cannot run delete-only compaction for L0")
2862 : }
2863 1 : curFragment := 0
2864 1 :
2865 1 : // Outer loop loops on files. Middle loop loops on fragments. Inner loop
2866 1 : // loops on raw fragments of hints. Number of fragments are bounded by
2867 1 : // the number of hints this compaction was created with, which is capped
2868 1 : // in the compaction picker to avoid very CPU-hot loops here.
2869 1 : for f := range cl.files.All() {
2870 1 : // curFile usually matches f, except if f got excised in which case
2871 1 : // it maps to a virtual file that replaces f, or nil if f got removed
2872 1 : // in its entirety.
2873 1 : curFile := f
2874 1 : for curFragment < len(fragments) && d.cmp(fragments[curFragment].start, f.Smallest().UserKey) <= 0 {
2875 1 : curFragment++
2876 1 : }
2877 1 : if curFragment > 0 {
2878 1 : curFragment--
2879 1 : }
2880 :
2881 1 : for ; curFragment < len(fragments); curFragment++ {
2882 1 : if f.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(fragments[curFragment].start)) < 0 {
2883 1 : break
2884 : }
2885 : // Process all overlapping hints with this file. Note that applying
2886 : // a hint twice is idempotent; curFile should have already been excised
2887 : // the first time, resulting in no change the second time.
2888 1 : for _, h := range fragments[curFragment].hints {
2889 1 : if h.tombstoneLevel >= cl.level {
2890 1 : // We cannot excise out the deletion tombstone itself, or anything
2891 1 : // above it.
2892 1 : continue
2893 : }
2894 1 : hintOverlap := h.canDeleteOrExcise(d.cmp, curFile, snapshots, exciseEnabled)
2895 1 : if hintOverlap == hintDoesNotApply {
2896 1 : continue
2897 : }
2898 1 : newFiles, err := d.applyHintOnFile(h, curFile, cl.level, levelMetrics, ve, hintOverlap)
2899 1 : if err != nil {
2900 0 : return err
2901 0 : }
2902 1 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{Level: cl.level, FileNum: curFile.TableNum}]; ok {
2903 1 : curFile = nil
2904 1 : }
2905 1 : if len(newFiles) > 0 {
2906 1 : curFile = newFiles[len(newFiles)-1].Meta
2907 1 : } else if curFile == nil {
2908 1 : // Nothing remains of the file.
2909 1 : break
2910 : }
2911 : }
2912 1 : if curFile == nil {
2913 1 : // Nothing remains of the file.
2914 1 : break
2915 : }
2916 : }
2917 1 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{
2918 1 : Level: cl.level,
2919 1 : FileNum: f.TableNum,
2920 1 : }]; !ok {
2921 0 : panic("pebble: delete-only compaction scheduled with hints that did not delete or excise a file")
2922 : }
2923 : }
2924 1 : return nil
2925 : }
2926 :
2927 : // deleteCompactionHintFragment represents a fragment of the key space and
2928 : // contains a set of deleteCompactionHints that apply to that fragment; a
2929 : // fragment starts at the start field and ends where the next fragment starts.
2930 : type deleteCompactionHintFragment struct {
2931 : start []byte
2932 : hints []deleteCompactionHint
2933 : }
2934 :
2935 : // Delete compaction hints can overlap with each other, and multiple fragments
2936 : // can apply to a single file. This function takes a list of hints and fragments
2937 : // them, to make it easier to apply them to non-overlapping files occupying a level;
2938 : // that way, files and hint fragments can be iterated on in lockstep, while efficiently
2939 : // being able to apply all hints overlapping with a given file.
2940 : func fragmentDeleteCompactionHints(
2941 : cmp Compare, hints []deleteCompactionHint,
2942 1 : ) []deleteCompactionHintFragment {
2943 1 : fragments := make([]deleteCompactionHintFragment, 0, len(hints)*2)
2944 1 : for i := range hints {
2945 1 : fragments = append(fragments, deleteCompactionHintFragment{start: hints[i].start},
2946 1 : deleteCompactionHintFragment{start: hints[i].end})
2947 1 : }
2948 1 : slices.SortFunc(fragments, func(i, j deleteCompactionHintFragment) int {
2949 1 : return cmp(i.start, j.start)
2950 1 : })
2951 1 : fragments = slices.CompactFunc(fragments, func(i, j deleteCompactionHintFragment) bool {
2952 1 : return bytes.Equal(i.start, j.start)
2953 1 : })
2954 1 : for _, h := range hints {
2955 1 : startIdx := sort.Search(len(fragments), func(i int) bool {
2956 1 : return cmp(fragments[i].start, h.start) >= 0
2957 1 : })
2958 1 : endIdx := sort.Search(len(fragments), func(i int) bool {
2959 1 : return cmp(fragments[i].start, h.end) >= 0
2960 1 : })
2961 1 : for i := startIdx; i < endIdx; i++ {
2962 1 : fragments[i].hints = append(fragments[i].hints, h)
2963 1 : }
2964 : }
2965 1 : return fragments
2966 : }
2967 :
2968 : // Runs a delete-only compaction.
2969 : //
2970 : // d.mu must *not* be held when calling this.
2971 : func (d *DB) runDeleteOnlyCompaction(
2972 : jobID JobID, c *compaction, snapshots compact.Snapshots,
2973 1 : ) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
2974 1 : fragments := fragmentDeleteCompactionHints(d.cmp, c.deletionHints)
2975 1 : ve = &manifest.VersionEdit{
2976 1 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
2977 1 : }
2978 1 : for _, cl := range c.inputs {
2979 1 : levelMetrics := &LevelMetrics{}
2980 1 : if err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.exciseEnabled); err != nil {
2981 0 : return nil, stats, err
2982 0 : }
2983 1 : c.metrics[cl.level] = levelMetrics
2984 : }
2985 : // Remove any files that were added and deleted in the same versionEdit.
2986 1 : ve.NewTables = slices.DeleteFunc(ve.NewTables, func(e manifest.NewTableEntry) bool {
2987 1 : entry := manifest.DeletedTableEntry{Level: e.Level, FileNum: e.Meta.TableNum}
2988 1 : if _, deleted := ve.DeletedTables[entry]; deleted {
2989 1 : delete(ve.DeletedTables, entry)
2990 1 : return true
2991 1 : }
2992 1 : return false
2993 : })
2994 : // Remove any entries from CreatedBackingTables that are not used in any
2995 : // NewFiles.
2996 1 : usedBackingFiles := make(map[base.DiskFileNum]struct{})
2997 1 : for _, e := range ve.NewTables {
2998 1 : if e.Meta.Virtual {
2999 1 : usedBackingFiles[e.Meta.TableBacking.DiskFileNum] = struct{}{}
3000 1 : }
3001 : }
3002 1 : ve.CreatedBackingTables = slices.DeleteFunc(ve.CreatedBackingTables, func(b *manifest.TableBacking) bool {
3003 1 : _, used := usedBackingFiles[b.DiskFileNum]
3004 1 : return !used
3005 1 : })
3006 : // Refresh the disk available statistic whenever a compaction/flush
3007 : // completes, before re-acquiring the mutex.
3008 1 : d.calculateDiskAvailableBytes()
3009 1 : return ve, stats, nil
3010 : }
3011 :
3012 : func (d *DB) runMoveCompaction(
3013 : jobID JobID, c *compaction,
3014 1 : ) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
3015 1 : iter := c.startLevel.files.Iter()
3016 1 : meta := iter.First()
3017 1 : if iter.Next() != nil {
3018 0 : return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
3019 0 : }
3020 1 : if c.cancel.Load() {
3021 0 : return ve, stats, ErrCancelledCompaction
3022 0 : }
3023 1 : c.metrics[c.outputLevel.level] = &LevelMetrics{
3024 1 : TableBytesMoved: meta.Size,
3025 1 : TablesMoved: 1,
3026 1 : }
3027 1 : ve = &manifest.VersionEdit{
3028 1 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
3029 1 : {Level: c.startLevel.level, FileNum: meta.TableNum}: meta,
3030 1 : },
3031 1 : NewTables: []manifest.NewTableEntry{
3032 1 : {Level: c.outputLevel.level, Meta: meta},
3033 1 : },
3034 1 : }
3035 1 :
3036 1 : return ve, stats, nil
3037 : }
3038 :
3039 : // runCompaction runs a compaction that produces new on-disk tables from
3040 : // memtables or old on-disk tables.
3041 : //
3042 : // runCompaction cannot be used for compactionKindIngestedFlushable.
3043 : //
3044 : // d.mu must be held when calling this, but the mutex may be dropped and
3045 : // re-acquired during the course of this method.
3046 : func (d *DB) runCompaction(
3047 : jobID JobID, c *compaction,
3048 1 : ) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3049 1 : if c.cancel.Load() {
3050 1 : return ve, stats, ErrCancelledCompaction
3051 1 : }
3052 1 : switch c.kind {
3053 1 : case compactionKindDeleteOnly:
3054 1 : // Before dropping the db mutex, grab a ref to the current version. This
3055 1 : // prevents any concurrent excises from deleting files that this compaction
3056 1 : // needs to read/maintain a reference to.
3057 1 : //
3058 1 : // Note that delete-only compactions can call excise(), which needs to be able
3059 1 : // to read these files.
3060 1 : vers := d.mu.versions.currentVersion()
3061 1 : vers.Ref()
3062 1 : defer vers.UnrefLocked()
3063 1 : // Release the d.mu lock while doing I/O.
3064 1 : // Note the unusual order: Unlock and then Lock.
3065 1 : snapshots := d.mu.snapshots.toSlice()
3066 1 : d.mu.Unlock()
3067 1 : defer d.mu.Lock()
3068 1 : return d.runDeleteOnlyCompaction(jobID, c, snapshots)
3069 1 : case compactionKindMove:
3070 1 : return d.runMoveCompaction(jobID, c)
3071 1 : case compactionKindCopy:
3072 1 : return d.runCopyCompaction(jobID, c)
3073 0 : case compactionKindIngestedFlushable:
3074 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
3075 : }
3076 :
3077 1 : snapshots := d.mu.snapshots.toSlice()
3078 1 :
3079 1 : if c.flushing == nil {
3080 1 : // Before dropping the db mutex, grab a ref to the current version. This
3081 1 : // prevents any concurrent excises from deleting files that this compaction
3082 1 : // needs to read/maintain a reference to.
3083 1 : //
3084 1 : // Note that unlike user iterators, compactionIter does not maintain a ref
3085 1 : // of the version or read state.
3086 1 : vers := d.mu.versions.currentVersion()
3087 1 : vers.Ref()
3088 1 : defer vers.UnrefLocked()
3089 1 : }
3090 :
3091 : // Release the d.mu lock while doing I/O.
3092 : // Note the unusual order: Unlock and then Lock.
3093 1 : d.mu.Unlock()
3094 1 : defer d.mu.Lock()
3095 1 :
3096 1 : // Determine whether we should separate values into blob files.
3097 1 : //
3098 1 : // TODO(jackson): Currently we never separate values in non-tests. Choose
3099 1 : // and initialize the appropriate ValueSeparation implementation based on
3100 1 : // Options and the compaction inputs.
3101 1 : valueSeparation := c.getValueSeparation(jobID, c, c.tableFormat)
3102 1 :
3103 1 : result := d.compactAndWrite(jobID, c, snapshots, c.tableFormat, valueSeparation)
3104 1 : if result.Err == nil {
3105 1 : ve, result.Err = c.makeVersionEdit(result)
3106 1 : }
3107 1 : if result.Err != nil {
3108 1 : // Delete any created tables or blob files.
3109 1 : obsoleteFiles := manifest.ObsoleteFiles{
3110 1 : TableBackings: make([]*manifest.TableBacking, 0, len(result.Tables)),
3111 1 : BlobFiles: make([]*manifest.PhysicalBlobFile, 0, len(result.Blobs)),
3112 1 : }
3113 1 : d.mu.Lock()
3114 1 : for i := range result.Tables {
3115 1 : backing := &manifest.TableBacking{
3116 1 : DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
3117 1 : Size: result.Tables[i].WriterMeta.Size,
3118 1 : }
3119 1 : obsoleteFiles.AddBacking(backing)
3120 1 : // Add this file to zombie tables as well, as the versionSet
3121 1 : // asserts on whether every obsolete file was at one point
3122 1 : // marked zombie.
3123 1 : d.mu.versions.zombieTables.AddMetadata(&result.Tables[i].ObjMeta, backing.Size)
3124 1 : }
3125 1 : for i := range result.Blobs {
3126 0 : obsoleteFiles.AddBlob(result.Blobs[i].Metadata)
3127 0 : // Add this file to zombie blobs as well, as the versionSet
3128 0 : // asserts on whether every obsolete file was at one point
3129 0 : // marked zombie.
3130 0 : d.mu.versions.zombieBlobs.AddMetadata(&result.Blobs[i].ObjMeta, result.Blobs[i].Metadata.Size)
3131 0 : }
3132 1 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
3133 1 : d.mu.Unlock()
3134 : }
3135 : // Refresh the disk available statistic whenever a compaction/flush
3136 : // completes, before re-acquiring the mutex.
3137 1 : d.calculateDiskAvailableBytes()
3138 1 : return ve, result.Stats, result.Err
3139 : }
3140 :
3141 : // compactAndWrite runs the data part of a compaction, where we set up a
3142 : // compaction iterator and use it to write output tables.
3143 : func (d *DB) compactAndWrite(
3144 : jobID JobID,
3145 : c *compaction,
3146 : snapshots compact.Snapshots,
3147 : tableFormat sstable.TableFormat,
3148 : valueSeparation compact.ValueSeparation,
3149 1 : ) (result compact.Result) {
3150 1 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
3151 1 : // block cache with blocks that will not be read again. We initialize the
3152 1 : // buffer pool with a size 12. This initial size does not need to be
3153 1 : // accurate, because the pool will grow to accommodate the maximum number of
3154 1 : // blocks allocated at a given time over the course of the compaction. But
3155 1 : // choosing a size larger than that working set avoids any additional
3156 1 : // allocations to grow the size of the pool over the course of iteration.
3157 1 : //
3158 1 : // Justification for initial size 12: In a two-level compaction, at any
3159 1 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
3160 1 : // Additionally, when decoding a compressed block, we'll temporarily
3161 1 : // allocate 1 additional block to hold the compressed buffer. In the worst
3162 1 : // case that all input sstables have two-level index blocks (+2), value
3163 1 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
3164 1 : // additionally require 2n+4 blocks where n is the number of input sstables.
3165 1 : // Range deletion and range key blocks are relatively rare, and the cost of
3166 1 : // an additional allocation or two over the course of the compaction is
3167 1 : // considered to be okay. A larger initial size would cause the pool to hold
3168 1 : // on to more memory, even when it's not in-use because the pool will
3169 1 : // recycle buffers up to the current capacity of the pool. The memory use of
3170 1 : // a 12-buffer pool is expected to be within reason, even if all the buffers
3171 1 : // grow to the typical size of an index block (256 KiB) which would
3172 1 : // translate to 3 MiB per compaction.
3173 1 : c.bufferPool.Init(12)
3174 1 : defer c.bufferPool.Release()
3175 1 : blockReadEnv := block.ReadEnv{
3176 1 : BufferPool: &c.bufferPool,
3177 1 : Stats: &c.stats,
3178 1 : IterStats: d.fileCache.SSTStatsCollector().Accumulator(
3179 1 : uint64(uintptr(unsafe.Pointer(c))),
3180 1 : categoryCompaction,
3181 1 : ),
3182 1 : }
3183 1 : c.valueFetcher.Init(d.fileCache, blockReadEnv)
3184 1 : iiopts := internalIterOpts{
3185 1 : compaction: true,
3186 1 : readEnv: sstable.ReadEnv{Block: blockReadEnv},
3187 1 : blobValueFetcher: &c.valueFetcher,
3188 1 : }
3189 1 : defer func() { _ = c.valueFetcher.Close() }()
3190 :
3191 1 : pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter, iiopts)
3192 1 : defer func() {
3193 1 : for _, closer := range c.closers {
3194 1 : closer.FragmentIterator.Close()
3195 1 : }
3196 : }()
3197 1 : if err != nil {
3198 1 : return compact.Result{Err: err}
3199 1 : }
3200 1 : c.allowedZeroSeqNum = c.allowZeroSeqNum()
3201 1 : cfg := compact.IterConfig{
3202 1 : Comparer: c.comparer,
3203 1 : Merge: d.merge,
3204 1 : TombstoneElision: c.delElision,
3205 1 : RangeKeyElision: c.rangeKeyElision,
3206 1 : Snapshots: snapshots,
3207 1 : AllowZeroSeqNum: c.allowedZeroSeqNum,
3208 1 : IneffectualSingleDeleteCallback: func(userKey []byte) {
3209 1 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3210 1 : Kind: IneffectualSingleDelete,
3211 1 : UserKey: slices.Clone(userKey),
3212 1 : })
3213 1 : },
3214 0 : NondeterministicSingleDeleteCallback: func(userKey []byte) {
3215 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3216 0 : Kind: NondeterministicSingleDelete,
3217 0 : UserKey: slices.Clone(userKey),
3218 0 : })
3219 0 : },
3220 1 : MissizedDeleteCallback: func(userKey []byte, elidedSize, expectedSize uint64) {
3221 1 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3222 1 : Kind: MissizedDelete,
3223 1 : UserKey: slices.Clone(userKey),
3224 1 : ExtraInfo: fmt.Sprintf("elidedSize=%d,expectedSize=%d", elidedSize, expectedSize),
3225 1 : })
3226 1 : },
3227 : }
3228 1 : iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
3229 1 :
3230 1 : runnerCfg := compact.RunnerConfig{
3231 1 : CompactionBounds: base.UserKeyBoundsFromInternal(c.smallest, c.largest),
3232 1 : L0SplitKeys: c.l0Limits,
3233 1 : Grandparents: c.grandparents,
3234 1 : MaxGrandparentOverlapBytes: c.maxOverlapBytes,
3235 1 : TargetOutputFileSize: c.maxOutputFileSize,
3236 1 : GrantHandle: c.grantHandle,
3237 1 : }
3238 1 : runner := compact.NewRunner(runnerCfg, iter)
3239 1 :
3240 1 : var spanPolicyValid bool
3241 1 : var spanPolicy SpanPolicy
3242 1 : // If spanPolicyValid is true and spanPolicyEndKey is empty, then spanPolicy
3243 1 : // applies for the rest of the keyspace.
3244 1 : var spanPolicyEndKey []byte
3245 1 :
3246 1 : for runner.MoreDataToWrite() {
3247 1 : if c.cancel.Load() {
3248 0 : return runner.Finish().WithError(ErrCancelledCompaction)
3249 0 : }
3250 : // Create a new table.
3251 1 : firstKey := runner.FirstKey()
3252 1 : if !spanPolicyValid || (len(spanPolicyEndKey) > 0 && d.cmp(firstKey, spanPolicyEndKey) >= 0) {
3253 1 : var err error
3254 1 : spanPolicy, spanPolicyEndKey, err = d.opts.Experimental.SpanPolicyFunc(firstKey)
3255 1 : if err != nil {
3256 0 : return runner.Finish().WithError(err)
3257 0 : }
3258 1 : spanPolicyValid = true
3259 : }
3260 :
3261 1 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3262 1 : if spanPolicy.DisableValueSeparationBySuffix {
3263 1 : writerOpts.DisableValueBlocks = true
3264 1 : }
3265 1 : vSep := valueSeparation
3266 1 : if spanPolicy.ValueStoragePolicy == ValueStorageLowReadLatency {
3267 1 : vSep = compact.NeverSeparateValues{}
3268 1 : }
3269 1 : objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)
3270 1 : if err != nil {
3271 1 : return runner.Finish().WithError(err)
3272 1 : }
3273 1 : runner.WriteTable(objMeta, tw, spanPolicyEndKey, vSep)
3274 : }
3275 1 : result = runner.Finish()
3276 1 : if result.Err == nil {
3277 1 : result.Err = d.objProvider.Sync()
3278 1 : }
3279 1 : return result
3280 : }
3281 :
3282 : // makeVersionEdit creates the version edit for a compaction, based on the
3283 : // tables in compact.Result.
3284 1 : func (c *compaction) makeVersionEdit(result compact.Result) (*manifest.VersionEdit, error) {
3285 1 : ve := &manifest.VersionEdit{
3286 1 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
3287 1 : }
3288 1 : for _, cl := range c.inputs {
3289 1 : for f := range cl.files.All() {
3290 1 : ve.DeletedTables[manifest.DeletedTableEntry{
3291 1 : Level: cl.level,
3292 1 : FileNum: f.TableNum,
3293 1 : }] = f
3294 1 : }
3295 : }
3296 : // Add any newly constructed blob files to the version edit.
3297 1 : ve.NewBlobFiles = make([]manifest.BlobFileMetadata, len(result.Blobs))
3298 1 : for i := range result.Blobs {
3299 1 : ve.NewBlobFiles[i] = manifest.BlobFileMetadata{
3300 1 : FileID: base.BlobFileID(result.Blobs[i].Metadata.FileNum),
3301 1 : Physical: result.Blobs[i].Metadata,
3302 1 : }
3303 1 : }
3304 :
3305 1 : startLevelBytes := c.startLevel.files.TableSizeSum()
3306 1 : outputMetrics := &LevelMetrics{
3307 1 : TableBytesIn: startLevelBytes,
3308 1 : // TODO(jackson): This BytesRead value does not include any blob files
3309 1 : // written. It either should, or we should add a separate metric.
3310 1 : TableBytesRead: c.outputLevel.files.TableSizeSum(),
3311 1 : BlobBytesCompacted: result.Stats.CumulativeBlobFileSize,
3312 1 : }
3313 1 : if c.flushing != nil {
3314 1 : outputMetrics.BlobBytesFlushed = result.Stats.CumulativeBlobFileSize
3315 1 : }
3316 1 : if len(c.extraLevels) > 0 {
3317 1 : outputMetrics.TableBytesIn += c.extraLevels[0].files.TableSizeSum()
3318 1 : }
3319 1 : outputMetrics.TableBytesRead += outputMetrics.TableBytesIn
3320 1 :
3321 1 : c.metrics[c.outputLevel.level] = outputMetrics
3322 1 : if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
3323 1 : c.metrics[c.startLevel.level] = &LevelMetrics{}
3324 1 : }
3325 1 : if len(c.extraLevels) > 0 {
3326 1 : c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
3327 1 : outputMetrics.MultiLevel.TableBytesInTop = startLevelBytes
3328 1 : outputMetrics.MultiLevel.TableBytesIn = outputMetrics.TableBytesIn
3329 1 : outputMetrics.MultiLevel.TableBytesRead = outputMetrics.TableBytesRead
3330 1 : }
3331 :
3332 1 : inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
3333 1 : ve.NewTables = make([]manifest.NewTableEntry, len(result.Tables))
3334 1 : for i := range result.Tables {
3335 1 : t := &result.Tables[i]
3336 1 :
3337 1 : if t.WriterMeta.Properties.NumValuesInBlobFiles > 0 {
3338 1 : if len(t.BlobReferences) == 0 {
3339 0 : return nil, base.AssertionFailedf("num values in blob files %d but no blob references",
3340 0 : t.WriterMeta.Properties.NumValuesInBlobFiles)
3341 0 : }
3342 : }
3343 :
3344 1 : fileMeta := &manifest.TableMetadata{
3345 1 : TableNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
3346 1 : CreationTime: t.CreationTime.Unix(),
3347 1 : Size: t.WriterMeta.Size,
3348 1 : SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
3349 1 : LargestSeqNum: t.WriterMeta.LargestSeqNum,
3350 1 : BlobReferences: t.BlobReferences,
3351 1 : BlobReferenceDepth: t.BlobReferenceDepth,
3352 1 : }
3353 1 : if c.flushing == nil {
3354 1 : // Set the file's LargestSeqNumAbsolute to be the maximum value of any
3355 1 : // of the compaction's input sstables.
3356 1 : // TODO(jackson): This could be narrowed to be the maximum of input
3357 1 : // sstables that overlap the output sstable's key range.
3358 1 : fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
3359 1 : } else {
3360 1 : fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
3361 1 : }
3362 1 : fileMeta.InitPhysicalBacking()
3363 1 :
3364 1 : // If the file didn't contain any range deletions, we can fill its
3365 1 : // table stats now, avoiding unnecessarily loading the table later.
3366 1 : maybeSetStatsFromProperties(
3367 1 : fileMeta.PhysicalMeta(), &t.WriterMeta.Properties.CommonProperties, c.logger,
3368 1 : )
3369 1 :
3370 1 : if t.WriterMeta.HasPointKeys {
3371 1 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestPoint, t.WriterMeta.LargestPoint)
3372 1 : }
3373 1 : if t.WriterMeta.HasRangeDelKeys {
3374 1 : fileMeta.ExtendPointKeyBounds(c.cmp, t.WriterMeta.SmallestRangeDel, t.WriterMeta.LargestRangeDel)
3375 1 : }
3376 1 : if t.WriterMeta.HasRangeKeys {
3377 1 : fileMeta.ExtendRangeKeyBounds(c.cmp, t.WriterMeta.SmallestRangeKey, t.WriterMeta.LargestRangeKey)
3378 1 : }
3379 :
3380 1 : ve.NewTables[i] = manifest.NewTableEntry{
3381 1 : Level: c.outputLevel.level,
3382 1 : Meta: fileMeta,
3383 1 : }
3384 1 :
3385 1 : // Update metrics.
3386 1 : if c.flushing == nil {
3387 1 : outputMetrics.TablesCompacted++
3388 1 : outputMetrics.TableBytesCompacted += fileMeta.Size
3389 1 : } else {
3390 1 : outputMetrics.TablesFlushed++
3391 1 : outputMetrics.TableBytesFlushed += fileMeta.Size
3392 1 : }
3393 1 : outputMetrics.EstimatedReferencesSize += fileMeta.EstimatedReferenceSize()
3394 1 : outputMetrics.BlobBytesReadEstimate += fileMeta.EstimatedReferenceSize()
3395 1 : outputMetrics.TablesSize += int64(fileMeta.Size)
3396 1 : outputMetrics.TablesCount++
3397 1 : outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
3398 1 : outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
3399 : }
3400 :
3401 : // Sanity check that the tables are ordered and don't overlap.
3402 1 : for i := 1; i < len(ve.NewTables); i++ {
3403 1 : if ve.NewTables[i-1].Meta.UserKeyBounds().End.IsUpperBoundFor(c.cmp, ve.NewTables[i].Meta.Smallest().UserKey) {
3404 0 : return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
3405 0 : ve.NewTables[i-1].Meta.DebugString(c.formatKey, true),
3406 0 : ve.NewTables[i].Meta.DebugString(c.formatKey, true),
3407 0 : )
3408 0 : }
3409 : }
3410 :
3411 1 : return ve, nil
3412 : }
3413 :
3414 : // newCompactionOutputTable creates an object for a new table produced by a
3415 : // compaction or flush.
3416 : func (d *DB) newCompactionOutputTable(
3417 : jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
3418 1 : ) (objstorage.ObjectMetadata, sstable.RawWriter, error) {
3419 1 : writable, objMeta, err := d.newCompactionOutputObj(c, base.FileTypeTable)
3420 1 : if err != nil {
3421 1 : return objstorage.ObjectMetadata{}, nil, err
3422 1 : }
3423 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
3424 1 : JobID: int(jobID),
3425 1 : Reason: c.kind.compactingOrFlushing(),
3426 1 : Path: d.objProvider.Path(objMeta),
3427 1 : FileNum: objMeta.DiskFileNum,
3428 1 : })
3429 1 : writerOpts.SetInternal(sstableinternal.WriterOptions{
3430 1 : CacheOpts: sstableinternal.CacheOptions{
3431 1 : CacheHandle: d.cacheHandle,
3432 1 : FileNum: objMeta.DiskFileNum,
3433 1 : },
3434 1 : })
3435 1 : tw := sstable.NewRawWriterWithCPUMeasurer(writable, writerOpts, c.grantHandle)
3436 1 : return objMeta, tw, nil
3437 : }
3438 :
3439 : // newCompactionOutputBlob creates an object for a new blob produced by a
3440 : // compaction or flush.
3441 : func (d *DB) newCompactionOutputBlob(
3442 : jobID JobID, c *compaction,
3443 1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
3444 1 : writable, objMeta, err := d.newCompactionOutputObj(c, base.FileTypeBlob)
3445 1 : if err != nil {
3446 0 : return nil, objstorage.ObjectMetadata{}, err
3447 0 : }
3448 1 : d.opts.EventListener.BlobFileCreated(BlobFileCreateInfo{
3449 1 : JobID: int(jobID),
3450 1 : Reason: c.kind.compactingOrFlushing(),
3451 1 : Path: d.objProvider.Path(objMeta),
3452 1 : FileNum: objMeta.DiskFileNum,
3453 1 : })
3454 1 : return writable, objMeta, nil
3455 : }
3456 :
3457 : // newCompactionOutputObj creates an object produced by a compaction or flush.
3458 : func (d *DB) newCompactionOutputObj(
3459 : c *compaction, typ base.FileType,
3460 1 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
3461 1 : diskFileNum := d.mu.versions.getNextDiskFileNum()
3462 1 : ctx := context.TODO()
3463 1 :
3464 1 : if objiotracing.Enabled {
3465 0 : ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
3466 0 : if c.kind == compactionKindFlush {
3467 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
3468 0 : } else {
3469 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
3470 0 : }
3471 : }
3472 :
3473 1 : writable, objMeta, err := d.objProvider.Create(ctx, typ, diskFileNum, c.objCreateOpts)
3474 1 : if err != nil {
3475 1 : return nil, objstorage.ObjectMetadata{}, err
3476 1 : }
3477 :
3478 1 : if c.kind != compactionKindFlush {
3479 1 : writable = &compactionWritable{
3480 1 : Writable: writable,
3481 1 : versions: d.mu.versions,
3482 1 : written: &c.bytesWritten,
3483 1 : }
3484 1 : }
3485 1 : return writable, objMeta, nil
3486 : }
3487 :
3488 : // validateVersionEdit validates that start and end keys across new and deleted
3489 : // files in a versionEdit pass the given validation function.
3490 : func validateVersionEdit(
3491 : ve *manifest.VersionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
3492 1 : ) {
3493 1 : validateKey := func(f *manifest.TableMetadata, key []byte) {
3494 1 : if err := vk.Validate(key); err != nil {
3495 1 : logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
3496 1 : }
3497 : }
3498 :
3499 : // Validate both new and deleted files.
3500 1 : for _, f := range ve.NewTables {
3501 1 : validateKey(f.Meta, f.Meta.Smallest().UserKey)
3502 1 : validateKey(f.Meta, f.Meta.Largest().UserKey)
3503 1 : }
3504 1 : for _, m := range ve.DeletedTables {
3505 1 : validateKey(m, m.Smallest().UserKey)
3506 1 : validateKey(m, m.Largest().UserKey)
3507 1 : }
3508 : }
3509 :
3510 1 : func getDiskWriteCategoryForCompaction(opts *Options, kind compactionKind) vfs.DiskWriteCategory {
3511 1 : if opts.EnableSQLRowSpillMetrics {
3512 0 : // In the scenario that the Pebble engine is used for SQL row spills the
3513 0 : // data written to the memtable will correspond to spills to disk and
3514 0 : // should be categorized as such.
3515 0 : return "sql-row-spill"
3516 1 : } else if kind == compactionKindFlush {
3517 1 : return "pebble-memtable-flush"
3518 1 : } else {
3519 1 : return "pebble-compaction"
3520 1 : }
3521 : }
|