Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : stdcmp "cmp"
10 : "context"
11 : "fmt"
12 : "iter"
13 : "maps"
14 : "math"
15 : "runtime/pprof"
16 : "slices"
17 : "sort"
18 : "sync/atomic"
19 : "time"
20 : "unsafe"
21 :
22 : "github.com/cockroachdb/crlib/crtime"
23 : "github.com/cockroachdb/errors"
24 : "github.com/cockroachdb/pebble/internal/base"
25 : "github.com/cockroachdb/pebble/internal/compact"
26 : "github.com/cockroachdb/pebble/internal/keyspan"
27 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
28 : "github.com/cockroachdb/pebble/internal/manifest"
29 : "github.com/cockroachdb/pebble/internal/problemspans"
30 : "github.com/cockroachdb/pebble/internal/sstableinternal"
31 : "github.com/cockroachdb/pebble/objstorage"
32 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
33 : "github.com/cockroachdb/pebble/objstorage/remote"
34 : "github.com/cockroachdb/pebble/sstable"
35 : "github.com/cockroachdb/pebble/sstable/blob"
36 : "github.com/cockroachdb/pebble/sstable/block"
37 : "github.com/cockroachdb/pebble/sstable/block/blockkind"
38 : "github.com/cockroachdb/pebble/vfs"
39 : "github.com/cockroachdb/redact"
40 : )
41 :
42 : var errEmptyTable = errors.New("pebble: empty table")
43 :
44 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
45 : // concurrent excise or ingest-split operation.
46 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
47 :
48 : var flushLabels = pprof.Labels("pebble", "flush", "output-level", "L0")
49 : var gcLabels = pprof.Labels("pebble", "gc")
50 :
51 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
52 : // compacted files. We avoid expanding the lower level file set of a compaction
53 : // if it would make the total compaction cover more than this many bytes.
54 : func expandedCompactionByteSizeLimit(
55 : opts *Options, targetFileSize int64, availBytes uint64,
56 2 : ) uint64 {
57 2 : v := uint64(25 * targetFileSize)
58 2 :
59 2 : // Never expand a compaction beyond half the available capacity, divided
60 2 : // by the maximum number of concurrent compactions. Each of the concurrent
61 2 : // compactions may expand up to this limit, so this attempts to limit
62 2 : // compactions to half of available disk space. Note that this will not
63 2 : // prevent compaction picking from pursuing compactions that are larger
64 2 : // than this threshold before expansion.
65 2 : //
66 2 : // NB: this heuristic is an approximation since we may run more compactions
67 2 : // than the upper concurrency limit.
68 2 : _, maxConcurrency := opts.CompactionConcurrencyRange()
69 2 : diskMax := (availBytes / 2) / uint64(maxConcurrency)
70 2 : if v > diskMax {
71 1 : v = diskMax
72 1 : }
73 2 : return v
74 : }
75 :
76 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
77 : // before we stop building a single file in a level-1 to level compaction.
78 2 : func maxGrandparentOverlapBytes(targetFileSize int64) uint64 {
79 2 : return uint64(10 * targetFileSize)
80 2 : }
81 :
82 : // maxReadCompactionBytes is used to prevent read compactions which
83 : // are too wide.
84 2 : func maxReadCompactionBytes(targetFileSize int64) uint64 {
85 2 : return uint64(10 * targetFileSize)
86 2 : }
87 :
88 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
89 : // calls to Close. It is used during compaction to ensure that rangeDelIters
90 : // are not closed prematurely.
91 : type noCloseIter struct {
92 : keyspan.FragmentIterator
93 : }
94 :
95 2 : func (i *noCloseIter) Close() {}
96 :
97 : type compactionLevel struct {
98 : level int
99 : files manifest.LevelSlice
100 : // l0SublevelInfo contains information about L0 sublevels being compacted.
101 : // It's only set for the start level of a compaction starting out of L0 and
102 : // is nil for all other compactions.
103 : l0SublevelInfo []sublevelInfo
104 : }
105 :
106 2 : func (cl compactionLevel) Clone() compactionLevel {
107 2 : newCL := compactionLevel{
108 2 : level: cl.level,
109 2 : files: cl.files,
110 2 : }
111 2 : return newCL
112 2 : }
113 1 : func (cl compactionLevel) String() string {
114 1 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
115 1 : }
116 :
117 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
118 : // updates a metric in `versions` on bytes written by in-progress compactions so
119 : // far. It also increments a per-compaction `written` atomic int.
120 : type compactionWritable struct {
121 : objstorage.Writable
122 :
123 : versions *versionSet
124 : written *atomic.Int64
125 : }
126 :
127 : // Write is part of the objstorage.Writable interface.
128 2 : func (c *compactionWritable) Write(p []byte) error {
129 2 : if err := c.Writable.Write(p); err != nil {
130 0 : return err
131 0 : }
132 :
133 2 : c.written.Add(int64(len(p)))
134 2 : c.versions.incrementCompactionBytes(int64(len(p)))
135 2 : return nil
136 : }
137 :
138 : type compactionKind int
139 :
140 : const (
141 : compactionKindDefault compactionKind = iota
142 : compactionKindFlush
143 : // compactionKindMove denotes a move compaction where the input file is
144 : // retained and linked in a new level without being obsoleted.
145 : compactionKindMove
146 : // compactionKindCopy denotes a copy compaction where the input file is
147 : // copied byte-by-byte into a new file with a new TableNum in the output level.
148 : compactionKindCopy
149 : // compactionKindDeleteOnly denotes a compaction that only deletes input
150 : // files. It can occur when wide range tombstones completely contain sstables.
151 : compactionKindDeleteOnly
152 : compactionKindElisionOnly
153 : compactionKindRead
154 : compactionKindTombstoneDensity
155 : compactionKindRewrite
156 : compactionKindIngestedFlushable
157 : compactionKindBlobFileRewrite
158 : )
159 :
160 2 : func (k compactionKind) String() string {
161 2 : switch k {
162 2 : case compactionKindDefault:
163 2 : return "default"
164 0 : case compactionKindFlush:
165 0 : return "flush"
166 2 : case compactionKindMove:
167 2 : return "move"
168 2 : case compactionKindDeleteOnly:
169 2 : return "delete-only"
170 2 : case compactionKindElisionOnly:
171 2 : return "elision-only"
172 1 : case compactionKindRead:
173 1 : return "read"
174 2 : case compactionKindTombstoneDensity:
175 2 : return "tombstone-density"
176 2 : case compactionKindRewrite:
177 2 : return "rewrite"
178 0 : case compactionKindIngestedFlushable:
179 0 : return "ingested-flushable"
180 2 : case compactionKindCopy:
181 2 : return "copy"
182 0 : case compactionKindBlobFileRewrite:
183 0 : return "blob-file-rewrite"
184 : }
185 0 : return "?"
186 : }
187 :
188 : // compactingOrFlushing returns "flushing" if the compaction kind is a flush,
189 : // otherwise it returns "compacting".
190 2 : func (k compactionKind) compactingOrFlushing() string {
191 2 : if k == compactionKindFlush {
192 2 : return "flushing"
193 2 : }
194 2 : return "compacting"
195 : }
196 :
197 : type compaction interface {
198 : AddInProgressLocked(*DB)
199 : BeganAt() time.Time
200 : Bounds() *base.UserKeyBounds
201 : Cancel()
202 : Execute(JobID, *DB) error
203 : GrantHandle() CompactionGrantHandle
204 : Info() compactionInfo
205 : IsDownload() bool
206 : IsFlush() bool
207 : PprofLabels(UserKeyCategories) pprof.LabelSet
208 : RecordError(*problemspans.ByLevel, error)
209 : Tables() iter.Seq2[int, *manifest.TableMetadata]
210 : VersionEditApplied() bool
211 : }
212 :
213 : // tableCompaction is a table compaction from one level to the next, starting
214 : // from a given version. It implements the compaction interface.
215 : type tableCompaction struct {
216 : // cancel is a bool that can be used by other goroutines to signal a compaction
217 : // to cancel, such as if a conflicting excise operation raced it to manifest
218 : // application. Only holders of the manifest lock will write to this atomic.
219 : cancel atomic.Bool
220 : // kind indicates the kind of compaction. Different compaction kinds have
221 : // different semantics and mechanics. Some may have additional fields.
222 : kind compactionKind
223 : // isDownload is true if this compaction was started as part of a Download
224 : // operation. In this case kind is compactionKindCopy or
225 : // compactionKindRewrite.
226 : isDownload bool
227 :
228 : comparer *base.Comparer
229 : logger Logger
230 : version *manifest.Version
231 : // versionEditApplied is set to true when a compaction has completed and the
232 : // resulting version has been installed (if successful), but the compaction
233 : // goroutine is still cleaning up (eg, deleting obsolete files).
234 : versionEditApplied bool
235 : // getValueSeparation constructs a compact.ValueSeparation for use in a
236 : // compaction. It implements heuristics around choosing whether a compaction
237 : // should:
238 : //
239 : // a) preserve existing blob references: The compaction does not write any
240 : // new blob files, but propagates existing references to blob files.This
241 : // conserves write bandwidth by avoiding rewriting the referenced values. It
242 : // also reduces the locality of the referenced values which can reduce scan
243 : // performance because a scan must load values from more unique blob files.
244 : // It can also delay reclamation of disk space if some of the references to
245 : // blob values are elided by the compaction, increasing space amplification.
246 : //
247 : // b) rewrite blob files: The compaction will write eligible values to new
248 : // blob files. This consumes more write bandwidth because all values are
249 : // rewritten. However it restores locality.
250 : getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
251 :
252 : // startLevel is the level that is being compacted. Inputs from startLevel
253 : // and outputLevel will be merged to produce a set of outputLevel files.
254 : startLevel *compactionLevel
255 :
256 : // outputLevel is the level that files are being produced in. outputLevel is
257 : // equal to startLevel+1 except when:
258 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
259 : // - in multilevel compaction, the output level is the lowest level involved in
260 : // the compaction
261 : // A compaction's outputLevel is nil for delete-only compactions.
262 : outputLevel *compactionLevel
263 :
264 : // extraLevels point to additional levels in between the input and output
265 : // levels that get compacted in multilevel compactions
266 : extraLevels []*compactionLevel
267 :
268 : inputs []compactionLevel
269 :
270 : // maxOutputFileSize is the maximum size of an individual table created
271 : // during compaction.
272 : maxOutputFileSize uint64
273 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
274 : // single output table with the tables in the grandparent level.
275 : maxOverlapBytes uint64
276 :
277 : // The boundaries of the input data.
278 : bounds base.UserKeyBounds
279 :
280 : // grandparents are the tables in level+2 that overlap with the files being
281 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
282 : // in the grandparent when this compaction finishes will be the same.
283 : grandparents manifest.LevelSlice
284 :
285 : delElision compact.TombstoneElision
286 : rangeKeyElision compact.TombstoneElision
287 :
288 : // deleteOnly contains information specific to compactions with kind
289 : // compactionKindDeleteOnly. A delete-only compaction is a special
290 : // compaction that does not merge or write sstables. Instead, it only
291 : // performs deletions either through removing whole sstables from the LSM or
292 : // virtualizing them into virtual sstables.
293 : deleteOnly struct {
294 : // hints are collected by the table stats collector and describe range
295 : // deletions and the files containing keys deleted by them.
296 : hints []deleteCompactionHint
297 : // exciseEnabled is set to true if this compaction is allowed to excise
298 : // files. If false, the compaction will only remove whole sstables that
299 : // are wholly contained within the bounds of range deletions.
300 : exciseEnabled bool
301 : }
302 : // flush contains information specific to flushes (compactionKindFlush and
303 : // compactionKindIngestedFlushable). A flush is modeled by a compaction
304 : // because it has similar mechanics to a default compaction.
305 : flush struct {
306 : // flushables contains the flushables (aka memtables, large batches,
307 : // flushable ingestions, etc) that are being flushed.
308 : flushables flushableList
309 : // Boundaries at which sstables flushed to L0 should be split.
310 : // Determined by L0Sublevels. If nil, ignored.
311 : l0Limits [][]byte
312 : }
313 : // iterationState contains state used during compaction iteration.
314 : iterationState struct {
315 : // bufferPool is a pool of buffers used when reading blocks. Compactions
316 : // do not populate the block cache under the assumption that the blocks
317 : // we read will soon be irrelevant when their containing sstables are
318 : // removed from the LSM.
319 : bufferPool sstable.BufferPool
320 : // keyspanIterClosers is a list of fragment iterators to close when the
321 : // compaction finishes. As iteration opens new keyspan iterators,
322 : // elements are appended. Keyspan iterators must remain open for the
323 : // lifetime of the compaction, so they're accumulated here. When the
324 : // compaction finishes, all the underlying keyspan iterators are closed.
325 : keyspanIterClosers []*noCloseIter
326 : // valueFetcher is used to fetch values from blob files. It's propagated
327 : // down the iterator tree through the internal iterator options.
328 : valueFetcher blob.ValueFetcher
329 : }
330 : // metrics encapsulates various metrics collected during a compaction.
331 : metrics compactionMetrics
332 :
333 : grantHandle CompactionGrantHandle
334 :
335 : tableFormat sstable.TableFormat
336 : objCreateOpts objstorage.CreateOptions
337 :
338 : annotations []string
339 : }
340 :
341 : // Assert that tableCompaction implements the compaction interface.
342 : var _ compaction = (*tableCompaction)(nil)
343 :
344 2 : func (c *tableCompaction) AddInProgressLocked(d *DB) {
345 2 : d.mu.compact.inProgress[c] = struct{}{}
346 2 : var isBase, isIntraL0 bool
347 2 : for _, cl := range c.inputs {
348 2 : for f := range cl.files.All() {
349 2 : if f.IsCompacting() {
350 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
351 0 : }
352 2 : f.SetCompactionState(manifest.CompactionStateCompacting)
353 2 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
354 2 : if c.outputLevel.level == 0 {
355 2 : f.IsIntraL0Compacting = true
356 2 : isIntraL0 = true
357 2 : } else {
358 2 : isBase = true
359 2 : }
360 : }
361 : }
362 : }
363 :
364 2 : if isIntraL0 || isBase {
365 2 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
366 2 : if isIntraL0 {
367 2 : l0Inputs = append(l0Inputs, c.outputLevel.files)
368 2 : }
369 2 : if err := d.mu.versions.latest.l0Organizer.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
370 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
371 0 : }
372 : }
373 : }
374 :
375 2 : func (c *tableCompaction) BeganAt() time.Time { return c.metrics.beganAt }
376 2 : func (c *tableCompaction) Bounds() *base.UserKeyBounds { return &c.bounds }
377 2 : func (c *tableCompaction) Cancel() { c.cancel.Store(true) }
378 :
379 2 : func (c *tableCompaction) Execute(jobID JobID, d *DB) error {
380 2 : c.grantHandle.Started()
381 2 : err := d.compact1(jobID, c)
382 2 : // The version stored in the compaction is ref'd when the compaction is
383 2 : // created. We're responsible for un-refing it when the compaction is
384 2 : // complete.
385 2 : if c.version != nil {
386 2 : c.version.UnrefLocked()
387 2 : }
388 2 : return err
389 : }
390 :
391 1 : func (c *tableCompaction) RecordError(problemSpans *problemspans.ByLevel, err error) {
392 1 : // Record problem spans for a short duration, unless the error is a
393 1 : // corruption.
394 1 : expiration := 30 * time.Second
395 1 : if IsCorruptionError(err) {
396 1 : // TODO(radu): ideally, we should be using the corruption reporting
397 1 : // mechanism which has a tighter span for the corruption. We would need to
398 1 : // somehow plumb the level of the file.
399 1 : expiration = 5 * time.Minute
400 1 : }
401 :
402 1 : for i := range c.inputs {
403 1 : level := c.inputs[i].level
404 1 : if level == 0 {
405 1 : // We do not set problem spans on L0, as they could block flushes.
406 1 : continue
407 : }
408 1 : it := c.inputs[i].files.Iter()
409 1 : for f := it.First(); f != nil; f = it.Next() {
410 1 : problemSpans.Add(level, f.UserKeyBounds(), expiration)
411 1 : }
412 : }
413 : }
414 :
415 2 : func (c *tableCompaction) GrantHandle() CompactionGrantHandle { return c.grantHandle }
416 2 : func (c *tableCompaction) IsDownload() bool { return c.isDownload }
417 2 : func (c *tableCompaction) IsFlush() bool { return len(c.flush.flushables) > 0 }
418 2 : func (c *tableCompaction) Info() compactionInfo {
419 2 : info := compactionInfo{
420 2 : versionEditApplied: c.versionEditApplied,
421 2 : kind: c.kind,
422 2 : inputs: c.inputs,
423 2 : bounds: &c.bounds,
424 2 : outputLevel: -1,
425 2 : }
426 2 : if c.outputLevel != nil {
427 2 : info.outputLevel = c.outputLevel.level
428 2 : }
429 2 : return info
430 : }
431 2 : func (c *tableCompaction) PprofLabels(kc UserKeyCategories) pprof.LabelSet {
432 2 : activity := "compact"
433 2 : if len(c.flush.flushables) != 0 {
434 0 : activity = "flush"
435 0 : }
436 2 : level := "L?"
437 2 : // Delete-only compactions don't have an output level.
438 2 : if c.outputLevel != nil {
439 2 : level = fmt.Sprintf("L%d", c.outputLevel.level)
440 2 : }
441 2 : if kc.Len() > 0 {
442 0 : cat := kc.CategorizeKeyRange(c.bounds.Start, c.bounds.End.Key)
443 0 : return pprof.Labels("pebble", activity, "output-level", level, "key-type", cat)
444 0 : }
445 2 : return pprof.Labels("pebble", activity, "output-level", level)
446 : }
447 :
448 2 : func (c *tableCompaction) Tables() iter.Seq2[int, *manifest.TableMetadata] {
449 2 : return func(yield func(int, *manifest.TableMetadata) bool) {
450 2 : for _, cl := range c.inputs {
451 2 : for f := range cl.files.All() {
452 2 : if !yield(cl.level, f) {
453 2 : return
454 2 : }
455 : }
456 : }
457 : }
458 : }
459 :
460 2 : func (c *tableCompaction) VersionEditApplied() bool { return c.versionEditApplied }
461 :
462 : // compactionMetrics contians metrics surrounding a compaction.
463 : type compactionMetrics struct {
464 : // beganAt is the time when the compaction began.
465 : beganAt time.Time
466 : // bytesWritten contains the number of bytes that have been written to
467 : // outputs. It's updated whenever the compaction outputs'
468 : // objstorage.Writables receive new writes. See newCompactionOutputObj.
469 : bytesWritten atomic.Int64
470 : // internalIterStats contains statistics from the internal iterators used by
471 : // the compaction.
472 : //
473 : // TODO(jackson): Use these to power the compaction BytesRead metric.
474 : internalIterStats base.InternalIteratorStats
475 : // perLevel contains metrics for each level involved in the compaction.
476 : perLevel levelMetricsDelta
477 : // picker contains metrics from the compaction picker when the compaction
478 : // was picked.
479 : picker pickedCompactionMetrics
480 : }
481 :
482 : // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
483 : // input sstables.
484 2 : func (c *tableCompaction) inputLargestSeqNumAbsolute() base.SeqNum {
485 2 : var seqNum base.SeqNum
486 2 : for _, cl := range c.inputs {
487 2 : for m := range cl.files.All() {
488 2 : seqNum = max(seqNum, m.LargestSeqNumAbsolute)
489 2 : }
490 : }
491 2 : return seqNum
492 : }
493 :
494 2 : func (c *tableCompaction) makeInfo(jobID JobID) CompactionInfo {
495 2 : info := CompactionInfo{
496 2 : JobID: int(jobID),
497 2 : Reason: c.kind.String(),
498 2 : Input: make([]LevelInfo, 0, len(c.inputs)),
499 2 : Annotations: []string{},
500 2 : }
501 2 : if c.isDownload {
502 2 : info.Reason = "download," + info.Reason
503 2 : }
504 2 : for _, cl := range c.inputs {
505 2 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
506 2 : for m := range cl.files.All() {
507 2 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
508 2 : }
509 2 : info.Input = append(info.Input, inputInfo)
510 : }
511 2 : if c.outputLevel != nil {
512 2 : info.Output.Level = c.outputLevel.level
513 2 :
514 2 : // If there are no inputs from the output level (eg, a move
515 2 : // compaction), add an empty LevelInfo to info.Input.
516 2 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
517 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
518 0 : }
519 2 : } else {
520 2 : // For a delete-only compaction, set the output level to L6. The
521 2 : // output level is not meaningful here, but complicating the
522 2 : // info.Output interface with a pointer doesn't seem worth the
523 2 : // semantic distinction.
524 2 : info.Output.Level = numLevels - 1
525 2 : }
526 :
527 2 : for i, score := range c.metrics.picker.scores {
528 2 : info.Input[i].Score = score
529 2 : }
530 2 : info.SingleLevelOverlappingRatio = c.metrics.picker.singleLevelOverlappingRatio
531 2 : info.MultiLevelOverlappingRatio = c.metrics.picker.multiLevelOverlappingRatio
532 2 : if len(info.Input) > 2 {
533 2 : info.Annotations = append(info.Annotations, "multilevel")
534 2 : }
535 2 : return info
536 : }
537 :
538 : type getValueSeparation func(JobID, *tableCompaction, sstable.TableFormat) compact.ValueSeparation
539 :
540 : // newCompaction constructs a compaction from the provided picked compaction.
541 : //
542 : // The compaction is created with a reference to its version that must be
543 : // released when the compaction is complete.
544 : func newCompaction(
545 : pc *pickedTableCompaction,
546 : opts *Options,
547 : beganAt time.Time,
548 : provider objstorage.Provider,
549 : grantHandle CompactionGrantHandle,
550 : tableFormat sstable.TableFormat,
551 : getValueSeparation getValueSeparation,
552 2 : ) *tableCompaction {
553 2 : c := &tableCompaction{
554 2 : kind: compactionKindDefault,
555 2 : comparer: opts.Comparer,
556 2 : inputs: pc.inputs,
557 2 : bounds: pc.bounds,
558 2 : logger: opts.Logger,
559 2 : version: pc.version,
560 2 : getValueSeparation: getValueSeparation,
561 2 : maxOutputFileSize: pc.maxOutputFileSize,
562 2 : maxOverlapBytes: pc.maxOverlapBytes,
563 2 : metrics: compactionMetrics{
564 2 : beganAt: beganAt,
565 2 : picker: pc.pickerMetrics,
566 2 : },
567 2 : grantHandle: grantHandle,
568 2 : tableFormat: tableFormat,
569 2 : }
570 2 : // Acquire a reference to the version to ensure that files and in-memory
571 2 : // version state necessary for reading files remain available. Ignoring
572 2 : // excises, this isn't strictly necessary for reading the sstables that are
573 2 : // inputs to the compaction because those files are 'marked as compacting'
574 2 : // and shouldn't be subject to any competing compactions. However with
575 2 : // excises, a concurrent excise may remove a compaction's file from the
576 2 : // Version and then cancel the compaction. The file shouldn't be physically
577 2 : // removed until the cancelled compaction stops reading it.
578 2 : //
579 2 : // Additionally, we need any blob files referenced by input sstables to
580 2 : // remain available, even if the blob file is rewritten. Maintaining a
581 2 : // reference ensures that all these files remain available for the
582 2 : // compaction's reads.
583 2 : c.version.Ref()
584 2 :
585 2 : c.startLevel = &c.inputs[0]
586 2 : if pc.startLevel.l0SublevelInfo != nil {
587 2 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
588 2 : }
589 :
590 2 : c.outputLevel = &c.inputs[len(c.inputs)-1]
591 2 :
592 2 : if len(pc.inputs) > 2 {
593 2 : // TODO(xinhaoz): Look into removing extraLevels on the compaction struct.
594 2 : c.extraLevels = make([]*compactionLevel, 0, len(pc.inputs)-2)
595 2 : for i := 1; i < len(pc.inputs)-1; i++ {
596 2 : c.extraLevels = append(c.extraLevels, &c.inputs[i])
597 2 : }
598 : }
599 : // Compute the set of outputLevel+1 files that overlap this compaction (these
600 : // are the grandparent sstables).
601 2 : if c.outputLevel.level+1 < numLevels {
602 2 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.bounds)
603 2 : }
604 2 : c.delElision, c.rangeKeyElision = compact.SetupTombstoneElision(
605 2 : c.comparer.Compare, c.version, pc.l0Organizer, c.outputLevel.level, c.bounds,
606 2 : )
607 2 : c.kind = pc.kind
608 2 :
609 2 : preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
610 2 : remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
611 2 : c.maybeSwitchToMoveOrCopy(preferSharedStorage, provider)
612 2 : c.objCreateOpts = objstorage.CreateOptions{
613 2 : PreferSharedStorage: preferSharedStorage,
614 2 : WriteCategory: getDiskWriteCategoryForCompaction(opts, c.kind),
615 2 : }
616 2 : if preferSharedStorage {
617 2 : c.getValueSeparation = neverSeparateValues
618 2 : }
619 :
620 2 : return c
621 : }
622 :
623 : // maybeSwitchToMoveOrCopy decides if the compaction can be changed into a move
624 : // or copy compaction, in which case c.kind is updated.
625 : func (c *tableCompaction) maybeSwitchToMoveOrCopy(
626 : preferSharedStorage bool, provider objstorage.Provider,
627 2 : ) {
628 2 : // Only non-multi-level compactions with a single input file can be
629 2 : // considered.
630 2 : if c.startLevel.files.Len() != 1 || !c.outputLevel.files.Empty() || c.hasExtraLevelData() {
631 2 : return
632 2 : }
633 :
634 : // In addition to the default compaction, we also check whether a tombstone
635 : // density compaction can be optimized into a move compaction. However, we
636 : // want to avoid performing a move compaction into the lowest level, since the
637 : // goal there is to actually remove the tombstones.
638 : //
639 : // Tombstone density compaction is meant to address cases where tombstones
640 : // don't reclaim much space but are still expensive to scan over. We can only
641 : // remove the tombstones once there's nothing at all underneath them.
642 2 : switch c.kind {
643 2 : case compactionKindDefault:
644 : // Proceed.
645 2 : case compactionKindTombstoneDensity:
646 2 : // Tombstone density compaction can be optimized into a move compaction.
647 2 : // However, we want to avoid performing a move compaction into the lowest
648 2 : // level, since the goal there is to actually remove the tombstones; even if
649 2 : // they don't prevent a lot of space from being reclaimed, tombstones can
650 2 : // still be expensive to scan over.
651 2 : if c.outputLevel.level == numLevels-1 {
652 2 : return
653 2 : }
654 2 : default:
655 2 : // Other compaction kinds not supported.
656 2 : return
657 : }
658 :
659 : // We avoid a move or copy if there is lots of overlapping grandparent data.
660 : // Otherwise, the move could create a parent file that will require a very
661 : // expensive merge later on.
662 2 : if c.grandparents.AggregateSizeSum() > c.maxOverlapBytes {
663 1 : return
664 1 : }
665 :
666 2 : iter := c.startLevel.files.Iter()
667 2 : meta := iter.First()
668 2 :
669 2 : // We should always be passed a provider, except in some unit tests.
670 2 : isRemote := provider != nil && !objstorage.IsLocalTable(provider, meta.TableBacking.DiskFileNum)
671 2 :
672 2 : // Shared and external tables can always be moved. We can also move a local
673 2 : // table unless we need the result to be on shared storage.
674 2 : if isRemote || !preferSharedStorage {
675 2 : c.kind = compactionKindMove
676 2 : return
677 2 : }
678 :
679 : // We can rewrite the table (regular compaction) or we can use a copy compaction.
680 2 : switch {
681 1 : case meta.Virtual:
682 : // We want to avoid a copy compaction if the table is virtual, as we may end
683 : // up copying a lot more data than necessary.
684 1 : case meta.BlobReferenceDepth != 0:
685 : // We also want to avoid copy compactions for tables with blob references,
686 : // as we currently lack a mechanism to propagate blob references along with
687 : // the sstable.
688 2 : default:
689 2 : c.kind = compactionKindCopy
690 : }
691 : }
692 :
693 : // newDeleteOnlyCompaction constructs a delete-only compaction from the provided
694 : // inputs.
695 : //
696 : // The compaction is created with a reference to its version that must be
697 : // released when the compaction is complete.
698 : func newDeleteOnlyCompaction(
699 : opts *Options,
700 : cur *manifest.Version,
701 : inputs []compactionLevel,
702 : beganAt time.Time,
703 : hints []deleteCompactionHint,
704 : exciseEnabled bool,
705 2 : ) *tableCompaction {
706 2 : c := &tableCompaction{
707 2 : kind: compactionKindDeleteOnly,
708 2 : comparer: opts.Comparer,
709 2 : logger: opts.Logger,
710 2 : version: cur,
711 2 : inputs: inputs,
712 2 : grantHandle: noopGrantHandle{},
713 2 : metrics: compactionMetrics{
714 2 : beganAt: beganAt,
715 2 : },
716 2 : }
717 2 : c.deleteOnly.hints = hints
718 2 : c.deleteOnly.exciseEnabled = exciseEnabled
719 2 : // Acquire a reference to the version to ensure that files and in-memory
720 2 : // version state necessary for reading files remain available. Ignoring
721 2 : // excises, this isn't strictly necessary for reading the sstables that are
722 2 : // inputs to the compaction because those files are 'marked as compacting'
723 2 : // and shouldn't be subject to any competing compactions. However with
724 2 : // excises, a concurrent excise may remove a compaction's file from the
725 2 : // Version and then cancel the compaction. The file shouldn't be physically
726 2 : // removed until the cancelled compaction stops reading it.
727 2 : //
728 2 : // Additionally, we need any blob files referenced by input sstables to
729 2 : // remain available, even if the blob file is rewritten. Maintaining a
730 2 : // reference ensures that all these files remain available for the
731 2 : // compaction's reads.
732 2 : c.version.Ref()
733 2 :
734 2 : // Set c.smallest, c.largest.
735 2 : cmp := opts.Comparer.Compare
736 2 : for _, in := range inputs {
737 2 : c.bounds = manifest.ExtendKeyRange(cmp, c.bounds, in.files.All())
738 2 : }
739 2 : return c
740 : }
741 :
742 2 : func adjustGrandparentOverlapBytesForFlush(c *tableCompaction, flushingBytes uint64) {
743 2 : // Heuristic to place a lower bound on compaction output file size
744 2 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
745 2 : // production with 310K files of which 290K files were < 10KB in size.
746 2 : // Our hypothesis is that it was caused by L1 having 2600 files and
747 2 : // ~10GB, such that each flush got split into many tiny files due to
748 2 : // overlapping with most of the files in Lbase.
749 2 : //
750 2 : // The computation below is general in that it accounts
751 2 : // for flushing different volumes of data (e.g. we may be flushing
752 2 : // many memtables). For illustration, we consider the typical
753 2 : // example of flushing a 64MB memtable. So 12.8MB output,
754 2 : // based on the compression guess below. If the compressed bytes
755 2 : // guess is an over-estimate we will end up with smaller files,
756 2 : // and if an under-estimate we will end up with larger files.
757 2 : // With a 2MB target file size, 7 files. We are willing to accept
758 2 : // 4x the number of files, if it results in better write amplification
759 2 : // when later compacting to Lbase, i.e., ~450KB files (target file
760 2 : // size / 4).
761 2 : //
762 2 : // Note that this is a pessimistic heuristic in that
763 2 : // fileCountUpperBoundDueToGrandparents could be far from the actual
764 2 : // number of files produced due to the grandparent limits. For
765 2 : // example, in the extreme, consider a flush that overlaps with 1000
766 2 : // files in Lbase f0...f999, and the initially calculated value of
767 2 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
768 2 : // means an upper bound file count of 100 files. Say the input bytes
769 2 : // in the flush are such that acceptableFileCount=10. We will fatten
770 2 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
771 2 : // drops to 10. However, it is possible that in practice, even without
772 2 : // this change, we would have produced no more than 10 files, and that
773 2 : // this change makes the files unnecessarily wide. Say the input bytes
774 2 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
775 2 : // 10% in f80...f89 and 10% in f990...f999. The original value of
776 2 : // maxOverlapBytes would have actually produced only 10 sstables. But
777 2 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
778 2 : // spans f0...f89, i.e., a much wider sstable than necessary.
779 2 : //
780 2 : // We could produce a tighter estimate of
781 2 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
782 2 : // distribution of the flush. The 4x multiplier mentioned earlier is
783 2 : // a way to try to compensate for this pessimism.
784 2 : //
785 2 : // TODO(sumeer): we don't have compression info for the data being
786 2 : // flushed, but it is likely that existing files that overlap with
787 2 : // this flush in Lbase are representative wrt compression ratio. We
788 2 : // could store the uncompressed size in TableMetadata and estimate
789 2 : // the compression ratio.
790 2 : const approxCompressionRatio = 0.2
791 2 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
792 2 : approxNumFilesBasedOnTargetSize :=
793 2 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
794 2 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
795 2 : // The byte calculation is linear in numGrandparentFiles, but we will
796 2 : // incur this linear cost in compact.Runner.TableSplitLimit() too, so we are
797 2 : // also willing to pay it now. We could approximate this cheaply by using the
798 2 : // mean file size of Lbase.
799 2 : grandparentFileBytes := c.grandparents.AggregateSizeSum()
800 2 : fileCountUpperBoundDueToGrandparents :=
801 2 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
802 2 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
803 2 : c.maxOverlapBytes = uint64(
804 2 : float64(c.maxOverlapBytes) *
805 2 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
806 2 : }
807 : }
808 :
809 : // newFlush creates the state necessary for a flush (modeled with the compaction
810 : // struct).
811 : //
812 : // newFlush takes the current Version in order to populate grandparent flushing
813 : // limits, but it does not reference the version.
814 : //
815 : // TODO(jackson): Consider maintaining a reference to the version anyways since
816 : // in the future in-memory Version state may only be available while a Version
817 : // is referenced (eg, if we start recycling B-Tree nodes once they're no longer
818 : // referenced). There's subtlety around unref'ing the version at the right
819 : // moment, so we defer it for now.
820 : func newFlush(
821 : opts *Options,
822 : cur *manifest.Version,
823 : l0Organizer *manifest.L0Organizer,
824 : baseLevel int,
825 : flushing flushableList,
826 : beganAt time.Time,
827 : tableFormat sstable.TableFormat,
828 : getValueSeparation getValueSeparation,
829 2 : ) (*tableCompaction, error) {
830 2 : c := &tableCompaction{
831 2 : kind: compactionKindFlush,
832 2 : comparer: opts.Comparer,
833 2 : logger: opts.Logger,
834 2 : inputs: []compactionLevel{{level: -1}, {level: 0}},
835 2 : getValueSeparation: getValueSeparation,
836 2 : maxOutputFileSize: math.MaxUint64,
837 2 : maxOverlapBytes: math.MaxUint64,
838 2 : grantHandle: noopGrantHandle{},
839 2 : tableFormat: tableFormat,
840 2 : metrics: compactionMetrics{
841 2 : beganAt: beganAt,
842 2 : },
843 2 : }
844 2 : c.flush.flushables = flushing
845 2 : c.flush.l0Limits = l0Organizer.FlushSplitKeys()
846 2 : c.startLevel = &c.inputs[0]
847 2 : c.outputLevel = &c.inputs[1]
848 2 : if len(flushing) > 0 {
849 2 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
850 2 : if len(flushing) != 1 {
851 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
852 : }
853 2 : c.kind = compactionKindIngestedFlushable
854 2 : return c, nil
855 2 : } else {
856 2 : // Make sure there's no ingestedFlushable after the first flushable
857 2 : // in the list.
858 2 : for _, f := range c.flush.flushables[1:] {
859 2 : if _, ok := f.flushable.(*ingestedFlushable); ok {
860 0 : panic("pebble: flushables shouldn't contain ingestedFlushable")
861 : }
862 : }
863 : }
864 : }
865 :
866 2 : preferSharedStorage := tableFormat >= FormatMinForSharedObjects.MaxTableFormat() &&
867 2 : remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
868 2 : c.objCreateOpts = objstorage.CreateOptions{
869 2 : PreferSharedStorage: preferSharedStorage,
870 2 : WriteCategory: getDiskWriteCategoryForCompaction(opts, c.kind),
871 2 : }
872 2 : if preferSharedStorage {
873 2 : c.getValueSeparation = neverSeparateValues
874 2 : }
875 :
876 2 : cmp := c.comparer.Compare
877 2 : updatePointBounds := func(iter internalIterator) {
878 2 : if kv := iter.First(); kv != nil {
879 2 : if c.bounds.Start == nil || cmp(c.bounds.Start, kv.K.UserKey) > 0 {
880 2 : c.bounds.Start = slices.Clone(kv.K.UserKey)
881 2 : }
882 : }
883 2 : if kv := iter.Last(); kv != nil {
884 2 : if c.bounds.End.Key == nil || !c.bounds.End.IsUpperBoundForInternalKey(cmp, kv.K) {
885 2 : c.bounds.End = base.UserKeyExclusiveIf(slices.Clone(kv.K.UserKey), kv.K.IsExclusiveSentinel())
886 2 : }
887 : }
888 : }
889 :
890 2 : updateRangeBounds := func(iter keyspan.FragmentIterator) error {
891 2 : // File bounds require s != nil && !s.Empty(). We only need to check for
892 2 : // s != nil here, as the memtable's FragmentIterator would never surface
893 2 : // empty spans.
894 2 : if s, err := iter.First(); err != nil {
895 0 : return err
896 2 : } else if s != nil {
897 2 : c.bounds = c.bounds.Union(cmp, s.Bounds().Clone())
898 2 : }
899 2 : if s, err := iter.Last(); err != nil {
900 0 : return err
901 2 : } else if s != nil {
902 2 : c.bounds = c.bounds.Union(cmp, s.Bounds().Clone())
903 2 : }
904 2 : return nil
905 : }
906 :
907 2 : var flushingBytes uint64
908 2 : for i := range flushing {
909 2 : f := flushing[i]
910 2 : updatePointBounds(f.newIter(nil))
911 2 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
912 2 : if err := updateRangeBounds(rangeDelIter); err != nil {
913 0 : return nil, err
914 0 : }
915 : }
916 2 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
917 2 : if err := updateRangeBounds(rangeKeyIter); err != nil {
918 0 : return nil, err
919 0 : }
920 : }
921 2 : flushingBytes += f.inuseBytes()
922 : }
923 :
924 2 : if opts.FlushSplitBytes > 0 {
925 2 : c.maxOutputFileSize = uint64(opts.TargetFileSizes[0])
926 2 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts.TargetFileSizes[0])
927 2 : c.grandparents = cur.Overlaps(baseLevel, c.bounds)
928 2 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
929 2 : }
930 :
931 : // We don't elide tombstones for flushes.
932 2 : c.delElision, c.rangeKeyElision = compact.NoTombstoneElision(), compact.NoTombstoneElision()
933 2 : return c, nil
934 : }
935 :
936 2 : func (c *tableCompaction) hasExtraLevelData() bool {
937 2 : if len(c.extraLevels) == 0 {
938 2 : // not a multi level compaction
939 2 : return false
940 2 : } else if c.extraLevels[0].files.Empty() {
941 2 : // a multi level compaction without data in the intermediate input level;
942 2 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
943 2 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
944 2 : return false
945 2 : }
946 2 : return true
947 : }
948 :
949 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
950 : // this compaction have revisions of the same user key present in both sstables,
951 : // when it shouldn't (eg. when splitting flushes).
952 1 : func (c *tableCompaction) errorOnUserKeyOverlap(ve *manifest.VersionEdit) error {
953 1 : if n := len(ve.NewTables); n > 1 {
954 1 : meta := ve.NewTables[n-1].Meta
955 1 : prevMeta := ve.NewTables[n-2].Meta
956 1 : if !prevMeta.Largest().IsExclusiveSentinel() &&
957 1 : c.comparer.Compare(prevMeta.Largest().UserKey, meta.Smallest().UserKey) >= 0 {
958 1 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
959 1 : prevMeta.Largest().Pretty(c.comparer.FormatKey),
960 1 : prevMeta.TableNum,
961 1 : meta.TableNum)
962 1 : }
963 : }
964 1 : return nil
965 : }
966 :
967 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
968 : // snapshots requiring them to be kept. It performs this determination by
969 : // looking at the TombstoneElision values which are set up based on sstables
970 : // which overlap the bounds of the compaction at a lower level in the LSM.
971 2 : func (c *tableCompaction) allowZeroSeqNum() bool {
972 2 : // TODO(peter): we disable zeroing of seqnums during flushing to match
973 2 : // RocksDB behavior and to avoid generating overlapping sstables during
974 2 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
975 2 : // WAL is replayed building up a single version edit that is
976 2 : // applied. Because we don't apply the version edit after each flush, this
977 2 : // code doesn't know that L0 contains files and zeroing of seqnums should
978 2 : // be disabled. That is fixable, but it seems safer to just match the
979 2 : // RocksDB behavior for now.
980 2 : return len(c.flush.flushables) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
981 2 : }
982 :
983 : // newInputIters returns an iterator over all the input tables in a compaction.
984 : func (c *tableCompaction) newInputIters(
985 : newIters tableNewIters, iiopts internalIterOpts,
986 : ) (
987 : pointIter internalIterator,
988 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
989 : retErr error,
990 2 : ) {
991 2 : ctx := context.TODO()
992 2 : cmp := c.comparer.Compare
993 2 :
994 2 : // Validate the ordering of compaction input files for defense in depth.
995 2 : if len(c.flush.flushables) == 0 {
996 2 : if c.startLevel.level >= 0 {
997 2 : err := manifest.CheckOrdering(c.comparer, manifest.Level(c.startLevel.level),
998 2 : c.startLevel.files.Iter())
999 2 : if err != nil {
1000 1 : return nil, nil, nil, err
1001 1 : }
1002 : }
1003 2 : err := manifest.CheckOrdering(c.comparer, manifest.Level(c.outputLevel.level),
1004 2 : c.outputLevel.files.Iter())
1005 2 : if err != nil {
1006 1 : return nil, nil, nil, err
1007 1 : }
1008 2 : if c.startLevel.level == 0 {
1009 2 : if c.startLevel.l0SublevelInfo == nil {
1010 0 : panic("l0SublevelInfo not created for compaction out of L0")
1011 : }
1012 2 : for _, info := range c.startLevel.l0SublevelInfo {
1013 2 : err := manifest.CheckOrdering(c.comparer, info.sublevel, info.Iter())
1014 2 : if err != nil {
1015 1 : return nil, nil, nil, err
1016 1 : }
1017 : }
1018 : }
1019 2 : if len(c.extraLevels) > 0 {
1020 2 : if len(c.extraLevels) > 1 {
1021 0 : panic("n>2 multi level compaction not implemented yet")
1022 : }
1023 2 : interLevel := c.extraLevels[0]
1024 2 : err := manifest.CheckOrdering(c.comparer, manifest.Level(interLevel.level),
1025 2 : interLevel.files.Iter())
1026 2 : if err != nil {
1027 0 : return nil, nil, nil, err
1028 0 : }
1029 : }
1030 : }
1031 :
1032 : // There are three classes of keys that a compaction needs to process: point
1033 : // keys, range deletion tombstones and range keys. Collect all iterators for
1034 : // all these classes of keys from all the levels. We'll aggregate them
1035 : // together farther below.
1036 : //
1037 : // numInputLevels is an approximation of the number of iterator levels. Due
1038 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
1039 : // initial capacity.
1040 2 : numInputLevels := max(len(c.flush.flushables), len(c.inputs))
1041 2 : iters := make([]internalIterator, 0, numInputLevels)
1042 2 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
1043 2 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
1044 2 :
1045 2 : // If construction of the iterator inputs fails, ensure that we close all
1046 2 : // the consitutent iterators.
1047 2 : defer func() {
1048 2 : if retErr != nil {
1049 1 : for _, iter := range iters {
1050 1 : if iter != nil {
1051 1 : _ = iter.Close()
1052 1 : }
1053 : }
1054 1 : for _, rangeDelIter := range rangeDelIters {
1055 0 : rangeDelIter.Close()
1056 0 : }
1057 : }
1058 : }()
1059 2 : iterOpts := IterOptions{
1060 2 : Category: categoryCompaction,
1061 2 : logger: c.logger,
1062 2 : }
1063 2 :
1064 2 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
1065 2 : // constituent iterators. This depends on whether this is a flush or a
1066 2 : // compaction.
1067 2 : if len(c.flush.flushables) != 0 {
1068 2 : // If flushing, we need to build the input iterators over the memtables
1069 2 : // stored in c.flush.flushables.
1070 2 : for _, f := range c.flush.flushables {
1071 2 : iters = append(iters, f.newFlushIter(nil))
1072 2 : rangeDelIter := f.newRangeDelIter(nil)
1073 2 : if rangeDelIter != nil {
1074 2 : rangeDelIters = append(rangeDelIters, rangeDelIter)
1075 2 : }
1076 2 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
1077 2 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
1078 2 : }
1079 : }
1080 2 : } else {
1081 2 : addItersForLevel := func(level *compactionLevel, l manifest.Layer) error {
1082 2 : // Add a *levelIter for point iterators. Because we don't call
1083 2 : // initRangeDel, the levelIter will close and forget the range
1084 2 : // deletion iterator when it steps on to a new file. Surfacing range
1085 2 : // deletions to compactions are handled below.
1086 2 : iters = append(iters, newLevelIter(ctx, iterOpts, c.comparer,
1087 2 : newIters, level.files.Iter(), l, iiopts))
1088 2 : // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range
1089 2 : // deletions into memory upfront. (See #2015, which reverted this.) There
1090 2 : // will be no user keys that are split between sstables within a level in
1091 2 : // Cockroach 23.1, which unblocks this optimization.
1092 2 :
1093 2 : // Add the range deletion iterator for each file as an independent level
1094 2 : // in mergingIter, as opposed to making a levelIter out of those. This
1095 2 : // is safer as levelIter expects all keys coming from underlying
1096 2 : // iterators to be in order. Due to compaction / tombstone writing
1097 2 : // logic in finishOutput(), it is possible for range tombstones to not
1098 2 : // be strictly ordered across all files in one level.
1099 2 : //
1100 2 : // Consider this example from the metamorphic tests (also repeated in
1101 2 : // finishOutput()), consisting of three L3 files with their bounds
1102 2 : // specified in square brackets next to the file name:
1103 2 : //
1104 2 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
1105 2 : // tmgc#391,MERGE [786e627a]
1106 2 : // tmgc-udkatvs#331,RANGEDEL
1107 2 : //
1108 2 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
1109 2 : // tmgc#384,MERGE [666c7070]
1110 2 : // tmgc-tvsalezade#383,RANGEDEL
1111 2 : // tmgc-tvsalezade#331,RANGEDEL
1112 2 : //
1113 2 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
1114 2 : // tmgc-tvsalezade#383,RANGEDEL
1115 2 : // tmgc#375,SET [72646c78766965616c72776865676e79]
1116 2 : // tmgc-tvsalezade#356,RANGEDEL
1117 2 : //
1118 2 : // Here, the range tombstone in 000240.sst falls "after" one in
1119 2 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
1120 2 : // levelIter's purposes. While each file is still consistent before its
1121 2 : // bounds, it's safer to have all rangedel iterators be visible to
1122 2 : // mergingIter.
1123 2 : iter := level.files.Iter()
1124 2 : for f := iter.First(); f != nil; f = iter.Next() {
1125 2 : rangeDelIter, err := c.newRangeDelIter(ctx, newIters, iter.Take(), iterOpts, iiopts, l)
1126 2 : if err != nil {
1127 1 : // The error will already be annotated with the BackingFileNum, so
1128 1 : // we annotate it with the FileNum.
1129 1 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.TableNum))
1130 1 : }
1131 2 : if rangeDelIter == nil {
1132 2 : continue
1133 : }
1134 2 : rangeDelIters = append(rangeDelIters, rangeDelIter)
1135 2 : c.iterationState.keyspanIterClosers = append(c.iterationState.keyspanIterClosers, rangeDelIter)
1136 : }
1137 :
1138 : // Check if this level has any range keys.
1139 2 : hasRangeKeys := false
1140 2 : for f := iter.First(); f != nil; f = iter.Next() {
1141 2 : if f.HasRangeKeys {
1142 2 : hasRangeKeys = true
1143 2 : break
1144 : }
1145 : }
1146 2 : if hasRangeKeys {
1147 2 : newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.TableMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
1148 2 : iters, err := newIters(ctx, file, &iterOpts, iiopts, iterRangeKeys)
1149 2 : if err != nil {
1150 0 : return nil, err
1151 2 : } else if iters.rangeKey == nil {
1152 0 : return emptyKeyspanIter, nil
1153 0 : }
1154 : // Ensure that the range key iter is not closed until the compaction is
1155 : // finished. This is necessary because range key processing
1156 : // requires the range keys to be held in memory for up to the
1157 : // lifetime of the compaction.
1158 2 : noCloseIter := &noCloseIter{iters.rangeKey}
1159 2 : c.iterationState.keyspanIterClosers = append(c.iterationState.keyspanIterClosers, noCloseIter)
1160 2 :
1161 2 : // We do not need to truncate range keys to sstable boundaries, or
1162 2 : // only read within the file's atomic compaction units, unlike with
1163 2 : // range tombstones. This is because range keys were added after we
1164 2 : // stopped splitting user keys across sstables, so all the range keys
1165 2 : // in this sstable must wholly lie within the file's bounds.
1166 2 : return noCloseIter, err
1167 : }
1168 2 : li := keyspanimpl.NewLevelIter(ctx, keyspan.SpanIterOptions{}, cmp,
1169 2 : newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
1170 2 : rangeKeyIters = append(rangeKeyIters, li)
1171 : }
1172 2 : return nil
1173 : }
1174 :
1175 2 : for i := range c.inputs {
1176 2 : // If the level is annotated with l0SublevelInfo, expand it into one
1177 2 : // level per sublevel.
1178 2 : // TODO(jackson): Perform this expansion even earlier when we pick the
1179 2 : // compaction?
1180 2 : if len(c.inputs[i].l0SublevelInfo) > 0 {
1181 2 : for _, info := range c.startLevel.l0SublevelInfo {
1182 2 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
1183 2 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
1184 1 : return nil, nil, nil, err
1185 1 : }
1186 : }
1187 2 : continue
1188 : }
1189 2 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
1190 1 : return nil, nil, nil, err
1191 1 : }
1192 : }
1193 : }
1194 :
1195 : // If there's only one constituent point iterator, we can avoid the overhead
1196 : // of a *mergingIter. This is possible, for example, when performing a flush
1197 : // of a single memtable. Otherwise, combine all the iterators into a merging
1198 : // iter.
1199 2 : pointIter = iters[0]
1200 2 : if len(iters) > 1 {
1201 2 : pointIter = newMergingIter(c.logger, &c.metrics.internalIterStats, cmp, nil, iters...)
1202 2 : }
1203 :
1204 : // In normal operation, levelIter iterates over the point operations in a
1205 : // level, and initializes a rangeDelIter pointer for the range deletions in
1206 : // each table. During compaction, we want to iterate over the merged view of
1207 : // point operations and range deletions. In order to do this we create one
1208 : // levelIter per level to iterate over the point operations, and collect up
1209 : // all the range deletion files.
1210 : //
1211 : // The range deletion levels are combined with a keyspanimpl.MergingIter. The
1212 : // resulting merged rangedel iterator is then included using an
1213 : // InterleavingIter.
1214 : // TODO(jackson): Consider using a defragmenting iterator to stitch together
1215 : // logical range deletions that were fragmented due to previous file
1216 : // boundaries.
1217 2 : if len(rangeDelIters) > 0 {
1218 2 : mi := &keyspanimpl.MergingIter{}
1219 2 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
1220 2 : rangeDelIter = mi
1221 2 : }
1222 :
1223 : // If there are range key iterators, we need to combine them using
1224 : // keyspanimpl.MergingIter, and then interleave them among the points.
1225 2 : if len(rangeKeyIters) > 0 {
1226 2 : mi := &keyspanimpl.MergingIter{}
1227 2 : mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
1228 2 : // TODO(radu): why do we have a defragmenter here but not above?
1229 2 : di := &keyspan.DefragmentingIter{}
1230 2 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
1231 2 : rangeKeyIter = di
1232 2 : }
1233 2 : return pointIter, rangeDelIter, rangeKeyIter, nil
1234 : }
1235 :
1236 : func (c *tableCompaction) newRangeDelIter(
1237 : ctx context.Context,
1238 : newIters tableNewIters,
1239 : f manifest.LevelFile,
1240 : opts IterOptions,
1241 : iiopts internalIterOpts,
1242 : l manifest.Layer,
1243 2 : ) (*noCloseIter, error) {
1244 2 : opts.layer = l
1245 2 : iterSet, err := newIters(ctx, f.TableMetadata, &opts, iiopts, iterRangeDeletions)
1246 2 : if err != nil {
1247 1 : return nil, err
1248 2 : } else if iterSet.rangeDeletion == nil {
1249 2 : // The file doesn't contain any range deletions.
1250 2 : return nil, nil
1251 2 : }
1252 : // Ensure that rangeDelIter is not closed until the compaction is
1253 : // finished. This is necessary because range tombstone processing
1254 : // requires the range tombstones to be held in memory for up to the
1255 : // lifetime of the compaction.
1256 2 : return &noCloseIter{iterSet.rangeDeletion}, nil
1257 : }
1258 :
1259 1 : func (c *tableCompaction) String() string {
1260 1 : if len(c.flush.flushables) != 0 {
1261 0 : return "flush\n"
1262 0 : }
1263 :
1264 1 : var buf bytes.Buffer
1265 1 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
1266 1 : i := level - c.startLevel.level
1267 1 : fmt.Fprintf(&buf, "%d:", level)
1268 1 : for f := range c.inputs[i].files.All() {
1269 1 : fmt.Fprintf(&buf, " %s:%s-%s", f.TableNum, f.Smallest(), f.Largest())
1270 1 : }
1271 1 : fmt.Fprintf(&buf, "\n")
1272 : }
1273 1 : return buf.String()
1274 : }
1275 :
1276 : type manualCompaction struct {
1277 : // id is for internal bookkeeping.
1278 : id uint64
1279 : // Count of the retries due to concurrent compaction to overlapping levels.
1280 : retries int
1281 : level int
1282 : outputLevel int
1283 : done chan error
1284 : start []byte
1285 : end []byte
1286 : split bool
1287 : }
1288 :
1289 : type readCompaction struct {
1290 : level int
1291 : // [start, end] key ranges are used for de-duping.
1292 : start []byte
1293 : end []byte
1294 :
1295 : // The file associated with the compaction.
1296 : // If the file no longer belongs in the same
1297 : // level, then we skip the compaction.
1298 : tableNum base.TableNum
1299 : }
1300 :
1301 : // Removes compaction markers from files in a compaction. The rollback parameter
1302 : // indicates whether the compaction state should be rolled back to its original
1303 : // state in the case of an unsuccessful compaction.
1304 : //
1305 : // DB.mu must be held when calling this method, however this method can drop and
1306 : // re-acquire that mutex. All writes to the manifest for this compaction should
1307 : // have completed by this point.
1308 2 : func (d *DB) clearCompactingState(c *tableCompaction, rollback bool) {
1309 2 : c.versionEditApplied = true
1310 2 : for _, cl := range c.inputs {
1311 2 : for f := range cl.files.All() {
1312 2 : if !f.IsCompacting() {
1313 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.TableNum)
1314 0 : }
1315 2 : if !rollback {
1316 2 : // On success all compactions other than move and delete-only compactions
1317 2 : // transition the file into the Compacted state. Move-compacted files
1318 2 : // become eligible for compaction again and transition back to NotCompacting.
1319 2 : // Delete-only compactions could, on rare occasion, leave files untouched
1320 2 : // (eg. if files have a loose bound), so we revert them all to NotCompacting
1321 2 : // just in case they need to be compacted again.
1322 2 : if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
1323 2 : f.SetCompactionState(manifest.CompactionStateCompacted)
1324 2 : } else {
1325 2 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1326 2 : }
1327 2 : } else {
1328 2 : // Else, on rollback, all input files unconditionally transition back to
1329 2 : // NotCompacting.
1330 2 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1331 2 : }
1332 2 : f.IsIntraL0Compacting = false
1333 : }
1334 : }
1335 2 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1336 2 : func() {
1337 2 : // InitCompactingFileInfo requires that no other manifest writes be
1338 2 : // happening in parallel with it, i.e. we're not in the midst of installing
1339 2 : // another version. Otherwise, it's possible that we've created another
1340 2 : // L0Sublevels instance, but not added it to the versions list, causing
1341 2 : // all the indices in TableMetadata to be inaccurate. To ensure this,
1342 2 : // grab the manifest lock.
1343 2 : d.mu.versions.logLock()
1344 2 : // It is a bit peculiar that we are fiddling with th current version state
1345 2 : // in a separate critical section from when this version was installed.
1346 2 : // But this fiddling is necessary if the compaction failed. When the
1347 2 : // compaction succeeded, we've already done this in UpdateVersionLocked, so
1348 2 : // this seems redundant. Anyway, we clear the pickedCompactionCache since we
1349 2 : // may be able to pick a better compaction (though when this compaction
1350 2 : // succeeded we've also cleared the cache in UpdateVersionLocked).
1351 2 : defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
1352 2 : d.mu.versions.latest.l0Organizer.InitCompactingFileInfo(l0InProgress)
1353 2 : }()
1354 : }
1355 :
1356 2 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1357 2 : space, err := d.opts.FS.GetDiskUsage(d.dirname)
1358 2 : if err != nil {
1359 2 : if !errors.Is(err, vfs.ErrUnsupported) {
1360 1 : d.opts.EventListener.BackgroundError(err)
1361 1 : }
1362 : // Return the last value we managed to obtain.
1363 2 : return d.diskAvailBytes.Load()
1364 : }
1365 :
1366 2 : d.lowDiskSpaceReporter.Report(space.AvailBytes, space.TotalBytes, d.opts.EventListener)
1367 2 : d.diskAvailBytes.Store(space.AvailBytes)
1368 2 : return space.AvailBytes
1369 : }
1370 :
1371 : // maybeScheduleFlush schedules a flush if necessary.
1372 : //
1373 : // d.mu must be held when calling this.
1374 2 : func (d *DB) maybeScheduleFlush() {
1375 2 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1376 2 : return
1377 2 : }
1378 2 : if len(d.mu.mem.queue) <= 1 {
1379 2 : return
1380 2 : }
1381 :
1382 2 : if !d.passedFlushThreshold() {
1383 2 : return
1384 2 : }
1385 :
1386 2 : d.mu.compact.flushing = true
1387 2 : go d.flush()
1388 : }
1389 :
1390 2 : func (d *DB) passedFlushThreshold() bool {
1391 2 : var n int
1392 2 : var size uint64
1393 2 : for ; n < len(d.mu.mem.queue)-1; n++ {
1394 2 : if !d.mu.mem.queue[n].readyForFlush() {
1395 2 : break
1396 : }
1397 2 : if d.mu.mem.queue[n].flushForced {
1398 2 : // A flush was forced. Pretend the memtable size is the configured
1399 2 : // size. See minFlushSize below.
1400 2 : size += d.opts.MemTableSize
1401 2 : } else {
1402 2 : size += d.mu.mem.queue[n].totalBytes()
1403 2 : }
1404 : }
1405 2 : if n == 0 {
1406 2 : // None of the immutable memtables are ready for flushing.
1407 2 : return false
1408 2 : }
1409 :
1410 : // Only flush once the sum of the queued memtable sizes exceeds half the
1411 : // configured memtable size. This prevents flushing of memtables at startup
1412 : // while we're undergoing the ramp period on the memtable size. See
1413 : // DB.newMemTable().
1414 2 : minFlushSize := d.opts.MemTableSize / 2
1415 2 : return size >= minFlushSize
1416 : }
1417 :
1418 2 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1419 2 : var mem *flushableEntry
1420 2 : for _, m := range d.mu.mem.queue {
1421 2 : if m.flushable == tbl {
1422 2 : mem = m
1423 2 : break
1424 : }
1425 : }
1426 2 : if mem == nil || mem.flushForced {
1427 2 : return
1428 2 : }
1429 2 : deadline := d.timeNow().Add(dur)
1430 2 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1431 2 : // Already scheduled to flush sooner than within `dur`.
1432 2 : return
1433 2 : }
1434 2 : mem.delayedFlushForcedAt = deadline
1435 2 : go func() {
1436 2 : timer := time.NewTimer(dur)
1437 2 : defer timer.Stop()
1438 2 :
1439 2 : select {
1440 2 : case <-d.closedCh:
1441 2 : return
1442 2 : case <-mem.flushed:
1443 2 : return
1444 2 : case <-timer.C:
1445 2 : d.commit.mu.Lock()
1446 2 : defer d.commit.mu.Unlock()
1447 2 : d.mu.Lock()
1448 2 : defer d.mu.Unlock()
1449 2 :
1450 2 : // NB: The timer may fire concurrently with a call to Close. If a
1451 2 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1452 2 : // and it's too late to flush anything. Otherwise, the Close call
1453 2 : // will block on locking d.mu until we've finished scheduling the
1454 2 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1455 2 : // for the current flush to complete.
1456 2 : if d.closed.Load() != nil {
1457 1 : return
1458 1 : }
1459 :
1460 2 : if d.mu.mem.mutable == tbl {
1461 2 : _ = d.makeRoomForWrite(nil)
1462 2 : } else {
1463 2 : mem.flushForced = true
1464 2 : }
1465 2 : d.maybeScheduleFlush()
1466 : }
1467 : }()
1468 : }
1469 :
1470 2 : func (d *DB) flush() {
1471 2 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1472 2 : flushingWorkStart := crtime.NowMono()
1473 2 : d.mu.Lock()
1474 2 : defer d.mu.Unlock()
1475 2 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1476 2 : var bytesFlushed uint64
1477 2 : var err error
1478 2 : if bytesFlushed, err = d.flush1(); err != nil {
1479 1 : // TODO(peter): count consecutive flush errors and backoff.
1480 1 : d.opts.EventListener.BackgroundError(err)
1481 1 : }
1482 2 : d.mu.compact.flushing = false
1483 2 : d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
1484 2 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1485 2 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1486 2 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1487 2 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1488 2 : // More flush work may have arrived while we were flushing, so schedule
1489 2 : // another flush if needed.
1490 2 : d.maybeScheduleFlush()
1491 2 : // Let the CompactionScheduler know, so that it can react immediately to
1492 2 : // an increase in DB.GetAllowedWithoutPermission.
1493 2 : d.opts.Experimental.CompactionScheduler.UpdateGetAllowedWithoutPermission()
1494 2 : // The flush may have produced too many files in a level, so schedule a
1495 2 : // compaction if needed.
1496 2 : d.maybeScheduleCompaction()
1497 2 : d.mu.compact.cond.Broadcast()
1498 : })
1499 : }
1500 :
1501 : // runIngestFlush is used to generate a flush version edit for sstables which
1502 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1503 : // while runIngestFlush is called.
1504 2 : func (d *DB) runIngestFlush(c *tableCompaction) (*manifest.VersionEdit, error) {
1505 2 : if len(c.flush.flushables) != 1 {
1506 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1507 : }
1508 :
1509 : // Finding the target level for ingestion must use the latest version
1510 : // after the logLock has been acquired.
1511 2 : version := d.mu.versions.currentVersion()
1512 2 :
1513 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1514 2 : ve := &manifest.VersionEdit{}
1515 2 : var ingestSplitFiles []ingestSplitFile
1516 2 : ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
1517 2 :
1518 2 : updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
1519 2 : levelMetrics := c.metrics.perLevel[level]
1520 2 : if levelMetrics == nil {
1521 2 : levelMetrics = &LevelMetrics{}
1522 2 : c.metrics.perLevel[level] = levelMetrics
1523 2 : }
1524 2 : levelMetrics.TablesCount--
1525 2 : levelMetrics.TablesSize -= int64(m.Size)
1526 2 : levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
1527 2 : for i := range added {
1528 2 : levelMetrics.TablesCount++
1529 2 : levelMetrics.TablesSize += int64(added[i].Meta.Size)
1530 2 : levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
1531 2 : }
1532 : }
1533 :
1534 2 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1535 2 : d.FormatMajorVersion() >= FormatVirtualSSTables
1536 2 :
1537 2 : if suggestSplit || ingestFlushable.exciseSpan.Valid() {
1538 2 : // We could add deleted files to ve.
1539 2 : ve.DeletedTables = make(map[manifest.DeletedTableEntry]*manifest.TableMetadata)
1540 2 : }
1541 :
1542 2 : ctx := context.Background()
1543 2 : overlapChecker := &overlapChecker{
1544 2 : comparer: d.opts.Comparer,
1545 2 : newIters: d.newIters,
1546 2 : opts: IterOptions{
1547 2 : logger: d.opts.Logger,
1548 2 : Category: categoryIngest,
1549 2 : },
1550 2 : v: version,
1551 2 : }
1552 2 : replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
1553 2 : for _, file := range ingestFlushable.files {
1554 2 : var fileToSplit *manifest.TableMetadata
1555 2 : var level int
1556 2 :
1557 2 : // This file fits perfectly within the excise span, so we can slot it at L6.
1558 2 : if ingestFlushable.exciseSpan.Valid() &&
1559 2 : ingestFlushable.exciseSpan.Contains(d.cmp, file.Smallest()) &&
1560 2 : ingestFlushable.exciseSpan.Contains(d.cmp, file.Largest()) {
1561 2 : level = 6
1562 2 : } else {
1563 2 : // TODO(radu): this can perform I/O; we should not do this while holding DB.mu.
1564 2 : lsmOverlap, err := overlapChecker.DetermineLSMOverlap(ctx, file.UserKeyBounds())
1565 2 : if err != nil {
1566 0 : return nil, err
1567 0 : }
1568 2 : level, fileToSplit, err = ingestTargetLevel(
1569 2 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, file, suggestSplit,
1570 2 : )
1571 2 : if err != nil {
1572 0 : return nil, err
1573 0 : }
1574 : }
1575 :
1576 : // Add the current flushableIngest file to the version.
1577 2 : ve.NewTables = append(ve.NewTables, manifest.NewTableEntry{Level: level, Meta: file})
1578 2 : if fileToSplit != nil {
1579 1 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1580 1 : ingestFile: file,
1581 1 : splitFile: fileToSplit,
1582 1 : level: level,
1583 1 : })
1584 1 : }
1585 2 : levelMetrics := c.metrics.perLevel.level(level)
1586 2 : levelMetrics.TableBytesIngested += file.Size
1587 2 : levelMetrics.TablesIngested++
1588 : }
1589 2 : if ingestFlushable.exciseSpan.Valid() {
1590 2 : exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
1591 2 : if d.FormatMajorVersion() >= FormatExciseBoundsRecord {
1592 2 : ve.ExciseBoundsRecord = append(ve.ExciseBoundsRecord, manifest.ExciseOpEntry{
1593 2 : Bounds: exciseBounds,
1594 2 : SeqNum: ingestFlushable.exciseSeqNum,
1595 2 : })
1596 2 : d.mu.versions.metrics.Ingest.ExciseIngestCount++
1597 2 : }
1598 : // Iterate through all levels and find files that intersect with exciseSpan.
1599 2 : for layer, ls := range version.AllLevelsAndSublevels() {
1600 2 : for m := range ls.Overlaps(d.cmp, ingestFlushable.exciseSpan.UserKeyBounds()).All() {
1601 2 : leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, m, layer.Level(), tightExciseBounds)
1602 2 : if err != nil {
1603 0 : return nil, err
1604 0 : }
1605 2 : newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
1606 2 : replacedTables[m.TableNum] = newFiles
1607 2 : updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
1608 : }
1609 : }
1610 : }
1611 :
1612 2 : if len(ingestSplitFiles) > 0 {
1613 1 : if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedTables); err != nil {
1614 0 : return nil, err
1615 0 : }
1616 : }
1617 :
1618 2 : return ve, nil
1619 : }
1620 :
1621 : // flush runs a compaction that copies the immutable memtables from memory to
1622 : // disk.
1623 : //
1624 : // d.mu must be held when calling this, but the mutex may be dropped and
1625 : // re-acquired during the course of this method.
1626 2 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1627 2 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1628 2 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1629 2 : // level in the lsm. Let's say the flushable queue contains a prefix of
1630 2 : // regular immutable memtables, then an ingestedFlushable, and then the
1631 2 : // mutable memtable. When the flush of the ingestedFlushable is performed,
1632 2 : // it needs an updated view of the lsm. That is, the prefix of immutable
1633 2 : // memtables must have already been flushed. Similarly, if there are two
1634 2 : // contiguous ingestedFlushables in the queue, then the first flushable must
1635 2 : // be flushed, so that the second flushable can see an updated view of the
1636 2 : // lsm.
1637 2 : //
1638 2 : // Given the above, we restrict flushes to either some prefix of regular
1639 2 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
1640 2 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
1641 2 : // the remaining flush work should be scheduled right away.
1642 2 : //
1643 2 : // NB: Large batches placed in the flushable queue share the WAL with the
1644 2 : // previous memtable in the queue. We must ensure the property that both the
1645 2 : // large batch and the memtable with which it shares a WAL are flushed
1646 2 : // together. The property ensures that the minimum unflushed log number
1647 2 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
1648 2 : // returns true, and since the large batch will always be placed right after
1649 2 : // the memtable with which it shares a WAL, the property is naturally
1650 2 : // ensured. The large batch will always be placed after the memtable with
1651 2 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
1652 2 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
1653 2 : // measure, if we try to flush the memtable without also flushing the
1654 2 : // flushable batch in the same flush, since the memtable and flushableBatch
1655 2 : // have the same logNum, the logNum invariant check below will trigger.
1656 2 : var n, inputs int
1657 2 : var inputBytes uint64
1658 2 : var ingest bool
1659 2 : for ; n < len(d.mu.mem.queue)-1; n++ {
1660 2 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
1661 2 : if n == 0 {
1662 2 : // The first flushable is of type ingestedFlushable. Since these
1663 2 : // must be flushed individually, we perform a flush for just
1664 2 : // this.
1665 2 : if !f.readyForFlush() {
1666 0 : // This check is almost unnecessary, but we guard against it
1667 0 : // just in case this invariant changes in the future.
1668 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
1669 : }
1670 : // By setting n = 1, we ensure that the first flushable(n == 0)
1671 : // is scheduled for a flush. The number of tables added is equal to the
1672 : // number of files in the ingest operation.
1673 2 : n = 1
1674 2 : inputs = len(f.files)
1675 2 : ingest = true
1676 2 : break
1677 2 : } else {
1678 2 : // There was some prefix of flushables which weren't of type
1679 2 : // ingestedFlushable. So, perform a flush for those.
1680 2 : break
1681 : }
1682 : }
1683 2 : if !d.mu.mem.queue[n].readyForFlush() {
1684 1 : break
1685 : }
1686 2 : inputBytes += d.mu.mem.queue[n].inuseBytes()
1687 : }
1688 2 : if n == 0 {
1689 0 : // None of the immutable memtables are ready for flushing.
1690 0 : return 0, nil
1691 0 : }
1692 2 : if !ingest {
1693 2 : // Flushes of memtables add the prefix of n memtables from the flushable
1694 2 : // queue.
1695 2 : inputs = n
1696 2 : }
1697 :
1698 : // Require that every memtable being flushed has a log number less than the
1699 : // new minimum unflushed log number.
1700 2 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
1701 2 : if !d.opts.DisableWAL {
1702 2 : for i := 0; i < n; i++ {
1703 2 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
1704 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
1705 0 : n,
1706 0 : i, d.mu.mem.queue[i].flushable, logNum,
1707 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
1708 : }
1709 : }
1710 : }
1711 :
1712 2 : c, err := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.latest.l0Organizer,
1713 2 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow(), d.TableFormat(), d.determineCompactionValueSeparation)
1714 2 : if err != nil {
1715 0 : return 0, err
1716 0 : }
1717 2 : c.AddInProgressLocked(d)
1718 2 :
1719 2 : jobID := d.newJobIDLocked()
1720 2 : info := FlushInfo{
1721 2 : JobID: int(jobID),
1722 2 : Input: inputs,
1723 2 : InputBytes: inputBytes,
1724 2 : Ingest: ingest,
1725 2 : }
1726 2 : d.opts.EventListener.FlushBegin(info)
1727 2 :
1728 2 : startTime := d.timeNow()
1729 2 :
1730 2 : var ve *manifest.VersionEdit
1731 2 : var stats compact.Stats
1732 2 : // To determine the target level of the files in the ingestedFlushable, we
1733 2 : // need to acquire the logLock, and not release it for that duration. Since
1734 2 : // UpdateVersionLocked acquires it anyway, we create the VersionEdit for
1735 2 : // ingestedFlushable outside runCompaction. For all other flush cases, we
1736 2 : // construct the VersionEdit inside runCompaction.
1737 2 : var compactionErr error
1738 2 : if c.kind != compactionKindIngestedFlushable {
1739 2 : ve, stats, compactionErr = d.runCompaction(jobID, c)
1740 2 : }
1741 :
1742 2 : _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1743 2 : err := compactionErr
1744 2 : if c.kind == compactionKindIngestedFlushable {
1745 2 : ve, err = d.runIngestFlush(c)
1746 2 : }
1747 2 : info.Duration = d.timeNow().Sub(startTime)
1748 2 : if err != nil {
1749 1 : return versionUpdate{}, err
1750 1 : }
1751 :
1752 2 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
1753 2 : for i := range ve.NewTables {
1754 2 : e := &ve.NewTables[i]
1755 2 : info.Output = append(info.Output, e.Meta.TableInfo())
1756 2 : // Ingested tables are not necessarily flushed to L0. Record the level of
1757 2 : // each ingested file explicitly.
1758 2 : if ingest {
1759 2 : info.IngestLevels = append(info.IngestLevels, e.Level)
1760 2 : }
1761 : }
1762 :
1763 : // The flush succeeded or it produced an empty sstable. In either case we
1764 : // want to bump the minimum unflushed log number to the log number of the
1765 : // oldest unflushed memtable.
1766 2 : ve.MinUnflushedLogNum = minUnflushedLogNum
1767 2 : if c.kind != compactionKindIngestedFlushable {
1768 2 : l0Metrics := c.metrics.perLevel.level(0)
1769 2 : if d.opts.DisableWAL {
1770 2 : // If the WAL is disabled, every flushable has a zero [logSize],
1771 2 : // resulting in zero bytes in. Instead, use the number of bytes we
1772 2 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
1773 2 : // calculation even when the WAL is disabled.
1774 2 : l0Metrics.TableBytesIn = l0Metrics.TableBytesFlushed + l0Metrics.BlobBytesFlushed
1775 2 : } else {
1776 2 : for i := 0; i < n; i++ {
1777 2 : l0Metrics.TableBytesIn += d.mu.mem.queue[i].logSize
1778 2 : }
1779 : }
1780 2 : } else {
1781 2 : // c.kind == compactionKindIngestedFlushable && we could have deleted files due
1782 2 : // to ingest-time splits or excises.
1783 2 : ingestFlushable := c.flush.flushables[0].flushable.(*ingestedFlushable)
1784 2 : exciseBounds := ingestFlushable.exciseSpan.UserKeyBounds()
1785 2 : for c2 := range d.mu.compact.inProgress {
1786 2 : // Check if this compaction overlaps with the excise span. Note that just
1787 2 : // checking if the inputs individually overlap with the excise span
1788 2 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
1789 2 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
1790 2 : // doing a [c,d) excise at the same time as this compaction, we will have
1791 2 : // to error out the whole compaction as we can't guarantee it hasn't/won't
1792 2 : // write a file overlapping with the excise span.
1793 2 : bounds := c2.Bounds()
1794 2 : if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
1795 2 : c2.Cancel()
1796 2 : }
1797 : }
1798 :
1799 2 : if len(ve.DeletedTables) > 0 {
1800 2 : // Iterate through all other compactions, and check if their inputs have
1801 2 : // been replaced due to an ingest-time split or excise. In that case,
1802 2 : // cancel the compaction.
1803 2 : for c2 := range d.mu.compact.inProgress {
1804 2 : for level, table := range c2.Tables() {
1805 2 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{FileNum: table.TableNum, Level: level}]; ok {
1806 2 : c2.Cancel()
1807 2 : break
1808 : }
1809 : }
1810 : }
1811 : }
1812 : }
1813 2 : return versionUpdate{
1814 2 : VE: ve,
1815 2 : JobID: jobID,
1816 2 : Metrics: c.metrics.perLevel,
1817 2 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
1818 : }, nil
1819 : })
1820 :
1821 : // If err != nil, then the flush will be retried, and we will recalculate
1822 : // these metrics.
1823 2 : if err == nil {
1824 2 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
1825 2 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
1826 2 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
1827 2 : }
1828 :
1829 2 : d.clearCompactingState(c, err != nil)
1830 2 : delete(d.mu.compact.inProgress, c)
1831 2 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
1832 2 :
1833 2 : var flushed flushableList
1834 2 : if err == nil {
1835 2 : flushed = d.mu.mem.queue[:n]
1836 2 : d.mu.mem.queue = d.mu.mem.queue[n:]
1837 2 : d.updateReadStateLocked(d.opts.DebugCheck)
1838 2 : d.updateTableStatsLocked(ve.NewTables)
1839 2 : if ingest {
1840 2 : d.mu.versions.metrics.Flush.AsIngestCount++
1841 2 : for _, l := range c.metrics.perLevel {
1842 2 : if l != nil {
1843 2 : d.mu.versions.metrics.Flush.AsIngestBytes += l.TableBytesIngested
1844 2 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
1845 2 : }
1846 : }
1847 : }
1848 2 : d.maybeTransitionSnapshotsToFileOnlyLocked()
1849 : }
1850 : // Signal FlushEnd after installing the new readState. This helps for unit
1851 : // tests that use the callback to trigger a read using an iterator with
1852 : // IterOptions.OnlyReadGuaranteedDurable.
1853 2 : info.Err = err
1854 2 : if info.Err == nil && len(ve.NewTables) == 0 {
1855 2 : info.Err = errEmptyTable
1856 2 : }
1857 2 : info.Done = true
1858 2 : info.TotalDuration = d.timeNow().Sub(startTime)
1859 2 : d.opts.EventListener.FlushEnd(info)
1860 2 :
1861 2 : // The order of these operations matters here for ease of testing.
1862 2 : // Removing the reader reference first allows tests to be guaranteed that
1863 2 : // the memtable reservation has been released by the time a synchronous
1864 2 : // flush returns. readerUnrefLocked may also produce obsolete files so the
1865 2 : // call to deleteObsoleteFiles must happen after it.
1866 2 : for i := range flushed {
1867 2 : flushed[i].readerUnrefLocked(true)
1868 2 : }
1869 :
1870 2 : d.deleteObsoleteFiles(jobID)
1871 2 :
1872 2 : // Mark all the memtables we flushed as flushed.
1873 2 : for i := range flushed {
1874 2 : close(flushed[i].flushed)
1875 2 : }
1876 :
1877 2 : return inputBytes, err
1878 : }
1879 :
1880 : // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually
1881 : // file-only" snapshots to be file-only if all their visible state has been
1882 : // flushed to sstables.
1883 : //
1884 : // REQUIRES: d.mu.
1885 2 : func (d *DB) maybeTransitionSnapshotsToFileOnlyLocked() {
1886 2 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
1887 2 : currentVersion := d.mu.versions.currentVersion()
1888 2 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
1889 2 : if s.efos == nil {
1890 2 : s = s.next
1891 2 : continue
1892 : }
1893 2 : overlapsFlushable := false
1894 2 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, base.SeqNumMax) {
1895 2 : // There are some unflushed keys that are still visible to the EFOS.
1896 2 : // Check if any memtables older than the EFOS contain keys within a
1897 2 : // protected range of the EFOS. If no, we can transition.
1898 2 : protectedRanges := make([]bounded, len(s.efos.protectedRanges))
1899 2 : for i := range s.efos.protectedRanges {
1900 2 : protectedRanges[i] = s.efos.protectedRanges[i]
1901 2 : }
1902 2 : for i := range d.mu.mem.queue {
1903 2 : if !base.Visible(d.mu.mem.queue[i].logSeqNum, s.efos.seqNum, base.SeqNumMax) {
1904 1 : // All keys in this memtable are newer than the EFOS. Skip this
1905 1 : // memtable.
1906 1 : continue
1907 : }
1908 : // NB: computePossibleOverlaps could have false positives, such as if
1909 : // the flushable is a flushable ingest and not a memtable. In that
1910 : // case we don't open the sstables to check; we just pessimistically
1911 : // assume an overlap.
1912 2 : d.mu.mem.queue[i].computePossibleOverlaps(func(b bounded) shouldContinue {
1913 2 : overlapsFlushable = true
1914 2 : return stopIteration
1915 2 : }, protectedRanges...)
1916 2 : if overlapsFlushable {
1917 2 : break
1918 : }
1919 : }
1920 : }
1921 2 : if overlapsFlushable {
1922 2 : s = s.next
1923 2 : continue
1924 : }
1925 2 : currentVersion.Ref()
1926 2 :
1927 2 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
1928 2 : // case s.next would be nil. Save it before calling it.
1929 2 : next := s.next
1930 2 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
1931 2 : s = next
1932 : }
1933 : }
1934 :
1935 : // maybeScheduleCompactionAsync should be used when
1936 : // we want to possibly schedule a compaction, but don't
1937 : // want to eat the cost of running maybeScheduleCompaction.
1938 : // This method should be launched in a separate goroutine.
1939 : // d.mu must not be held when this is called.
1940 0 : func (d *DB) maybeScheduleCompactionAsync() {
1941 0 : defer d.compactionSchedulers.Done()
1942 0 :
1943 0 : d.mu.Lock()
1944 0 : d.maybeScheduleCompaction()
1945 0 : d.mu.Unlock()
1946 0 : }
1947 :
1948 : // maybeScheduleCompaction schedules a compaction if necessary.
1949 : //
1950 : // WARNING: maybeScheduleCompaction and Schedule must be the only ways that
1951 : // any compactions are run. These ensure that the pickedCompactionCache is
1952 : // used and not stale (by ensuring invalidation is done).
1953 : //
1954 : // Even compactions that are not scheduled by the CompactionScheduler must be
1955 : // run using maybeScheduleCompaction, since starting those compactions needs
1956 : // to invalidate the pickedCompactionCache.
1957 : //
1958 : // Requires d.mu to be held.
1959 2 : func (d *DB) maybeScheduleCompaction() {
1960 2 : d.mu.versions.logLock()
1961 2 : defer d.mu.versions.logUnlock()
1962 2 : env := d.makeCompactionEnvLocked()
1963 2 : if env == nil {
1964 2 : return
1965 2 : }
1966 : // env.inProgressCompactions will become stale once we pick a compaction, so
1967 : // it needs to be kept fresh. Also, the pickedCompaction in the
1968 : // pickedCompactionCache is not valid if we pick a compaction before using
1969 : // it, since those earlier compactions can mark the same file as compacting.
1970 :
1971 : // Delete-only compactions are expected to be cheap and reduce future
1972 : // compaction work, so schedule them directly instead of using the
1973 : // CompactionScheduler.
1974 2 : if d.tryScheduleDeleteOnlyCompaction() {
1975 2 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1976 2 : d.mu.versions.pickedCompactionCache.invalidate()
1977 2 : }
1978 : // Download compactions have their own concurrency and do not currently
1979 : // interact with CompactionScheduler.
1980 : //
1981 : // TODO(sumeer): integrate with CompactionScheduler, since these consume
1982 : // disk write bandwidth.
1983 2 : if d.tryScheduleDownloadCompactions(*env, d.opts.MaxConcurrentDownloads()) {
1984 2 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
1985 2 : d.mu.versions.pickedCompactionCache.invalidate()
1986 2 : }
1987 : // The remaining compactions are scheduled by the CompactionScheduler.
1988 2 : if d.mu.versions.pickedCompactionCache.isWaiting() {
1989 2 : // CompactionScheduler already knows that the DB is waiting to run a
1990 2 : // compaction.
1991 2 : return
1992 2 : }
1993 : // INVARIANT: !pickedCompactionCache.isWaiting. The following loop will
1994 : // either exit after successfully starting all the compactions it can pick,
1995 : // or will exit with one pickedCompaction in the cache, and isWaiting=true.
1996 2 : for {
1997 2 : // Do not have a pickedCompaction in the cache.
1998 2 : pc := d.pickAnyCompaction(*env)
1999 2 : if pc == nil {
2000 2 : return
2001 2 : }
2002 2 : success, grantHandle := d.opts.Experimental.CompactionScheduler.TrySchedule()
2003 2 : if !success {
2004 2 : // Can't run now, but remember this pickedCompaction in the cache.
2005 2 : d.mu.versions.pickedCompactionCache.add(pc)
2006 2 : return
2007 2 : }
2008 2 : d.runPickedCompaction(pc, grantHandle)
2009 2 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2010 : }
2011 : }
2012 :
2013 : // makeCompactionEnv attempts to create a compactionEnv necessary during
2014 : // compaction picking. If the DB is closed or marked as read-only,
2015 : // makeCompactionEnv returns nil to indicate that compactions may not be
2016 : // performed. Else, a new compactionEnv is constructed using the current DB
2017 : // state.
2018 : //
2019 : // Compaction picking needs a coherent view of a Version. For example, we need
2020 : // to exclude concurrent ingestions from making a decision on which level to
2021 : // ingest into that conflicts with our compaction decision.
2022 : //
2023 : // A pickedCompaction constructed using a compactionEnv must only be used if
2024 : // the latest Version has not changed.
2025 : //
2026 : // REQUIRES: d.mu and d.mu.versions.logLock are held.
2027 2 : func (d *DB) makeCompactionEnvLocked() *compactionEnv {
2028 2 : if d.closed.Load() != nil || d.opts.ReadOnly {
2029 2 : return nil
2030 2 : }
2031 2 : env := &compactionEnv{
2032 2 : diskAvailBytes: d.diskAvailBytes.Load(),
2033 2 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
2034 2 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
2035 2 : inProgressCompactions: d.getInProgressCompactionInfoLocked(nil),
2036 2 : readCompactionEnv: readCompactionEnv{
2037 2 : readCompactions: &d.mu.compact.readCompactions,
2038 2 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
2039 2 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
2040 2 : },
2041 2 : }
2042 2 : if !d.problemSpans.IsEmpty() {
2043 1 : env.problemSpans = &d.problemSpans
2044 1 : }
2045 2 : return env
2046 : }
2047 :
2048 : // pickAnyCompaction tries to pick a manual or automatic compaction.
2049 2 : func (d *DB) pickAnyCompaction(env compactionEnv) (pc pickedCompaction) {
2050 2 : // Pick a score-based compaction first, since a misshapen LSM is bad.
2051 2 : if !d.opts.DisableAutomaticCompactions {
2052 2 : if pc = d.mu.versions.picker.pickAutoScore(env); pc != nil {
2053 2 : return pc
2054 2 : }
2055 : }
2056 : // Pick a manual compaction, if any.
2057 2 : if pc = d.pickManualCompaction(env); pc != nil {
2058 2 : return pc
2059 2 : }
2060 2 : if !d.opts.DisableAutomaticCompactions {
2061 2 : return d.mu.versions.picker.pickAutoNonScore(env)
2062 2 : }
2063 2 : return nil
2064 : }
2065 :
2066 : // runPickedCompaction kicks off the provided pickedCompaction. In case the
2067 : // pickedCompaction is a manual compaction, the corresponding manualCompaction
2068 : // is removed from d.mu.compact.manual.
2069 : //
2070 : // REQUIRES: d.mu and d.mu.versions.logLock is held.
2071 2 : func (d *DB) runPickedCompaction(pc pickedCompaction, grantHandle CompactionGrantHandle) {
2072 2 : var doneChannel chan error
2073 2 : if pc.ManualID() > 0 {
2074 2 : for i := range d.mu.compact.manual {
2075 2 : if d.mu.compact.manual[i].id == pc.ManualID() {
2076 2 : doneChannel = d.mu.compact.manual[i].done
2077 2 : d.mu.compact.manual = slices.Delete(d.mu.compact.manual, i, i+1)
2078 2 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
2079 2 : break
2080 : }
2081 : }
2082 2 : if doneChannel == nil {
2083 0 : panic(errors.AssertionFailedf("did not find manual compaction with id %d", pc.ManualID()))
2084 : }
2085 : }
2086 :
2087 2 : d.mu.compact.compactingCount++
2088 2 : c := pc.ConstructCompaction(d, grantHandle)
2089 2 : c.AddInProgressLocked(d)
2090 2 : go func() {
2091 2 : d.compact(c, doneChannel)
2092 2 : }()
2093 : }
2094 :
2095 : // Schedule implements DBForCompaction (it is called by the
2096 : // CompactionScheduler).
2097 2 : func (d *DB) Schedule(grantHandle CompactionGrantHandle) bool {
2098 2 : d.mu.Lock()
2099 2 : defer d.mu.Unlock()
2100 2 : d.mu.versions.logLock()
2101 2 : defer d.mu.versions.logUnlock()
2102 2 : isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
2103 2 : if !isWaiting {
2104 0 : return false
2105 0 : }
2106 2 : pc := d.mu.versions.pickedCompactionCache.getForRunning()
2107 2 : if pc == nil {
2108 2 : env := d.makeCompactionEnvLocked()
2109 2 : if env != nil {
2110 2 : pc = d.pickAnyCompaction(*env)
2111 2 : }
2112 2 : if pc == nil {
2113 1 : d.mu.versions.pickedCompactionCache.setNotWaiting()
2114 1 : return false
2115 1 : }
2116 : }
2117 : // INVARIANT: pc != nil and is not in the cache. isWaiting is true, since
2118 : // there may be more compactions to run.
2119 2 : d.runPickedCompaction(pc, grantHandle)
2120 2 : return true
2121 : }
2122 :
2123 : // GetWaitingCompaction implements DBForCompaction (it is called by the
2124 : // CompactionScheduler).
2125 2 : func (d *DB) GetWaitingCompaction() (bool, WaitingCompaction) {
2126 2 : d.mu.Lock()
2127 2 : defer d.mu.Unlock()
2128 2 : d.mu.versions.logLock()
2129 2 : defer d.mu.versions.logUnlock()
2130 2 : isWaiting := d.mu.versions.pickedCompactionCache.isWaiting()
2131 2 : if !isWaiting {
2132 2 : return false, WaitingCompaction{}
2133 2 : }
2134 2 : pc := d.mu.versions.pickedCompactionCache.peek()
2135 2 : if pc == nil {
2136 2 : // Need to pick a compaction.
2137 2 : env := d.makeCompactionEnvLocked()
2138 2 : if env != nil {
2139 2 : pc = d.pickAnyCompaction(*env)
2140 2 : }
2141 2 : if pc == nil {
2142 2 : // Call setNotWaiting so that next call to GetWaitingCompaction can
2143 2 : // return early.
2144 2 : d.mu.versions.pickedCompactionCache.setNotWaiting()
2145 2 : return false, WaitingCompaction{}
2146 2 : } else {
2147 2 : d.mu.versions.pickedCompactionCache.add(pc)
2148 2 : }
2149 : }
2150 : // INVARIANT: pc != nil and is in the cache.
2151 2 : return true, pc.WaitingCompaction()
2152 : }
2153 :
2154 : // GetAllowedWithoutPermission implements DBForCompaction (it is called by the
2155 : // CompactionScheduler).
2156 2 : func (d *DB) GetAllowedWithoutPermission() int {
2157 2 : allowedBasedOnBacklog := int(d.mu.versions.curCompactionConcurrency.Load())
2158 2 : allowedBasedOnManual := 0
2159 2 : manualBacklog := int(d.mu.compact.manualLen.Load())
2160 2 : if manualBacklog > 0 {
2161 2 : _, maxAllowed := d.opts.CompactionConcurrencyRange()
2162 2 : allowedBasedOnManual = min(maxAllowed, manualBacklog+allowedBasedOnBacklog)
2163 2 : }
2164 2 : return max(allowedBasedOnBacklog, allowedBasedOnManual)
2165 : }
2166 :
2167 : // tryScheduleDownloadCompactions tries to start download compactions.
2168 : //
2169 : // Requires d.mu to be held. Updates d.mu.compact.downloads.
2170 : //
2171 : // Returns true iff at least one compaction was started.
2172 2 : func (d *DB) tryScheduleDownloadCompactions(env compactionEnv, maxConcurrentDownloads int) bool {
2173 2 : started := false
2174 2 : vers := d.mu.versions.currentVersion()
2175 2 : for i := 0; i < len(d.mu.compact.downloads); {
2176 2 : if d.mu.compact.downloadingCount >= maxConcurrentDownloads {
2177 2 : break
2178 : }
2179 2 : download := d.mu.compact.downloads[i]
2180 2 : switch d.tryLaunchDownloadCompaction(download, vers, d.mu.versions.latest.l0Organizer, env, maxConcurrentDownloads) {
2181 2 : case launchedCompaction:
2182 2 : started = true
2183 2 : continue
2184 1 : case didNotLaunchCompaction:
2185 1 : // See if we can launch a compaction for another download task.
2186 1 : i++
2187 2 : case downloadTaskCompleted:
2188 2 : // Task is completed and must be removed.
2189 2 : d.mu.compact.downloads = slices.Delete(d.mu.compact.downloads, i, i+1)
2190 : }
2191 : }
2192 2 : return started
2193 : }
2194 :
2195 2 : func (d *DB) pickManualCompaction(env compactionEnv) (pc pickedCompaction) {
2196 2 : v := d.mu.versions.currentVersion()
2197 2 : for len(d.mu.compact.manual) > 0 {
2198 2 : manual := d.mu.compact.manual[0]
2199 2 : pc, retryLater := newPickedManualCompaction(v, d.mu.versions.latest.l0Organizer,
2200 2 : d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
2201 2 : if pc != nil {
2202 2 : return pc
2203 2 : }
2204 2 : if retryLater {
2205 2 : // We are not able to run this manual compaction at this time.
2206 2 : // Inability to run the head blocks later manual compactions.
2207 2 : manual.retries++
2208 2 : return nil
2209 2 : }
2210 : // Manual compaction is a no-op. Signal that it's complete.
2211 2 : manual.done <- nil
2212 2 : d.mu.compact.manual = d.mu.compact.manual[1:]
2213 2 : d.mu.compact.manualLen.Store(int32(len(d.mu.compact.manual)))
2214 : }
2215 2 : return nil
2216 : }
2217 :
2218 : // tryScheduleDeleteOnlyCompaction tries to kick off a delete-only compaction
2219 : // for all files that can be deleted as suggested by deletionHints.
2220 : //
2221 : // Requires d.mu to be held. Updates d.mu.compact.deletionHints.
2222 : //
2223 : // Returns true iff a compaction was started.
2224 2 : func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
2225 2 : if d.opts.private.disableDeleteOnlyCompactions || d.opts.DisableAutomaticCompactions ||
2226 2 : len(d.mu.compact.deletionHints) == 0 {
2227 2 : return false
2228 2 : }
2229 2 : if _, maxConcurrency := d.opts.CompactionConcurrencyRange(); d.mu.compact.compactingCount >= maxConcurrency {
2230 2 : return false
2231 2 : }
2232 2 : v := d.mu.versions.currentVersion()
2233 2 : snapshots := d.mu.snapshots.toSlice()
2234 2 : // We need to save the value of exciseEnabled in the compaction itself, as
2235 2 : // it can change dynamically between now and when the compaction runs.
2236 2 : exciseEnabled := d.FormatMajorVersion() >= FormatVirtualSSTables &&
2237 2 : d.opts.Experimental.EnableDeleteOnlyCompactionExcises != nil && d.opts.Experimental.EnableDeleteOnlyCompactionExcises()
2238 2 : inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, exciseEnabled)
2239 2 : d.mu.compact.deletionHints = unresolvedHints
2240 2 :
2241 2 : if len(inputs) > 0 {
2242 2 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
2243 2 : d.mu.compact.compactingCount++
2244 2 : c.AddInProgressLocked(d)
2245 2 : go d.compact(c, nil)
2246 2 : return true
2247 2 : }
2248 2 : return false
2249 : }
2250 :
2251 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
2252 : // generated from a span containing a range del (point key only), a range key
2253 : // delete (range key only), or both a point and range key.
2254 : type deleteCompactionHintType uint8
2255 :
2256 : const (
2257 : // NOTE: While these are primarily used as enumeration types, they are also
2258 : // used for some bitwise operations. Care should be taken when updating.
2259 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
2260 : deleteCompactionHintTypePointKeyOnly
2261 : deleteCompactionHintTypeRangeKeyOnly
2262 : deleteCompactionHintTypePointAndRangeKey
2263 : )
2264 :
2265 : // String implements fmt.Stringer.
2266 1 : func (h deleteCompactionHintType) String() string {
2267 1 : switch h {
2268 0 : case deleteCompactionHintTypeUnknown:
2269 0 : return "unknown"
2270 1 : case deleteCompactionHintTypePointKeyOnly:
2271 1 : return "point-key-only"
2272 1 : case deleteCompactionHintTypeRangeKeyOnly:
2273 1 : return "range-key-only"
2274 1 : case deleteCompactionHintTypePointAndRangeKey:
2275 1 : return "point-and-range-key"
2276 0 : default:
2277 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
2278 : }
2279 : }
2280 :
2281 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
2282 : // keyspan.Keys.
2283 2 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
2284 2 : var hintType deleteCompactionHintType
2285 2 : for _, k := range keys {
2286 2 : switch k.Kind() {
2287 2 : case base.InternalKeyKindRangeDelete:
2288 2 : hintType |= deleteCompactionHintTypePointKeyOnly
2289 2 : case base.InternalKeyKindRangeKeyDelete:
2290 2 : hintType |= deleteCompactionHintTypeRangeKeyOnly
2291 0 : default:
2292 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
2293 : }
2294 : }
2295 2 : return hintType
2296 : }
2297 :
2298 : // A deleteCompactionHint records a user key and sequence number span that has been
2299 : // deleted by a range tombstone. A hint is recorded if at least one sstable
2300 : // falls completely within both the user key and sequence number spans.
2301 : // Once the tombstones and the observed completely-contained sstables fall
2302 : // into the same snapshot stripe, a delete-only compaction may delete any
2303 : // sstables within the range.
2304 : type deleteCompactionHint struct {
2305 : // The type of key span that generated this hint (point key, range key, or
2306 : // both).
2307 : hintType deleteCompactionHintType
2308 : // start and end are user keys specifying a key range [start, end) of
2309 : // deleted keys.
2310 : start []byte
2311 : end []byte
2312 : // The level of the file containing the range tombstone(s) when the hint
2313 : // was created. Only lower levels need to be searched for files that may
2314 : // be deleted.
2315 : tombstoneLevel int
2316 : // The file containing the range tombstone(s) that created the hint.
2317 : tombstoneFile *manifest.TableMetadata
2318 : // The smallest and largest sequence numbers of the abutting tombstones
2319 : // merged to form this hint. All of a tables' keys must be less than the
2320 : // tombstone smallest sequence number to be deleted. All of a tables'
2321 : // sequence numbers must fall into the same snapshot stripe as the
2322 : // tombstone largest sequence number to be deleted.
2323 : tombstoneLargestSeqNum base.SeqNum
2324 : tombstoneSmallestSeqNum base.SeqNum
2325 : // The smallest sequence number of a sstable that was found to be covered
2326 : // by this hint. The hint cannot be resolved until this sequence number is
2327 : // in the same snapshot stripe as the largest tombstone sequence number.
2328 : // This is set when a hint is created, so the LSM may look different and
2329 : // notably no longer contain the sstable that contained the key at this
2330 : // sequence number.
2331 : fileSmallestSeqNum base.SeqNum
2332 : }
2333 :
2334 : type deletionHintOverlap int8
2335 :
2336 : const (
2337 : // hintDoesNotApply indicates that the hint does not apply to the file.
2338 : hintDoesNotApply deletionHintOverlap = iota
2339 : // hintExcisesFile indicates that the hint excises a portion of the file,
2340 : // and the format major version of the DB supports excises.
2341 : hintExcisesFile
2342 : // hintDeletesFile indicates that the hint deletes the entirety of the file.
2343 : hintDeletesFile
2344 : )
2345 :
2346 1 : func (h deleteCompactionHint) String() string {
2347 1 : return fmt.Sprintf(
2348 1 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
2349 1 : h.tombstoneLevel, h.tombstoneFile.TableNum, h.start, h.end,
2350 1 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
2351 1 : h.hintType,
2352 1 : )
2353 1 : }
2354 :
2355 : func (h *deleteCompactionHint) canDeleteOrExcise(
2356 : cmp Compare, m *manifest.TableMetadata, snapshots compact.Snapshots, exciseEnabled bool,
2357 2 : ) deletionHintOverlap {
2358 2 : // The file can only be deleted if all of its keys are older than the
2359 2 : // earliest tombstone aggregated into the hint. Note that we use
2360 2 : // m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
2361 2 : // zeroes sequence numbers. A compaction may zero the sequence number of a
2362 2 : // key with a sequence number > h.tombstoneSmallestSeqNum and set it to
2363 2 : // zero. If we looked at m.LargestSeqNum, the resulting output file would
2364 2 : // appear to not contain any keys more recent than the oldest tombstone. To
2365 2 : // avoid this error, the largest pre-zeroing sequence number is maintained
2366 2 : // in LargestSeqNumAbsolute and used here to make the determination whether
2367 2 : // the file's keys are older than all of the hint's tombstones.
2368 2 : if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
2369 2 : return hintDoesNotApply
2370 2 : }
2371 :
2372 : // The file's oldest key must be in the same snapshot stripe as the
2373 : // newest tombstone. NB: We already checked the hint's sequence numbers,
2374 : // but this file's oldest sequence number might be lower than the hint's
2375 : // smallest sequence number despite the file falling within the key range
2376 : // if this file was constructed after the hint by a compaction.
2377 2 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(m.SmallestSeqNum) {
2378 0 : return hintDoesNotApply
2379 0 : }
2380 :
2381 2 : switch h.hintType {
2382 2 : case deleteCompactionHintTypePointKeyOnly:
2383 2 : // A hint generated by a range del span cannot delete tables that contain
2384 2 : // range keys.
2385 2 : if m.HasRangeKeys {
2386 1 : return hintDoesNotApply
2387 1 : }
2388 2 : case deleteCompactionHintTypeRangeKeyOnly:
2389 2 : // A hint generated by a range key del span cannot delete tables that
2390 2 : // contain point keys.
2391 2 : if m.HasPointKeys {
2392 2 : return hintDoesNotApply
2393 2 : }
2394 2 : case deleteCompactionHintTypePointAndRangeKey:
2395 : // A hint from a span that contains both range dels *and* range keys can
2396 : // only be deleted if both bounds fall within the hint. The next check takes
2397 : // care of this.
2398 0 : default:
2399 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
2400 : }
2401 2 : if cmp(h.start, m.Smallest().UserKey) <= 0 &&
2402 2 : base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0 {
2403 2 : return hintDeletesFile
2404 2 : }
2405 2 : if !exciseEnabled {
2406 2 : // The file's keys must be completely contained within the hint range; excises
2407 2 : // aren't allowed.
2408 2 : return hintDoesNotApply
2409 2 : }
2410 : // Check for any overlap. In cases of partial overlap, we can excise the part of the file
2411 : // that overlaps with the deletion hint.
2412 2 : if cmp(h.end, m.Smallest().UserKey) > 0 &&
2413 2 : (m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0) {
2414 2 : return hintExcisesFile
2415 2 : }
2416 1 : return hintDoesNotApply
2417 : }
2418 :
2419 : // checkDeleteCompactionHints checks the passed-in deleteCompactionHints for those that
2420 : // can be resolved and those that cannot. A hint is considered resolved when its largest
2421 : // tombstone sequence number and the smallest sequence number of covered files fall in
2422 : // the same snapshot stripe. No more than maxHintsPerDeleteOnlyCompaction will be resolved
2423 : // per method call. Resolved and unresolved hints are returned in separate return values.
2424 : // The files that the resolved hints apply to, are returned as compactionLevels.
2425 : func checkDeleteCompactionHints(
2426 : cmp Compare,
2427 : v *manifest.Version,
2428 : hints []deleteCompactionHint,
2429 : snapshots compact.Snapshots,
2430 : exciseEnabled bool,
2431 2 : ) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
2432 2 : var files map[*manifest.TableMetadata]bool
2433 2 : var byLevel [numLevels][]*manifest.TableMetadata
2434 2 :
2435 2 : // Delete-only compactions can be quadratic (O(mn)) in terms of runtime
2436 2 : // where m = number of files in the delete-only compaction and n = number
2437 2 : // of resolved hints. To prevent these from growing unbounded, we cap
2438 2 : // the number of hints we resolve for one delete-only compaction. This
2439 2 : // cap only applies if exciseEnabled == true.
2440 2 : const maxHintsPerDeleteOnlyCompaction = 10
2441 2 :
2442 2 : unresolvedHints := hints[:0]
2443 2 : // Lazily populate resolvedHints, similar to files above.
2444 2 : resolvedHints := make([]deleteCompactionHint, 0)
2445 2 : for _, h := range hints {
2446 2 : // Check each compaction hint to see if it's resolvable. Resolvable
2447 2 : // hints are removed and trigger a delete-only compaction if any files
2448 2 : // in the current LSM still meet their criteria. Unresolvable hints
2449 2 : // are saved and don't trigger a delete-only compaction.
2450 2 : //
2451 2 : // When a compaction hint is created, the sequence numbers of the
2452 2 : // range tombstones and the covered file with the oldest key are
2453 2 : // recorded. The largest tombstone sequence number and the smallest
2454 2 : // file sequence number must be in the same snapshot stripe for the
2455 2 : // hint to be resolved. The below graphic models a compaction hint
2456 2 : // covering the keyspace [b, r). The hint completely contains two
2457 2 : // files, 000002 and 000003. The file 000003 contains the lowest
2458 2 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
2459 2 : // the highest tombstone sequence number incorporated into the hint.
2460 2 : // The hint may be resolved only once the snapshots at #100, #180 and
2461 2 : // #210 are all closed. File 000001 is not included within the hint
2462 2 : // because it extends beyond the range tombstones in user key space.
2463 2 : //
2464 2 : // 250
2465 2 : //
2466 2 : // |-b...230:h-|
2467 2 : // _____________________________________________________ snapshot #210
2468 2 : // 200 |--h.RANGEDEL.200:r--|
2469 2 : //
2470 2 : // _____________________________________________________ snapshot #180
2471 2 : //
2472 2 : // 150 +--------+
2473 2 : // +---------+ | 000003 |
2474 2 : // | 000002 | | |
2475 2 : // +_________+ | |
2476 2 : // 100_____________________|________|___________________ snapshot #100
2477 2 : // +--------+
2478 2 : // _____________________________________________________ snapshot #70
2479 2 : // +---------------+
2480 2 : // 50 | 000001 |
2481 2 : // | |
2482 2 : // +---------------+
2483 2 : // ______________________________________________________________
2484 2 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2485 2 :
2486 2 : if snapshots.Index(h.tombstoneLargestSeqNum) != snapshots.Index(h.fileSmallestSeqNum) ||
2487 2 : (len(resolvedHints) >= maxHintsPerDeleteOnlyCompaction && exciseEnabled) {
2488 2 : // Cannot resolve yet.
2489 2 : unresolvedHints = append(unresolvedHints, h)
2490 2 : continue
2491 : }
2492 :
2493 : // The hint h will be resolved and dropped, if it either affects no files at all
2494 : // or if the number of files it creates (eg. through excision) is less than or
2495 : // equal to the number of files it deletes. First, determine how many files are
2496 : // affected by this hint.
2497 2 : filesDeletedByCurrentHint := 0
2498 2 : var filesDeletedByLevel [7][]*manifest.TableMetadata
2499 2 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2500 2 : for m := range v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end)).All() {
2501 2 : doesHintApply := h.canDeleteOrExcise(cmp, m, snapshots, exciseEnabled)
2502 2 : if m.IsCompacting() || doesHintApply == hintDoesNotApply || files[m] {
2503 2 : continue
2504 : }
2505 2 : switch doesHintApply {
2506 2 : case hintDeletesFile:
2507 2 : filesDeletedByCurrentHint++
2508 2 : case hintExcisesFile:
2509 2 : // Account for the original file being deleted.
2510 2 : filesDeletedByCurrentHint++
2511 2 : // An excise could produce up to 2 new files. If the hint
2512 2 : // leaves a fragment of the file on the left, decrement
2513 2 : // the counter once. If the hint leaves a fragment of the
2514 2 : // file on the right, decrement the counter once.
2515 2 : if cmp(h.start, m.Smallest().UserKey) > 0 {
2516 2 : filesDeletedByCurrentHint--
2517 2 : }
2518 2 : if m.UserKeyBounds().End.IsUpperBoundFor(cmp, h.end) {
2519 2 : filesDeletedByCurrentHint--
2520 2 : }
2521 : }
2522 2 : filesDeletedByLevel[l] = append(filesDeletedByLevel[l], m)
2523 : }
2524 : }
2525 2 : if filesDeletedByCurrentHint < 0 {
2526 2 : // This hint does not delete a sufficient number of files to warrant
2527 2 : // a delete-only compaction at this stage. Drop it (ie. don't add it
2528 2 : // to either resolved or unresolved hints) so it doesn't stick around
2529 2 : // forever.
2530 2 : continue
2531 : }
2532 : // This hint will be resolved and dropped.
2533 2 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2534 2 : byLevel[l] = append(byLevel[l], filesDeletedByLevel[l]...)
2535 2 : for _, m := range filesDeletedByLevel[l] {
2536 2 : if files == nil {
2537 2 : // Construct files lazily, assuming most calls will not
2538 2 : // produce delete-only compactions.
2539 2 : files = make(map[*manifest.TableMetadata]bool)
2540 2 : }
2541 2 : files[m] = true
2542 : }
2543 : }
2544 2 : resolvedHints = append(resolvedHints, h)
2545 : }
2546 :
2547 2 : var compactLevels []compactionLevel
2548 2 : for l, files := range byLevel {
2549 2 : if len(files) == 0 {
2550 2 : continue
2551 : }
2552 2 : compactLevels = append(compactLevels, compactionLevel{
2553 2 : level: l,
2554 2 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2555 2 : })
2556 : }
2557 2 : return compactLevels, resolvedHints, unresolvedHints
2558 : }
2559 :
2560 : // compact runs one compaction and maybe schedules another call to compact.
2561 2 : func (d *DB) compact(c compaction, errChannel chan error) {
2562 2 : pprof.Do(context.Background(), c.PprofLabels(d.opts.Experimental.UserKeyCategories), func(context.Context) {
2563 2 : func() {
2564 2 : d.mu.Lock()
2565 2 : defer d.mu.Unlock()
2566 2 : jobID := d.newJobIDLocked()
2567 2 :
2568 2 : compactErr := c.Execute(jobID, d)
2569 2 :
2570 2 : d.deleteObsoleteFiles(jobID)
2571 2 : // We send on the error channel only after we've deleted
2572 2 : // obsolete files so that tests performing manual compactions
2573 2 : // block until the obsolete files are deleted, and the test
2574 2 : // observes the deletion.
2575 2 : if errChannel != nil {
2576 2 : errChannel <- compactErr
2577 2 : }
2578 2 : if compactErr != nil {
2579 2 : d.handleCompactFailure(c, compactErr)
2580 2 : }
2581 2 : if c.IsDownload() {
2582 2 : d.mu.compact.downloadingCount--
2583 2 : } else {
2584 2 : d.mu.compact.compactingCount--
2585 2 : }
2586 2 : delete(d.mu.compact.inProgress, c)
2587 2 : // Add this compaction's duration to the cumulative duration. NB: This
2588 2 : // must be atomic with the above removal of c from
2589 2 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2590 2 : // miss or double count a completing compaction's duration.
2591 2 : d.mu.compact.duration += d.timeNow().Sub(c.BeganAt())
2592 : }()
2593 : // Done must not be called while holding any lock that needs to be
2594 : // acquired by Schedule. Also, it must be called after new Version has
2595 : // been installed, and metadata related to compactingCount and inProgress
2596 : // compactions has been updated. This is because when we are running at
2597 : // the limit of permitted compactions, Done can cause the
2598 : // CompactionScheduler to schedule another compaction. Note that the only
2599 : // compactions that may be scheduled by Done are those integrated with the
2600 : // CompactionScheduler.
2601 2 : c.GrantHandle().Done()
2602 2 : // The previous compaction may have produced too many files in a level, so
2603 2 : // reschedule another compaction if needed.
2604 2 : //
2605 2 : // The preceding Done call will not necessarily cause a compaction to be
2606 2 : // scheduled, so we also need to call maybeScheduleCompaction. And
2607 2 : // maybeScheduleCompaction encompasses all compactions, and not only those
2608 2 : // scheduled via the CompactionScheduler.
2609 2 : func() {
2610 2 : d.mu.Lock()
2611 2 : defer d.mu.Unlock()
2612 2 : d.maybeScheduleCompaction()
2613 2 : d.mu.compact.cond.Broadcast()
2614 2 : }()
2615 : })
2616 : }
2617 :
2618 2 : func (d *DB) handleCompactFailure(c compaction, err error) {
2619 2 : if errors.Is(err, ErrCancelledCompaction) {
2620 2 : // ErrCancelledCompaction is expected during normal operation, so we don't
2621 2 : // want to report it as a background error.
2622 2 : d.opts.Logger.Infof("%v", err)
2623 2 : return
2624 2 : }
2625 1 : c.RecordError(&d.problemSpans, err)
2626 1 : // TODO(peter): count consecutive compaction errors and backoff.
2627 1 : d.opts.EventListener.BackgroundError(err)
2628 : }
2629 :
2630 : // cleanupVersionEdit cleans up any on-disk artifacts that were created
2631 : // for the application of a versionEdit that is no longer going to be applied.
2632 : //
2633 : // d.mu must be held when calling this method.
2634 2 : func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
2635 2 : obsoleteFiles := manifest.ObsoleteFiles{
2636 2 : TableBackings: make([]*manifest.TableBacking, 0, len(ve.NewTables)),
2637 2 : BlobFiles: make([]*manifest.PhysicalBlobFile, 0, len(ve.NewBlobFiles)),
2638 2 : }
2639 2 : deletedTables := make(map[base.TableNum]struct{})
2640 2 : for key := range ve.DeletedTables {
2641 2 : deletedTables[key.FileNum] = struct{}{}
2642 2 : }
2643 2 : for i := range ve.NewBlobFiles {
2644 1 : obsoleteFiles.AddBlob(ve.NewBlobFiles[i].Physical)
2645 1 : d.mu.versions.zombieBlobs.Add(objectInfo{
2646 1 : fileInfo: fileInfo{
2647 1 : FileNum: ve.NewBlobFiles[i].Physical.FileNum,
2648 1 : FileSize: ve.NewBlobFiles[i].Physical.Size,
2649 1 : },
2650 1 : isLocal: objstorage.IsLocalBlobFile(d.objProvider, ve.NewBlobFiles[i].Physical.FileNum),
2651 1 : })
2652 1 : }
2653 2 : for i := range ve.NewTables {
2654 2 : if ve.NewTables[i].Meta.Virtual {
2655 1 : // We handle backing files separately.
2656 1 : continue
2657 : }
2658 2 : if _, ok := deletedTables[ve.NewTables[i].Meta.TableNum]; ok {
2659 1 : // This file is being moved in this ve to a different level.
2660 1 : // Don't mark it as obsolete.
2661 1 : continue
2662 : }
2663 2 : obsoleteFiles.AddBacking(ve.NewTables[i].Meta.PhysicalMeta().TableBacking)
2664 : }
2665 2 : for i := range ve.CreatedBackingTables {
2666 1 : if ve.CreatedBackingTables[i].IsUnused() {
2667 0 : obsoleteFiles.AddBacking(ve.CreatedBackingTables[i])
2668 0 : }
2669 : }
2670 2 : for _, of := range obsoleteFiles.TableBackings {
2671 2 : // Add this file to zombie tables as well, as the versionSet
2672 2 : // asserts on whether every obsolete file was at one point
2673 2 : // marked zombie.
2674 2 : d.mu.versions.zombieTables.Add(objectInfo{
2675 2 : fileInfo: fileInfo{
2676 2 : FileNum: of.DiskFileNum,
2677 2 : FileSize: of.Size,
2678 2 : },
2679 2 : isLocal: objstorage.IsLocalTable(d.objProvider, of.DiskFileNum),
2680 2 : })
2681 2 : }
2682 2 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
2683 : }
2684 :
2685 : // compact1 runs one compaction.
2686 : //
2687 : // d.mu must be held when calling this, but the mutex may be dropped and
2688 : // re-acquired during the course of this method.
2689 2 : func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
2690 2 : info := c.makeInfo(jobID)
2691 2 : d.opts.EventListener.CompactionBegin(info)
2692 2 : startTime := d.timeNow()
2693 2 :
2694 2 : ve, stats, err := d.runCompaction(jobID, c)
2695 2 :
2696 2 : info.Annotations = append(info.Annotations, c.annotations...)
2697 2 : info.Duration = d.timeNow().Sub(startTime)
2698 2 : if err == nil {
2699 2 : validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
2700 2 : _, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
2701 2 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2702 2 : // that necessitates it restarting from scratch. Note that since we hold
2703 2 : // the manifest lock, we don't expect this bool to change its value
2704 2 : // as only the holder of the manifest lock will ever write to it.
2705 2 : if c.cancel.Load() {
2706 2 : err = firstError(err, ErrCancelledCompaction)
2707 2 : // This is the first time we've seen a cancellation during the
2708 2 : // life of this compaction (or the original condition on err == nil
2709 2 : // would not have been true). We should delete any tables already
2710 2 : // created, as d.runCompaction did not do that.
2711 2 : d.cleanupVersionEdit(ve)
2712 2 : // Note that UpdateVersionLocked invalidates the pickedCompactionCache
2713 2 : // when we return, which is relevant because this failed compaction
2714 2 : // may be the highest priority to run next.
2715 2 : return versionUpdate{}, err
2716 2 : }
2717 2 : return versionUpdate{
2718 2 : VE: ve,
2719 2 : JobID: jobID,
2720 2 : Metrics: c.metrics.perLevel,
2721 2 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
2722 : }, nil
2723 : })
2724 : }
2725 :
2726 2 : info.Done = true
2727 2 : info.Err = err
2728 2 : if err == nil {
2729 2 : for i := range ve.NewTables {
2730 2 : e := &ve.NewTables[i]
2731 2 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2732 2 : }
2733 2 : d.mu.snapshots.cumulativePinnedCount += stats.CumulativePinnedKeys
2734 2 : d.mu.snapshots.cumulativePinnedSize += stats.CumulativePinnedSize
2735 2 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.CountMissizedDels
2736 : }
2737 :
2738 : // NB: clearing compacting state must occur before updating the read state;
2739 : // L0Sublevels initialization depends on it.
2740 2 : d.clearCompactingState(c, err != nil)
2741 2 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
2742 2 : d.mu.versions.incrementCompactionBytes(-c.metrics.bytesWritten.Load())
2743 2 :
2744 2 : info.TotalDuration = d.timeNow().Sub(c.metrics.beganAt)
2745 2 : d.opts.EventListener.CompactionEnd(info)
2746 2 :
2747 2 : // Update the read state before deleting obsolete files because the
2748 2 : // read-state update will cause the previous version to be unref'd and if
2749 2 : // there are no references obsolete tables will be added to the obsolete
2750 2 : // table list.
2751 2 : if err == nil {
2752 2 : d.updateReadStateLocked(d.opts.DebugCheck)
2753 2 : d.updateTableStatsLocked(ve.NewTables)
2754 2 : }
2755 :
2756 2 : return err
2757 : }
2758 :
2759 : // runCopyCompaction runs a copy compaction where a new TableNum is created that
2760 : // is a byte-for-byte copy of the input file or span thereof in some cases. This
2761 : // is used in lieu of a move compaction when a file is being moved across the
2762 : // local/remote storage boundary. It could also be used in lieu of a rewrite
2763 : // compaction as part of a Download() call, which allows copying only a span of
2764 : // the external file, provided the file does not contain range keys or value
2765 : // blocks (see sstable.CopySpan).
2766 : //
2767 : // d.mu must be held when calling this method. The mutex will be released when
2768 : // doing IO.
2769 : func (d *DB) runCopyCompaction(
2770 : jobID JobID, c *tableCompaction,
2771 2 : ) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
2772 2 : if c.cancel.Load() {
2773 0 : return nil, compact.Stats{}, ErrCancelledCompaction
2774 0 : }
2775 2 : iter := c.startLevel.files.Iter()
2776 2 : inputMeta := iter.First()
2777 2 : if iter.Next() != nil {
2778 0 : return nil, compact.Stats{}, base.AssertionFailedf("got more than one file for a move compaction")
2779 0 : }
2780 2 : if inputMeta.BlobReferenceDepth > 0 || len(inputMeta.BlobReferences) > 0 {
2781 0 : return nil, compact.Stats{}, base.AssertionFailedf(
2782 0 : "copy compaction for %s with blob references (depth=%d, refs=%d)",
2783 0 : inputMeta.TableNum, inputMeta.BlobReferenceDepth, len(inputMeta.BlobReferences),
2784 0 : )
2785 0 : }
2786 2 : ve = &manifest.VersionEdit{
2787 2 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
2788 2 : {Level: c.startLevel.level, FileNum: inputMeta.TableNum}: inputMeta,
2789 2 : },
2790 2 : }
2791 2 :
2792 2 : objMeta, err := d.objProvider.Lookup(base.FileTypeTable, inputMeta.TableBacking.DiskFileNum)
2793 2 : if err != nil {
2794 0 : return nil, compact.Stats{}, err
2795 0 : }
2796 : // This code does not support copying a shared table (which should never be necessary).
2797 2 : if objMeta.IsShared() {
2798 0 : return nil, compact.Stats{}, base.AssertionFailedf("copy compaction of shared table")
2799 0 : }
2800 :
2801 : // We are in the relatively more complex case where we need to copy this
2802 : // file to remote storage. Drop the db mutex while we do the copy
2803 : //
2804 : // To ease up cleanup of the local file and tracking of refs, we create
2805 : // a new FileNum. This has the potential of making the block cache less
2806 : // effective, however.
2807 2 : newMeta := &manifest.TableMetadata{
2808 2 : Size: inputMeta.Size,
2809 2 : CreationTime: inputMeta.CreationTime,
2810 2 : SmallestSeqNum: inputMeta.SmallestSeqNum,
2811 2 : LargestSeqNum: inputMeta.LargestSeqNum,
2812 2 : LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
2813 2 : Virtual: inputMeta.Virtual,
2814 2 : SyntheticPrefixAndSuffix: inputMeta.SyntheticPrefixAndSuffix,
2815 2 : }
2816 2 : if inputStats, ok := inputMeta.Stats(); ok {
2817 2 : newMeta.PopulateStats(inputStats)
2818 2 : }
2819 2 : if inputMeta.HasPointKeys {
2820 2 : newMeta.ExtendPointKeyBounds(c.comparer.Compare,
2821 2 : inputMeta.PointKeyBounds.Smallest(),
2822 2 : inputMeta.PointKeyBounds.Largest())
2823 2 : }
2824 2 : if inputMeta.HasRangeKeys {
2825 2 : newMeta.ExtendRangeKeyBounds(c.comparer.Compare,
2826 2 : inputMeta.RangeKeyBounds.Smallest(),
2827 2 : inputMeta.RangeKeyBounds.Largest())
2828 2 : }
2829 2 : newMeta.TableNum = d.mu.versions.getNextTableNum()
2830 2 : if objMeta.IsExternal() {
2831 2 : // external -> local/shared copy. File must be virtual.
2832 2 : // We will update this size later after we produce the new backing file.
2833 2 : newMeta.InitVirtualBacking(base.DiskFileNum(newMeta.TableNum), inputMeta.TableBacking.Size)
2834 2 : } else {
2835 2 : // local -> shared copy. New file is guaranteed to not be virtual.
2836 2 : newMeta.InitPhysicalBacking()
2837 2 : }
2838 :
2839 : // NB: The order here is reversed, lock after unlock. This is similar to
2840 : // runCompaction.
2841 2 : d.mu.Unlock()
2842 2 : defer d.mu.Lock()
2843 2 :
2844 2 : deleteOnExit := false
2845 2 : defer func() {
2846 2 : if deleteOnExit {
2847 1 : _ = d.objProvider.Remove(base.FileTypeTable, newMeta.TableBacking.DiskFileNum)
2848 1 : }
2849 : }()
2850 :
2851 : // If the src obj is external, we're doing an external to local/shared copy.
2852 2 : if objMeta.IsExternal() {
2853 2 : ctx := context.TODO()
2854 2 : src, err := d.objProvider.OpenForReading(
2855 2 : ctx, base.FileTypeTable, inputMeta.TableBacking.DiskFileNum, objstorage.OpenOptions{},
2856 2 : )
2857 2 : if err != nil {
2858 0 : return nil, compact.Stats{}, err
2859 0 : }
2860 2 : defer func() {
2861 2 : if src != nil {
2862 0 : _ = src.Close()
2863 0 : }
2864 : }()
2865 :
2866 2 : w, _, err := d.objProvider.Create(
2867 2 : ctx, base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
2868 2 : objstorage.CreateOptions{
2869 2 : PreferSharedStorage: d.shouldCreateShared(c.outputLevel.level),
2870 2 : },
2871 2 : )
2872 2 : if err != nil {
2873 0 : return nil, compact.Stats{}, err
2874 0 : }
2875 2 : deleteOnExit = true
2876 2 :
2877 2 : start, end := newMeta.Smallest(), newMeta.Largest()
2878 2 : if newMeta.SyntheticPrefixAndSuffix.HasPrefix() {
2879 2 : syntheticPrefix := newMeta.SyntheticPrefixAndSuffix.Prefix()
2880 2 : start.UserKey = syntheticPrefix.Invert(start.UserKey)
2881 2 : end.UserKey = syntheticPrefix.Invert(end.UserKey)
2882 2 : }
2883 2 : if newMeta.SyntheticPrefixAndSuffix.HasSuffix() {
2884 1 : // Extend the bounds as necessary so that the keys don't include suffixes.
2885 1 : start.UserKey = start.UserKey[:c.comparer.Split(start.UserKey)]
2886 1 : if n := c.comparer.Split(end.UserKey); n < len(end.UserKey) {
2887 0 : end = base.MakeRangeDeleteSentinelKey(c.comparer.ImmediateSuccessor(nil, end.UserKey[:n]))
2888 0 : }
2889 : }
2890 :
2891 : // NB: external files are always virtual.
2892 2 : var wrote uint64
2893 2 : err = d.fileCache.withReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r *sstable.Reader, env sstable.ReadEnv) error {
2894 2 : var err error
2895 2 : // TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
2896 2 : // or update category stats).
2897 2 : wrote, err = sstable.CopySpan(ctx,
2898 2 : src, r, d.opts.MakeReaderOptions(),
2899 2 : w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),
2900 2 : start, end,
2901 2 : )
2902 2 : return err
2903 2 : })
2904 :
2905 2 : src = nil // We passed src to CopySpan; it's responsible for closing it.
2906 2 : if err != nil {
2907 1 : if errors.Is(err, sstable.ErrEmptySpan) {
2908 1 : // The virtual table was empty. Just remove the backing file.
2909 1 : // Note that deleteOnExit is true so we will delete the created object.
2910 1 : outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
2911 1 : outputMetrics.TableBytesIn = inputMeta.Size
2912 1 :
2913 1 : return ve, compact.Stats{}, nil
2914 1 : }
2915 0 : return nil, compact.Stats{}, err
2916 : }
2917 2 : newMeta.TableBacking.Size = wrote
2918 2 : newMeta.Size = wrote
2919 2 : } else {
2920 2 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2921 2 : d.objProvider.Path(objMeta), base.FileTypeTable, newMeta.TableBacking.DiskFileNum,
2922 2 : objstorage.CreateOptions{PreferSharedStorage: true})
2923 2 : if err != nil {
2924 0 : return nil, compact.Stats{}, err
2925 0 : }
2926 2 : deleteOnExit = true
2927 : }
2928 2 : ve.NewTables = []manifest.NewTableEntry{{
2929 2 : Level: c.outputLevel.level,
2930 2 : Meta: newMeta,
2931 2 : }}
2932 2 : if newMeta.Virtual {
2933 2 : ve.CreatedBackingTables = []*manifest.TableBacking{newMeta.TableBacking}
2934 2 : }
2935 2 : outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
2936 2 : outputMetrics.TableBytesIn = inputMeta.Size
2937 2 : outputMetrics.TableBytesCompacted = newMeta.Size
2938 2 : outputMetrics.TablesCompacted = 1
2939 2 :
2940 2 : if err := d.objProvider.Sync(); err != nil {
2941 0 : return nil, compact.Stats{}, err
2942 0 : }
2943 2 : deleteOnExit = false
2944 2 : return ve, compact.Stats{}, nil
2945 : }
2946 :
2947 : // applyHintOnFile applies a deleteCompactionHint to a file, and updates the
2948 : // versionEdit accordingly. It returns a list of new files that were created
2949 : // if the hint was applied partially to a file (eg. through an exciseTable as opposed
2950 : // to an outright deletion). levelMetrics is kept up-to-date with the number
2951 : // of tables deleted or excised.
2952 : func (d *DB) applyHintOnFile(
2953 : h deleteCompactionHint,
2954 : f *manifest.TableMetadata,
2955 : level int,
2956 : levelMetrics *LevelMetrics,
2957 : ve *manifest.VersionEdit,
2958 : hintOverlap deletionHintOverlap,
2959 2 : ) (newFiles []manifest.NewTableEntry, err error) {
2960 2 : if hintOverlap == hintDoesNotApply {
2961 0 : return nil, nil
2962 0 : }
2963 :
2964 : // The hint overlaps with at least part of the file.
2965 2 : if hintOverlap == hintDeletesFile {
2966 2 : // The hint deletes the entirety of this file.
2967 2 : ve.DeletedTables[manifest.DeletedTableEntry{
2968 2 : Level: level,
2969 2 : FileNum: f.TableNum,
2970 2 : }] = f
2971 2 : levelMetrics.TablesDeleted++
2972 2 : return nil, nil
2973 2 : }
2974 : // The hint overlaps with only a part of the file, not the entirety of it. We need
2975 : // to use d.exciseTable. (hintOverlap == hintExcisesFile)
2976 2 : if d.FormatMajorVersion() < FormatVirtualSSTables {
2977 0 : panic("pebble: delete-only compaction hint excising a file is not supported in this version")
2978 : }
2979 :
2980 2 : levelMetrics.TablesExcised++
2981 2 : exciseBounds := base.UserKeyBoundsEndExclusive(h.start, h.end)
2982 2 : leftTable, rightTable, err := d.exciseTable(context.TODO(), exciseBounds, f, level, tightExciseBounds)
2983 2 : if err != nil {
2984 0 : return nil, errors.Wrap(err, "error when running excise for delete-only compaction")
2985 0 : }
2986 2 : newFiles = applyExciseToVersionEdit(ve, f, leftTable, rightTable, level)
2987 2 : return newFiles, nil
2988 : }
2989 :
2990 : func (d *DB) runDeleteOnlyCompactionForLevel(
2991 : cl compactionLevel,
2992 : levelMetrics *LevelMetrics,
2993 : ve *manifest.VersionEdit,
2994 : snapshots compact.Snapshots,
2995 : fragments []deleteCompactionHintFragment,
2996 : exciseEnabled bool,
2997 2 : ) error {
2998 2 : if cl.level == 0 {
2999 0 : panic("cannot run delete-only compaction for L0")
3000 : }
3001 2 : curFragment := 0
3002 2 :
3003 2 : // Outer loop loops on files. Middle loop loops on fragments. Inner loop
3004 2 : // loops on raw fragments of hints. Number of fragments are bounded by
3005 2 : // the number of hints this compaction was created with, which is capped
3006 2 : // in the compaction picker to avoid very CPU-hot loops here.
3007 2 : for f := range cl.files.All() {
3008 2 : // curFile usually matches f, except if f got excised in which case
3009 2 : // it maps to a virtual file that replaces f, or nil if f got removed
3010 2 : // in its entirety.
3011 2 : curFile := f
3012 2 : for curFragment < len(fragments) && d.cmp(fragments[curFragment].start, f.Smallest().UserKey) <= 0 {
3013 2 : curFragment++
3014 2 : }
3015 2 : if curFragment > 0 {
3016 2 : curFragment--
3017 2 : }
3018 :
3019 2 : for ; curFragment < len(fragments); curFragment++ {
3020 2 : if f.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(fragments[curFragment].start)) < 0 {
3021 2 : break
3022 : }
3023 : // Process all overlapping hints with this file. Note that applying
3024 : // a hint twice is idempotent; curFile should have already been excised
3025 : // the first time, resulting in no change the second time.
3026 2 : for _, h := range fragments[curFragment].hints {
3027 2 : if h.tombstoneLevel >= cl.level {
3028 2 : // We cannot excise out the deletion tombstone itself, or anything
3029 2 : // above it.
3030 2 : continue
3031 : }
3032 2 : hintOverlap := h.canDeleteOrExcise(d.cmp, curFile, snapshots, exciseEnabled)
3033 2 : if hintOverlap == hintDoesNotApply {
3034 1 : continue
3035 : }
3036 2 : newFiles, err := d.applyHintOnFile(h, curFile, cl.level, levelMetrics, ve, hintOverlap)
3037 2 : if err != nil {
3038 0 : return err
3039 0 : }
3040 2 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{Level: cl.level, FileNum: curFile.TableNum}]; ok {
3041 2 : curFile = nil
3042 2 : }
3043 2 : if len(newFiles) > 0 {
3044 2 : curFile = newFiles[len(newFiles)-1].Meta
3045 2 : } else if curFile == nil {
3046 2 : // Nothing remains of the file.
3047 2 : break
3048 : }
3049 : }
3050 2 : if curFile == nil {
3051 2 : // Nothing remains of the file.
3052 2 : break
3053 : }
3054 : }
3055 2 : if _, ok := ve.DeletedTables[manifest.DeletedTableEntry{
3056 2 : Level: cl.level,
3057 2 : FileNum: f.TableNum,
3058 2 : }]; !ok {
3059 0 : panic("pebble: delete-only compaction scheduled with hints that did not delete or excise a file")
3060 : }
3061 : }
3062 2 : return nil
3063 : }
3064 :
3065 : // deleteCompactionHintFragment represents a fragment of the key space and
3066 : // contains a set of deleteCompactionHints that apply to that fragment; a
3067 : // fragment starts at the start field and ends where the next fragment starts.
3068 : type deleteCompactionHintFragment struct {
3069 : start []byte
3070 : hints []deleteCompactionHint
3071 : }
3072 :
3073 : // Delete compaction hints can overlap with each other, and multiple fragments
3074 : // can apply to a single file. This function takes a list of hints and fragments
3075 : // them, to make it easier to apply them to non-overlapping files occupying a level;
3076 : // that way, files and hint fragments can be iterated on in lockstep, while efficiently
3077 : // being able to apply all hints overlapping with a given file.
3078 : func fragmentDeleteCompactionHints(
3079 : cmp Compare, hints []deleteCompactionHint,
3080 2 : ) []deleteCompactionHintFragment {
3081 2 : fragments := make([]deleteCompactionHintFragment, 0, len(hints)*2)
3082 2 : for i := range hints {
3083 2 : fragments = append(fragments, deleteCompactionHintFragment{start: hints[i].start},
3084 2 : deleteCompactionHintFragment{start: hints[i].end})
3085 2 : }
3086 2 : slices.SortFunc(fragments, func(i, j deleteCompactionHintFragment) int {
3087 2 : return cmp(i.start, j.start)
3088 2 : })
3089 2 : fragments = slices.CompactFunc(fragments, func(i, j deleteCompactionHintFragment) bool {
3090 2 : return bytes.Equal(i.start, j.start)
3091 2 : })
3092 2 : for _, h := range hints {
3093 2 : startIdx := sort.Search(len(fragments), func(i int) bool {
3094 2 : return cmp(fragments[i].start, h.start) >= 0
3095 2 : })
3096 2 : endIdx := sort.Search(len(fragments), func(i int) bool {
3097 2 : return cmp(fragments[i].start, h.end) >= 0
3098 2 : })
3099 2 : for i := startIdx; i < endIdx; i++ {
3100 2 : fragments[i].hints = append(fragments[i].hints, h)
3101 2 : }
3102 : }
3103 2 : return fragments
3104 : }
3105 :
3106 : // Runs a delete-only compaction.
3107 : //
3108 : // d.mu must *not* be held when calling this.
3109 : func (d *DB) runDeleteOnlyCompaction(
3110 : jobID JobID, c *tableCompaction, snapshots compact.Snapshots,
3111 2 : ) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3112 2 : fragments := fragmentDeleteCompactionHints(d.cmp, c.deleteOnly.hints)
3113 2 : ve = &manifest.VersionEdit{
3114 2 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
3115 2 : }
3116 2 : for _, cl := range c.inputs {
3117 2 : levelMetrics := c.metrics.perLevel.level(cl.level)
3118 2 : err := d.runDeleteOnlyCompactionForLevel(cl, levelMetrics, ve, snapshots, fragments, c.deleteOnly.exciseEnabled)
3119 2 : if err != nil {
3120 0 : return nil, stats, err
3121 0 : }
3122 : }
3123 : // Remove any files that were added and deleted in the same versionEdit.
3124 2 : ve.NewTables = slices.DeleteFunc(ve.NewTables, func(e manifest.NewTableEntry) bool {
3125 2 : entry := manifest.DeletedTableEntry{Level: e.Level, FileNum: e.Meta.TableNum}
3126 2 : if _, deleted := ve.DeletedTables[entry]; deleted {
3127 2 : delete(ve.DeletedTables, entry)
3128 2 : return true
3129 2 : }
3130 2 : return false
3131 : })
3132 2 : sort.Slice(ve.NewTables, func(i, j int) bool {
3133 2 : return ve.NewTables[i].Meta.TableNum < ve.NewTables[j].Meta.TableNum
3134 2 : })
3135 2 : deletedTableEntries := slices.Collect(maps.Keys(ve.DeletedTables))
3136 2 : slices.SortFunc(deletedTableEntries, func(a, b manifest.DeletedTableEntry) int {
3137 2 : return stdcmp.Compare(a.FileNum, b.FileNum)
3138 2 : })
3139 : // Remove any entries from CreatedBackingTables that are not used in any
3140 : // NewFiles.
3141 2 : usedBackingFiles := make(map[base.DiskFileNum]struct{})
3142 2 : for _, e := range ve.NewTables {
3143 2 : if e.Meta.Virtual {
3144 2 : usedBackingFiles[e.Meta.TableBacking.DiskFileNum] = struct{}{}
3145 2 : }
3146 : }
3147 2 : ve.CreatedBackingTables = slices.DeleteFunc(ve.CreatedBackingTables, func(b *manifest.TableBacking) bool {
3148 2 : _, used := usedBackingFiles[b.DiskFileNum]
3149 2 : return !used
3150 2 : })
3151 :
3152 : // Iterate through the deleted tables and new tables to annotate excised tables.
3153 : // If a new table is virtual and the base.DiskFileNum is the same as a deleted table, then
3154 : // our deleted table was excised.
3155 2 : for _, table := range deletedTableEntries {
3156 2 : for _, newEntry := range ve.NewTables {
3157 2 : if newEntry.Meta.Virtual &&
3158 2 : newEntry.Meta.TableBacking.DiskFileNum == ve.DeletedTables[table].TableBacking.DiskFileNum {
3159 2 : c.annotations = append(c.annotations,
3160 2 : fmt.Sprintf("(excised: %s)", ve.DeletedTables[table].TableNum))
3161 2 : break
3162 : }
3163 : }
3164 :
3165 : }
3166 :
3167 : // Refresh the disk available statistic whenever a compaction/flush
3168 : // completes, before re-acquiring the mutex.
3169 2 : d.calculateDiskAvailableBytes()
3170 2 : return ve, stats, nil
3171 : }
3172 :
3173 : func (d *DB) runMoveCompaction(
3174 : jobID JobID, c *tableCompaction,
3175 2 : ) (ve *manifest.VersionEdit, stats compact.Stats, _ error) {
3176 2 : iter := c.startLevel.files.Iter()
3177 2 : meta := iter.First()
3178 2 : if iter.Next() != nil {
3179 0 : return nil, stats, base.AssertionFailedf("got more than one file for a move compaction")
3180 0 : }
3181 2 : if c.cancel.Load() {
3182 0 : return ve, stats, ErrCancelledCompaction
3183 0 : }
3184 2 : outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
3185 2 : outputMetrics.TableBytesMoved = meta.Size
3186 2 : outputMetrics.TablesMoved = 1
3187 2 : ve = &manifest.VersionEdit{
3188 2 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{
3189 2 : {Level: c.startLevel.level, FileNum: meta.TableNum}: meta,
3190 2 : },
3191 2 : NewTables: []manifest.NewTableEntry{
3192 2 : {Level: c.outputLevel.level, Meta: meta},
3193 2 : },
3194 2 : }
3195 2 :
3196 2 : return ve, stats, nil
3197 : }
3198 :
3199 : // runCompaction runs a compaction that produces new on-disk tables from
3200 : // memtables or old on-disk tables.
3201 : //
3202 : // runCompaction cannot be used for compactionKindIngestedFlushable.
3203 : //
3204 : // d.mu must be held when calling this, but the mutex may be dropped and
3205 : // re-acquired during the course of this method.
3206 : func (d *DB) runCompaction(
3207 : jobID JobID, c *tableCompaction,
3208 2 : ) (ve *manifest.VersionEdit, stats compact.Stats, retErr error) {
3209 2 : if c.cancel.Load() {
3210 2 : return ve, stats, ErrCancelledCompaction
3211 2 : }
3212 2 : switch c.kind {
3213 2 : case compactionKindDeleteOnly:
3214 2 : // Release the d.mu lock while doing I/O.
3215 2 : // Note the unusual order: Unlock and then Lock.
3216 2 : snapshots := d.mu.snapshots.toSlice()
3217 2 : d.mu.Unlock()
3218 2 : defer d.mu.Lock()
3219 2 : return d.runDeleteOnlyCompaction(jobID, c, snapshots)
3220 2 : case compactionKindMove:
3221 2 : return d.runMoveCompaction(jobID, c)
3222 2 : case compactionKindCopy:
3223 2 : return d.runCopyCompaction(jobID, c)
3224 0 : case compactionKindIngestedFlushable:
3225 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
3226 : }
3227 :
3228 2 : snapshots := d.mu.snapshots.toSlice()
3229 2 :
3230 2 : // Release the d.mu lock while doing I/O.
3231 2 : // Note the unusual order: Unlock and then Lock.
3232 2 : d.mu.Unlock()
3233 2 : defer d.mu.Lock()
3234 2 :
3235 2 : // Determine whether we should separate values into blob files.
3236 2 : valueSeparation := c.getValueSeparation(jobID, c, c.tableFormat)
3237 2 :
3238 2 : result := d.compactAndWrite(jobID, c, snapshots, c.tableFormat, valueSeparation)
3239 2 : if result.Err == nil {
3240 2 : ve, result.Err = c.makeVersionEdit(result)
3241 2 : }
3242 2 : if result.Err != nil {
3243 2 : // Delete any created tables or blob files.
3244 2 : obsoleteFiles := manifest.ObsoleteFiles{
3245 2 : TableBackings: make([]*manifest.TableBacking, 0, len(result.Tables)),
3246 2 : BlobFiles: make([]*manifest.PhysicalBlobFile, 0, len(result.Blobs)),
3247 2 : }
3248 2 : d.mu.Lock()
3249 2 : for i := range result.Tables {
3250 2 : backing := &manifest.TableBacking{
3251 2 : DiskFileNum: result.Tables[i].ObjMeta.DiskFileNum,
3252 2 : Size: result.Tables[i].WriterMeta.Size,
3253 2 : }
3254 2 : obsoleteFiles.AddBacking(backing)
3255 2 : // Add this file to zombie tables as well, as the versionSet
3256 2 : // asserts on whether every obsolete file was at one point
3257 2 : // marked zombie.
3258 2 : d.mu.versions.zombieTables.AddMetadata(&result.Tables[i].ObjMeta, backing.Size)
3259 2 : }
3260 2 : for i := range result.Blobs {
3261 0 : obsoleteFiles.AddBlob(result.Blobs[i].Metadata)
3262 0 : // Add this file to zombie blobs as well, as the versionSet
3263 0 : // asserts on whether every obsolete file was at one point
3264 0 : // marked zombie.
3265 0 : d.mu.versions.zombieBlobs.AddMetadata(&result.Blobs[i].ObjMeta, result.Blobs[i].Metadata.Size)
3266 0 : }
3267 2 : d.mu.versions.addObsoleteLocked(obsoleteFiles)
3268 2 : d.mu.Unlock()
3269 : }
3270 : // Refresh the disk available statistic whenever a compaction/flush
3271 : // completes, before re-acquiring the mutex.
3272 2 : d.calculateDiskAvailableBytes()
3273 2 : return ve, result.Stats, result.Err
3274 : }
3275 :
3276 : // compactAndWrite runs the data part of a compaction, where we set up a
3277 : // compaction iterator and use it to write output tables.
3278 : func (d *DB) compactAndWrite(
3279 : jobID JobID,
3280 : c *tableCompaction,
3281 : snapshots compact.Snapshots,
3282 : tableFormat sstable.TableFormat,
3283 : valueSeparation compact.ValueSeparation,
3284 2 : ) (result compact.Result) {
3285 2 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
3286 2 : // block cache with blocks that will not be read again. We initialize the
3287 2 : // buffer pool with a size 12. This initial size does not need to be
3288 2 : // accurate, because the pool will grow to accommodate the maximum number of
3289 2 : // blocks allocated at a given time over the course of the compaction. But
3290 2 : // choosing a size larger than that working set avoids any additional
3291 2 : // allocations to grow the size of the pool over the course of iteration.
3292 2 : //
3293 2 : // Justification for initial size 12: In a two-level compaction, at any
3294 2 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
3295 2 : // Additionally, when decoding a compressed block, we'll temporarily
3296 2 : // allocate 1 additional block to hold the compressed buffer. In the worst
3297 2 : // case that all input sstables have two-level index blocks (+2), value
3298 2 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
3299 2 : // additionally require 2n+4 blocks where n is the number of input sstables.
3300 2 : // Range deletion and range key blocks are relatively rare, and the cost of
3301 2 : // an additional allocation or two over the course of the compaction is
3302 2 : // considered to be okay. A larger initial size would cause the pool to hold
3303 2 : // on to more memory, even when it's not in-use because the pool will
3304 2 : // recycle buffers up to the current capacity of the pool. The memory use of
3305 2 : // a 12-buffer pool is expected to be within reason, even if all the buffers
3306 2 : // grow to the typical size of an index block (256 KiB) which would
3307 2 : // translate to 3 MiB per compaction.
3308 2 : c.iterationState.bufferPool.Init(12)
3309 2 : defer c.iterationState.bufferPool.Release()
3310 2 : blockReadEnv := block.ReadEnv{
3311 2 : BufferPool: &c.iterationState.bufferPool,
3312 2 : Stats: &c.metrics.internalIterStats,
3313 2 : IterStats: d.fileCache.SSTStatsCollector().Accumulator(
3314 2 : uint64(uintptr(unsafe.Pointer(c))),
3315 2 : categoryCompaction,
3316 2 : ),
3317 2 : }
3318 2 : if c.version != nil {
3319 2 : c.iterationState.valueFetcher.Init(&c.version.BlobFiles, d.fileCache, blockReadEnv)
3320 2 : }
3321 2 : iiopts := internalIterOpts{
3322 2 : compaction: true,
3323 2 : readEnv: sstable.ReadEnv{Block: blockReadEnv},
3324 2 : blobValueFetcher: &c.iterationState.valueFetcher,
3325 2 : }
3326 2 : defer func() { _ = c.iterationState.valueFetcher.Close() }()
3327 :
3328 2 : pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, iiopts)
3329 2 : defer func() {
3330 2 : for _, closer := range c.iterationState.keyspanIterClosers {
3331 2 : closer.FragmentIterator.Close()
3332 2 : }
3333 : }()
3334 2 : if err != nil {
3335 1 : return compact.Result{Err: err}
3336 1 : }
3337 2 : cfg := compact.IterConfig{
3338 2 : Comparer: c.comparer,
3339 2 : Merge: d.merge,
3340 2 : TombstoneElision: c.delElision,
3341 2 : RangeKeyElision: c.rangeKeyElision,
3342 2 : Snapshots: snapshots,
3343 2 : AllowZeroSeqNum: c.allowZeroSeqNum(),
3344 2 : IneffectualSingleDeleteCallback: func(userKey []byte) {
3345 2 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3346 2 : Kind: IneffectualSingleDelete,
3347 2 : UserKey: slices.Clone(userKey),
3348 2 : })
3349 2 : },
3350 0 : NondeterministicSingleDeleteCallback: func(userKey []byte) {
3351 0 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3352 0 : Kind: NondeterministicSingleDelete,
3353 0 : UserKey: slices.Clone(userKey),
3354 0 : })
3355 0 : },
3356 2 : MissizedDeleteCallback: func(userKey []byte, elidedSize, expectedSize uint64) {
3357 2 : d.opts.EventListener.PossibleAPIMisuse(PossibleAPIMisuseInfo{
3358 2 : Kind: MissizedDelete,
3359 2 : UserKey: slices.Clone(userKey),
3360 2 : ExtraInfo: redact.Sprintf("elidedSize=%d,expectedSize=%d",
3361 2 : redact.SafeUint(elidedSize), redact.SafeUint(expectedSize)),
3362 2 : })
3363 2 : },
3364 : }
3365 2 : iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)
3366 2 :
3367 2 : runnerCfg := compact.RunnerConfig{
3368 2 : CompactionBounds: c.bounds,
3369 2 : L0SplitKeys: c.flush.l0Limits,
3370 2 : Grandparents: c.grandparents,
3371 2 : MaxGrandparentOverlapBytes: c.maxOverlapBytes,
3372 2 : TargetOutputFileSize: c.maxOutputFileSize,
3373 2 : GrantHandle: c.grantHandle,
3374 2 : }
3375 2 : runner := compact.NewRunner(runnerCfg, iter)
3376 2 :
3377 2 : var spanPolicyValid bool
3378 2 : var spanPolicy SpanPolicy
3379 2 : // If spanPolicyValid is true and spanPolicyEndKey is empty, then spanPolicy
3380 2 : // applies for the rest of the keyspace.
3381 2 : var spanPolicyEndKey []byte
3382 2 :
3383 2 : for runner.MoreDataToWrite() {
3384 2 : if c.cancel.Load() {
3385 2 : return runner.Finish().WithError(ErrCancelledCompaction)
3386 2 : }
3387 : // Create a new table.
3388 2 : firstKey := runner.FirstKey()
3389 2 : if !spanPolicyValid || (len(spanPolicyEndKey) > 0 && d.cmp(firstKey, spanPolicyEndKey) >= 0) {
3390 2 : var err error
3391 2 : spanPolicy, spanPolicyEndKey, err = d.opts.Experimental.SpanPolicyFunc(firstKey)
3392 2 : if err != nil {
3393 0 : return runner.Finish().WithError(err)
3394 0 : }
3395 2 : spanPolicyValid = true
3396 : }
3397 :
3398 2 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3399 2 : if spanPolicy.DisableValueSeparationBySuffix {
3400 1 : writerOpts.DisableValueBlocks = true
3401 1 : }
3402 2 : if spanPolicy.PreferFastCompression && writerOpts.Compression != block.NoCompression {
3403 0 : writerOpts.Compression = block.FastestCompression
3404 0 : }
3405 2 : vSep := valueSeparation
3406 2 : switch spanPolicy.ValueStoragePolicy {
3407 1 : case ValueStorageLowReadLatency:
3408 1 : vSep = compact.NeverSeparateValues{}
3409 1 : case ValueStorageLatencyTolerant:
3410 1 : // This span of keyspace is more tolerant of latency, so set a more
3411 1 : // aggressive value separation policy for this output.
3412 1 : vSep.SetNextOutputConfig(compact.ValueSeparationOutputConfig{
3413 1 : MinimumSize: latencyTolerantMinimumSize,
3414 1 : })
3415 : }
3416 2 : objMeta, tw, err := d.newCompactionOutputTable(jobID, c, writerOpts)
3417 2 : if err != nil {
3418 1 : return runner.Finish().WithError(err)
3419 1 : }
3420 2 : runner.WriteTable(objMeta, tw, spanPolicyEndKey, vSep)
3421 : }
3422 2 : result = runner.Finish()
3423 2 : if result.Err == nil {
3424 2 : result.Err = d.objProvider.Sync()
3425 2 : }
3426 2 : return result
3427 : }
3428 :
3429 : // makeVersionEdit creates the version edit for a compaction, based on the
3430 : // tables in compact.Result.
3431 2 : func (c *tableCompaction) makeVersionEdit(result compact.Result) (*manifest.VersionEdit, error) {
3432 2 : ve := &manifest.VersionEdit{
3433 2 : DeletedTables: map[manifest.DeletedTableEntry]*manifest.TableMetadata{},
3434 2 : }
3435 2 : for _, cl := range c.inputs {
3436 2 : for f := range cl.files.All() {
3437 2 : ve.DeletedTables[manifest.DeletedTableEntry{
3438 2 : Level: cl.level,
3439 2 : FileNum: f.TableNum,
3440 2 : }] = f
3441 2 : }
3442 : }
3443 : // Add any newly constructed blob files to the version edit.
3444 2 : ve.NewBlobFiles = make([]manifest.BlobFileMetadata, len(result.Blobs))
3445 2 : for i := range result.Blobs {
3446 2 : ve.NewBlobFiles[i] = manifest.BlobFileMetadata{
3447 2 : FileID: base.BlobFileID(result.Blobs[i].Metadata.FileNum),
3448 2 : Physical: result.Blobs[i].Metadata,
3449 2 : }
3450 2 : }
3451 :
3452 2 : startLevelBytes := c.startLevel.files.TableSizeSum()
3453 2 :
3454 2 : outputMetrics := c.metrics.perLevel.level(c.outputLevel.level)
3455 2 : outputMetrics.TableBytesIn = startLevelBytes
3456 2 : for i := range c.metrics.internalIterStats.BlockReads {
3457 2 : switch blockkind.Kind(i) {
3458 2 : case blockkind.BlobValue:
3459 2 : outputMetrics.BlobBytesRead += c.metrics.internalIterStats.BlockReads[i].BlockBytes
3460 2 : default:
3461 2 : outputMetrics.TableBytesRead += c.metrics.internalIterStats.BlockReads[i].BlockBytes
3462 : }
3463 : }
3464 2 : outputMetrics.BlobBytesCompacted = result.Stats.CumulativeBlobFileSize
3465 2 : if c.flush.flushables != nil {
3466 2 : outputMetrics.BlobBytesFlushed = result.Stats.CumulativeBlobFileSize
3467 2 : }
3468 2 : if len(c.extraLevels) > 0 {
3469 2 : outputMetrics.TableBytesIn += c.extraLevels[0].files.TableSizeSum()
3470 2 : }
3471 :
3472 2 : if len(c.flush.flushables) == 0 {
3473 2 : c.metrics.perLevel.level(c.startLevel.level)
3474 2 : }
3475 2 : if len(c.extraLevels) > 0 {
3476 2 : c.metrics.perLevel.level(c.extraLevels[0].level)
3477 2 : outputMetrics.MultiLevel.TableBytesInTop = startLevelBytes
3478 2 : outputMetrics.MultiLevel.TableBytesIn = outputMetrics.TableBytesIn
3479 2 : outputMetrics.MultiLevel.TableBytesRead = outputMetrics.TableBytesRead
3480 2 : }
3481 :
3482 2 : inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()
3483 2 : ve.NewTables = make([]manifest.NewTableEntry, len(result.Tables))
3484 2 : for i := range result.Tables {
3485 2 : t := &result.Tables[i]
3486 2 :
3487 2 : if t.WriterMeta.Properties.NumValuesInBlobFiles > 0 {
3488 2 : if len(t.BlobReferences) == 0 {
3489 0 : return nil, base.AssertionFailedf("num values in blob files %d but no blob references",
3490 0 : t.WriterMeta.Properties.NumValuesInBlobFiles)
3491 0 : }
3492 : }
3493 :
3494 2 : fileMeta := &manifest.TableMetadata{
3495 2 : TableNum: base.PhysicalTableFileNum(t.ObjMeta.DiskFileNum),
3496 2 : CreationTime: t.CreationTime.Unix(),
3497 2 : Size: t.WriterMeta.Size,
3498 2 : SmallestSeqNum: t.WriterMeta.SmallestSeqNum,
3499 2 : LargestSeqNum: t.WriterMeta.LargestSeqNum,
3500 2 : BlobReferences: t.BlobReferences,
3501 2 : BlobReferenceDepth: t.BlobReferenceDepth,
3502 2 : }
3503 2 : if c.flush.flushables == nil {
3504 2 : // Set the file's LargestSeqNumAbsolute to be the maximum value of any
3505 2 : // of the compaction's input sstables.
3506 2 : // TODO(jackson): This could be narrowed to be the maximum of input
3507 2 : // sstables that overlap the output sstable's key range.
3508 2 : fileMeta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
3509 2 : } else {
3510 2 : fileMeta.LargestSeqNumAbsolute = t.WriterMeta.LargestSeqNum
3511 2 : }
3512 2 : fileMeta.InitPhysicalBacking()
3513 2 :
3514 2 : // If the file didn't contain any range deletions, we can fill its
3515 2 : // table stats now, avoiding unnecessarily loading the table later.
3516 2 : maybeSetStatsFromProperties(
3517 2 : fileMeta.PhysicalMeta(), &t.WriterMeta.Properties, c.logger,
3518 2 : )
3519 2 :
3520 2 : if t.WriterMeta.HasPointKeys {
3521 2 : fileMeta.ExtendPointKeyBounds(c.comparer.Compare,
3522 2 : t.WriterMeta.SmallestPoint,
3523 2 : t.WriterMeta.LargestPoint)
3524 2 : }
3525 2 : if t.WriterMeta.HasRangeDelKeys {
3526 2 : fileMeta.ExtendPointKeyBounds(c.comparer.Compare,
3527 2 : t.WriterMeta.SmallestRangeDel,
3528 2 : t.WriterMeta.LargestRangeDel)
3529 2 : }
3530 2 : if t.WriterMeta.HasRangeKeys {
3531 2 : fileMeta.ExtendRangeKeyBounds(c.comparer.Compare,
3532 2 : t.WriterMeta.SmallestRangeKey,
3533 2 : t.WriterMeta.LargestRangeKey)
3534 2 : }
3535 :
3536 2 : ve.NewTables[i] = manifest.NewTableEntry{
3537 2 : Level: c.outputLevel.level,
3538 2 : Meta: fileMeta,
3539 2 : }
3540 2 :
3541 2 : // Update metrics.
3542 2 : if c.flush.flushables == nil {
3543 2 : outputMetrics.TablesCompacted++
3544 2 : outputMetrics.TableBytesCompacted += fileMeta.Size
3545 2 : } else {
3546 2 : outputMetrics.TablesFlushed++
3547 2 : outputMetrics.TableBytesFlushed += fileMeta.Size
3548 2 : }
3549 2 : outputMetrics.EstimatedReferencesSize += fileMeta.EstimatedReferenceSize()
3550 2 : outputMetrics.TablesSize += int64(fileMeta.Size)
3551 2 : outputMetrics.TablesCount++
3552 2 : outputMetrics.Additional.BytesWrittenDataBlocks += t.WriterMeta.Properties.DataSize
3553 2 : outputMetrics.Additional.BytesWrittenValueBlocks += t.WriterMeta.Properties.ValueBlocksSize
3554 : }
3555 :
3556 : // Sanity check that the tables are ordered and don't overlap.
3557 2 : for i := 1; i < len(ve.NewTables); i++ {
3558 2 : if ve.NewTables[i-1].Meta.Largest().IsUpperBoundFor(c.comparer.Compare, ve.NewTables[i].Meta.Smallest().UserKey) {
3559 0 : return nil, base.AssertionFailedf("pebble: compaction output tables overlap: %s and %s",
3560 0 : ve.NewTables[i-1].Meta.DebugString(c.comparer.FormatKey, true),
3561 0 : ve.NewTables[i].Meta.DebugString(c.comparer.FormatKey, true),
3562 0 : )
3563 0 : }
3564 : }
3565 :
3566 2 : return ve, nil
3567 : }
3568 :
3569 : // newCompactionOutputTable creates an object for a new table produced by a
3570 : // compaction or flush.
3571 : func (d *DB) newCompactionOutputTable(
3572 : jobID JobID, c *tableCompaction, writerOpts sstable.WriterOptions,
3573 2 : ) (objstorage.ObjectMetadata, sstable.RawWriter, error) {
3574 2 : writable, objMeta, err := d.newCompactionOutputObj(
3575 2 : base.FileTypeTable, c.kind, c.outputLevel.level, &c.metrics.bytesWritten, c.objCreateOpts)
3576 2 : if err != nil {
3577 1 : return objstorage.ObjectMetadata{}, nil, err
3578 1 : }
3579 2 : d.opts.EventListener.TableCreated(TableCreateInfo{
3580 2 : JobID: int(jobID),
3581 2 : Reason: c.kind.compactingOrFlushing(),
3582 2 : Path: d.objProvider.Path(objMeta),
3583 2 : FileNum: objMeta.DiskFileNum,
3584 2 : })
3585 2 : writerOpts.SetInternal(sstableinternal.WriterOptions{
3586 2 : CacheOpts: sstableinternal.CacheOptions{
3587 2 : CacheHandle: d.cacheHandle,
3588 2 : FileNum: objMeta.DiskFileNum,
3589 2 : },
3590 2 : })
3591 2 : tw := sstable.NewRawWriterWithCPUMeasurer(writable, writerOpts, c.grantHandle)
3592 2 : return objMeta, tw, nil
3593 : }
3594 :
3595 : // newCompactionOutputBlob creates an object for a new blob produced by a
3596 : // compaction or flush.
3597 : func (d *DB) newCompactionOutputBlob(
3598 : jobID JobID,
3599 : kind compactionKind,
3600 : outputLevel int,
3601 : bytesWritten *atomic.Int64,
3602 : opts objstorage.CreateOptions,
3603 2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
3604 2 : writable, objMeta, err := d.newCompactionOutputObj(base.FileTypeBlob, kind, outputLevel, bytesWritten, opts)
3605 2 : if err != nil {
3606 0 : return nil, objstorage.ObjectMetadata{}, err
3607 0 : }
3608 2 : d.opts.EventListener.BlobFileCreated(BlobFileCreateInfo{
3609 2 : JobID: int(jobID),
3610 2 : Reason: kind.compactingOrFlushing(),
3611 2 : Path: d.objProvider.Path(objMeta),
3612 2 : FileNum: objMeta.DiskFileNum,
3613 2 : })
3614 2 : return writable, objMeta, nil
3615 : }
3616 :
3617 : // newCompactionOutputObj creates an object produced by a compaction or flush.
3618 : func (d *DB) newCompactionOutputObj(
3619 : typ base.FileType,
3620 : kind compactionKind,
3621 : outputLevel int,
3622 : bytesWritten *atomic.Int64,
3623 : opts objstorage.CreateOptions,
3624 2 : ) (objstorage.Writable, objstorage.ObjectMetadata, error) {
3625 2 : diskFileNum := d.mu.versions.getNextDiskFileNum()
3626 2 : ctx := context.TODO()
3627 2 :
3628 2 : if objiotracing.Enabled {
3629 0 : ctx = objiotracing.WithLevel(ctx, outputLevel)
3630 0 : if kind == compactionKindFlush {
3631 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
3632 0 : } else {
3633 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
3634 0 : }
3635 : }
3636 :
3637 2 : writable, objMeta, err := d.objProvider.Create(ctx, typ, diskFileNum, opts)
3638 2 : if err != nil {
3639 1 : return nil, objstorage.ObjectMetadata{}, err
3640 1 : }
3641 :
3642 2 : if kind != compactionKindFlush {
3643 2 : writable = &compactionWritable{
3644 2 : Writable: writable,
3645 2 : versions: d.mu.versions,
3646 2 : written: bytesWritten,
3647 2 : }
3648 2 : }
3649 2 : return writable, objMeta, nil
3650 : }
3651 :
3652 : // validateVersionEdit validates that start and end keys across new and deleted
3653 : // files in a versionEdit pass the given validation function.
3654 : func validateVersionEdit(
3655 : ve *manifest.VersionEdit, vk base.ValidateKey, format base.FormatKey, logger Logger,
3656 2 : ) {
3657 2 : validateKey := func(f *manifest.TableMetadata, key []byte) {
3658 2 : if err := vk.Validate(key); err != nil {
3659 1 : logger.Fatalf("pebble: version edit validation failed (key=%s file=%s): %v", format(key), f, err)
3660 1 : }
3661 : }
3662 :
3663 : // Validate both new and deleted files.
3664 2 : for _, f := range ve.NewTables {
3665 2 : validateKey(f.Meta, f.Meta.Smallest().UserKey)
3666 2 : validateKey(f.Meta, f.Meta.Largest().UserKey)
3667 2 : }
3668 2 : for _, m := range ve.DeletedTables {
3669 2 : validateKey(m, m.Smallest().UserKey)
3670 2 : validateKey(m, m.Largest().UserKey)
3671 2 : }
3672 : }
3673 :
3674 2 : func getDiskWriteCategoryForCompaction(opts *Options, kind compactionKind) vfs.DiskWriteCategory {
3675 2 : if opts.EnableSQLRowSpillMetrics {
3676 0 : // In the scenario that the Pebble engine is used for SQL row spills the
3677 0 : // data written to the memtable will correspond to spills to disk and
3678 0 : // should be categorized as such.
3679 0 : return "sql-row-spill"
3680 2 : } else if kind == compactionKindFlush {
3681 2 : return "pebble-memtable-flush"
3682 2 : } else if kind == compactionKindBlobFileRewrite {
3683 2 : return "pebble-blob-file-rewrite"
3684 2 : } else {
3685 2 : return "pebble-compaction"
3686 2 : }
3687 : }
|