Line data Source code
1 : // Copyright 2013 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "fmt"
12 : "io"
13 : "math"
14 : "runtime/pprof"
15 : "slices"
16 : "sort"
17 : "sync/atomic"
18 : "time"
19 :
20 : "github.com/cockroachdb/errors"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/invalidating"
23 : "github.com/cockroachdb/pebble/internal/invariants"
24 : "github.com/cockroachdb/pebble/internal/keyspan"
25 : "github.com/cockroachdb/pebble/internal/manifest"
26 : "github.com/cockroachdb/pebble/internal/private"
27 : "github.com/cockroachdb/pebble/internal/rangedel"
28 : "github.com/cockroachdb/pebble/internal/rangekey"
29 : "github.com/cockroachdb/pebble/objstorage"
30 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
31 : "github.com/cockroachdb/pebble/objstorage/remote"
32 : "github.com/cockroachdb/pebble/sstable"
33 : "github.com/cockroachdb/pebble/vfs"
34 : )
35 :
36 : var errEmptyTable = errors.New("pebble: empty table")
37 :
38 : // ErrCancelledCompaction is returned if a compaction is cancelled by a
39 : // concurrent excise or ingest-split operation.
40 : var ErrCancelledCompaction = errors.New("pebble: compaction cancelled by a concurrent operation, will retry compaction")
41 :
42 : var compactLabels = pprof.Labels("pebble", "compact")
43 : var flushLabels = pprof.Labels("pebble", "flush")
44 : var gcLabels = pprof.Labels("pebble", "gc")
45 :
46 : // getInternalWriterProperties accesses a private variable (in the
47 : // internal/private package) initialized by the sstable Writer. This indirection
48 : // is necessary to ensure non-Pebble users constructing sstables for ingestion
49 : // are unable to set internal-only properties.
50 : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
51 :
52 : // expandedCompactionByteSizeLimit is the maximum number of bytes in all
53 : // compacted files. We avoid expanding the lower level file set of a compaction
54 : // if it would make the total compaction cover more than this many bytes.
55 1 : func expandedCompactionByteSizeLimit(opts *Options, level int, availBytes uint64) uint64 {
56 1 : v := uint64(25 * opts.Level(level).TargetFileSize)
57 1 :
58 1 : // Never expand a compaction beyond half the available capacity, divided
59 1 : // by the maximum number of concurrent compactions. Each of the concurrent
60 1 : // compactions may expand up to this limit, so this attempts to limit
61 1 : // compactions to half of available disk space. Note that this will not
62 1 : // prevent compaction picking from pursuing compactions that are larger
63 1 : // than this threshold before expansion.
64 1 : diskMax := (availBytes / 2) / uint64(opts.MaxConcurrentCompactions())
65 1 : if v > diskMax {
66 1 : v = diskMax
67 1 : }
68 1 : return v
69 : }
70 :
71 : // maxGrandparentOverlapBytes is the maximum bytes of overlap with level+1
72 : // before we stop building a single file in a level-1 to level compaction.
73 1 : func maxGrandparentOverlapBytes(opts *Options, level int) uint64 {
74 1 : return uint64(10 * opts.Level(level).TargetFileSize)
75 1 : }
76 :
77 : // maxReadCompactionBytes is used to prevent read compactions which
78 : // are too wide.
79 1 : func maxReadCompactionBytes(opts *Options, level int) uint64 {
80 1 : return uint64(10 * opts.Level(level).TargetFileSize)
81 1 : }
82 :
83 : // noCloseIter wraps around a FragmentIterator, intercepting and eliding
84 : // calls to Close. It is used during compaction to ensure that rangeDelIters
85 : // are not closed prematurely.
86 : type noCloseIter struct {
87 : keyspan.FragmentIterator
88 : }
89 :
90 1 : func (i noCloseIter) Close() error {
91 1 : return nil
92 1 : }
93 :
94 : type compactionLevel struct {
95 : level int
96 : files manifest.LevelSlice
97 : // l0SublevelInfo contains information about L0 sublevels being compacted.
98 : // It's only set for the start level of a compaction starting out of L0 and
99 : // is nil for all other compactions.
100 : l0SublevelInfo []sublevelInfo
101 : }
102 :
103 1 : func (cl compactionLevel) Clone() compactionLevel {
104 1 : newCL := compactionLevel{
105 1 : level: cl.level,
106 1 : files: cl.files.Reslice(func(start, end *manifest.LevelIterator) {}),
107 : }
108 1 : return newCL
109 : }
110 1 : func (cl compactionLevel) String() string {
111 1 : return fmt.Sprintf(`Level %d, Files %s`, cl.level, cl.files)
112 1 : }
113 :
114 : // Return output from compactionOutputSplitters. See comment on
115 : // compactionOutputSplitter.shouldSplitBefore() on how this value is used.
116 : type maybeSplit int
117 :
118 : const (
119 : noSplit maybeSplit = iota
120 : splitNow
121 : )
122 :
123 : // String implements the Stringer interface.
124 1 : func (c maybeSplit) String() string {
125 1 : if c == noSplit {
126 1 : return "no-split"
127 1 : }
128 1 : return "split-now"
129 : }
130 :
131 : // compactionOutputSplitter is an interface for encapsulating logic around
132 : // switching the output of a compaction to a new output file. Additional
133 : // constraints around switching compaction outputs that are specific to that
134 : // compaction type (eg. flush splits) are implemented in
135 : // compactionOutputSplitters that compose other child compactionOutputSplitters.
136 : type compactionOutputSplitter interface {
137 : // shouldSplitBefore returns whether we should split outputs before the
138 : // specified "current key". The return value is splitNow or noSplit.
139 : // splitNow means a split is advised before the specified key, and noSplit
140 : // means no split is advised. If shouldSplitBefore(a) advises a split then
141 : // shouldSplitBefore(b) should also advise a split given b >= a, until
142 : // onNewOutput is called.
143 : shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
144 : // onNewOutput updates internal splitter state when the compaction switches
145 : // to a new sstable, and returns the next limit for the new output which
146 : // would get used to truncate range tombstones if the compaction iterator
147 : // runs out of keys. The limit returned MUST be > key according to the
148 : // compaction's comparator. The specified key is the first key in the new
149 : // output, or nil if this sstable will only contain range tombstones already
150 : // in the fragmenter.
151 : onNewOutput(key []byte) []byte
152 : }
153 :
154 : // fileSizeSplitter is a compactionOutputSplitter that enforces target file
155 : // sizes. This splitter splits to a new output file when the estimated file size
156 : // is 0.5x-2x the target file size. If there are overlapping grandparent files,
157 : // this splitter will attempt to split at a grandparent boundary. For example,
158 : // consider the example where a compaction wrote 'd' to the current output file,
159 : // and the next key has a user key 'g':
160 : //
161 : // previous key next key
162 : // | |
163 : // | |
164 : // +---------------|----+ +--|----------+
165 : // grandparents: | 000006 | | | | 000007 |
166 : // +---------------|----+ +--|----------+
167 : // a b d e f g i
168 : //
169 : // Splitting the output file F before 'g' will ensure that the current output
170 : // file F does not overlap the grandparent file 000007. Aligning sstable
171 : // boundaries like this can significantly reduce write amplification, since a
172 : // subsequent compaction of F into the grandparent level will avoid needlessly
173 : // rewriting any keys within 000007 that do not overlap F's bounds. Consider the
174 : // following compaction:
175 : //
176 : // +----------------------+
177 : // input | |
178 : // level +----------------------+
179 : // \/
180 : // +---------------+ +---------------+
181 : // output |XXXXXXX| | | |XXXXXXXX|
182 : // level +---------------+ +---------------+
183 : //
184 : // The input-level file overlaps two files in the output level, but only
185 : // partially. The beginning of the first output-level file and the end of the
186 : // second output-level file will be rewritten verbatim. This write I/O is
187 : // "wasted" in the sense that no merging is being performed.
188 : //
189 : // To prevent the above waste, this splitter attempts to split output files
190 : // before the start key of grandparent files. It still strives to write output
191 : // files of approximately the target file size, by constraining this splitting
192 : // at grandparent points to apply only if the current output's file size is
193 : // about the right order of magnitude.
194 : //
195 : // Note that, unlike most other splitters, this splitter does not guarantee that
196 : // it will advise splits only at user key change boundaries.
197 : type fileSizeSplitter struct {
198 : frontier frontier
199 : targetFileSize uint64
200 : atGrandparentBoundary bool
201 : boundariesObserved uint64
202 : nextGrandparent *fileMetadata
203 : grandparents manifest.LevelIterator
204 : }
205 :
206 : func newFileSizeSplitter(
207 : f *frontiers, targetFileSize uint64, grandparents manifest.LevelIterator,
208 1 : ) *fileSizeSplitter {
209 1 : s := &fileSizeSplitter{targetFileSize: targetFileSize}
210 1 : s.nextGrandparent = grandparents.First()
211 1 : s.grandparents = grandparents
212 1 : if s.nextGrandparent != nil {
213 1 : s.frontier.Init(f, s.nextGrandparent.Smallest.UserKey, s.reached)
214 1 : }
215 1 : return s
216 : }
217 :
218 1 : func (f *fileSizeSplitter) reached(nextKey []byte) []byte {
219 1 : f.atGrandparentBoundary = true
220 1 : f.boundariesObserved++
221 1 : // NB: f.grandparents is a bounded iterator, constrained to the compaction
222 1 : // key range.
223 1 : f.nextGrandparent = f.grandparents.Next()
224 1 : if f.nextGrandparent == nil {
225 1 : return nil
226 1 : }
227 : // TODO(jackson): Should we also split before or immediately after
228 : // grandparents' largest keys? Splitting before the start boundary prevents
229 : // overlap with the grandparent. Also splitting after the end boundary may
230 : // increase the probability of move compactions.
231 1 : return f.nextGrandparent.Smallest.UserKey
232 : }
233 :
234 1 : func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
235 1 : atGrandparentBoundary := f.atGrandparentBoundary
236 1 :
237 1 : // Clear f.atGrandparentBoundary unconditionally.
238 1 : //
239 1 : // This is a bit subtle. Even if do decide to split, it's possible that a
240 1 : // higher-level splitter will ignore our request (eg, because we're between
241 1 : // two internal keys with the same user key). In this case, the next call to
242 1 : // shouldSplitBefore will find atGrandparentBoundary=false. This is
243 1 : // desirable, because in this case we would've already written the earlier
244 1 : // key with the same user key to the output file. The current output file is
245 1 : // already doomed to overlap the grandparent whose bound triggered
246 1 : // atGrandparentBoundary=true. We should continue on, waiting for the next
247 1 : // grandparent boundary.
248 1 : f.atGrandparentBoundary = false
249 1 :
250 1 : // If the key is a range tombstone, the EstimatedSize may not grow right
251 1 : // away when a range tombstone is added to the fragmenter: It's dependent on
252 1 : // whether or not the this new range deletion will start a new fragment.
253 1 : // Range deletions are rare, so we choose to simply not split yet.
254 1 : // TODO(jackson): Reconsider this, and consider range keys too as a part of
255 1 : // #2321.
256 1 : if key.Kind() == InternalKeyKindRangeDelete || tw == nil {
257 1 : return noSplit
258 1 : }
259 :
260 1 : estSize := tw.EstimatedSize()
261 1 : switch {
262 1 : case estSize < f.targetFileSize/2:
263 1 : // The estimated file size is less than half the target file size. Don't
264 1 : // split it, even if currently aligned with a grandparent file because
265 1 : // it's too small.
266 1 : return noSplit
267 1 : case estSize >= 2*f.targetFileSize:
268 1 : // The estimated file size is double the target file size. Split it even
269 1 : // if we were not aligned with a grandparent file boundary to avoid
270 1 : // excessively exceeding the target file size.
271 1 : return splitNow
272 1 : case !atGrandparentBoundary:
273 1 : // Don't split if we're not at a grandparent, except if we've exhausted
274 1 : // all the grandparents overlapping this compaction's key range. Then we
275 1 : // may want to split purely based on file size.
276 1 : if f.nextGrandparent == nil {
277 1 : // There are no more grandparents. Optimize for the target file size
278 1 : // and split as soon as we hit the target file size.
279 1 : if estSize >= f.targetFileSize {
280 1 : return splitNow
281 1 : }
282 : }
283 1 : return noSplit
284 1 : default:
285 1 : // INVARIANT: atGrandparentBoundary
286 1 : // INVARIANT: targetSize/2 < estSize < 2*targetSize
287 1 : //
288 1 : // The estimated file size is close enough to the target file size that
289 1 : // we should consider splitting.
290 1 : //
291 1 : // Determine whether to split now based on how many grandparent
292 1 : // boundaries we have already observed while building this output file.
293 1 : // The intuition here is that if the grandparent level is dense in this
294 1 : // part of the keyspace, we're likely to continue to have more
295 1 : // opportunities to split this file aligned with a grandparent. If this
296 1 : // is the first grandparent boundary observed, we split immediately
297 1 : // (we're already at ≥50% the target file size). Otherwise, each
298 1 : // overlapping grandparent we've observed increases the minimum file
299 1 : // size by 5% of the target file size, up to at most 90% of the target
300 1 : // file size.
301 1 : //
302 1 : // TODO(jackson): The particular thresholds are somewhat unprincipled.
303 1 : // This is the same heuristic as RocksDB implements. Is there are more
304 1 : // principled formulation that can, further reduce w-amp, produce files
305 1 : // closer to the target file size, or is more understandable?
306 1 :
307 1 : // NB: Subtract 1 from `boundariesObserved` to account for the current
308 1 : // boundary we're considering splitting at. `reached` will have
309 1 : // incremented it at the same time it set `atGrandparentBoundary`.
310 1 : minimumPctOfTargetSize := 50 + 5*min(f.boundariesObserved-1, 8)
311 1 : if estSize < (minimumPctOfTargetSize*f.targetFileSize)/100 {
312 1 : return noSplit
313 1 : }
314 1 : return splitNow
315 : }
316 : }
317 :
318 1 : func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
319 1 : f.boundariesObserved = 0
320 1 : return nil
321 1 : }
322 :
323 1 : func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
324 1 : s := &limitFuncSplitter{limitFunc: limitFunc}
325 1 : s.frontier.Init(f, nil, s.reached)
326 1 : return s
327 1 : }
328 :
329 : type limitFuncSplitter struct {
330 : frontier frontier
331 : limitFunc func(userKey []byte) []byte
332 : split maybeSplit
333 : }
334 :
335 1 : func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
336 1 : return lf.split
337 1 : }
338 :
339 1 : func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
340 1 : lf.split = splitNow
341 1 : return nil
342 1 : }
343 :
344 1 : func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
345 1 : lf.split = noSplit
346 1 : if key != nil {
347 1 : // TODO(jackson): For some users, like L0 flush splits, there's no need
348 1 : // to binary search over all the flush splits every time. The next split
349 1 : // point must be ahead of the previous flush split point.
350 1 : limit := lf.limitFunc(key)
351 1 : lf.frontier.Update(limit)
352 1 : return limit
353 1 : }
354 1 : lf.frontier.Update(nil)
355 1 : return nil
356 : }
357 :
358 : // splitterGroup is a compactionOutputSplitter that splits whenever one of its
359 : // child splitters advises a compaction split.
360 : type splitterGroup struct {
361 : cmp Compare
362 : splitters []compactionOutputSplitter
363 : }
364 :
365 : func (a *splitterGroup) shouldSplitBefore(
366 : key *InternalKey, tw *sstable.Writer,
367 1 : ) (suggestion maybeSplit) {
368 1 : for _, splitter := range a.splitters {
369 1 : if splitter.shouldSplitBefore(key, tw) == splitNow {
370 1 : return splitNow
371 1 : }
372 : }
373 1 : return noSplit
374 : }
375 :
376 1 : func (a *splitterGroup) onNewOutput(key []byte) []byte {
377 1 : var earliestLimit []byte
378 1 : for _, splitter := range a.splitters {
379 1 : limit := splitter.onNewOutput(key)
380 1 : if limit == nil {
381 1 : continue
382 : }
383 1 : if earliestLimit == nil || a.cmp(limit, earliestLimit) < 0 {
384 1 : earliestLimit = limit
385 1 : }
386 : }
387 1 : return earliestLimit
388 : }
389 :
390 : // userKeyChangeSplitter is a compactionOutputSplitter that takes in a child
391 : // splitter, and splits when 1) that child splitter has advised a split, and 2)
392 : // the compaction output is at the boundary between two user keys (also
393 : // the boundary between atomic compaction units). Use this splitter to wrap
394 : // any splitters that don't guarantee user key splits (i.e. splitters that make
395 : // their determination in ways other than comparing the current key against a
396 : // limit key.) If a wrapped splitter advises a split, it must continue
397 : // to advise a split until a new output.
398 : type userKeyChangeSplitter struct {
399 : cmp Compare
400 : splitter compactionOutputSplitter
401 : unsafePrevUserKey func() []byte
402 : }
403 :
404 1 : func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
405 1 : // NB: The userKeyChangeSplitter only needs to suffer a key comparison if
406 1 : // the wrapped splitter requests a split.
407 1 : //
408 1 : // We could implement this splitter using frontiers: When the inner splitter
409 1 : // requests a split before key `k`, we'd update a frontier to be
410 1 : // ImmediateSuccessor(k). Then on the next key greater than >k, the
411 1 : // frontier's `reached` func would be called and we'd return splitNow.
412 1 : // This doesn't really save work since duplicate user keys are rare, and it
413 1 : // requires us to materialize the ImmediateSuccessor key. It also prevents
414 1 : // us from splitting on the same key that the inner splitter requested a
415 1 : // split for—instead we need to wait until the next key. The current
416 1 : // implementation uses `unsafePrevUserKey` to gain access to the previous
417 1 : // key which allows it to immediately respect the inner splitter if
418 1 : // possible.
419 1 : if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
420 1 : return split
421 1 : }
422 1 : if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 {
423 1 : return splitNow
424 1 : }
425 1 : return noSplit
426 : }
427 :
428 1 : func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
429 1 : return u.splitter.onNewOutput(key)
430 1 : }
431 :
432 : // compactionWritable is a objstorage.Writable wrapper that, on every write,
433 : // updates a metric in `versions` on bytes written by in-progress compactions so
434 : // far. It also increments a per-compaction `written` int.
435 : type compactionWritable struct {
436 : objstorage.Writable
437 :
438 : versions *versionSet
439 : written *int64
440 : }
441 :
442 : // Write is part of the objstorage.Writable interface.
443 1 : func (c *compactionWritable) Write(p []byte) error {
444 1 : if err := c.Writable.Write(p); err != nil {
445 0 : return err
446 0 : }
447 :
448 1 : *c.written += int64(len(p))
449 1 : c.versions.incrementCompactionBytes(int64(len(p)))
450 1 : return nil
451 : }
452 :
453 : type compactionKind int
454 :
455 : const (
456 : compactionKindDefault compactionKind = iota
457 : compactionKindFlush
458 : // compactionKindMove denotes a move compaction where the input file is
459 : // retained and linked in a new level without being obsoleted.
460 : compactionKindMove
461 : // compactionKindCopy denotes a copy compaction where the input file is
462 : // copied byte-by-byte into a new file with a new FileNum in the output level.
463 : compactionKindCopy
464 : compactionKindDeleteOnly
465 : compactionKindElisionOnly
466 : compactionKindRead
467 : compactionKindRewrite
468 : compactionKindIngestedFlushable
469 : )
470 :
471 1 : func (k compactionKind) String() string {
472 1 : switch k {
473 1 : case compactionKindDefault:
474 1 : return "default"
475 0 : case compactionKindFlush:
476 0 : return "flush"
477 1 : case compactionKindMove:
478 1 : return "move"
479 1 : case compactionKindDeleteOnly:
480 1 : return "delete-only"
481 1 : case compactionKindElisionOnly:
482 1 : return "elision-only"
483 1 : case compactionKindRead:
484 1 : return "read"
485 1 : case compactionKindRewrite:
486 1 : return "rewrite"
487 0 : case compactionKindIngestedFlushable:
488 0 : return "ingested-flushable"
489 1 : case compactionKindCopy:
490 1 : return "copy"
491 : }
492 0 : return "?"
493 : }
494 :
495 : // rangeKeyCompactionTransform is used to transform range key spans as part of the
496 : // keyspan.MergingIter. As part of this transformation step, we can elide range
497 : // keys in the last snapshot stripe, as well as coalesce range keys within
498 : // snapshot stripes.
499 : func rangeKeyCompactionTransform(
500 : eq base.Equal, snapshots []uint64, elideRangeKey func(start, end []byte) bool,
501 1 : ) keyspan.Transformer {
502 1 : return keyspan.TransformerFunc(func(cmp base.Compare, s keyspan.Span, dst *keyspan.Span) error {
503 1 : elideInLastStripe := func(keys []keyspan.Key) []keyspan.Key {
504 1 : // Unsets and deletes in the last snapshot stripe can be elided.
505 1 : k := 0
506 1 : for j := range keys {
507 1 : if elideRangeKey(s.Start, s.End) &&
508 1 : (keys[j].Kind() == InternalKeyKindRangeKeyUnset || keys[j].Kind() == InternalKeyKindRangeKeyDelete) {
509 1 : continue
510 : }
511 1 : keys[k] = keys[j]
512 1 : k++
513 : }
514 1 : keys = keys[:k]
515 1 : return keys
516 : }
517 : // snapshots are in ascending order, while s.keys are in descending seqnum
518 : // order. Partition s.keys by snapshot stripes, and call rangekey.Coalesce
519 : // on each partition.
520 1 : dst.Start = s.Start
521 1 : dst.End = s.End
522 1 : dst.Keys = dst.Keys[:0]
523 1 : i, j := len(snapshots)-1, 0
524 1 : usedLen := 0
525 1 : for i >= 0 {
526 1 : start := j
527 1 : for j < len(s.Keys) && !base.Visible(s.Keys[j].SeqNum(), snapshots[i], base.InternalKeySeqNumMax) {
528 1 : // Include j in current partition.
529 1 : j++
530 1 : }
531 1 : if j > start {
532 1 : keysDst := dst.Keys[usedLen:cap(dst.Keys)]
533 1 : if err := rangekey.Coalesce(cmp, eq, s.Keys[start:j], &keysDst); err != nil {
534 0 : return err
535 0 : }
536 1 : if j == len(s.Keys) {
537 1 : // This is the last snapshot stripe. Unsets and deletes can be elided.
538 1 : keysDst = elideInLastStripe(keysDst)
539 1 : }
540 1 : usedLen += len(keysDst)
541 1 : dst.Keys = append(dst.Keys, keysDst...)
542 : }
543 1 : i--
544 : }
545 1 : if j < len(s.Keys) {
546 1 : keysDst := dst.Keys[usedLen:cap(dst.Keys)]
547 1 : if err := rangekey.Coalesce(cmp, eq, s.Keys[j:], &keysDst); err != nil {
548 0 : return err
549 0 : }
550 1 : keysDst = elideInLastStripe(keysDst)
551 1 : usedLen += len(keysDst)
552 1 : dst.Keys = append(dst.Keys, keysDst...)
553 : }
554 1 : return nil
555 : })
556 : }
557 :
558 : // compaction is a table compaction from one level to the next, starting from a
559 : // given version.
560 : type compaction struct {
561 : // cancel is a bool that can be used by other goroutines to signal a compaction
562 : // to cancel, such as if a conflicting excise operation raced it to manifest
563 : // application. Only holders of the manifest lock will write to this atomic.
564 : cancel atomic.Bool
565 :
566 : kind compactionKind
567 : cmp Compare
568 : equal Equal
569 : comparer *base.Comparer
570 : formatKey base.FormatKey
571 : logger Logger
572 : version *version
573 : stats base.InternalIteratorStats
574 : beganAt time.Time
575 : // versionEditApplied is set to true when a compaction has completed and the
576 : // resulting version has been installed (if successful), but the compaction
577 : // goroutine is still cleaning up (eg, deleting obsolete files).
578 : versionEditApplied bool
579 : bufferPool sstable.BufferPool
580 :
581 : // startLevel is the level that is being compacted. Inputs from startLevel
582 : // and outputLevel will be merged to produce a set of outputLevel files.
583 : startLevel *compactionLevel
584 :
585 : // outputLevel is the level that files are being produced in. outputLevel is
586 : // equal to startLevel+1 except when:
587 : // - if startLevel is 0, the output level equals compactionPicker.baseLevel().
588 : // - in multilevel compaction, the output level is the lowest level involved in
589 : // the compaction
590 : // A compaction's outputLevel is nil for delete-only compactions.
591 : outputLevel *compactionLevel
592 :
593 : // extraLevels point to additional levels in between the input and output
594 : // levels that get compacted in multilevel compactions
595 : extraLevels []*compactionLevel
596 :
597 : inputs []compactionLevel
598 :
599 : // maxOutputFileSize is the maximum size of an individual table created
600 : // during compaction.
601 : maxOutputFileSize uint64
602 : // maxOverlapBytes is the maximum number of bytes of overlap allowed for a
603 : // single output table with the tables in the grandparent level.
604 : maxOverlapBytes uint64
605 : // disableSpanElision disables elision of range tombstones and range keys. Used
606 : // by tests to allow range tombstones or range keys to be added to tables where
607 : // they would otherwise be elided.
608 : disableSpanElision bool
609 :
610 : // flushing contains the flushables (aka memtables) that are being flushed.
611 : flushing flushableList
612 : // bytesIterated contains the number of bytes that have been flushed/compacted.
613 : bytesIterated uint64
614 : // bytesWritten contains the number of bytes that have been written to outputs.
615 : bytesWritten int64
616 :
617 : // The boundaries of the input data.
618 : smallest InternalKey
619 : largest InternalKey
620 :
621 : // The range deletion tombstone fragmenter. Adds range tombstones as they are
622 : // returned from `compactionIter` and fragments them for output to files.
623 : // Referenced by `compactionIter` which uses it to check whether keys are deleted.
624 : rangeDelFrag keyspan.Fragmenter
625 : // The range key fragmenter. Similar to rangeDelFrag in that it gets range
626 : // keys from the compaction iter and fragments them for output to files.
627 : rangeKeyFrag keyspan.Fragmenter
628 : // The range deletion tombstone iterator, that merges and fragments
629 : // tombstones across levels. This iterator is included within the compaction
630 : // input iterator as a single level.
631 : // TODO(jackson): Remove this when the refactor of FragmentIterator,
632 : // InterleavingIterator, etc is complete.
633 : rangeDelIter keyspan.InternalIteratorShim
634 : // rangeKeyInterleaving is the interleaving iter for range keys.
635 : rangeKeyInterleaving keyspan.InterleavingIter
636 :
637 : // A list of objects to close when the compaction finishes. Used by input
638 : // iteration to keep rangeDelIters open for the lifetime of the compaction,
639 : // and only close them when the compaction finishes.
640 : closers []io.Closer
641 :
642 : // grandparents are the tables in level+2 that overlap with the files being
643 : // compacted. Used to determine output table boundaries. Do not assume that the actual files
644 : // in the grandparent when this compaction finishes will be the same.
645 : grandparents manifest.LevelSlice
646 :
647 : // Boundaries at which flushes to L0 should be split. Determined by
648 : // L0Sublevels. If nil, flushes aren't split.
649 : l0Limits [][]byte
650 :
651 : // List of disjoint inuse key ranges the compaction overlaps with in
652 : // grandparent and lower levels. See setupInuseKeyRanges() for the
653 : // construction. Used by elideTombstone() and elideRangeTombstone() to
654 : // determine if keys affected by a tombstone possibly exist at a lower level.
655 : inuseKeyRanges []manifest.UserKeyRange
656 : // inuseEntireRange is set if the above inuse key ranges wholly contain the
657 : // compaction's key range. This allows compactions in higher levels to often
658 : // elide key comparisons.
659 : inuseEntireRange bool
660 : elideTombstoneIndex int
661 :
662 : // allowedZeroSeqNum is true if seqnums can be zeroed if there are no
663 : // snapshots requiring them to be kept. This determination is made by
664 : // looking for an sstable which overlaps the bounds of the compaction at a
665 : // lower level in the LSM during runCompaction.
666 : allowedZeroSeqNum bool
667 :
668 : metrics map[int]*LevelMetrics
669 :
670 : pickerMetrics compactionPickerMetrics
671 : }
672 :
673 1 : func (c *compaction) makeInfo(jobID int) CompactionInfo {
674 1 : info := CompactionInfo{
675 1 : JobID: jobID,
676 1 : Reason: c.kind.String(),
677 1 : Input: make([]LevelInfo, 0, len(c.inputs)),
678 1 : Annotations: []string{},
679 1 : }
680 1 : for _, cl := range c.inputs {
681 1 : inputInfo := LevelInfo{Level: cl.level, Tables: nil}
682 1 : iter := cl.files.Iter()
683 1 : for m := iter.First(); m != nil; m = iter.Next() {
684 1 : inputInfo.Tables = append(inputInfo.Tables, m.TableInfo())
685 1 : }
686 1 : info.Input = append(info.Input, inputInfo)
687 : }
688 1 : if c.outputLevel != nil {
689 1 : info.Output.Level = c.outputLevel.level
690 1 :
691 1 : // If there are no inputs from the output level (eg, a move
692 1 : // compaction), add an empty LevelInfo to info.Input.
693 1 : if len(c.inputs) > 0 && c.inputs[len(c.inputs)-1].level != c.outputLevel.level {
694 0 : info.Input = append(info.Input, LevelInfo{Level: c.outputLevel.level})
695 0 : }
696 1 : } else {
697 1 : // For a delete-only compaction, set the output level to L6. The
698 1 : // output level is not meaningful here, but complicating the
699 1 : // info.Output interface with a pointer doesn't seem worth the
700 1 : // semantic distinction.
701 1 : info.Output.Level = numLevels - 1
702 1 : }
703 :
704 1 : for i, score := range c.pickerMetrics.scores {
705 1 : info.Input[i].Score = score
706 1 : }
707 1 : info.SingleLevelOverlappingRatio = c.pickerMetrics.singleLevelOverlappingRatio
708 1 : info.MultiLevelOverlappingRatio = c.pickerMetrics.multiLevelOverlappingRatio
709 1 : if len(info.Input) > 2 {
710 1 : info.Annotations = append(info.Annotations, "multilevel")
711 1 : }
712 1 : return info
713 : }
714 :
715 : func newCompaction(
716 : pc *pickedCompaction, opts *Options, beganAt time.Time, provider objstorage.Provider,
717 1 : ) *compaction {
718 1 : c := &compaction{
719 1 : kind: compactionKindDefault,
720 1 : cmp: pc.cmp,
721 1 : equal: opts.equal(),
722 1 : comparer: opts.Comparer,
723 1 : formatKey: opts.Comparer.FormatKey,
724 1 : inputs: pc.inputs,
725 1 : smallest: pc.smallest,
726 1 : largest: pc.largest,
727 1 : logger: opts.Logger,
728 1 : version: pc.version,
729 1 : beganAt: beganAt,
730 1 : maxOutputFileSize: pc.maxOutputFileSize,
731 1 : maxOverlapBytes: pc.maxOverlapBytes,
732 1 : pickerMetrics: pc.pickerMetrics,
733 1 : }
734 1 : c.startLevel = &c.inputs[0]
735 1 : if pc.startLevel.l0SublevelInfo != nil {
736 1 : c.startLevel.l0SublevelInfo = pc.startLevel.l0SublevelInfo
737 1 : }
738 1 : c.outputLevel = &c.inputs[1]
739 1 :
740 1 : if len(pc.extraLevels) > 0 {
741 1 : c.extraLevels = pc.extraLevels
742 1 : c.outputLevel = &c.inputs[len(c.inputs)-1]
743 1 : }
744 : // Compute the set of outputLevel+1 files that overlap this compaction (these
745 : // are the grandparent sstables).
746 1 : if c.outputLevel.level+1 < numLevels {
747 1 : c.grandparents = c.version.Overlaps(c.outputLevel.level+1, c.cmp,
748 1 : c.smallest.UserKey, c.largest.UserKey, c.largest.IsExclusiveSentinel())
749 1 : }
750 1 : c.setupInuseKeyRanges()
751 1 : c.kind = pc.kind
752 1 :
753 1 : if c.kind == compactionKindDefault && c.outputLevel.files.Empty() && !c.hasExtraLevelData() &&
754 1 : c.startLevel.files.Len() == 1 && c.grandparents.SizeSum() <= c.maxOverlapBytes {
755 1 : // This compaction can be converted into a move or copy from one level
756 1 : // to the next. We avoid such a move if there is lots of overlapping
757 1 : // grandparent data. Otherwise, the move could create a parent file
758 1 : // that will require a very expensive merge later on.
759 1 : iter := c.startLevel.files.Iter()
760 1 : meta := iter.First()
761 1 : isRemote := false
762 1 : // We should always be passed a provider, except in some unit tests.
763 1 : if provider != nil {
764 1 : objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
765 1 : if err != nil {
766 0 : panic(errors.Wrapf(err, "cannot lookup table %s in provider", meta.FileBacking.DiskFileNum))
767 : }
768 1 : isRemote = objMeta.IsRemote()
769 : }
770 : // Avoid a trivial move or copy if all of these are true, as rewriting a
771 : // new file is better:
772 : //
773 : // 1) The source file is a virtual sstable
774 : // 2) The existing file `meta` is on non-remote storage
775 : // 3) The output level prefers shared storage
776 1 : mustCopy := !isRemote && remote.ShouldCreateShared(opts.Experimental.CreateOnShared, c.outputLevel.level)
777 1 : if mustCopy {
778 1 : // If the source is virtual, it's best to just rewrite the file as all
779 1 : // conditions in the above comment are met.
780 1 : if !meta.Virtual {
781 1 : c.kind = compactionKindCopy
782 1 : }
783 1 : } else {
784 1 : c.kind = compactionKindMove
785 1 : }
786 : }
787 1 : return c
788 : }
789 :
790 : func newDeleteOnlyCompaction(
791 : opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
792 1 : ) *compaction {
793 1 : c := &compaction{
794 1 : kind: compactionKindDeleteOnly,
795 1 : cmp: opts.Comparer.Compare,
796 1 : equal: opts.equal(),
797 1 : comparer: opts.Comparer,
798 1 : formatKey: opts.Comparer.FormatKey,
799 1 : logger: opts.Logger,
800 1 : version: cur,
801 1 : beganAt: beganAt,
802 1 : inputs: inputs,
803 1 : }
804 1 :
805 1 : // Set c.smallest, c.largest.
806 1 : files := make([]manifest.LevelIterator, 0, len(inputs))
807 1 : for _, in := range inputs {
808 1 : files = append(files, in.files.Iter())
809 1 : }
810 1 : c.smallest, c.largest = manifest.KeyRange(opts.Comparer.Compare, files...)
811 1 : return c
812 : }
813 :
814 1 : func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) {
815 1 : // Heuristic to place a lower bound on compaction output file size
816 1 : // caused by Lbase. Prior to this heuristic we have observed an L0 in
817 1 : // production with 310K files of which 290K files were < 10KB in size.
818 1 : // Our hypothesis is that it was caused by L1 having 2600 files and
819 1 : // ~10GB, such that each flush got split into many tiny files due to
820 1 : // overlapping with most of the files in Lbase.
821 1 : //
822 1 : // The computation below is general in that it accounts
823 1 : // for flushing different volumes of data (e.g. we may be flushing
824 1 : // many memtables). For illustration, we consider the typical
825 1 : // example of flushing a 64MB memtable. So 12.8MB output,
826 1 : // based on the compression guess below. If the compressed bytes
827 1 : // guess is an over-estimate we will end up with smaller files,
828 1 : // and if an under-estimate we will end up with larger files.
829 1 : // With a 2MB target file size, 7 files. We are willing to accept
830 1 : // 4x the number of files, if it results in better write amplification
831 1 : // when later compacting to Lbase, i.e., ~450KB files (target file
832 1 : // size / 4).
833 1 : //
834 1 : // Note that this is a pessimistic heuristic in that
835 1 : // fileCountUpperBoundDueToGrandparents could be far from the actual
836 1 : // number of files produced due to the grandparent limits. For
837 1 : // example, in the extreme, consider a flush that overlaps with 1000
838 1 : // files in Lbase f0...f999, and the initially calculated value of
839 1 : // maxOverlapBytes will cause splits at f10, f20,..., f990, which
840 1 : // means an upper bound file count of 100 files. Say the input bytes
841 1 : // in the flush are such that acceptableFileCount=10. We will fatten
842 1 : // up maxOverlapBytes by 10x to ensure that the upper bound file count
843 1 : // drops to 10. However, it is possible that in practice, even without
844 1 : // this change, we would have produced no more than 10 files, and that
845 1 : // this change makes the files unnecessarily wide. Say the input bytes
846 1 : // are distributed such that 10% are in f0...f9, 10% in f10...f19, ...
847 1 : // 10% in f80...f89 and 10% in f990...f999. The original value of
848 1 : // maxOverlapBytes would have actually produced only 10 sstables. But
849 1 : // by increasing maxOverlapBytes by 10x, we may produce 1 sstable that
850 1 : // spans f0...f89, i.e., a much wider sstable than necessary.
851 1 : //
852 1 : // We could produce a tighter estimate of
853 1 : // fileCountUpperBoundDueToGrandparents if we had knowledge of the key
854 1 : // distribution of the flush. The 4x multiplier mentioned earlier is
855 1 : // a way to try to compensate for this pessimism.
856 1 : //
857 1 : // TODO(sumeer): we don't have compression info for the data being
858 1 : // flushed, but it is likely that existing files that overlap with
859 1 : // this flush in Lbase are representative wrt compression ratio. We
860 1 : // could store the uncompressed size in FileMetadata and estimate
861 1 : // the compression ratio.
862 1 : const approxCompressionRatio = 0.2
863 1 : approxOutputBytes := approxCompressionRatio * float64(flushingBytes)
864 1 : approxNumFilesBasedOnTargetSize :=
865 1 : int(math.Ceil(approxOutputBytes / float64(c.maxOutputFileSize)))
866 1 : acceptableFileCount := float64(4 * approxNumFilesBasedOnTargetSize)
867 1 : // The byte calculation is linear in numGrandparentFiles, but we will
868 1 : // incur this linear cost in findGrandparentLimit too, so we are also
869 1 : // willing to pay it now. We could approximate this cheaply by using
870 1 : // the mean file size of Lbase.
871 1 : grandparentFileBytes := c.grandparents.SizeSum()
872 1 : fileCountUpperBoundDueToGrandparents :=
873 1 : float64(grandparentFileBytes) / float64(c.maxOverlapBytes)
874 1 : if fileCountUpperBoundDueToGrandparents > acceptableFileCount {
875 1 : c.maxOverlapBytes = uint64(
876 1 : float64(c.maxOverlapBytes) *
877 1 : (fileCountUpperBoundDueToGrandparents / acceptableFileCount))
878 1 : }
879 : }
880 :
881 : func newFlush(
882 : opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
883 1 : ) *compaction {
884 1 : c := &compaction{
885 1 : kind: compactionKindFlush,
886 1 : cmp: opts.Comparer.Compare,
887 1 : equal: opts.equal(),
888 1 : comparer: opts.Comparer,
889 1 : formatKey: opts.Comparer.FormatKey,
890 1 : logger: opts.Logger,
891 1 : version: cur,
892 1 : beganAt: beganAt,
893 1 : inputs: []compactionLevel{{level: -1}, {level: 0}},
894 1 : maxOutputFileSize: math.MaxUint64,
895 1 : maxOverlapBytes: math.MaxUint64,
896 1 : flushing: flushing,
897 1 : }
898 1 : c.startLevel = &c.inputs[0]
899 1 : c.outputLevel = &c.inputs[1]
900 1 :
901 1 : if len(flushing) > 0 {
902 1 : if _, ok := flushing[0].flushable.(*ingestedFlushable); ok {
903 1 : if len(flushing) != 1 {
904 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
905 : }
906 1 : c.kind = compactionKindIngestedFlushable
907 1 : return c
908 : }
909 : }
910 :
911 : // Make sure there's no ingestedFlushable after the first flushable in the
912 : // list.
913 1 : for _, f := range flushing {
914 1 : if _, ok := f.flushable.(*ingestedFlushable); ok {
915 0 : panic("pebble: flushing shouldn't contain ingestedFlushable flushable")
916 : }
917 : }
918 :
919 1 : if cur.L0Sublevels != nil {
920 1 : c.l0Limits = cur.L0Sublevels.FlushSplitKeys()
921 1 : }
922 :
923 1 : smallestSet, largestSet := false, false
924 1 : updatePointBounds := func(iter internalIterator) {
925 1 : if key, _ := iter.First(); key != nil {
926 1 : if !smallestSet ||
927 1 : base.InternalCompare(c.cmp, c.smallest, *key) > 0 {
928 1 : smallestSet = true
929 1 : c.smallest = key.Clone()
930 1 : }
931 : }
932 1 : if key, _ := iter.Last(); key != nil {
933 1 : if !largestSet ||
934 1 : base.InternalCompare(c.cmp, c.largest, *key) < 0 {
935 1 : largestSet = true
936 1 : c.largest = key.Clone()
937 1 : }
938 : }
939 : }
940 :
941 1 : updateRangeBounds := func(iter keyspan.FragmentIterator) {
942 1 : // File bounds require s != nil && !s.Empty(). We only need to check for
943 1 : // s != nil here, as the memtable's FragmentIterator would never surface
944 1 : // empty spans.
945 1 : if s := iter.First(); s != nil {
946 1 : if key := s.SmallestKey(); !smallestSet ||
947 1 : base.InternalCompare(c.cmp, c.smallest, key) > 0 {
948 1 : smallestSet = true
949 1 : c.smallest = key.Clone()
950 1 : }
951 : }
952 1 : if s := iter.Last(); s != nil {
953 1 : if key := s.LargestKey(); !largestSet ||
954 1 : base.InternalCompare(c.cmp, c.largest, key) < 0 {
955 1 : largestSet = true
956 1 : c.largest = key.Clone()
957 1 : }
958 : }
959 : }
960 :
961 1 : var flushingBytes uint64
962 1 : for i := range flushing {
963 1 : f := flushing[i]
964 1 : updatePointBounds(f.newIter(nil))
965 1 : if rangeDelIter := f.newRangeDelIter(nil); rangeDelIter != nil {
966 1 : updateRangeBounds(rangeDelIter)
967 1 : }
968 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
969 1 : updateRangeBounds(rangeKeyIter)
970 1 : }
971 1 : flushingBytes += f.inuseBytes()
972 : }
973 :
974 1 : if opts.FlushSplitBytes > 0 {
975 1 : c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
976 1 : c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
977 1 : c.grandparents = c.version.Overlaps(baseLevel, c.cmp, c.smallest.UserKey,
978 1 : c.largest.UserKey, c.largest.IsExclusiveSentinel())
979 1 : adjustGrandparentOverlapBytesForFlush(c, flushingBytes)
980 1 : }
981 :
982 1 : c.setupInuseKeyRanges()
983 1 : return c
984 : }
985 :
986 1 : func (c *compaction) hasExtraLevelData() bool {
987 1 : if len(c.extraLevels) == 0 {
988 1 : // not a multi level compaction
989 1 : return false
990 1 : } else if c.extraLevels[0].files.Empty() {
991 1 : // a multi level compaction without data in the intermediate input level;
992 1 : // e.g. for a multi level compaction with levels 4,5, and 6, this could
993 1 : // occur if there is no files to compact in 5, or in 5 and 6 (i.e. a move).
994 1 : return false
995 1 : }
996 1 : return true
997 : }
998 :
999 1 : func (c *compaction) setupInuseKeyRanges() {
1000 1 : level := c.outputLevel.level + 1
1001 1 : if c.outputLevel.level == 0 {
1002 1 : level = 0
1003 1 : }
1004 : // calculateInuseKeyRanges will return a series of sorted spans. Overlapping
1005 : // or abutting spans have already been merged.
1006 1 : c.inuseKeyRanges = calculateInuseKeyRanges(
1007 1 : c.version, c.cmp, level, numLevels-1, c.smallest.UserKey, c.largest.UserKey,
1008 1 : )
1009 1 : // Check if there's a single in-use span that encompasses the entire key
1010 1 : // range of the compaction. This is an optimization to avoid key comparisons
1011 1 : // against inuseKeyRanges during the compaction when every key within the
1012 1 : // compaction overlaps with an in-use span.
1013 1 : if len(c.inuseKeyRanges) > 0 {
1014 1 : c.inuseEntireRange = c.cmp(c.inuseKeyRanges[0].Start, c.smallest.UserKey) <= 0 &&
1015 1 : c.cmp(c.inuseKeyRanges[0].End, c.largest.UserKey) >= 0
1016 1 : }
1017 : }
1018 :
1019 : func calculateInuseKeyRanges(
1020 : v *version, cmp base.Compare, level, maxLevel int, smallest, largest []byte,
1021 1 : ) []manifest.UserKeyRange {
1022 1 : // Use two slices, alternating which one is input and which one is output
1023 1 : // as we descend the LSM.
1024 1 : var input, output []manifest.UserKeyRange
1025 1 :
1026 1 : // L0 requires special treatment, since sstables within L0 may overlap.
1027 1 : // We use the L0 Sublevels structure to efficiently calculate the merged
1028 1 : // in-use key ranges.
1029 1 : if level == 0 {
1030 1 : output = v.L0Sublevels.InUseKeyRanges(smallest, largest)
1031 1 : level++
1032 1 : }
1033 :
1034 1 : for ; level <= maxLevel; level++ {
1035 1 : // NB: We always treat `largest` as inclusive for simplicity, because
1036 1 : // there's little consequence to calculating slightly broader in-use key
1037 1 : // ranges.
1038 1 : overlaps := v.Overlaps(level, cmp, smallest, largest, false /* exclusiveEnd */)
1039 1 : iter := overlaps.Iter()
1040 1 :
1041 1 : // We may already have in-use key ranges from higher levels. Iterate
1042 1 : // through both our accumulated in-use key ranges and this level's
1043 1 : // files, merging the two.
1044 1 : //
1045 1 : // Tables higher within the LSM have broader key spaces. We use this
1046 1 : // when possible to seek past a level's files that are contained by
1047 1 : // our current accumulated in-use key ranges. This helps avoid
1048 1 : // per-sstable work during flushes or compactions in high levels which
1049 1 : // overlap the majority of the LSM's sstables.
1050 1 : input, output = output, input
1051 1 : output = output[:0]
1052 1 :
1053 1 : var currFile *fileMetadata
1054 1 : var currAccum *manifest.UserKeyRange
1055 1 : if len(input) > 0 {
1056 1 : currAccum, input = &input[0], input[1:]
1057 1 : }
1058 :
1059 : // If we have an accumulated key range and its start is ≤ smallest,
1060 : // we can seek to the accumulated range's end. Otherwise, we need to
1061 : // start at the first overlapping file within the level.
1062 1 : if currAccum != nil && cmp(currAccum.Start, smallest) <= 0 {
1063 1 : currFile = seekGT(&iter, cmp, currAccum.End)
1064 1 : } else {
1065 1 : currFile = iter.First()
1066 1 : }
1067 :
1068 1 : for currFile != nil || currAccum != nil {
1069 1 : // If we've exhausted either the files in the level or the
1070 1 : // accumulated key ranges, we just need to append the one we have.
1071 1 : // If we have both a currFile and a currAccum, they either overlap
1072 1 : // or they're disjoint. If they're disjoint, we append whichever
1073 1 : // one sorts first and move on to the next file or range. If they
1074 1 : // overlap, we merge them into currAccum and proceed to the next
1075 1 : // file.
1076 1 : switch {
1077 1 : case currAccum == nil || (currFile != nil && cmp(currFile.Largest.UserKey, currAccum.Start) < 0):
1078 1 : // This file is strictly before the current accumulated range,
1079 1 : // or there are no more accumulated ranges.
1080 1 : output = append(output, manifest.UserKeyRange{
1081 1 : Start: currFile.Smallest.UserKey,
1082 1 : End: currFile.Largest.UserKey,
1083 1 : })
1084 1 : currFile = iter.Next()
1085 1 : case currFile == nil || (currAccum != nil && cmp(currAccum.End, currFile.Smallest.UserKey) < 0):
1086 1 : // The current accumulated key range is strictly before the
1087 1 : // current file, or there are no more files.
1088 1 : output = append(output, *currAccum)
1089 1 : currAccum = nil
1090 1 : if len(input) > 0 {
1091 1 : currAccum, input = &input[0], input[1:]
1092 1 : }
1093 1 : default:
1094 1 : // The current accumulated range and the current file overlap.
1095 1 : // Adjust the accumulated range to be the union.
1096 1 : if cmp(currFile.Smallest.UserKey, currAccum.Start) < 0 {
1097 1 : currAccum.Start = currFile.Smallest.UserKey
1098 1 : }
1099 1 : if cmp(currFile.Largest.UserKey, currAccum.End) > 0 {
1100 1 : currAccum.End = currFile.Largest.UserKey
1101 1 : }
1102 :
1103 : // Extending `currAccum`'s end boundary may have caused it to
1104 : // overlap with `input` key ranges that we haven't processed
1105 : // yet. Merge any such key ranges.
1106 1 : for len(input) > 0 && cmp(input[0].Start, currAccum.End) <= 0 {
1107 1 : if cmp(input[0].End, currAccum.End) > 0 {
1108 1 : currAccum.End = input[0].End
1109 1 : }
1110 1 : input = input[1:]
1111 : }
1112 : // Seek the level iterator past our current accumulated end.
1113 1 : currFile = seekGT(&iter, cmp, currAccum.End)
1114 : }
1115 : }
1116 : }
1117 1 : return output
1118 : }
1119 :
1120 1 : func seekGT(iter *manifest.LevelIterator, cmp base.Compare, key []byte) *manifest.FileMetadata {
1121 1 : f := iter.SeekGE(cmp, key)
1122 1 : for f != nil && cmp(f.Largest.UserKey, key) == 0 {
1123 1 : f = iter.Next()
1124 1 : }
1125 1 : return f
1126 : }
1127 :
1128 : // findGrandparentLimit takes the start user key for a table and returns the
1129 : // user key to which that table can extend without excessively overlapping
1130 : // the grandparent level. If no limit is needed considering the grandparent
1131 : // files, this function returns nil. This is done in order to prevent a table
1132 : // at level N from overlapping too much data at level N+1. We want to avoid
1133 : // such large overlaps because they translate into large compactions. The
1134 : // current heuristic stops output of a table if the addition of another key
1135 : // would cause the table to overlap more than 10x the target file size at
1136 : // level N. See maxGrandparentOverlapBytes.
1137 1 : func (c *compaction) findGrandparentLimit(start []byte) []byte {
1138 1 : iter := c.grandparents.Iter()
1139 1 : var overlappedBytes uint64
1140 1 : var greater bool
1141 1 : for f := iter.SeekGE(c.cmp, start); f != nil; f = iter.Next() {
1142 1 : overlappedBytes += f.Size
1143 1 : // To ensure forward progress we always return a larger user
1144 1 : // key than where we started. See comments above clients of
1145 1 : // this function for how this is used.
1146 1 : greater = greater || c.cmp(f.Smallest.UserKey, start) > 0
1147 1 : if !greater {
1148 1 : continue
1149 : }
1150 :
1151 : // We return the smallest bound of a sstable rather than the
1152 : // largest because the smallest is always inclusive, and limits
1153 : // are used exlusively when truncating range tombstones. If we
1154 : // truncated an output to the largest key while there's a
1155 : // pending tombstone, the next output file would also overlap
1156 : // the same grandparent f.
1157 1 : if overlappedBytes > c.maxOverlapBytes {
1158 1 : return f.Smallest.UserKey
1159 1 : }
1160 : }
1161 1 : return nil
1162 : }
1163 :
1164 : // findL0Limit takes the start key for a table and returns the user key to which
1165 : // that table can be extended without hitting the next l0Limit. Having flushed
1166 : // sstables "bridging across" an l0Limit could lead to increased L0 -> LBase
1167 : // compaction sizes as well as elevated read amplification.
1168 1 : func (c *compaction) findL0Limit(start []byte) []byte {
1169 1 : if c.startLevel.level > -1 || c.outputLevel.level != 0 || len(c.l0Limits) == 0 {
1170 1 : return nil
1171 1 : }
1172 1 : index := sort.Search(len(c.l0Limits), func(i int) bool {
1173 1 : return c.cmp(c.l0Limits[i], start) > 0
1174 1 : })
1175 1 : if index < len(c.l0Limits) {
1176 1 : return c.l0Limits[index]
1177 1 : }
1178 1 : return nil
1179 : }
1180 :
1181 : // errorOnUserKeyOverlap returns an error if the last two written sstables in
1182 : // this compaction have revisions of the same user key present in both sstables,
1183 : // when it shouldn't (eg. when splitting flushes).
1184 1 : func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
1185 1 : if n := len(ve.NewFiles); n > 1 {
1186 1 : meta := ve.NewFiles[n-1].Meta
1187 1 : prevMeta := ve.NewFiles[n-2].Meta
1188 1 : if !prevMeta.Largest.IsExclusiveSentinel() &&
1189 1 : c.cmp(prevMeta.Largest.UserKey, meta.Smallest.UserKey) >= 0 {
1190 1 : return errors.Errorf("pebble: compaction split user key across two sstables: %s in %s and %s",
1191 1 : prevMeta.Largest.Pretty(c.formatKey),
1192 1 : prevMeta.FileNum,
1193 1 : meta.FileNum)
1194 1 : }
1195 : }
1196 1 : return nil
1197 : }
1198 :
1199 : // allowZeroSeqNum returns true if seqnum's can be zeroed if there are no
1200 : // snapshots requiring them to be kept. It performs this determination by
1201 : // looking for an sstable which overlaps the bounds of the compaction at a
1202 : // lower level in the LSM.
1203 1 : func (c *compaction) allowZeroSeqNum() bool {
1204 1 : return c.elideRangeTombstone(c.smallest.UserKey, c.largest.UserKey)
1205 1 : }
1206 :
1207 : // elideTombstone returns true if it is ok to elide a tombstone for the
1208 : // specified key. A return value of true guarantees that there are no key/value
1209 : // pairs at c.level+2 or higher that possibly contain the specified user
1210 : // key. The keys in multiple invocations to elideTombstone must be supplied in
1211 : // order.
1212 1 : func (c *compaction) elideTombstone(key []byte) bool {
1213 1 : if c.inuseEntireRange || len(c.flushing) != 0 {
1214 1 : return false
1215 1 : }
1216 :
1217 1 : for ; c.elideTombstoneIndex < len(c.inuseKeyRanges); c.elideTombstoneIndex++ {
1218 1 : r := &c.inuseKeyRanges[c.elideTombstoneIndex]
1219 1 : if c.cmp(key, r.End) <= 0 {
1220 1 : if c.cmp(key, r.Start) >= 0 {
1221 1 : return false
1222 1 : }
1223 1 : break
1224 : }
1225 : }
1226 1 : return true
1227 : }
1228 :
1229 : // elideRangeTombstone returns true if it is ok to elide the specified range
1230 : // tombstone. A return value of true guarantees that there are no key/value
1231 : // pairs at c.outputLevel.level+1 or higher that possibly overlap the specified
1232 : // tombstone.
1233 1 : func (c *compaction) elideRangeTombstone(start, end []byte) bool {
1234 1 : // Disable range tombstone elision if the testing knob for that is enabled,
1235 1 : // or if we are flushing memtables. The latter requirement is due to
1236 1 : // inuseKeyRanges not accounting for key ranges in other memtables that are
1237 1 : // being flushed in the same compaction. It's possible for a range tombstone
1238 1 : // in one memtable to overlap keys in a preceding memtable in c.flushing.
1239 1 : //
1240 1 : // This function is also used in setting allowZeroSeqNum, so disabling
1241 1 : // elision of range tombstones also disables zeroing of SeqNums.
1242 1 : //
1243 1 : // TODO(peter): we disable zeroing of seqnums during flushing to match
1244 1 : // RocksDB behavior and to avoid generating overlapping sstables during
1245 1 : // DB.replayWAL. When replaying WAL files at startup, we flush after each
1246 1 : // WAL is replayed building up a single version edit that is
1247 1 : // applied. Because we don't apply the version edit after each flush, this
1248 1 : // code doesn't know that L0 contains files and zeroing of seqnums should
1249 1 : // be disabled. That is fixable, but it seems safer to just match the
1250 1 : // RocksDB behavior for now.
1251 1 : if c.disableSpanElision || len(c.flushing) != 0 {
1252 1 : return false
1253 1 : }
1254 :
1255 1 : lower := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
1256 1 : return c.cmp(c.inuseKeyRanges[i].End, start) >= 0
1257 1 : })
1258 1 : upper := sort.Search(len(c.inuseKeyRanges), func(i int) bool {
1259 1 : return c.cmp(c.inuseKeyRanges[i].Start, end) > 0
1260 1 : })
1261 1 : return lower >= upper
1262 : }
1263 :
1264 : // elideRangeKey returns true if it is ok to elide the specified range key. A
1265 : // return value of true guarantees that there are no key/value pairs at
1266 : // c.outputLevel.level+1 or higher that possibly overlap the specified range key.
1267 1 : func (c *compaction) elideRangeKey(start, end []byte) bool {
1268 1 : // TODO(bilal): Track inuseKeyRanges separately for the range keyspace as
1269 1 : // opposed to the point keyspace. Once that is done, elideRangeTombstone
1270 1 : // can just check in the point keyspace, and this function can check for
1271 1 : // inuseKeyRanges in the range keyspace.
1272 1 : return c.elideRangeTombstone(start, end)
1273 1 : }
1274 :
1275 : // newInputIter returns an iterator over all the input tables in a compaction.
1276 : func (c *compaction) newInputIter(
1277 : newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, snapshots []uint64,
1278 1 : ) (_ internalIterator, retErr error) {
1279 1 : // Validate the ordering of compaction input files for defense in depth.
1280 1 : if len(c.flushing) == 0 {
1281 1 : if c.startLevel.level >= 0 {
1282 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
1283 1 : manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
1284 1 : if err != nil {
1285 1 : return nil, err
1286 1 : }
1287 : }
1288 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
1289 1 : manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
1290 1 : if err != nil {
1291 1 : return nil, err
1292 1 : }
1293 1 : if c.startLevel.level == 0 {
1294 1 : if c.startLevel.l0SublevelInfo == nil {
1295 0 : panic("l0SublevelInfo not created for compaction out of L0")
1296 : }
1297 1 : for _, info := range c.startLevel.l0SublevelInfo {
1298 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
1299 1 : info.sublevel, info.Iter())
1300 1 : if err != nil {
1301 1 : return nil, err
1302 1 : }
1303 : }
1304 : }
1305 1 : if len(c.extraLevels) > 0 {
1306 1 : if len(c.extraLevels) > 1 {
1307 0 : panic("n>2 multi level compaction not implemented yet")
1308 : }
1309 1 : interLevel := c.extraLevels[0]
1310 1 : err := manifest.CheckOrdering(c.cmp, c.formatKey,
1311 1 : manifest.Level(interLevel.level), interLevel.files.Iter())
1312 1 : if err != nil {
1313 0 : return nil, err
1314 0 : }
1315 : }
1316 : }
1317 :
1318 : // There are three classes of keys that a compaction needs to process: point
1319 : // keys, range deletion tombstones and range keys. Collect all iterators for
1320 : // all these classes of keys from all the levels. We'll aggregate them
1321 : // together farther below.
1322 : //
1323 : // numInputLevels is an approximation of the number of iterator levels. Due
1324 : // to idiosyncrasies in iterator construction, we may (rarely) exceed this
1325 : // initial capacity.
1326 1 : numInputLevels := max(len(c.flushing), len(c.inputs))
1327 1 : iters := make([]internalIterator, 0, numInputLevels)
1328 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
1329 1 : rangeKeyIters := make([]keyspan.FragmentIterator, 0, numInputLevels)
1330 1 :
1331 1 : // If construction of the iterator inputs fails, ensure that we close all
1332 1 : // the consitutent iterators.
1333 1 : defer func() {
1334 1 : if retErr != nil {
1335 0 : for _, iter := range iters {
1336 0 : if iter != nil {
1337 0 : iter.Close()
1338 0 : }
1339 : }
1340 0 : for _, rangeDelIter := range rangeDelIters {
1341 0 : rangeDelIter.Close()
1342 0 : }
1343 : }
1344 : }()
1345 1 : iterOpts := IterOptions{
1346 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1347 1 : Category: "pebble-compaction",
1348 1 : QoSLevel: sstable.NonLatencySensitiveQoSLevel,
1349 1 : },
1350 1 : logger: c.logger,
1351 1 : }
1352 1 :
1353 1 : // Populate iters, rangeDelIters and rangeKeyIters with the appropriate
1354 1 : // constituent iterators. This depends on whether this is a flush or a
1355 1 : // compaction.
1356 1 : if len(c.flushing) != 0 {
1357 1 : // If flushing, we need to build the input iterators over the memtables
1358 1 : // stored in c.flushing.
1359 1 : for i := range c.flushing {
1360 1 : f := c.flushing[i]
1361 1 : iters = append(iters, f.newFlushIter(nil, &c.bytesIterated))
1362 1 : rangeDelIter := f.newRangeDelIter(nil)
1363 1 : if rangeDelIter != nil {
1364 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
1365 1 : }
1366 1 : if rangeKeyIter := f.newRangeKeyIter(nil); rangeKeyIter != nil {
1367 1 : rangeKeyIters = append(rangeKeyIters, rangeKeyIter)
1368 1 : }
1369 : }
1370 1 : } else {
1371 1 : addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
1372 1 : // Add a *levelIter for point iterators. Because we don't call
1373 1 : // initRangeDel, the levelIter will close and forget the range
1374 1 : // deletion iterator when it steps on to a new file. Surfacing range
1375 1 : // deletions to compactions are handled below.
1376 1 : iters = append(iters, newLevelIter(context.Background(),
1377 1 : iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{
1378 1 : bytesIterated: &c.bytesIterated,
1379 1 : bufferPool: &c.bufferPool,
1380 1 : }))
1381 1 : // TODO(jackson): Use keyspan.LevelIter to avoid loading all the range
1382 1 : // deletions into memory upfront. (See #2015, which reverted this.)
1383 1 : // There will be no user keys that are split between sstables
1384 1 : // within a level in Cockroach 23.1, which unblocks this optimization.
1385 1 :
1386 1 : // Add the range deletion iterator for each file as an independent level
1387 1 : // in mergingIter, as opposed to making a levelIter out of those. This
1388 1 : // is safer as levelIter expects all keys coming from underlying
1389 1 : // iterators to be in order. Due to compaction / tombstone writing
1390 1 : // logic in finishOutput(), it is possible for range tombstones to not
1391 1 : // be strictly ordered across all files in one level.
1392 1 : //
1393 1 : // Consider this example from the metamorphic tests (also repeated in
1394 1 : // finishOutput()), consisting of three L3 files with their bounds
1395 1 : // specified in square brackets next to the file name:
1396 1 : //
1397 1 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
1398 1 : // tmgc#391,MERGE [786e627a]
1399 1 : // tmgc-udkatvs#331,RANGEDEL
1400 1 : //
1401 1 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
1402 1 : // tmgc#384,MERGE [666c7070]
1403 1 : // tmgc-tvsalezade#383,RANGEDEL
1404 1 : // tmgc-tvsalezade#331,RANGEDEL
1405 1 : //
1406 1 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
1407 1 : // tmgc-tvsalezade#383,RANGEDEL
1408 1 : // tmgc#375,SET [72646c78766965616c72776865676e79]
1409 1 : // tmgc-tvsalezade#356,RANGEDEL
1410 1 : //
1411 1 : // Here, the range tombstone in 000240.sst falls "after" one in
1412 1 : // 000241.sst, despite 000240.sst being ordered "before" 000241.sst for
1413 1 : // levelIter's purposes. While each file is still consistent before its
1414 1 : // bounds, it's safer to have all rangedel iterators be visible to
1415 1 : // mergingIter.
1416 1 : iter := level.files.Iter()
1417 1 : for f := iter.First(); f != nil; f = iter.Next() {
1418 1 : rangeDelIter, closer, err := c.newRangeDelIter(
1419 1 : newIters, iter.Take(), iterOpts, l, &c.bytesIterated)
1420 1 : if err != nil {
1421 0 : // The error will already be annotated with the BackingFileNum, so
1422 0 : // we annotate it with the FileNum.
1423 0 : return errors.Wrapf(err, "pebble: could not open table %s", errors.Safe(f.FileNum))
1424 0 : }
1425 1 : if rangeDelIter == nil {
1426 1 : continue
1427 : }
1428 1 : rangeDelIters = append(rangeDelIters, rangeDelIter)
1429 1 : c.closers = append(c.closers, closer)
1430 : }
1431 :
1432 : // Check if this level has any range keys.
1433 1 : hasRangeKeys := false
1434 1 : for f := iter.First(); f != nil; f = iter.Next() {
1435 1 : if f.HasRangeKeys {
1436 1 : hasRangeKeys = true
1437 1 : break
1438 : }
1439 : }
1440 1 : if hasRangeKeys {
1441 1 : li := &keyspan.LevelIter{}
1442 1 : newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
1443 1 : iter, err := newRangeKeyIter(file, iterOptions)
1444 1 : if err != nil {
1445 0 : return nil, err
1446 1 : } else if iter == nil {
1447 0 : return emptyKeyspanIter, nil
1448 0 : }
1449 : // Ensure that the range key iter is not closed until the compaction is
1450 : // finished. This is necessary because range key processing
1451 : // requires the range keys to be held in memory for up to the
1452 : // lifetime of the compaction.
1453 1 : c.closers = append(c.closers, iter)
1454 1 : iter = noCloseIter{iter}
1455 1 :
1456 1 : // We do not need to truncate range keys to sstable boundaries, or
1457 1 : // only read within the file's atomic compaction units, unlike with
1458 1 : // range tombstones. This is because range keys were added after we
1459 1 : // stopped splitting user keys across sstables, so all the range keys
1460 1 : // in this sstable must wholly lie within the file's bounds.
1461 1 : return iter, err
1462 : }
1463 1 : li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
1464 1 : rangeKeyIters = append(rangeKeyIters, li)
1465 : }
1466 1 : return nil
1467 : }
1468 :
1469 1 : for i := range c.inputs {
1470 1 : // If the level is annotated with l0SublevelInfo, expand it into one
1471 1 : // level per sublevel.
1472 1 : // TODO(jackson): Perform this expansion even earlier when we pick the
1473 1 : // compaction?
1474 1 : if len(c.inputs[i].l0SublevelInfo) > 0 {
1475 1 : for _, info := range c.startLevel.l0SublevelInfo {
1476 1 : sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
1477 1 : if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
1478 0 : return nil, err
1479 0 : }
1480 : }
1481 1 : continue
1482 : }
1483 1 : if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
1484 0 : return nil, err
1485 0 : }
1486 : }
1487 : }
1488 :
1489 : // In normal operation, levelIter iterates over the point operations in a
1490 : // level, and initializes a rangeDelIter pointer for the range deletions in
1491 : // each table. During compaction, we want to iterate over the merged view of
1492 : // point operations and range deletions. In order to do this we create one
1493 : // levelIter per level to iterate over the point operations, and collect up
1494 : // all the range deletion files.
1495 : //
1496 : // The range deletion levels are first combined with a keyspan.MergingIter
1497 : // (currently wrapped by a keyspan.InternalIteratorShim to satisfy the
1498 : // internal iterator interface). The resulting merged rangedel iterator is
1499 : // then included with the point levels in a single mergingIter.
1500 : //
1501 : // Combine all the rangedel iterators using a keyspan.MergingIterator and a
1502 : // InternalIteratorShim so that the range deletions may be interleaved in
1503 : // the compaction input.
1504 : // TODO(jackson): Replace the InternalIteratorShim with an interleaving
1505 : // iterator.
1506 1 : if len(rangeDelIters) > 0 {
1507 1 : c.rangeDelIter.Init(c.cmp, rangeDelIters...)
1508 1 : iters = append(iters, &c.rangeDelIter)
1509 1 : }
1510 :
1511 : // If there's only one constituent point iterator, we can avoid the overhead
1512 : // of a *mergingIter. This is possible, for example, when performing a flush
1513 : // of a single memtable. Otherwise, combine all the iterators into a merging
1514 : // iter.
1515 1 : iter := iters[0]
1516 1 : if len(iters) > 0 {
1517 1 : iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
1518 1 : }
1519 : // If there are range key iterators, we need to combine them using
1520 : // keyspan.MergingIter, and then interleave them among the points.
1521 1 : if len(rangeKeyIters) > 0 {
1522 1 : mi := &keyspan.MergingIter{}
1523 1 : mi.Init(c.cmp, rangeKeyCompactionTransform(c.equal, snapshots, c.elideRangeKey), new(keyspan.MergingBuffers), rangeKeyIters...)
1524 1 : di := &keyspan.DefragmentingIter{}
1525 1 : di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
1526 1 : c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{})
1527 1 : iter = &c.rangeKeyInterleaving
1528 1 : }
1529 1 : return iter, nil
1530 : }
1531 :
1532 : func (c *compaction) newRangeDelIter(
1533 : newIters tableNewIters,
1534 : f manifest.LevelFile,
1535 : opts IterOptions,
1536 : l manifest.Level,
1537 : bytesIterated *uint64,
1538 1 : ) (keyspan.FragmentIterator, io.Closer, error) {
1539 1 : opts.level = l
1540 1 : iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
1541 1 : &opts, internalIterOpts{
1542 1 : bytesIterated: &c.bytesIterated,
1543 1 : bufferPool: &c.bufferPool,
1544 1 : })
1545 1 : if err != nil {
1546 0 : return nil, nil, err
1547 0 : }
1548 : // TODO(peter): It is mildly wasteful to open the point iterator only to
1549 : // immediately close it. One way to solve this would be to add new
1550 : // methods to tableCache for creating point and range-deletion iterators
1551 : // independently. We'd only want to use those methods here,
1552 : // though. Doesn't seem worth the hassle in the near term.
1553 1 : if err = iter.Close(); err != nil {
1554 0 : if rangeDelIter != nil {
1555 0 : err = errors.CombineErrors(err, rangeDelIter.Close())
1556 0 : }
1557 0 : return nil, nil, err
1558 : }
1559 1 : if rangeDelIter == nil {
1560 1 : // The file doesn't contain any range deletions.
1561 1 : return nil, nil, nil
1562 1 : }
1563 :
1564 : // Ensure that rangeDelIter is not closed until the compaction is
1565 : // finished. This is necessary because range tombstone processing
1566 : // requires the range tombstones to be held in memory for up to the
1567 : // lifetime of the compaction.
1568 1 : closer := rangeDelIter
1569 1 : rangeDelIter = noCloseIter{rangeDelIter}
1570 1 :
1571 1 : // Truncate the range tombstones returned by the iterator to the
1572 1 : // upper bound of the atomic compaction unit of the file. We want to
1573 1 : // truncate the range tombstone to the bounds of the file, but files
1574 1 : // with split user keys pose an obstacle: The file's largest bound
1575 1 : // is inclusive whereas the range tombstone's end is exclusive.
1576 1 : //
1577 1 : // Consider the example:
1578 1 : //
1579 1 : // 000001:[b-f#200] range del [c,k)
1580 1 : // 000002:[f#190-g#inf] range del [c,k)
1581 1 : // 000003:[g#500-i#3]
1582 1 : //
1583 1 : // Files 000001 and 000002 contain the untruncated range tombstones
1584 1 : // [c,k). While the keyspace covered by 000003 was at one point
1585 1 : // deleted by the tombstone [c,k), the tombstone may have already
1586 1 : // been compacted away and the file does not contain an untruncated
1587 1 : // range tombstone. We want to bound 000001's tombstone to the file
1588 1 : // bounds, but it's not possible to encode a range tombstone with an
1589 1 : // end boundary within a user key (eg, between sequence numbers
1590 1 : // f#200 and f#190). Instead, we expand 000001 to its atomic
1591 1 : // compaction unit (000001 and 000002) and truncate the tombstone to
1592 1 : // g#inf.
1593 1 : //
1594 1 : // NB: We must not use the atomic compaction unit of the entire
1595 1 : // compaction, because the [c,k) tombstone contained in the file
1596 1 : // 000001 ≥ g. If 000001, 000002 and 000003 are all included in the
1597 1 : // same compaction, the compaction's atomic compaction unit includes
1598 1 : // 000003. However 000003's keys must not be covered by 000001's
1599 1 : // untruncated range tombstone.
1600 1 : //
1601 1 : // Note that we need do this truncation at read time in order to
1602 1 : // handle sstables generated by RocksDB and earlier versions of
1603 1 : // Pebble which do not truncate range tombstones to atomic
1604 1 : // compaction unit boundaries at write time.
1605 1 : //
1606 1 : // The current Pebble compaction logic DOES truncate tombstones to
1607 1 : // atomic unit boundaries at compaction time too.
1608 1 : atomicUnit, _ := expandToAtomicUnit(c.cmp, f.Slice(), true /* disableIsCompacting */)
1609 1 : lowerBound, upperBound := manifest.KeyRange(c.cmp, atomicUnit.Iter())
1610 1 : // Range deletion tombstones are often written to sstables
1611 1 : // untruncated on the end key side. However, they are still only
1612 1 : // valid within a given file's bounds. The logic for writing range
1613 1 : // tombstones to an output file sometimes has an incomplete view
1614 1 : // of range tombstones outside the file's internal key bounds. Skip
1615 1 : // any range tombstones completely outside file bounds.
1616 1 : rangeDelIter = keyspan.Truncate(
1617 1 : c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey,
1618 1 : &f.Smallest, &f.Largest, false, /* panicOnUpperTruncate */
1619 1 : )
1620 1 : return rangeDelIter, closer, nil
1621 : }
1622 :
1623 1 : func (c *compaction) String() string {
1624 1 : if len(c.flushing) != 0 {
1625 0 : return "flush\n"
1626 0 : }
1627 :
1628 1 : var buf bytes.Buffer
1629 1 : for level := c.startLevel.level; level <= c.outputLevel.level; level++ {
1630 1 : i := level - c.startLevel.level
1631 1 : fmt.Fprintf(&buf, "%d:", level)
1632 1 : iter := c.inputs[i].files.Iter()
1633 1 : for f := iter.First(); f != nil; f = iter.Next() {
1634 1 : fmt.Fprintf(&buf, " %s:%s-%s", f.FileNum, f.Smallest, f.Largest)
1635 1 : }
1636 1 : fmt.Fprintf(&buf, "\n")
1637 : }
1638 1 : return buf.String()
1639 : }
1640 :
1641 : type manualCompaction struct {
1642 : // Count of the retries either due to too many concurrent compactions, or a
1643 : // concurrent compaction to overlapping levels.
1644 : retries int
1645 : level int
1646 : outputLevel int
1647 : done chan error
1648 : start []byte
1649 : end []byte
1650 : split bool
1651 : }
1652 :
1653 : type readCompaction struct {
1654 : level int
1655 : // [start, end] key ranges are used for de-duping.
1656 : start []byte
1657 : end []byte
1658 :
1659 : // The file associated with the compaction.
1660 : // If the file no longer belongs in the same
1661 : // level, then we skip the compaction.
1662 : fileNum base.FileNum
1663 : }
1664 :
1665 : type downloadSpan struct {
1666 : start []byte
1667 : end []byte
1668 : // doneChans contains a list of channels passed into compactions as done
1669 : // channels. Each channel has a buffer size of 1 and is only passed into
1670 : // one compaction. This slice can grow over the lifetime of a downloadSpan.
1671 : doneChans []chan error
1672 : // compactionsStarted is the number of compactions started for this
1673 : // downloadSpan. Must be equal to len(doneChans)-1, i.e. there's one spare
1674 : // doneChan created each time a compaction starts up, for the next compaction.
1675 : compactionsStarted int
1676 : }
1677 :
1678 1 : func (d *DB) addInProgressCompaction(c *compaction) {
1679 1 : d.mu.compact.inProgress[c] = struct{}{}
1680 1 : var isBase, isIntraL0 bool
1681 1 : for _, cl := range c.inputs {
1682 1 : iter := cl.files.Iter()
1683 1 : for f := iter.First(); f != nil; f = iter.Next() {
1684 1 : if f.IsCompacting() {
1685 0 : d.opts.Logger.Fatalf("L%d->L%d: %s already being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1686 0 : }
1687 1 : f.SetCompactionState(manifest.CompactionStateCompacting)
1688 1 : if c.startLevel != nil && c.outputLevel != nil && c.startLevel.level == 0 {
1689 1 : if c.outputLevel.level == 0 {
1690 1 : f.IsIntraL0Compacting = true
1691 1 : isIntraL0 = true
1692 1 : } else {
1693 1 : isBase = true
1694 1 : }
1695 : }
1696 : }
1697 : }
1698 :
1699 1 : if (isIntraL0 || isBase) && c.version.L0Sublevels != nil {
1700 1 : l0Inputs := []manifest.LevelSlice{c.startLevel.files}
1701 1 : if isIntraL0 {
1702 1 : l0Inputs = append(l0Inputs, c.outputLevel.files)
1703 1 : }
1704 1 : if err := c.version.L0Sublevels.UpdateStateForStartedCompaction(l0Inputs, isBase); err != nil {
1705 0 : d.opts.Logger.Fatalf("could not update state for compaction: %s", err)
1706 0 : }
1707 : }
1708 : }
1709 :
1710 : // Removes compaction markers from files in a compaction. The rollback parameter
1711 : // indicates whether the compaction state should be rolled back to its original
1712 : // state in the case of an unsuccessful compaction.
1713 : //
1714 : // DB.mu must be held when calling this method, however this method can drop and
1715 : // re-acquire that mutex. All writes to the manifest for this compaction should
1716 : // have completed by this point.
1717 1 : func (d *DB) clearCompactingState(c *compaction, rollback bool) {
1718 1 : c.versionEditApplied = true
1719 1 : for _, cl := range c.inputs {
1720 1 : iter := cl.files.Iter()
1721 1 : for f := iter.First(); f != nil; f = iter.Next() {
1722 1 : if !f.IsCompacting() {
1723 0 : d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
1724 0 : }
1725 1 : if !rollback {
1726 1 : // On success all compactions other than move-compactions transition the
1727 1 : // file into the Compacted state. Move-compacted files become eligible
1728 1 : // for compaction again and transition back to NotCompacting.
1729 1 : if c.kind != compactionKindMove {
1730 1 : f.SetCompactionState(manifest.CompactionStateCompacted)
1731 1 : } else {
1732 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1733 1 : }
1734 1 : } else {
1735 1 : // Else, on rollback, all input files unconditionally transition back to
1736 1 : // NotCompacting.
1737 1 : f.SetCompactionState(manifest.CompactionStateNotCompacting)
1738 1 : }
1739 1 : f.IsIntraL0Compacting = false
1740 : }
1741 : }
1742 1 : l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
1743 1 : func() {
1744 1 : // InitCompactingFileInfo requires that no other manifest writes be
1745 1 : // happening in parallel with it, i.e. we're not in the midst of installing
1746 1 : // another version. Otherwise, it's possible that we've created another
1747 1 : // L0Sublevels instance, but not added it to the versions list, causing
1748 1 : // all the indices in FileMetadata to be inaccurate. To ensure this,
1749 1 : // grab the manifest lock.
1750 1 : d.mu.versions.logLock()
1751 1 : defer d.mu.versions.logUnlock()
1752 1 : d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
1753 1 : }()
1754 : }
1755 :
1756 1 : func (d *DB) calculateDiskAvailableBytes() uint64 {
1757 1 : if space, err := d.opts.FS.GetDiskUsage(d.dirname); err == nil {
1758 1 : d.diskAvailBytes.Store(space.AvailBytes)
1759 1 : return space.AvailBytes
1760 1 : } else if !errors.Is(err, vfs.ErrUnsupported) {
1761 1 : d.opts.EventListener.BackgroundError(err)
1762 1 : }
1763 1 : return d.diskAvailBytes.Load()
1764 : }
1765 :
1766 0 : func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
1767 0 : var pacerInfo deletionPacerInfo
1768 0 : // Call GetDiskUsage after every file deletion. This may seem inefficient,
1769 0 : // but in practice this was observed to take constant time, regardless of
1770 0 : // volume size used, at least on linux with ext4 and zfs. All invocations
1771 0 : // take 10 microseconds or less.
1772 0 : pacerInfo.freeBytes = d.calculateDiskAvailableBytes()
1773 0 : d.mu.Lock()
1774 0 : pacerInfo.obsoleteBytes = d.mu.versions.metrics.Table.ObsoleteSize
1775 0 : pacerInfo.liveBytes = uint64(d.mu.versions.metrics.Total().Size)
1776 0 : d.mu.Unlock()
1777 0 : return pacerInfo
1778 0 : }
1779 :
1780 : // onObsoleteTableDelete is called to update metrics when an sstable is deleted.
1781 1 : func (d *DB) onObsoleteTableDelete(fileSize uint64) {
1782 1 : d.mu.Lock()
1783 1 : d.mu.versions.metrics.Table.ObsoleteCount--
1784 1 : d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
1785 1 : d.mu.Unlock()
1786 1 : }
1787 :
1788 : // maybeScheduleFlush schedules a flush if necessary.
1789 : //
1790 : // d.mu must be held when calling this.
1791 1 : func (d *DB) maybeScheduleFlush() {
1792 1 : if d.mu.compact.flushing || d.closed.Load() != nil || d.opts.ReadOnly {
1793 1 : return
1794 1 : }
1795 1 : if len(d.mu.mem.queue) <= 1 {
1796 1 : return
1797 1 : }
1798 :
1799 1 : if !d.passedFlushThreshold() {
1800 1 : return
1801 1 : }
1802 :
1803 1 : d.mu.compact.flushing = true
1804 1 : go d.flush()
1805 : }
1806 :
1807 1 : func (d *DB) passedFlushThreshold() bool {
1808 1 : var n int
1809 1 : var size uint64
1810 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
1811 1 : if !d.mu.mem.queue[n].readyForFlush() {
1812 1 : break
1813 : }
1814 1 : if d.mu.mem.queue[n].flushForced {
1815 1 : // A flush was forced. Pretend the memtable size is the configured
1816 1 : // size. See minFlushSize below.
1817 1 : size += d.opts.MemTableSize
1818 1 : } else {
1819 1 : size += d.mu.mem.queue[n].totalBytes()
1820 1 : }
1821 : }
1822 1 : if n == 0 {
1823 1 : // None of the immutable memtables are ready for flushing.
1824 1 : return false
1825 1 : }
1826 :
1827 : // Only flush once the sum of the queued memtable sizes exceeds half the
1828 : // configured memtable size. This prevents flushing of memtables at startup
1829 : // while we're undergoing the ramp period on the memtable size. See
1830 : // DB.newMemTable().
1831 1 : minFlushSize := d.opts.MemTableSize / 2
1832 1 : return size >= minFlushSize
1833 : }
1834 :
1835 1 : func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
1836 1 : var mem *flushableEntry
1837 1 : for _, m := range d.mu.mem.queue {
1838 1 : if m.flushable == tbl {
1839 1 : mem = m
1840 1 : break
1841 : }
1842 : }
1843 1 : if mem == nil || mem.flushForced {
1844 1 : return
1845 1 : }
1846 1 : deadline := d.timeNow().Add(dur)
1847 1 : if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
1848 1 : // Already scheduled to flush sooner than within `dur`.
1849 1 : return
1850 1 : }
1851 1 : mem.delayedFlushForcedAt = deadline
1852 1 : go func() {
1853 1 : timer := time.NewTimer(dur)
1854 1 : defer timer.Stop()
1855 1 :
1856 1 : select {
1857 1 : case <-d.closedCh:
1858 1 : return
1859 1 : case <-mem.flushed:
1860 1 : return
1861 1 : case <-timer.C:
1862 1 : d.commit.mu.Lock()
1863 1 : defer d.commit.mu.Unlock()
1864 1 : d.mu.Lock()
1865 1 : defer d.mu.Unlock()
1866 1 :
1867 1 : // NB: The timer may fire concurrently with a call to Close. If a
1868 1 : // Close call beat us to acquiring d.mu, d.closed holds ErrClosed,
1869 1 : // and it's too late to flush anything. Otherwise, the Close call
1870 1 : // will block on locking d.mu until we've finished scheduling the
1871 1 : // flush and set `d.mu.compact.flushing` to true. Close will wait
1872 1 : // for the current flush to complete.
1873 1 : if d.closed.Load() != nil {
1874 1 : return
1875 1 : }
1876 :
1877 1 : if d.mu.mem.mutable == tbl {
1878 1 : d.makeRoomForWrite(nil)
1879 1 : } else {
1880 1 : mem.flushForced = true
1881 1 : }
1882 1 : d.maybeScheduleFlush()
1883 : }
1884 : }()
1885 : }
1886 :
1887 1 : func (d *DB) flush() {
1888 1 : pprof.Do(context.Background(), flushLabels, func(context.Context) {
1889 1 : flushingWorkStart := time.Now()
1890 1 : d.mu.Lock()
1891 1 : defer d.mu.Unlock()
1892 1 : idleDuration := flushingWorkStart.Sub(d.mu.compact.noOngoingFlushStartTime)
1893 1 : var bytesFlushed uint64
1894 1 : var err error
1895 1 : if bytesFlushed, err = d.flush1(); err != nil {
1896 1 : // TODO(peter): count consecutive flush errors and backoff.
1897 1 : d.opts.EventListener.BackgroundError(err)
1898 1 : }
1899 1 : d.mu.compact.flushing = false
1900 1 : d.mu.compact.noOngoingFlushStartTime = time.Now()
1901 1 : workDuration := d.mu.compact.noOngoingFlushStartTime.Sub(flushingWorkStart)
1902 1 : d.mu.compact.flushWriteThroughput.Bytes += int64(bytesFlushed)
1903 1 : d.mu.compact.flushWriteThroughput.WorkDuration += workDuration
1904 1 : d.mu.compact.flushWriteThroughput.IdleDuration += idleDuration
1905 1 : // More flush work may have arrived while we were flushing, so schedule
1906 1 : // another flush if needed.
1907 1 : d.maybeScheduleFlush()
1908 1 : // The flush may have produced too many files in a level, so schedule a
1909 1 : // compaction if needed.
1910 1 : d.maybeScheduleCompaction()
1911 1 : d.mu.compact.cond.Broadcast()
1912 : })
1913 : }
1914 :
1915 : // runIngestFlush is used to generate a flush version edit for sstables which
1916 : // were ingested as flushables. Both DB.mu and the manifest lock must be held
1917 : // while runIngestFlush is called.
1918 1 : func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
1919 1 : if len(c.flushing) != 1 {
1920 0 : panic("pebble: ingestedFlushable must be flushed one at a time.")
1921 : }
1922 :
1923 : // Construct the VersionEdit, levelMetrics etc.
1924 1 : c.metrics = make(map[int]*LevelMetrics, numLevels)
1925 1 : // Finding the target level for ingestion must use the latest version
1926 1 : // after the logLock has been acquired.
1927 1 : c.version = d.mu.versions.currentVersion()
1928 1 :
1929 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1930 1 : iterOpts := IterOptions{logger: d.opts.Logger}
1931 1 : ve := &versionEdit{}
1932 1 : var level int
1933 1 : var err error
1934 1 : var fileToSplit *fileMetadata
1935 1 : var ingestSplitFiles []ingestSplitFile
1936 1 : for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
1937 1 : suggestSplit := d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit() &&
1938 1 : d.FormatMajorVersion() >= FormatVirtualSSTables
1939 1 : level, fileToSplit, err = ingestTargetLevel(
1940 1 : d.newIters, d.tableNewRangeKeyIter, iterOpts, d.opts.Comparer,
1941 1 : c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
1942 1 : suggestSplit,
1943 1 : )
1944 1 : if err != nil {
1945 0 : return nil, err
1946 0 : }
1947 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
1948 1 : if fileToSplit != nil {
1949 0 : ingestSplitFiles = append(ingestSplitFiles, ingestSplitFile{
1950 0 : ingestFile: file.FileMetadata,
1951 0 : splitFile: fileToSplit,
1952 0 : level: level,
1953 0 : })
1954 0 : }
1955 1 : levelMetrics := c.metrics[level]
1956 1 : if levelMetrics == nil {
1957 1 : levelMetrics = &LevelMetrics{}
1958 1 : c.metrics[level] = levelMetrics
1959 1 : }
1960 1 : levelMetrics.BytesIngested += file.Size
1961 1 : levelMetrics.TablesIngested++
1962 : }
1963 :
1964 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
1965 0 : levelMetrics := c.metrics[level]
1966 0 : if levelMetrics == nil {
1967 0 : levelMetrics = &LevelMetrics{}
1968 0 : c.metrics[level] = levelMetrics
1969 0 : }
1970 0 : levelMetrics.NumFiles--
1971 0 : levelMetrics.Size -= int64(m.Size)
1972 0 : for i := range added {
1973 0 : levelMetrics.NumFiles++
1974 0 : levelMetrics.Size += int64(added[i].Meta.Size)
1975 0 : }
1976 : }
1977 :
1978 1 : if len(ingestSplitFiles) > 0 {
1979 0 : ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata)
1980 0 : replacedFiles := make(map[base.FileNum][]newFileEntry)
1981 0 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
1982 0 : return nil, err
1983 0 : }
1984 : }
1985 :
1986 1 : return ve, nil
1987 : }
1988 :
1989 : // flush runs a compaction that copies the immutable memtables from memory to
1990 : // disk.
1991 : //
1992 : // d.mu must be held when calling this, but the mutex may be dropped and
1993 : // re-acquired during the course of this method.
1994 1 : func (d *DB) flush1() (bytesFlushed uint64, err error) {
1995 1 : // NB: The flushable queue can contain flushables of type ingestedFlushable.
1996 1 : // The sstables in ingestedFlushable.files must be placed into the appropriate
1997 1 : // level in the lsm. Let's say the flushable queue contains a prefix of
1998 1 : // regular immutable memtables, then an ingestedFlushable, and then the
1999 1 : // mutable memtable. When the flush of the ingestedFlushable is performed,
2000 1 : // it needs an updated view of the lsm. That is, the prefix of immutable
2001 1 : // memtables must have already been flushed. Similarly, if there are two
2002 1 : // contiguous ingestedFlushables in the queue, then the first flushable must
2003 1 : // be flushed, so that the second flushable can see an updated view of the
2004 1 : // lsm.
2005 1 : //
2006 1 : // Given the above, we restrict flushes to either some prefix of regular
2007 1 : // memtables, or a single flushable of type ingestedFlushable. The DB.flush
2008 1 : // function will call DB.maybeScheduleFlush again, so a new flush to finish
2009 1 : // the remaining flush work should be scheduled right away.
2010 1 : //
2011 1 : // NB: Large batches placed in the flushable queue share the WAL with the
2012 1 : // previous memtable in the queue. We must ensure the property that both the
2013 1 : // large batch and the memtable with which it shares a WAL are flushed
2014 1 : // together. The property ensures that the minimum unflushed log number
2015 1 : // isn't incremented incorrectly. Since a flushableBatch.readyToFlush always
2016 1 : // returns true, and since the large batch will always be placed right after
2017 1 : // the memtable with which it shares a WAL, the property is naturally
2018 1 : // ensured. The large batch will always be placed after the memtable with
2019 1 : // which it shares a WAL because we ensure it in DB.commitWrite by holding
2020 1 : // the commitPipeline.mu and then holding DB.mu. As an extra defensive
2021 1 : // measure, if we try to flush the memtable without also flushing the
2022 1 : // flushable batch in the same flush, since the memtable and flushableBatch
2023 1 : // have the same logNum, the logNum invariant check below will trigger.
2024 1 : var n, inputs int
2025 1 : var inputBytes uint64
2026 1 : var ingest bool
2027 1 : for ; n < len(d.mu.mem.queue)-1; n++ {
2028 1 : if f, ok := d.mu.mem.queue[n].flushable.(*ingestedFlushable); ok {
2029 1 : if n == 0 {
2030 1 : // The first flushable is of type ingestedFlushable. Since these
2031 1 : // must be flushed individually, we perform a flush for just
2032 1 : // this.
2033 1 : if !f.readyForFlush() {
2034 0 : // This check is almost unnecessary, but we guard against it
2035 0 : // just in case this invariant changes in the future.
2036 0 : panic("pebble: ingestedFlushable should always be ready to flush.")
2037 : }
2038 : // By setting n = 1, we ensure that the first flushable(n == 0)
2039 : // is scheduled for a flush. The number of tables added is equal to the
2040 : // number of files in the ingest operation.
2041 1 : n = 1
2042 1 : inputs = len(f.files)
2043 1 : ingest = true
2044 1 : break
2045 1 : } else {
2046 1 : // There was some prefix of flushables which weren't of type
2047 1 : // ingestedFlushable. So, perform a flush for those.
2048 1 : break
2049 : }
2050 : }
2051 1 : if !d.mu.mem.queue[n].readyForFlush() {
2052 1 : break
2053 : }
2054 1 : inputBytes += d.mu.mem.queue[n].inuseBytes()
2055 : }
2056 1 : if n == 0 {
2057 0 : // None of the immutable memtables are ready for flushing.
2058 0 : return 0, nil
2059 0 : }
2060 1 : if !ingest {
2061 1 : // Flushes of memtables add the prefix of n memtables from the flushable
2062 1 : // queue.
2063 1 : inputs = n
2064 1 : }
2065 :
2066 : // Require that every memtable being flushed has a log number less than the
2067 : // new minimum unflushed log number.
2068 1 : minUnflushedLogNum := d.mu.mem.queue[n].logNum
2069 1 : if !d.opts.DisableWAL {
2070 1 : for i := 0; i < n; i++ {
2071 1 : if logNum := d.mu.mem.queue[i].logNum; logNum >= minUnflushedLogNum {
2072 0 : panic(errors.AssertionFailedf("logNum invariant violated: flushing %d items; %d:type=%T,logNum=%d; %d:type=%T,logNum=%d",
2073 0 : n,
2074 0 : i, d.mu.mem.queue[i].flushable, logNum,
2075 0 : n, d.mu.mem.queue[n].flushable, minUnflushedLogNum))
2076 : }
2077 : }
2078 : }
2079 :
2080 1 : c := newFlush(d.opts, d.mu.versions.currentVersion(),
2081 1 : d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
2082 1 : d.addInProgressCompaction(c)
2083 1 :
2084 1 : jobID := d.mu.nextJobID
2085 1 : d.mu.nextJobID++
2086 1 : d.opts.EventListener.FlushBegin(FlushInfo{
2087 1 : JobID: jobID,
2088 1 : Input: inputs,
2089 1 : InputBytes: inputBytes,
2090 1 : Ingest: ingest,
2091 1 : })
2092 1 : startTime := d.timeNow()
2093 1 :
2094 1 : var ve *manifest.VersionEdit
2095 1 : var pendingOutputs []physicalMeta
2096 1 : var stats compactStats
2097 1 : // To determine the target level of the files in the ingestedFlushable, we
2098 1 : // need to acquire the logLock, and not release it for that duration. Since,
2099 1 : // we need to acquire the logLock below to perform the logAndApply step
2100 1 : // anyway, we create the VersionEdit for ingestedFlushable outside of
2101 1 : // runCompaction. For all other flush cases, we construct the VersionEdit
2102 1 : // inside runCompaction.
2103 1 : if c.kind != compactionKindIngestedFlushable {
2104 1 : ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
2105 1 : }
2106 :
2107 : // Acquire logLock. This will be released either on an error, by way of
2108 : // logUnlock, or through a call to logAndApply if there is no error.
2109 1 : d.mu.versions.logLock()
2110 1 :
2111 1 : if c.kind == compactionKindIngestedFlushable {
2112 1 : ve, err = d.runIngestFlush(c)
2113 1 : }
2114 :
2115 1 : info := FlushInfo{
2116 1 : JobID: jobID,
2117 1 : Input: inputs,
2118 1 : InputBytes: inputBytes,
2119 1 : Duration: d.timeNow().Sub(startTime),
2120 1 : Done: true,
2121 1 : Ingest: ingest,
2122 1 : Err: err,
2123 1 : }
2124 1 : if err == nil {
2125 1 : for i := range ve.NewFiles {
2126 1 : e := &ve.NewFiles[i]
2127 1 : info.Output = append(info.Output, e.Meta.TableInfo())
2128 1 : // Ingested tables are not necessarily flushed to L0. Record the level of
2129 1 : // each ingested file explicitly.
2130 1 : if ingest {
2131 1 : info.IngestLevels = append(info.IngestLevels, e.Level)
2132 1 : }
2133 : }
2134 1 : if len(ve.NewFiles) == 0 {
2135 1 : info.Err = errEmptyTable
2136 1 : }
2137 :
2138 : // The flush succeeded or it produced an empty sstable. In either case we
2139 : // want to bump the minimum unflushed log number to the log number of the
2140 : // oldest unflushed memtable.
2141 1 : ve.MinUnflushedLogNum = minUnflushedLogNum
2142 1 : if c.kind != compactionKindIngestedFlushable {
2143 1 : metrics := c.metrics[0]
2144 1 : if d.opts.DisableWAL {
2145 1 : // If the WAL is disabled, every flushable has a zero [logSize],
2146 1 : // resulting in zero bytes in. Instead, use the number of bytes we
2147 1 : // flushed as the BytesIn. This ensures we get a reasonable w-amp
2148 1 : // calculation even when the WAL is disabled.
2149 1 : metrics.BytesIn = metrics.BytesFlushed
2150 1 : } else {
2151 1 : metrics := c.metrics[0]
2152 1 : for i := 0; i < n; i++ {
2153 1 : metrics.BytesIn += d.mu.mem.queue[i].logSize
2154 1 : }
2155 : }
2156 1 : } else if len(ve.DeletedFiles) > 0 {
2157 0 : // c.kind == compactionKindIngestedFlushable && we have deleted files due
2158 0 : // to ingest-time splits.
2159 0 : //
2160 0 : // Iterate through all other compactions, and check if their inputs have
2161 0 : // been replaced due to an ingest-time split. In that case, cancel the
2162 0 : // compaction.
2163 0 : for c2 := range d.mu.compact.inProgress {
2164 0 : for i := range c2.inputs {
2165 0 : iter := c2.inputs[i].files.Iter()
2166 0 : for f := iter.First(); f != nil; f = iter.Next() {
2167 0 : if _, ok := ve.DeletedFiles[deletedFileEntry{FileNum: f.FileNum, Level: c2.inputs[i].level}]; ok {
2168 0 : c2.cancel.Store(true)
2169 0 : break
2170 : }
2171 : }
2172 : }
2173 : }
2174 : }
2175 1 : err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false, /* forceRotation */
2176 1 : func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
2177 1 : if err != nil {
2178 1 : info.Err = err
2179 1 : // TODO(peter): untested.
2180 1 : for _, f := range pendingOutputs {
2181 1 : // Note that the FileBacking for the file metadata might not have
2182 1 : // been set yet. So, we directly use the FileNum. Since these
2183 1 : // files were generated as compaction outputs, these must be
2184 1 : // physical files on disk. This property might not hold once
2185 1 : // https://github.com/cockroachdb/pebble/issues/389 is
2186 1 : // implemented if #389 creates virtual sstables as output files.
2187 1 : d.mu.versions.obsoleteTables = append(
2188 1 : d.mu.versions.obsoleteTables,
2189 1 : fileInfo{f.FileNum.DiskFileNum(), f.Size},
2190 1 : )
2191 1 : }
2192 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
2193 : }
2194 1 : } else {
2195 1 : // We won't be performing the logAndApply step because of the error,
2196 1 : // so logUnlock.
2197 1 : d.mu.versions.logUnlock()
2198 1 : }
2199 :
2200 1 : bytesFlushed = c.bytesIterated
2201 1 :
2202 1 : // If err != nil, then the flush will be retried, and we will recalculate
2203 1 : // these metrics.
2204 1 : if err == nil {
2205 1 : d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
2206 1 : d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
2207 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
2208 1 : d.maybeUpdateDeleteCompactionHints(c)
2209 1 : }
2210 :
2211 1 : d.clearCompactingState(c, err != nil)
2212 1 : delete(d.mu.compact.inProgress, c)
2213 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
2214 1 :
2215 1 : var flushed flushableList
2216 1 : if err == nil {
2217 1 : flushed = d.mu.mem.queue[:n]
2218 1 : d.mu.mem.queue = d.mu.mem.queue[n:]
2219 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2220 1 : d.updateTableStatsLocked(ve.NewFiles)
2221 1 : if ingest {
2222 1 : d.mu.versions.metrics.Flush.AsIngestCount++
2223 1 : for _, l := range c.metrics {
2224 1 : d.mu.versions.metrics.Flush.AsIngestBytes += l.BytesIngested
2225 1 : d.mu.versions.metrics.Flush.AsIngestTableCount += l.TablesIngested
2226 1 : }
2227 : }
2228 :
2229 : // Update if any eventually file-only snapshots have now transitioned to
2230 : // being file-only.
2231 1 : earliestUnflushedSeqNum := d.getEarliestUnflushedSeqNumLocked()
2232 1 : currentVersion := d.mu.versions.currentVersion()
2233 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; {
2234 1 : if s.efos == nil {
2235 1 : s = s.next
2236 1 : continue
2237 : }
2238 1 : if base.Visible(earliestUnflushedSeqNum, s.efos.seqNum, InternalKeySeqNumMax) {
2239 0 : s = s.next
2240 0 : continue
2241 : }
2242 1 : if s.efos.excised.Load() {
2243 1 : // If a concurrent excise has happened that overlaps with one of the key
2244 1 : // ranges this snapshot is interested in, this EFOS cannot transition to
2245 1 : // a file-only snapshot as keys in that range could now be deleted. Move
2246 1 : // onto the next snapshot.
2247 1 : s = s.next
2248 1 : continue
2249 : }
2250 1 : currentVersion.Ref()
2251 1 :
2252 1 : // NB: s.efos.transitionToFileOnlySnapshot could close s, in which
2253 1 : // case s.next would be nil. Save it before calling it.
2254 1 : next := s.next
2255 1 : _ = s.efos.transitionToFileOnlySnapshot(currentVersion)
2256 1 : s = next
2257 : }
2258 : }
2259 : // Signal FlushEnd after installing the new readState. This helps for unit
2260 : // tests that use the callback to trigger a read using an iterator with
2261 : // IterOptions.OnlyReadGuaranteedDurable.
2262 1 : info.TotalDuration = d.timeNow().Sub(startTime)
2263 1 : d.opts.EventListener.FlushEnd(info)
2264 1 :
2265 1 : // The order of these operations matters here for ease of testing.
2266 1 : // Removing the reader reference first allows tests to be guaranteed that
2267 1 : // the memtable reservation has been released by the time a synchronous
2268 1 : // flush returns. readerUnrefLocked may also produce obsolete files so the
2269 1 : // call to deleteObsoleteFiles must happen after it.
2270 1 : for i := range flushed {
2271 1 : flushed[i].readerUnrefLocked(true)
2272 1 : }
2273 :
2274 1 : d.deleteObsoleteFiles(jobID)
2275 1 :
2276 1 : // Mark all the memtables we flushed as flushed.
2277 1 : for i := range flushed {
2278 1 : close(flushed[i].flushed)
2279 1 : }
2280 :
2281 1 : return bytesFlushed, err
2282 : }
2283 :
2284 : // maybeScheduleCompactionAsync should be used when
2285 : // we want to possibly schedule a compaction, but don't
2286 : // want to eat the cost of running maybeScheduleCompaction.
2287 : // This method should be launched in a separate goroutine.
2288 : // d.mu must not be held when this is called.
2289 0 : func (d *DB) maybeScheduleCompactionAsync() {
2290 0 : defer d.compactionSchedulers.Done()
2291 0 :
2292 0 : d.mu.Lock()
2293 0 : d.maybeScheduleCompaction()
2294 0 : d.mu.Unlock()
2295 0 : }
2296 :
2297 : // maybeScheduleCompaction schedules a compaction if necessary.
2298 : //
2299 : // d.mu must be held when calling this.
2300 1 : func (d *DB) maybeScheduleCompaction() {
2301 1 : d.maybeScheduleCompactionPicker(pickAuto)
2302 1 : }
2303 :
2304 1 : func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction {
2305 1 : return picker.pickAuto(env)
2306 1 : }
2307 :
2308 1 : func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction {
2309 1 : return picker.pickElisionOnlyCompaction(env)
2310 1 : }
2311 :
2312 : // maybeScheduleDownloadCompaction schedules a download compaction.
2313 : //
2314 : // Requires d.mu to be held.
2315 1 : func (d *DB) maybeScheduleDownloadCompaction(env compactionEnv, maxConcurrentCompactions int) {
2316 1 : for len(d.mu.compact.downloads) > 0 && d.mu.compact.compactingCount < maxConcurrentCompactions {
2317 1 : v := d.mu.versions.currentVersion()
2318 1 : download := d.mu.compact.downloads[0]
2319 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2320 1 : var externalFile *fileMetadata
2321 1 : var err error
2322 1 : var level int
2323 1 : for i := range v.Levels {
2324 1 : overlaps := v.Overlaps(i, d.cmp, download.start, download.end, true /* exclusiveEnd */)
2325 1 : iter := overlaps.Iter()
2326 1 : provider := d.objProvider
2327 1 : for f := iter.First(); f != nil; f = iter.Next() {
2328 1 : var objMeta objstorage.ObjectMetadata
2329 1 : objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
2330 1 : if err != nil {
2331 0 : break
2332 : }
2333 1 : if objMeta.IsExternal() {
2334 1 : if f.IsCompacting() {
2335 0 : continue
2336 : }
2337 1 : externalFile = f
2338 1 : level = i
2339 1 : break
2340 : }
2341 : }
2342 1 : if externalFile != nil || err != nil {
2343 1 : break
2344 : }
2345 : }
2346 1 : if err != nil {
2347 0 : d.mu.compact.downloads = d.mu.compact.downloads[1:]
2348 0 : download.doneChans[download.compactionsStarted] <- err
2349 0 : continue
2350 : }
2351 1 : if externalFile == nil {
2352 1 : // The entirety of this span is downloaded, or is being downloaded right
2353 1 : // now. No need to schedule additional downloads for this span.
2354 1 : d.mu.compact.downloads = d.mu.compact.downloads[1:]
2355 1 : continue
2356 : }
2357 1 : pc := pickDownloadCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), download, level, externalFile)
2358 1 : if pc != nil {
2359 1 : doneCh := download.doneChans[download.compactionsStarted]
2360 1 : download.compactionsStarted++
2361 1 : // Create another doneChan for the next compaction.
2362 1 : download.doneChans = append(download.doneChans, make(chan error, 1))
2363 1 :
2364 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
2365 1 : d.mu.compact.compactingCount++
2366 1 : d.addInProgressCompaction(c)
2367 1 : go d.compact(c, doneCh)
2368 1 : }
2369 : }
2370 : }
2371 :
2372 : // maybeScheduleCompactionPicker schedules a compaction if necessary,
2373 : // calling `pickFunc` to pick automatic compactions.
2374 : //
2375 : // d.mu must be held when calling this.
2376 : func (d *DB) maybeScheduleCompactionPicker(
2377 : pickFunc func(compactionPicker, compactionEnv) *pickedCompaction,
2378 1 : ) {
2379 1 : if d.closed.Load() != nil || d.opts.ReadOnly {
2380 1 : return
2381 1 : }
2382 1 : maxConcurrentCompactions := d.opts.MaxConcurrentCompactions()
2383 1 : if d.mu.compact.compactingCount >= maxConcurrentCompactions {
2384 1 : if len(d.mu.compact.manual) > 0 {
2385 1 : // Inability to run head blocks later manual compactions.
2386 1 : d.mu.compact.manual[0].retries++
2387 1 : }
2388 1 : return
2389 : }
2390 :
2391 : // Compaction picking needs a coherent view of a Version. In particular, we
2392 : // need to exlude concurrent ingestions from making a decision on which level
2393 : // to ingest into that conflicts with our compaction
2394 : // decision. versionSet.logLock provides the necessary mutual exclusion.
2395 1 : d.mu.versions.logLock()
2396 1 : defer d.mu.versions.logUnlock()
2397 1 :
2398 1 : // Check for the closed flag again, in case the DB was closed while we were
2399 1 : // waiting for logLock().
2400 1 : if d.closed.Load() != nil {
2401 1 : return
2402 1 : }
2403 :
2404 1 : env := compactionEnv{
2405 1 : diskAvailBytes: d.diskAvailBytes.Load(),
2406 1 : earliestSnapshotSeqNum: d.mu.snapshots.earliest(),
2407 1 : earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(),
2408 1 : }
2409 1 :
2410 1 : // Check for delete-only compactions first, because they're expected to be
2411 1 : // cheap and reduce future compaction work.
2412 1 : if !d.opts.private.disableDeleteOnlyCompactions &&
2413 1 : len(d.mu.compact.deletionHints) > 0 &&
2414 1 : !d.opts.DisableAutomaticCompactions {
2415 1 : v := d.mu.versions.currentVersion()
2416 1 : snapshots := d.mu.snapshots.toSlice()
2417 1 : inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
2418 1 : d.mu.compact.deletionHints = unresolvedHints
2419 1 :
2420 1 : if len(inputs) > 0 {
2421 1 : c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
2422 1 : d.mu.compact.compactingCount++
2423 1 : d.addInProgressCompaction(c)
2424 1 : go d.compact(c, nil)
2425 1 : }
2426 : }
2427 :
2428 1 : for len(d.mu.compact.manual) > 0 && d.mu.compact.compactingCount < maxConcurrentCompactions {
2429 1 : v := d.mu.versions.currentVersion()
2430 1 : manual := d.mu.compact.manual[0]
2431 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2432 1 : pc, retryLater := pickManualCompaction(v, d.opts, env, d.mu.versions.picker.getBaseLevel(), manual)
2433 1 : if pc != nil {
2434 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
2435 1 : d.mu.compact.manual = d.mu.compact.manual[1:]
2436 1 : d.mu.compact.compactingCount++
2437 1 : d.addInProgressCompaction(c)
2438 1 : go d.compact(c, manual.done)
2439 1 : } else if !retryLater {
2440 1 : // Noop
2441 1 : d.mu.compact.manual = d.mu.compact.manual[1:]
2442 1 : manual.done <- nil
2443 1 : } else {
2444 1 : // Inability to run head blocks later manual compactions.
2445 1 : manual.retries++
2446 1 : break
2447 : }
2448 : }
2449 :
2450 1 : for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxConcurrentCompactions {
2451 1 : env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
2452 1 : env.readCompactionEnv = readCompactionEnv{
2453 1 : readCompactions: &d.mu.compact.readCompactions,
2454 1 : flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
2455 1 : rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
2456 1 : }
2457 1 : pc := pickFunc(d.mu.versions.picker, env)
2458 1 : if pc == nil {
2459 1 : break
2460 : }
2461 1 : c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider())
2462 1 : d.mu.compact.compactingCount++
2463 1 : d.addInProgressCompaction(c)
2464 1 : go d.compact(c, nil)
2465 : }
2466 :
2467 1 : d.maybeScheduleDownloadCompaction(env, maxConcurrentCompactions)
2468 : }
2469 :
2470 : // deleteCompactionHintType indicates whether the deleteCompactionHint was
2471 : // generated from a span containing a range del (point key only), a range key
2472 : // delete (range key only), or both a point and range key.
2473 : type deleteCompactionHintType uint8
2474 :
2475 : const (
2476 : // NOTE: While these are primarily used as enumeration types, they are also
2477 : // used for some bitwise operations. Care should be taken when updating.
2478 : deleteCompactionHintTypeUnknown deleteCompactionHintType = iota
2479 : deleteCompactionHintTypePointKeyOnly
2480 : deleteCompactionHintTypeRangeKeyOnly
2481 : deleteCompactionHintTypePointAndRangeKey
2482 : )
2483 :
2484 : // String implements fmt.Stringer.
2485 1 : func (h deleteCompactionHintType) String() string {
2486 1 : switch h {
2487 0 : case deleteCompactionHintTypeUnknown:
2488 0 : return "unknown"
2489 1 : case deleteCompactionHintTypePointKeyOnly:
2490 1 : return "point-key-only"
2491 1 : case deleteCompactionHintTypeRangeKeyOnly:
2492 1 : return "range-key-only"
2493 1 : case deleteCompactionHintTypePointAndRangeKey:
2494 1 : return "point-and-range-key"
2495 0 : default:
2496 0 : panic(fmt.Sprintf("unknown hint type: %d", h))
2497 : }
2498 : }
2499 :
2500 : // compactionHintFromKeys returns a deleteCompactionHintType given a slice of
2501 : // keyspan.Keys.
2502 1 : func compactionHintFromKeys(keys []keyspan.Key) deleteCompactionHintType {
2503 1 : var hintType deleteCompactionHintType
2504 1 : for _, k := range keys {
2505 1 : switch k.Kind() {
2506 1 : case base.InternalKeyKindRangeDelete:
2507 1 : hintType |= deleteCompactionHintTypePointKeyOnly
2508 1 : case base.InternalKeyKindRangeKeyDelete:
2509 1 : hintType |= deleteCompactionHintTypeRangeKeyOnly
2510 0 : default:
2511 0 : panic(fmt.Sprintf("unsupported key kind: %s", k.Kind()))
2512 : }
2513 : }
2514 1 : return hintType
2515 : }
2516 :
2517 : // A deleteCompactionHint records a user key and sequence number span that has been
2518 : // deleted by a range tombstone. A hint is recorded if at least one sstable
2519 : // falls completely within both the user key and sequence number spans.
2520 : // Once the tombstones and the observed completely-contained sstables fall
2521 : // into the same snapshot stripe, a delete-only compaction may delete any
2522 : // sstables within the range.
2523 : type deleteCompactionHint struct {
2524 : // The type of key span that generated this hint (point key, range key, or
2525 : // both).
2526 : hintType deleteCompactionHintType
2527 : // start and end are user keys specifying a key range [start, end) of
2528 : // deleted keys.
2529 : start []byte
2530 : end []byte
2531 : // The level of the file containing the range tombstone(s) when the hint
2532 : // was created. Only lower levels need to be searched for files that may
2533 : // be deleted.
2534 : tombstoneLevel int
2535 : // The file containing the range tombstone(s) that created the hint.
2536 : tombstoneFile *fileMetadata
2537 : // The smallest and largest sequence numbers of the abutting tombstones
2538 : // merged to form this hint. All of a tables' keys must be less than the
2539 : // tombstone smallest sequence number to be deleted. All of a tables'
2540 : // sequence numbers must fall into the same snapshot stripe as the
2541 : // tombstone largest sequence number to be deleted.
2542 : tombstoneLargestSeqNum uint64
2543 : tombstoneSmallestSeqNum uint64
2544 : // The smallest sequence number of a sstable that was found to be covered
2545 : // by this hint. The hint cannot be resolved until this sequence number is
2546 : // in the same snapshot stripe as the largest tombstone sequence number.
2547 : // This is set when a hint is created, so the LSM may look different and
2548 : // notably no longer contain the sstable that contained the key at this
2549 : // sequence number.
2550 : fileSmallestSeqNum uint64
2551 : }
2552 :
2553 1 : func (h deleteCompactionHint) String() string {
2554 1 : return fmt.Sprintf(
2555 1 : "L%d.%s %s-%s seqnums(tombstone=%d-%d, file-smallest=%d, type=%s)",
2556 1 : h.tombstoneLevel, h.tombstoneFile.FileNum, h.start, h.end,
2557 1 : h.tombstoneSmallestSeqNum, h.tombstoneLargestSeqNum, h.fileSmallestSeqNum,
2558 1 : h.hintType,
2559 1 : )
2560 1 : }
2561 :
2562 1 : func (h *deleteCompactionHint) canDelete(cmp Compare, m *fileMetadata, snapshots []uint64) bool {
2563 1 : // The file can only be deleted if all of its keys are older than the
2564 1 : // earliest tombstone aggregated into the hint.
2565 1 : if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
2566 1 : return false
2567 1 : }
2568 :
2569 : // The file's oldest key must be in the same snapshot stripe as the
2570 : // newest tombstone. NB: We already checked the hint's sequence numbers,
2571 : // but this file's oldest sequence number might be lower than the hint's
2572 : // smallest sequence number despite the file falling within the key range
2573 : // if this file was constructed after the hint by a compaction.
2574 1 : ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
2575 1 : fi, _ := snapshotIndex(m.SmallestSeqNum, snapshots)
2576 1 : if ti != fi {
2577 0 : return false
2578 0 : }
2579 :
2580 1 : switch h.hintType {
2581 1 : case deleteCompactionHintTypePointKeyOnly:
2582 1 : // A hint generated by a range del span cannot delete tables that contain
2583 1 : // range keys.
2584 1 : if m.HasRangeKeys {
2585 0 : return false
2586 0 : }
2587 1 : case deleteCompactionHintTypeRangeKeyOnly:
2588 1 : // A hint generated by a range key del span cannot delete tables that
2589 1 : // contain point keys.
2590 1 : if m.HasPointKeys {
2591 1 : return false
2592 1 : }
2593 1 : case deleteCompactionHintTypePointAndRangeKey:
2594 : // A hint from a span that contains both range dels *and* range keys can
2595 : // only be deleted if both bounds fall within the hint. The next check takes
2596 : // care of this.
2597 0 : default:
2598 0 : panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
2599 : }
2600 :
2601 : // The file's keys must be completely contained within the hint range.
2602 1 : return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
2603 : }
2604 :
2605 1 : func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
2606 1 : // Compactions that zero sequence numbers can interfere with compaction
2607 1 : // deletion hints. Deletion hints apply to tables containing keys older
2608 1 : // than a threshold. If a key more recent than the threshold is zeroed in
2609 1 : // a compaction, a delete-only compaction may mistake it as meeting the
2610 1 : // threshold and drop a table containing live data.
2611 1 : //
2612 1 : // To avoid this scenario, compactions that zero sequence numbers remove
2613 1 : // any conflicting deletion hints. A deletion hint is conflicting if both
2614 1 : // of the following conditions apply:
2615 1 : // * its key space overlaps with the compaction
2616 1 : // * at least one of its inputs contains a key as recent as one of the
2617 1 : // hint's tombstones.
2618 1 : //
2619 1 : if !c.allowedZeroSeqNum {
2620 1 : return
2621 1 : }
2622 :
2623 1 : updatedHints := d.mu.compact.deletionHints[:0]
2624 1 : for _, h := range d.mu.compact.deletionHints {
2625 1 : // If the compaction's key space is disjoint from the hint's key
2626 1 : // space, the zeroing of sequence numbers won't affect the hint. Keep
2627 1 : // the hint.
2628 1 : keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
2629 1 : if keysDisjoint {
2630 0 : updatedHints = append(updatedHints, h)
2631 0 : continue
2632 : }
2633 :
2634 : // All of the compaction's inputs must be older than the hint's
2635 : // tombstones.
2636 1 : inputsOlder := true
2637 1 : for _, in := range c.inputs {
2638 1 : iter := in.files.Iter()
2639 1 : for f := iter.First(); f != nil; f = iter.Next() {
2640 1 : inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
2641 1 : }
2642 : }
2643 1 : if inputsOlder {
2644 0 : updatedHints = append(updatedHints, h)
2645 0 : continue
2646 : }
2647 :
2648 : // Drop h, because the compaction c may have zeroed sequence numbers
2649 : // of keys more recent than some of h's tombstones.
2650 : }
2651 1 : d.mu.compact.deletionHints = updatedHints
2652 : }
2653 :
2654 : func checkDeleteCompactionHints(
2655 : cmp Compare, v *version, hints []deleteCompactionHint, snapshots []uint64,
2656 1 : ) ([]compactionLevel, []deleteCompactionHint) {
2657 1 : var files map[*fileMetadata]bool
2658 1 : var byLevel [numLevels][]*fileMetadata
2659 1 :
2660 1 : unresolvedHints := hints[:0]
2661 1 : for _, h := range hints {
2662 1 : // Check each compaction hint to see if it's resolvable. Resolvable
2663 1 : // hints are removed and trigger a delete-only compaction if any files
2664 1 : // in the current LSM still meet their criteria. Unresolvable hints
2665 1 : // are saved and don't trigger a delete-only compaction.
2666 1 : //
2667 1 : // When a compaction hint is created, the sequence numbers of the
2668 1 : // range tombstones and the covered file with the oldest key are
2669 1 : // recorded. The largest tombstone sequence number and the smallest
2670 1 : // file sequence number must be in the same snapshot stripe for the
2671 1 : // hint to be resolved. The below graphic models a compaction hint
2672 1 : // covering the keyspace [b, r). The hint completely contains two
2673 1 : // files, 000002 and 000003. The file 000003 contains the lowest
2674 1 : // covered sequence number at #90. The tombstone b.RANGEDEL.230:h has
2675 1 : // the highest tombstone sequence number incorporated into the hint.
2676 1 : // The hint may be resolved only once the snapshots at #100, #180 and
2677 1 : // #210 are all closed. File 000001 is not included within the hint
2678 1 : // because it extends beyond the range tombstones in user key space.
2679 1 : //
2680 1 : // 250
2681 1 : //
2682 1 : // |-b...230:h-|
2683 1 : // _____________________________________________________ snapshot #210
2684 1 : // 200 |--h.RANGEDEL.200:r--|
2685 1 : //
2686 1 : // _____________________________________________________ snapshot #180
2687 1 : //
2688 1 : // 150 +--------+
2689 1 : // +---------+ | 000003 |
2690 1 : // | 000002 | | |
2691 1 : // +_________+ | |
2692 1 : // 100_____________________|________|___________________ snapshot #100
2693 1 : // +--------+
2694 1 : // _____________________________________________________ snapshot #70
2695 1 : // +---------------+
2696 1 : // 50 | 000001 |
2697 1 : // | |
2698 1 : // +---------------+
2699 1 : // ______________________________________________________________
2700 1 : // a b c d e f g h i j k l m n o p q r s t u v w x y z
2701 1 :
2702 1 : ti, _ := snapshotIndex(h.tombstoneLargestSeqNum, snapshots)
2703 1 : fi, _ := snapshotIndex(h.fileSmallestSeqNum, snapshots)
2704 1 : if ti != fi {
2705 1 : // Cannot resolve yet.
2706 1 : unresolvedHints = append(unresolvedHints, h)
2707 1 : continue
2708 : }
2709 :
2710 : // The hint h will be resolved and dropped, regardless of whether
2711 : // there are any tables that can be deleted.
2712 1 : for l := h.tombstoneLevel + 1; l < numLevels; l++ {
2713 1 : overlaps := v.Overlaps(l, cmp, h.start, h.end, true /* exclusiveEnd */)
2714 1 : iter := overlaps.Iter()
2715 1 : for m := iter.First(); m != nil; m = iter.Next() {
2716 1 : if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
2717 1 : continue
2718 : }
2719 1 : if files == nil {
2720 1 : // Construct files lazily, assuming most calls will not
2721 1 : // produce delete-only compactions.
2722 1 : files = make(map[*fileMetadata]bool)
2723 1 : }
2724 1 : files[m] = true
2725 1 : byLevel[l] = append(byLevel[l], m)
2726 : }
2727 : }
2728 : }
2729 :
2730 1 : var compactLevels []compactionLevel
2731 1 : for l, files := range byLevel {
2732 1 : if len(files) == 0 {
2733 1 : continue
2734 : }
2735 1 : compactLevels = append(compactLevels, compactionLevel{
2736 1 : level: l,
2737 1 : files: manifest.NewLevelSliceKeySorted(cmp, files),
2738 1 : })
2739 : }
2740 1 : return compactLevels, unresolvedHints
2741 : }
2742 :
2743 : // compact runs one compaction and maybe schedules another call to compact.
2744 1 : func (d *DB) compact(c *compaction, errChannel chan error) {
2745 1 : pprof.Do(context.Background(), compactLabels, func(context.Context) {
2746 1 : d.mu.Lock()
2747 1 : defer d.mu.Unlock()
2748 1 : if err := d.compact1(c, errChannel); err != nil {
2749 1 : // TODO(peter): count consecutive compaction errors and backoff.
2750 1 : d.opts.EventListener.BackgroundError(err)
2751 1 : }
2752 1 : d.mu.compact.compactingCount--
2753 1 : delete(d.mu.compact.inProgress, c)
2754 1 : // Add this compaction's duration to the cumulative duration. NB: This
2755 1 : // must be atomic with the above removal of c from
2756 1 : // d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
2757 1 : // miss or double count a completing compaction's duration.
2758 1 : d.mu.compact.duration += d.timeNow().Sub(c.beganAt)
2759 1 :
2760 1 : // The previous compaction may have produced too many files in a
2761 1 : // level, so reschedule another compaction if needed.
2762 1 : d.maybeScheduleCompaction()
2763 1 : d.mu.compact.cond.Broadcast()
2764 : })
2765 : }
2766 :
2767 : // compact1 runs one compaction.
2768 : //
2769 : // d.mu must be held when calling this, but the mutex may be dropped and
2770 : // re-acquired during the course of this method.
2771 1 : func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
2772 1 : if errChannel != nil {
2773 1 : defer func() {
2774 1 : errChannel <- err
2775 1 : }()
2776 : }
2777 :
2778 1 : jobID := d.mu.nextJobID
2779 1 : d.mu.nextJobID++
2780 1 : info := c.makeInfo(jobID)
2781 1 : d.opts.EventListener.CompactionBegin(info)
2782 1 : startTime := d.timeNow()
2783 1 :
2784 1 : ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)
2785 1 :
2786 1 : info.Duration = d.timeNow().Sub(startTime)
2787 1 : if err == nil {
2788 1 : err = func() error {
2789 1 : var err error
2790 1 : d.mu.versions.logLock()
2791 1 : // Check if this compaction had a conflicting operation (eg. a d.excise())
2792 1 : // that necessitates it restarting from scratch. Note that since we hold
2793 1 : // the manifest lock, we don't expect this bool to change its value
2794 1 : // as only the holder of the manifest lock will ever write to it.
2795 1 : if c.cancel.Load() {
2796 1 : err = firstError(err, ErrCancelledCompaction)
2797 1 : }
2798 1 : if err != nil {
2799 1 : // logAndApply calls logUnlock. If we didn't call it, we need to call
2800 1 : // logUnlock ourselves.
2801 1 : d.mu.versions.logUnlock()
2802 1 : return err
2803 1 : }
2804 1 : return d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
2805 1 : return d.getInProgressCompactionInfoLocked(c)
2806 1 : })
2807 : }()
2808 1 : if err != nil {
2809 1 : // TODO(peter): untested.
2810 1 : for _, f := range pendingOutputs {
2811 1 : // Note that the FileBacking for the file metadata might not have
2812 1 : // been set yet. So, we directly use the FileNum. Since these
2813 1 : // files were generated as compaction outputs, these must be
2814 1 : // physical files on disk. This property might not hold once
2815 1 : // https://github.com/cockroachdb/pebble/issues/389 is
2816 1 : // implemented if #389 creates virtual sstables as output files.
2817 1 : d.mu.versions.obsoleteTables = append(
2818 1 : d.mu.versions.obsoleteTables,
2819 1 : fileInfo{f.FileNum.DiskFileNum(), f.Size},
2820 1 : )
2821 1 : }
2822 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
2823 : }
2824 : }
2825 :
2826 1 : info.Done = true
2827 1 : info.Err = err
2828 1 : if err == nil {
2829 1 : for i := range ve.NewFiles {
2830 1 : e := &ve.NewFiles[i]
2831 1 : info.Output.Tables = append(info.Output.Tables, e.Meta.TableInfo())
2832 1 : }
2833 1 : d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
2834 1 : d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
2835 1 : d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
2836 1 : d.maybeUpdateDeleteCompactionHints(c)
2837 : }
2838 :
2839 : // NB: clearing compacting state must occur before updating the read state;
2840 : // L0Sublevels initialization depends on it.
2841 1 : d.clearCompactingState(c, err != nil)
2842 1 : d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.pickerMetrics)
2843 1 : d.mu.versions.incrementCompactionBytes(-c.bytesWritten)
2844 1 :
2845 1 : info.TotalDuration = d.timeNow().Sub(c.beganAt)
2846 1 : d.opts.EventListener.CompactionEnd(info)
2847 1 :
2848 1 : // Update the read state before deleting obsolete files because the
2849 1 : // read-state update will cause the previous version to be unref'd and if
2850 1 : // there are no references obsolete tables will be added to the obsolete
2851 1 : // table list.
2852 1 : if err == nil {
2853 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2854 1 : d.updateTableStatsLocked(ve.NewFiles)
2855 1 : }
2856 1 : d.deleteObsoleteFiles(jobID)
2857 1 :
2858 1 : return err
2859 : }
2860 :
2861 : type compactStats struct {
2862 : cumulativePinnedKeys uint64
2863 : cumulativePinnedSize uint64
2864 : countMissizedDels uint64
2865 : }
2866 :
2867 : // runCopyCompaction runs a copy compaction where a new FileNum is created that
2868 : // is a byte-for-byte copy of the input file. This is used in lieu of a move
2869 : // compaction when a file is being moved across the local/remote storage
2870 : // boundary.
2871 : //
2872 : // d.mu must be held when calling this method.
2873 : func (d *DB) runCopyCompaction(
2874 : jobID int,
2875 : c *compaction,
2876 : meta *fileMetadata,
2877 : objMeta objstorage.ObjectMetadata,
2878 : versionEdit *versionEdit,
2879 1 : ) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) {
2880 1 : ve = versionEdit
2881 1 : if objMeta.IsRemote() || !remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level) {
2882 0 : panic("pebble: scheduled a copy compaction that is not actually moving files to shared storage")
2883 : }
2884 : // Note that based on logic in the compaction picker, we're guaranteed
2885 : // meta.Virtual is false.
2886 1 : if meta.Virtual {
2887 0 : panic(errors.AssertionFailedf("cannot do a copy compaction of a virtual sstable across local/remote storage"))
2888 : }
2889 : // We are in the relatively more complex case where we need to copy this
2890 : // file to remote/shared storage. Drop the db mutex while we do the
2891 : // copy.
2892 : //
2893 : // To ease up cleanup of the local file and tracking of refs, we create
2894 : // a new FileNum. This has the potential of making the block cache less
2895 : // effective, however.
2896 1 : metaCopy := new(fileMetadata)
2897 1 : *metaCopy = fileMetadata{
2898 1 : Size: meta.Size,
2899 1 : CreationTime: meta.CreationTime,
2900 1 : SmallestSeqNum: meta.SmallestSeqNum,
2901 1 : LargestSeqNum: meta.LargestSeqNum,
2902 1 : Stats: meta.Stats,
2903 1 : Virtual: meta.Virtual,
2904 1 : }
2905 1 : if meta.HasPointKeys {
2906 1 : metaCopy.ExtendPointKeyBounds(c.cmp, meta.SmallestPointKey, meta.LargestPointKey)
2907 1 : }
2908 1 : if meta.HasRangeKeys {
2909 1 : metaCopy.ExtendRangeKeyBounds(c.cmp, meta.SmallestRangeKey, meta.LargestRangeKey)
2910 1 : }
2911 1 : metaCopy.FileNum = d.mu.versions.getNextFileNum()
2912 1 : metaCopy.InitPhysicalBacking()
2913 1 : c.metrics = map[int]*LevelMetrics{
2914 1 : c.outputLevel.level: {
2915 1 : BytesIn: meta.Size,
2916 1 : BytesCompacted: meta.Size,
2917 1 : TablesCompacted: 1,
2918 1 : },
2919 1 : }
2920 1 : pendingOutputs = append(pendingOutputs, metaCopy.PhysicalMeta())
2921 1 : // Before dropping the db mutex, grab a ref to the current version. This
2922 1 : // prevents any concurrent excises from deleting files that this compaction
2923 1 : // needs to read/maintain a reference to.
2924 1 : vers := d.mu.versions.currentVersion()
2925 1 : vers.Ref()
2926 1 : defer vers.UnrefLocked()
2927 1 :
2928 1 : d.mu.Unlock()
2929 1 : defer d.mu.Lock()
2930 1 : _, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
2931 1 : d.objProvider.Path(objMeta), fileTypeTable, metaCopy.FileBacking.DiskFileNum,
2932 1 : objstorage.CreateOptions{PreferSharedStorage: true})
2933 1 : if err != nil {
2934 0 : return ve, pendingOutputs, err
2935 0 : }
2936 1 : ve.NewFiles[0].Meta = metaCopy
2937 1 :
2938 1 : if err := d.objProvider.Sync(); err != nil {
2939 0 : return nil, pendingOutputs, err
2940 0 : }
2941 1 : return ve, pendingOutputs, nil
2942 : }
2943 :
2944 : // runCompactions runs a compaction that produces new on-disk tables from
2945 : // memtables or old on-disk tables.
2946 : //
2947 : // d.mu must be held when calling this, but the mutex may be dropped and
2948 : // re-acquired during the course of this method.
2949 : func (d *DB) runCompaction(
2950 : jobID int, c *compaction,
2951 1 : ) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
2952 1 : // As a sanity check, confirm that the smallest / largest keys for new and
2953 1 : // deleted files in the new versionEdit pass a validation function before
2954 1 : // returning the edit.
2955 1 : defer func() {
2956 1 : // If we're handling a panic, don't expect the version edit to validate.
2957 1 : if r := recover(); r != nil {
2958 0 : panic(r)
2959 1 : } else if ve != nil {
2960 1 : err := validateVersionEdit(ve, d.opts.Experimental.KeyValidationFunc, d.opts.Comparer.FormatKey)
2961 1 : if err != nil {
2962 0 : d.opts.Logger.Fatalf("pebble: version edit validation failed: %s", err)
2963 0 : }
2964 : }
2965 : }()
2966 :
2967 : // Check for a delete-only compaction. This can occur when wide range
2968 : // tombstones completely contain sstables.
2969 1 : if c.kind == compactionKindDeleteOnly {
2970 1 : c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
2971 1 : ve := &versionEdit{
2972 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
2973 1 : }
2974 1 : for _, cl := range c.inputs {
2975 1 : levelMetrics := &LevelMetrics{}
2976 1 : iter := cl.files.Iter()
2977 1 : for f := iter.First(); f != nil; f = iter.Next() {
2978 1 : ve.DeletedFiles[deletedFileEntry{
2979 1 : Level: cl.level,
2980 1 : FileNum: f.FileNum,
2981 1 : }] = f
2982 1 : }
2983 1 : c.metrics[cl.level] = levelMetrics
2984 : }
2985 1 : return ve, nil, stats, nil
2986 : }
2987 :
2988 1 : if c.kind == compactionKindIngestedFlushable {
2989 0 : panic("pebble: runCompaction cannot handle compactionKindIngestedFlushable.")
2990 : }
2991 :
2992 : // Check for a move or copy of one table from one level to the next. We avoid
2993 : // such a move if there is lots of overlapping grandparent data. Otherwise,
2994 : // the move could create a parent file that will require a very expensive
2995 : // merge later on.
2996 1 : if c.kind == compactionKindMove || c.kind == compactionKindCopy {
2997 1 : iter := c.startLevel.files.Iter()
2998 1 : meta := iter.First()
2999 1 : if invariants.Enabled {
3000 1 : if iter.Next() != nil {
3001 0 : panic("got more than one file for a move or copy compaction")
3002 : }
3003 : }
3004 1 : if c.cancel.Load() {
3005 0 : return ve, nil, stats, ErrCancelledCompaction
3006 0 : }
3007 1 : objMeta, err := d.objProvider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
3008 1 : if err != nil {
3009 0 : return ve, pendingOutputs, stats, err
3010 0 : }
3011 1 : c.metrics = map[int]*LevelMetrics{
3012 1 : c.outputLevel.level: {
3013 1 : BytesMoved: meta.Size,
3014 1 : TablesMoved: 1,
3015 1 : },
3016 1 : }
3017 1 : ve := &versionEdit{
3018 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{
3019 1 : {Level: c.startLevel.level, FileNum: meta.FileNum}: meta,
3020 1 : },
3021 1 : NewFiles: []newFileEntry{
3022 1 : {Level: c.outputLevel.level, Meta: meta},
3023 1 : },
3024 1 : }
3025 1 : if c.kind == compactionKindCopy {
3026 1 : ve, pendingOutputs, retErr = d.runCopyCompaction(jobID, c, meta, objMeta, ve)
3027 1 : if retErr != nil {
3028 0 : return ve, pendingOutputs, stats, retErr
3029 0 : }
3030 : }
3031 1 : return ve, nil, stats, nil
3032 : }
3033 :
3034 1 : defer func() {
3035 1 : if retErr != nil {
3036 1 : pendingOutputs = nil
3037 1 : }
3038 : }()
3039 :
3040 1 : snapshots := d.mu.snapshots.toSlice()
3041 1 : formatVers := d.FormatMajorVersion()
3042 1 :
3043 1 : if c.flushing == nil {
3044 1 : // Before dropping the db mutex, grab a ref to the current version. This
3045 1 : // prevents any concurrent excises from deleting files that this compaction
3046 1 : // needs to read/maintain a reference to.
3047 1 : //
3048 1 : // Note that unlike user iterators, compactionIter does not maintain a ref
3049 1 : // of the version or read state.
3050 1 : vers := d.mu.versions.currentVersion()
3051 1 : vers.Ref()
3052 1 : defer vers.UnrefLocked()
3053 1 : }
3054 :
3055 1 : if c.cancel.Load() {
3056 0 : return ve, nil, stats, ErrCancelledCompaction
3057 0 : }
3058 :
3059 : // Release the d.mu lock while doing I/O.
3060 : // Note the unusual order: Unlock and then Lock.
3061 1 : d.mu.Unlock()
3062 1 : defer d.mu.Lock()
3063 1 :
3064 1 : // Compactions use a pool of buffers to read blocks, avoiding polluting the
3065 1 : // block cache with blocks that will not be read again. We initialize the
3066 1 : // buffer pool with a size 12. This initial size does not need to be
3067 1 : // accurate, because the pool will grow to accommodate the maximum number of
3068 1 : // blocks allocated at a given time over the course of the compaction. But
3069 1 : // choosing a size larger than that working set avoids any additional
3070 1 : // allocations to grow the size of the pool over the course of iteration.
3071 1 : //
3072 1 : // Justification for initial size 12: In a two-level compaction, at any
3073 1 : // given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
3074 1 : // Additionally, when decoding a compressed block, we'll temporarily
3075 1 : // allocate 1 additional block to hold the compressed buffer. In the worst
3076 1 : // case that all input sstables have two-level index blocks (+2), value
3077 1 : // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
3078 1 : // additionally require 2n+4 blocks where n is the number of input sstables.
3079 1 : // Range deletion and range key blocks are relatively rare, and the cost of
3080 1 : // an additional allocation or two over the course of the compaction is
3081 1 : // considered to be okay. A larger initial size would cause the pool to hold
3082 1 : // on to more memory, even when it's not in-use because the pool will
3083 1 : // recycle buffers up to the current capacity of the pool. The memory use of
3084 1 : // a 12-buffer pool is expected to be within reason, even if all the buffers
3085 1 : // grow to the typical size of an index block (256 KiB) which would
3086 1 : // translate to 3 MiB per compaction.
3087 1 : c.bufferPool.Init(12)
3088 1 : defer c.bufferPool.Release()
3089 1 :
3090 1 : iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
3091 1 : if err != nil {
3092 0 : return nil, pendingOutputs, stats, err
3093 0 : }
3094 1 : c.allowedZeroSeqNum = c.allowZeroSeqNum()
3095 1 : iiter = invalidating.MaybeWrapIfInvariants(iiter)
3096 1 : iter := newCompactionIter(c.cmp, c.equal, c.formatKey, d.merge, iiter, snapshots,
3097 1 : &c.rangeDelFrag, &c.rangeKeyFrag, c.allowedZeroSeqNum, c.elideTombstone,
3098 1 : c.elideRangeTombstone, d.opts.Experimental.IneffectualSingleDeleteCallback,
3099 1 : d.opts.Experimental.SingleDeleteInvariantViolationCallback,
3100 1 : d.FormatMajorVersion())
3101 1 :
3102 1 : var (
3103 1 : createdFiles []base.DiskFileNum
3104 1 : tw *sstable.Writer
3105 1 : pinnedKeySize uint64
3106 1 : pinnedValueSize uint64
3107 1 : pinnedCount uint64
3108 1 : )
3109 1 : defer func() {
3110 1 : if iter != nil {
3111 1 : retErr = firstError(retErr, iter.Close())
3112 1 : }
3113 1 : if tw != nil {
3114 0 : retErr = firstError(retErr, tw.Close())
3115 0 : }
3116 1 : if retErr != nil {
3117 1 : for _, fileNum := range createdFiles {
3118 1 : _ = d.objProvider.Remove(fileTypeTable, fileNum)
3119 1 : }
3120 : }
3121 1 : for _, closer := range c.closers {
3122 1 : retErr = firstError(retErr, closer.Close())
3123 1 : }
3124 : }()
3125 :
3126 1 : ve = &versionEdit{
3127 1 : DeletedFiles: map[deletedFileEntry]*fileMetadata{},
3128 1 : }
3129 1 :
3130 1 : startLevelBytes := c.startLevel.files.SizeSum()
3131 1 : outputMetrics := &LevelMetrics{
3132 1 : BytesIn: startLevelBytes,
3133 1 : BytesRead: c.outputLevel.files.SizeSum(),
3134 1 : }
3135 1 : if len(c.extraLevels) > 0 {
3136 1 : outputMetrics.BytesIn += c.extraLevels[0].files.SizeSum()
3137 1 : }
3138 1 : outputMetrics.BytesRead += outputMetrics.BytesIn
3139 1 :
3140 1 : c.metrics = map[int]*LevelMetrics{
3141 1 : c.outputLevel.level: outputMetrics,
3142 1 : }
3143 1 : if len(c.flushing) == 0 && c.metrics[c.startLevel.level] == nil {
3144 1 : c.metrics[c.startLevel.level] = &LevelMetrics{}
3145 1 : }
3146 1 : if len(c.extraLevels) > 0 {
3147 1 : c.metrics[c.extraLevels[0].level] = &LevelMetrics{}
3148 1 : outputMetrics.MultiLevel.BytesInTop = startLevelBytes
3149 1 : outputMetrics.MultiLevel.BytesIn = outputMetrics.BytesIn
3150 1 : outputMetrics.MultiLevel.BytesRead = outputMetrics.BytesRead
3151 1 : }
3152 :
3153 : // The table is typically written at the maximum allowable format implied by
3154 : // the current format major version of the DB.
3155 1 : tableFormat := formatVers.MaxTableFormat()
3156 1 :
3157 1 : // In format major versions with maximum table formats of Pebblev3, value
3158 1 : // blocks were conditional on an experimental setting. In format major
3159 1 : // versions with maximum table formats of Pebblev4 and higher, value blocks
3160 1 : // are always enabled.
3161 1 : if tableFormat == sstable.TableFormatPebblev3 &&
3162 1 : (d.opts.Experimental.EnableValueBlocks == nil || !d.opts.Experimental.EnableValueBlocks()) {
3163 1 : tableFormat = sstable.TableFormatPebblev2
3164 1 : }
3165 :
3166 1 : writerOpts := d.opts.MakeWriterOptions(c.outputLevel.level, tableFormat)
3167 1 :
3168 1 : // prevPointKey is a sstable.WriterOption that provides access to
3169 1 : // the last point key written to a writer's sstable. When a new
3170 1 : // output begins in newOutput, prevPointKey is updated to point to
3171 1 : // the new output's sstable.Writer. This allows the compaction loop
3172 1 : // to access the last written point key without requiring the
3173 1 : // compaction loop to make a copy of each key ahead of time. Users
3174 1 : // must be careful, because the byte slice returned by UnsafeKey
3175 1 : // points directly into the Writer's block buffer.
3176 1 : var prevPointKey sstable.PreviousPointKeyOpt
3177 1 : var cpuWorkHandle CPUWorkHandle
3178 1 : defer func() {
3179 1 : if cpuWorkHandle != nil {
3180 1 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
3181 1 : }
3182 : }()
3183 :
3184 1 : newOutput := func() error {
3185 1 : // Check if we've been cancelled by a concurrent operation.
3186 1 : if c.cancel.Load() {
3187 0 : return ErrCancelledCompaction
3188 0 : }
3189 1 : fileMeta := &fileMetadata{}
3190 1 : d.mu.Lock()
3191 1 : fileNum := d.mu.versions.getNextFileNum()
3192 1 : fileMeta.FileNum = fileNum
3193 1 : pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta())
3194 1 : d.mu.Unlock()
3195 1 :
3196 1 : ctx := context.TODO()
3197 1 : if objiotracing.Enabled {
3198 0 : ctx = objiotracing.WithLevel(ctx, c.outputLevel.level)
3199 0 : switch c.kind {
3200 0 : case compactionKindFlush:
3201 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForFlush)
3202 0 : case compactionKindIngestedFlushable:
3203 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForIngestion)
3204 0 : default:
3205 0 : ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
3206 : }
3207 : }
3208 : // Prefer shared storage if present.
3209 1 : createOpts := objstorage.CreateOptions{
3210 1 : PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
3211 1 : }
3212 1 : writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, fileNum.DiskFileNum(), createOpts)
3213 1 : if err != nil {
3214 1 : return err
3215 1 : }
3216 :
3217 1 : reason := "flushing"
3218 1 : if c.flushing == nil {
3219 1 : reason = "compacting"
3220 1 : }
3221 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
3222 1 : JobID: jobID,
3223 1 : Reason: reason,
3224 1 : Path: d.objProvider.Path(objMeta),
3225 1 : FileNum: fileNum,
3226 1 : })
3227 1 : if c.kind != compactionKindFlush {
3228 1 : writable = &compactionWritable{
3229 1 : Writable: writable,
3230 1 : versions: d.mu.versions,
3231 1 : written: &c.bytesWritten,
3232 1 : }
3233 1 : }
3234 1 : createdFiles = append(createdFiles, fileNum.DiskFileNum())
3235 1 : cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum.DiskFileNum()).(sstable.WriterOption)
3236 1 :
3237 1 : const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
3238 1 : cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
3239 1 : MaxFileWriteAdditionalCPUTime,
3240 1 : )
3241 1 : writerOpts.Parallelism =
3242 1 : d.opts.Experimental.MaxWriterConcurrency > 0 &&
3243 1 : (cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)
3244 1 :
3245 1 : tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)
3246 1 :
3247 1 : fileMeta.CreationTime = time.Now().Unix()
3248 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{
3249 1 : Level: c.outputLevel.level,
3250 1 : Meta: fileMeta,
3251 1 : })
3252 1 : return nil
3253 : }
3254 :
3255 : // splitL0Outputs is true during flushes and intra-L0 compactions with flush
3256 : // splits enabled.
3257 1 : splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0
3258 1 :
3259 1 : // finishOutput is called with the a user key up to which all tombstones
3260 1 : // should be flushed. Typically, this is the first key of the next
3261 1 : // sstable or an empty key if this output is the final sstable.
3262 1 : finishOutput := func(splitKey []byte) error {
3263 1 : // If we haven't output any point records to the sstable (tw == nil) then the
3264 1 : // sstable will only contain range tombstones and/or range keys. The smallest
3265 1 : // key in the sstable will be the start key of the first range tombstone or
3266 1 : // range key added. We need to ensure that this start key is distinct from
3267 1 : // the splitKey passed to finishOutput (if set), otherwise we would generate
3268 1 : // an sstable where the largest key is smaller than the smallest key due to
3269 1 : // how the largest key boundary is set below. NB: It is permissible for the
3270 1 : // range tombstone / range key start key to be the empty string.
3271 1 : //
3272 1 : // TODO: It is unfortunate that we have to do this check here rather than
3273 1 : // when we decide to finish the sstable in the runCompaction loop. A better
3274 1 : // structure currently eludes us.
3275 1 : if tw == nil {
3276 1 : startKey := c.rangeDelFrag.Start()
3277 1 : if len(iter.tombstones) > 0 {
3278 1 : startKey = iter.tombstones[0].Start
3279 1 : }
3280 1 : if startKey == nil {
3281 1 : startKey = c.rangeKeyFrag.Start()
3282 1 : if len(iter.rangeKeys) > 0 {
3283 1 : startKey = iter.rangeKeys[0].Start
3284 1 : }
3285 : }
3286 1 : if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
3287 0 : return nil
3288 0 : }
3289 : }
3290 :
3291 : // NB: clone the key because the data can be held on to by the call to
3292 : // compactionIter.Tombstones via keyspan.Fragmenter.FlushTo, and by the
3293 : // WriterMetadata.LargestRangeDel.UserKey.
3294 1 : splitKey = append([]byte(nil), splitKey...)
3295 1 : for _, v := range iter.Tombstones(splitKey) {
3296 1 : if tw == nil {
3297 1 : if err := newOutput(); err != nil {
3298 0 : return err
3299 0 : }
3300 : }
3301 : // The tombstone being added could be completely outside the
3302 : // eventual bounds of the sstable. Consider this example (bounds
3303 : // in square brackets next to table filename):
3304 : //
3305 : // ./000240.sst [tmgc#391,MERGE-tmgc#391,MERGE]
3306 : // tmgc#391,MERGE [786e627a]
3307 : // tmgc-udkatvs#331,RANGEDEL
3308 : //
3309 : // ./000241.sst [tmgc#384,MERGE-tmgc#384,MERGE]
3310 : // tmgc#384,MERGE [666c7070]
3311 : // tmgc-tvsalezade#383,RANGEDEL
3312 : // tmgc-tvsalezade#331,RANGEDEL
3313 : //
3314 : // ./000242.sst [tmgc#383,RANGEDEL-tvsalezade#72057594037927935,RANGEDEL]
3315 : // tmgc-tvsalezade#383,RANGEDEL
3316 : // tmgc#375,SET [72646c78766965616c72776865676e79]
3317 : // tmgc-tvsalezade#356,RANGEDEL
3318 : //
3319 : // Note that both of the top two SSTables have range tombstones
3320 : // that start after the file's end keys. Since the file bound
3321 : // computation happens well after all range tombstones have been
3322 : // added to the writer, eliding out-of-file range tombstones based
3323 : // on sequence number at this stage is difficult, and necessitates
3324 : // read-time logic to ignore range tombstones outside file bounds.
3325 1 : if err := rangedel.Encode(&v, tw.Add); err != nil {
3326 0 : return err
3327 0 : }
3328 : }
3329 1 : for _, v := range iter.RangeKeys(splitKey) {
3330 1 : // Same logic as for range tombstones, except added using tw.AddRangeKey.
3331 1 : if tw == nil {
3332 1 : if err := newOutput(); err != nil {
3333 0 : return err
3334 0 : }
3335 : }
3336 1 : if err := rangekey.Encode(&v, tw.AddRangeKey); err != nil {
3337 0 : return err
3338 0 : }
3339 : }
3340 :
3341 1 : if tw == nil {
3342 1 : return nil
3343 1 : }
3344 1 : {
3345 1 : // Set internal sstable properties.
3346 1 : p := getInternalWriterProperties(tw)
3347 1 : // Set the external sst version to 0. This is what RocksDB expects for
3348 1 : // db-internal sstables; otherwise, it could apply a global sequence number.
3349 1 : p.ExternalFormatVersion = 0
3350 1 : // Set the snapshot pinned totals.
3351 1 : p.SnapshotPinnedKeys = pinnedCount
3352 1 : p.SnapshotPinnedKeySize = pinnedKeySize
3353 1 : p.SnapshotPinnedValueSize = pinnedValueSize
3354 1 : stats.cumulativePinnedKeys += pinnedCount
3355 1 : stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
3356 1 : pinnedCount = 0
3357 1 : pinnedKeySize = 0
3358 1 : pinnedValueSize = 0
3359 1 : }
3360 1 : if err := tw.Close(); err != nil {
3361 1 : tw = nil
3362 1 : return err
3363 1 : }
3364 1 : d.opts.Experimental.CPUWorkPermissionGranter.CPUWorkDone(cpuWorkHandle)
3365 1 : cpuWorkHandle = nil
3366 1 : writerMeta, err := tw.Metadata()
3367 1 : if err != nil {
3368 0 : tw = nil
3369 0 : return err
3370 0 : }
3371 1 : tw = nil
3372 1 : meta := ve.NewFiles[len(ve.NewFiles)-1].Meta
3373 1 : meta.Size = writerMeta.Size
3374 1 : meta.SmallestSeqNum = writerMeta.SmallestSeqNum
3375 1 : meta.LargestSeqNum = writerMeta.LargestSeqNum
3376 1 : meta.InitPhysicalBacking()
3377 1 :
3378 1 : // If the file didn't contain any range deletions, we can fill its
3379 1 : // table stats now, avoiding unnecessarily loading the table later.
3380 1 : maybeSetStatsFromProperties(
3381 1 : meta.PhysicalMeta(), &writerMeta.Properties,
3382 1 : )
3383 1 :
3384 1 : if c.flushing == nil {
3385 1 : outputMetrics.TablesCompacted++
3386 1 : outputMetrics.BytesCompacted += meta.Size
3387 1 : } else {
3388 1 : outputMetrics.TablesFlushed++
3389 1 : outputMetrics.BytesFlushed += meta.Size
3390 1 : }
3391 1 : outputMetrics.Size += int64(meta.Size)
3392 1 : outputMetrics.NumFiles++
3393 1 : outputMetrics.Additional.BytesWrittenDataBlocks += writerMeta.Properties.DataSize
3394 1 : outputMetrics.Additional.BytesWrittenValueBlocks += writerMeta.Properties.ValueBlocksSize
3395 1 :
3396 1 : if n := len(ve.NewFiles); n > 1 {
3397 1 : // This is not the first output file. Ensure the sstable boundaries
3398 1 : // are nonoverlapping.
3399 1 : prevMeta := ve.NewFiles[n-2].Meta
3400 1 : if writerMeta.SmallestRangeDel.UserKey != nil {
3401 1 : c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey)
3402 1 : if c < 0 {
3403 0 : return errors.Errorf(
3404 0 : "pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s",
3405 0 : writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3406 0 : prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
3407 1 : } else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() {
3408 0 : // The user key portion of the range boundary start key is
3409 0 : // equal to the previous table's largest key user key, and
3410 0 : // the previous table's largest key is not exclusive. This
3411 0 : // violates the invariant that tables are key-space
3412 0 : // partitioned.
3413 0 : return errors.Errorf(
3414 0 : "pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s",
3415 0 : prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey),
3416 0 : writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3417 0 : )
3418 0 : }
3419 : }
3420 : }
3421 :
3422 : // Verify that all range deletions outputted to the sstable are
3423 : // truncated to split key.
3424 1 : if splitKey != nil && writerMeta.LargestRangeDel.UserKey != nil &&
3425 1 : d.cmp(writerMeta.LargestRangeDel.UserKey, splitKey) > 0 {
3426 0 : return errors.Errorf(
3427 0 : "pebble: invariant violation: rangedel largest key %q extends beyond split key %q",
3428 0 : writerMeta.LargestRangeDel.Pretty(d.opts.Comparer.FormatKey),
3429 0 : d.opts.Comparer.FormatKey(splitKey),
3430 0 : )
3431 0 : }
3432 :
3433 1 : if writerMeta.HasPointKeys {
3434 1 : meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestPoint, writerMeta.LargestPoint)
3435 1 : }
3436 1 : if writerMeta.HasRangeDelKeys {
3437 1 : meta.ExtendPointKeyBounds(d.cmp, writerMeta.SmallestRangeDel, writerMeta.LargestRangeDel)
3438 1 : }
3439 1 : if writerMeta.HasRangeKeys {
3440 1 : meta.ExtendRangeKeyBounds(d.cmp, writerMeta.SmallestRangeKey, writerMeta.LargestRangeKey)
3441 1 : }
3442 :
3443 : // Verify that the sstable bounds fall within the compaction input
3444 : // bounds. This is a sanity check that we don't have a logic error
3445 : // elsewhere that causes the sstable bounds to accidentally expand past the
3446 : // compaction input bounds as doing so could lead to various badness such
3447 : // as keys being deleted by a range tombstone incorrectly.
3448 1 : if c.smallest.UserKey != nil {
3449 1 : switch v := d.cmp(meta.Smallest.UserKey, c.smallest.UserKey); {
3450 1 : case v >= 0:
3451 : // Nothing to do.
3452 0 : case v < 0:
3453 0 : return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s < %s",
3454 0 : meta.Smallest.Pretty(d.opts.Comparer.FormatKey),
3455 0 : c.smallest.Pretty(d.opts.Comparer.FormatKey))
3456 : }
3457 : }
3458 1 : if c.largest.UserKey != nil {
3459 1 : switch v := d.cmp(meta.Largest.UserKey, c.largest.UserKey); {
3460 1 : case v <= 0:
3461 : // Nothing to do.
3462 0 : case v > 0:
3463 0 : return errors.Errorf("pebble: compaction output grew beyond bounds of input: %s > %s",
3464 0 : meta.Largest.Pretty(d.opts.Comparer.FormatKey),
3465 0 : c.largest.Pretty(d.opts.Comparer.FormatKey))
3466 : }
3467 : }
3468 : // Verify that we never split different revisions of the same user key
3469 : // across two different sstables.
3470 1 : if err := c.errorOnUserKeyOverlap(ve); err != nil {
3471 0 : return err
3472 0 : }
3473 1 : if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
3474 0 : return err
3475 0 : }
3476 1 : return nil
3477 : }
3478 :
3479 : // Build a compactionOutputSplitter that contains all logic to determine
3480 : // whether the compaction loop should stop writing to one output sstable and
3481 : // switch to a new one. Some splitters can wrap other splitters, and the
3482 : // splitterGroup can be composed of multiple splitters. In this case, we
3483 : // start off with splitters for file sizes, grandparent limits, and (for L0
3484 : // splits) L0 limits, before wrapping them in an splitterGroup.
3485 1 : sizeSplitter := newFileSizeSplitter(&iter.frontiers, c.maxOutputFileSize, c.grandparents.Iter())
3486 1 : unsafePrevUserKey := func() []byte {
3487 1 : // Return the largest point key written to tw or the start of
3488 1 : // the current range deletion in the fragmenter, whichever is
3489 1 : // greater.
3490 1 : prevPoint := prevPointKey.UnsafeKey()
3491 1 : if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
3492 1 : return prevPoint.UserKey
3493 1 : }
3494 1 : return c.rangeDelFrag.Start()
3495 : }
3496 1 : outputSplitters := []compactionOutputSplitter{
3497 1 : // We do not split the same user key across different sstables within
3498 1 : // one flush or compaction. The fileSizeSplitter may request a split in
3499 1 : // the middle of a user key, so the userKeyChangeSplitter ensures we are
3500 1 : // at a user key change boundary when doing a split.
3501 1 : &userKeyChangeSplitter{
3502 1 : cmp: c.cmp,
3503 1 : splitter: sizeSplitter,
3504 1 : unsafePrevUserKey: unsafePrevUserKey,
3505 1 : },
3506 1 : newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
3507 1 : }
3508 1 : if splitL0Outputs {
3509 1 : outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
3510 1 : }
3511 1 : splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}
3512 1 :
3513 1 : // Each outer loop iteration produces one output file. An iteration that
3514 1 : // produces a file containing point keys (and optionally range tombstones)
3515 1 : // guarantees that the input iterator advanced. An iteration that produces
3516 1 : // a file containing only range tombstones guarantees the limit passed to
3517 1 : // `finishOutput()` advanced to a strictly greater user key corresponding
3518 1 : // to a grandparent file largest key, or nil. Taken together, these
3519 1 : // progress guarantees ensure that eventually the input iterator will be
3520 1 : // exhausted and the range tombstone fragments will all be flushed.
3521 1 : for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
3522 1 : var firstKey []byte
3523 1 : if key != nil {
3524 1 : firstKey = key.UserKey
3525 1 : } else if startKey := c.rangeDelFrag.Start(); startKey != nil {
3526 1 : // Pass the start key of the first pending tombstone to find the
3527 1 : // next limit. All pending tombstones have the same start key. We
3528 1 : // use this as opposed to the end key of the last written sstable to
3529 1 : // effectively handle cases like these:
3530 1 : //
3531 1 : // a.SET.3
3532 1 : // (lf.limit at b)
3533 1 : // d.RANGEDEL.4:f
3534 1 : //
3535 1 : // In this case, the partition after b has only range deletions, so
3536 1 : // if we were to find the limit after the last written key at the
3537 1 : // split point (key a), we'd get the limit b again, and
3538 1 : // finishOutput() would not advance any further because the next
3539 1 : // range tombstone to write does not start until after the L0 split
3540 1 : // point.
3541 1 : firstKey = startKey
3542 1 : }
3543 1 : splitterSuggestion := splitter.onNewOutput(firstKey)
3544 1 :
3545 1 : // Each inner loop iteration processes one key from the input iterator.
3546 1 : for ; key != nil; key, val = iter.Next() {
3547 1 : if split := splitter.shouldSplitBefore(key, tw); split == splitNow {
3548 1 : break
3549 : }
3550 :
3551 1 : switch key.Kind() {
3552 1 : case InternalKeyKindRangeDelete:
3553 1 : // Range tombstones are handled specially. They are fragmented,
3554 1 : // and they're not written until later during `finishOutput()`.
3555 1 : // We add them to the `Fragmenter` now to make them visible to
3556 1 : // `compactionIter` so covered keys in the same snapshot stripe
3557 1 : // can be elided.
3558 1 :
3559 1 : // The interleaved range deletion might only be one of many with
3560 1 : // these bounds. Some fragmenting is performed ahead of time by
3561 1 : // keyspan.MergingIter.
3562 1 : if s := c.rangeDelIter.Span(); !s.Empty() {
3563 1 : // The memory management here is subtle. Range deletions
3564 1 : // blocks do NOT use prefix compression, which ensures that
3565 1 : // range deletion spans' memory is available as long we keep
3566 1 : // the iterator open. However, the keyspan.MergingIter that
3567 1 : // merges spans across levels only guarantees the lifetime
3568 1 : // of the [start, end) bounds until the next positioning
3569 1 : // method is called.
3570 1 : //
3571 1 : // Additionally, the Span.Keys slice is owned by the the
3572 1 : // range deletion iterator stack, and it may be overwritten
3573 1 : // when we advance.
3574 1 : //
3575 1 : // Clone the Keys slice and the start and end keys.
3576 1 : //
3577 1 : // TODO(jackson): Avoid the clone by removing c.rangeDelFrag
3578 1 : // and performing explicit truncation of the pending
3579 1 : // rangedel span as necessary.
3580 1 : clone := keyspan.Span{
3581 1 : Start: iter.cloneKey(s.Start),
3582 1 : End: iter.cloneKey(s.End),
3583 1 : Keys: make([]keyspan.Key, len(s.Keys)),
3584 1 : }
3585 1 : copy(clone.Keys, s.Keys)
3586 1 : c.rangeDelFrag.Add(clone)
3587 1 : }
3588 1 : continue
3589 1 : case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
3590 1 : // Range keys are handled in the same way as range tombstones, except
3591 1 : // with a dedicated fragmenter.
3592 1 : if s := c.rangeKeyInterleaving.Span(); !s.Empty() {
3593 1 : clone := keyspan.Span{
3594 1 : Start: iter.cloneKey(s.Start),
3595 1 : End: iter.cloneKey(s.End),
3596 1 : Keys: make([]keyspan.Key, len(s.Keys)),
3597 1 : }
3598 1 : // Since the keys' Suffix and Value fields are not deep cloned, the
3599 1 : // underlying blockIter must be kept open for the lifetime of the
3600 1 : // compaction.
3601 1 : copy(clone.Keys, s.Keys)
3602 1 : c.rangeKeyFrag.Add(clone)
3603 1 : }
3604 1 : continue
3605 : }
3606 1 : if tw == nil {
3607 1 : if err := newOutput(); err != nil {
3608 1 : return nil, pendingOutputs, stats, err
3609 1 : }
3610 : }
3611 1 : if err := tw.AddWithForceObsolete(*key, val, iter.forceObsoleteDueToRangeDel); err != nil {
3612 0 : return nil, pendingOutputs, stats, err
3613 0 : }
3614 1 : if iter.snapshotPinned {
3615 1 : // The kv pair we just added to the sstable was only surfaced by
3616 1 : // the compaction iterator because an open snapshot prevented
3617 1 : // its elision. Increment the stats.
3618 1 : pinnedCount++
3619 1 : pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
3620 1 : pinnedValueSize += uint64(len(val))
3621 1 : }
3622 : }
3623 :
3624 : // A splitter requested a split, and we're ready to finish the output.
3625 : // We need to choose the key at which to split any pending range
3626 : // tombstones. There are two options:
3627 : // 1. splitterSuggestion — The key suggested by the splitter. This key
3628 : // is guaranteed to be greater than the last key written to the
3629 : // current output.
3630 : // 2. key.UserKey — the first key of the next sstable output. This user
3631 : // key is also guaranteed to be greater than the last user key
3632 : // written to the current output (see userKeyChangeSplitter).
3633 : //
3634 : // Use whichever is smaller. Using the smaller of the two limits
3635 : // overlap with grandparents. Consider the case where the
3636 : // grandparent limit is calculated to be 'b', key is 'x', and
3637 : // there exist many sstables between 'b' and 'x'. If the range
3638 : // deletion fragmenter has a pending tombstone [a,x), splitting
3639 : // at 'x' would cause the output table to overlap many
3640 : // grandparents well beyond the calculated grandparent limit
3641 : // 'b'. Splitting at the smaller `splitterSuggestion` avoids
3642 : // this unbounded overlap with grandparent tables.
3643 1 : splitKey := splitterSuggestion
3644 1 : if key != nil && (splitKey == nil || c.cmp(splitKey, key.UserKey) > 0) {
3645 1 : splitKey = key.UserKey
3646 1 : }
3647 1 : if err := finishOutput(splitKey); err != nil {
3648 1 : return nil, pendingOutputs, stats, err
3649 1 : }
3650 : }
3651 :
3652 1 : for _, cl := range c.inputs {
3653 1 : iter := cl.files.Iter()
3654 1 : for f := iter.First(); f != nil; f = iter.Next() {
3655 1 : ve.DeletedFiles[deletedFileEntry{
3656 1 : Level: cl.level,
3657 1 : FileNum: f.FileNum,
3658 1 : }] = f
3659 1 : }
3660 : }
3661 :
3662 : // The compaction iterator keeps track of a count of the number of DELSIZED
3663 : // keys that encoded an incorrect size. Propagate it up as a part of
3664 : // compactStats.
3665 1 : stats.countMissizedDels = iter.stats.countMissizedDels
3666 1 :
3667 1 : if err := d.objProvider.Sync(); err != nil {
3668 0 : return nil, pendingOutputs, stats, err
3669 0 : }
3670 :
3671 : // Refresh the disk available statistic whenever a compaction/flush
3672 : // completes, before re-acquiring the mutex.
3673 1 : _ = d.calculateDiskAvailableBytes()
3674 1 :
3675 1 : return ve, pendingOutputs, stats, nil
3676 : }
3677 :
3678 : // validateVersionEdit validates that start and end keys across new and deleted
3679 : // files in a versionEdit pass the given validation function.
3680 : func validateVersionEdit(
3681 : ve *versionEdit, validateFn func([]byte) error, format base.FormatKey,
3682 1 : ) error {
3683 1 : validateMetaFn := func(f *manifest.FileMetadata) error {
3684 1 : for _, key := range []InternalKey{f.Smallest, f.Largest} {
3685 1 : if err := validateFn(key.UserKey); err != nil {
3686 1 : return errors.Wrapf(err, "key=%q; file=%s", format(key.UserKey), f)
3687 1 : }
3688 : }
3689 1 : return nil
3690 : }
3691 :
3692 : // Validate both new and deleted files.
3693 1 : for _, f := range ve.NewFiles {
3694 1 : if err := validateMetaFn(f.Meta); err != nil {
3695 1 : return err
3696 1 : }
3697 : }
3698 1 : for _, m := range ve.DeletedFiles {
3699 1 : if err := validateMetaFn(m); err != nil {
3700 1 : return err
3701 1 : }
3702 : }
3703 :
3704 1 : return nil
3705 : }
3706 :
3707 : // scanObsoleteFiles scans the filesystem for files that are no longer needed
3708 : // and adds those to the internal lists of obsolete files. Note that the files
3709 : // are not actually deleted by this method. A subsequent call to
3710 : // deleteObsoleteFiles must be performed. Must be not be called concurrently
3711 : // with compactions and flushes. db.mu must be held when calling this function.
3712 1 : func (d *DB) scanObsoleteFiles(list []string) {
3713 1 : // Disable automatic compactions temporarily to avoid concurrent compactions /
3714 1 : // flushes from interfering. The original value is restored on completion.
3715 1 : disabledPrev := d.opts.DisableAutomaticCompactions
3716 1 : defer func() {
3717 1 : d.opts.DisableAutomaticCompactions = disabledPrev
3718 1 : }()
3719 1 : d.opts.DisableAutomaticCompactions = true
3720 1 :
3721 1 : // Wait for any ongoing compaction to complete before continuing.
3722 1 : for d.mu.compact.compactingCount > 0 || d.mu.compact.flushing {
3723 0 : d.mu.compact.cond.Wait()
3724 0 : }
3725 :
3726 1 : liveFileNums := make(map[base.DiskFileNum]struct{})
3727 1 : d.mu.versions.addLiveFileNums(liveFileNums)
3728 1 : // Protect against files which are only referred to by the ingestedFlushable
3729 1 : // from being deleted. These are added to the flushable queue on WAL replay
3730 1 : // during read only mode and aren't part of the Version. Note that if
3731 1 : // !d.opts.ReadOnly, then all flushables of type ingestedFlushable have
3732 1 : // already been flushed.
3733 1 : for _, fEntry := range d.mu.mem.queue {
3734 1 : if f, ok := fEntry.flushable.(*ingestedFlushable); ok {
3735 0 : for _, file := range f.files {
3736 0 : liveFileNums[file.FileBacking.DiskFileNum] = struct{}{}
3737 0 : }
3738 : }
3739 : }
3740 :
3741 1 : minUnflushedLogNum := d.mu.versions.minUnflushedLogNum
3742 1 : manifestFileNum := d.mu.versions.manifestFileNum
3743 1 :
3744 1 : var obsoleteLogs []fileInfo
3745 1 : var obsoleteTables []fileInfo
3746 1 : var obsoleteManifests []fileInfo
3747 1 : var obsoleteOptions []fileInfo
3748 1 :
3749 1 : for _, filename := range list {
3750 1 : fileType, diskFileNum, ok := base.ParseFilename(d.opts.FS, filename)
3751 1 : if !ok {
3752 1 : continue
3753 : }
3754 1 : switch fileType {
3755 1 : case fileTypeLog:
3756 1 : if diskFileNum >= minUnflushedLogNum {
3757 0 : continue
3758 : }
3759 1 : fi := fileInfo{fileNum: diskFileNum}
3760 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
3761 1 : fi.fileSize = uint64(stat.Size())
3762 1 : }
3763 1 : obsoleteLogs = append(obsoleteLogs, fi)
3764 1 : case fileTypeManifest:
3765 1 : if diskFileNum >= manifestFileNum {
3766 1 : continue
3767 : }
3768 1 : fi := fileInfo{fileNum: diskFileNum}
3769 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
3770 1 : fi.fileSize = uint64(stat.Size())
3771 1 : }
3772 1 : obsoleteManifests = append(obsoleteManifests, fi)
3773 1 : case fileTypeOptions:
3774 1 : if diskFileNum.FileNum() >= d.optionsFileNum.FileNum() {
3775 0 : continue
3776 : }
3777 1 : fi := fileInfo{fileNum: diskFileNum}
3778 1 : if stat, err := d.opts.FS.Stat(filename); err == nil {
3779 1 : fi.fileSize = uint64(stat.Size())
3780 1 : }
3781 1 : obsoleteOptions = append(obsoleteOptions, fi)
3782 1 : case fileTypeTable:
3783 : // Objects are handled through the objstorage provider below.
3784 1 : default:
3785 : // Don't delete files we don't know about.
3786 : }
3787 : }
3788 :
3789 1 : objects := d.objProvider.List()
3790 1 : for _, obj := range objects {
3791 1 : switch obj.FileType {
3792 1 : case fileTypeTable:
3793 1 : if _, ok := liveFileNums[obj.DiskFileNum]; ok {
3794 1 : continue
3795 : }
3796 1 : fileInfo := fileInfo{
3797 1 : fileNum: obj.DiskFileNum,
3798 1 : }
3799 1 : if size, err := d.objProvider.Size(obj); err == nil {
3800 1 : fileInfo.fileSize = uint64(size)
3801 1 : }
3802 1 : obsoleteTables = append(obsoleteTables, fileInfo)
3803 :
3804 0 : default:
3805 : // Ignore object types we don't know about.
3806 : }
3807 : }
3808 :
3809 1 : d.mu.log.queue = merge(d.mu.log.queue, obsoleteLogs)
3810 1 : d.mu.versions.metrics.WAL.Files = int64(len(d.mu.log.queue))
3811 1 : d.mu.versions.obsoleteTables = merge(d.mu.versions.obsoleteTables, obsoleteTables)
3812 1 : d.mu.versions.updateObsoleteTableMetricsLocked()
3813 1 : d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
3814 1 : d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
3815 : }
3816 :
3817 : // disableFileDeletions disables file deletions and then waits for any
3818 : // in-progress deletion to finish. The caller is required to call
3819 : // enableFileDeletions in order to enable file deletions again. It is ok for
3820 : // multiple callers to disable file deletions simultaneously, though they must
3821 : // all invoke enableFileDeletions in order for file deletions to be re-enabled
3822 : // (there is an internal reference count on file deletion disablement).
3823 : //
3824 : // d.mu must be held when calling this method.
3825 1 : func (d *DB) disableFileDeletions() {
3826 1 : d.mu.disableFileDeletions++
3827 1 : d.mu.Unlock()
3828 1 : defer d.mu.Lock()
3829 1 : d.cleanupManager.Wait()
3830 1 : }
3831 :
3832 : // enableFileDeletions enables previously disabled file deletions. A cleanup job
3833 : // is queued if necessary.
3834 : //
3835 : // d.mu must be held when calling this method.
3836 1 : func (d *DB) enableFileDeletions() {
3837 1 : if d.mu.disableFileDeletions <= 0 {
3838 1 : panic("pebble: file deletion disablement invariant violated")
3839 : }
3840 1 : d.mu.disableFileDeletions--
3841 1 : if d.mu.disableFileDeletions > 0 {
3842 0 : return
3843 0 : }
3844 1 : jobID := d.mu.nextJobID
3845 1 : d.mu.nextJobID++
3846 1 : d.deleteObsoleteFiles(jobID)
3847 : }
3848 :
3849 : type fileInfo struct {
3850 : fileNum base.DiskFileNum
3851 : fileSize uint64
3852 : }
3853 :
3854 : // deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary.
3855 : //
3856 : // d.mu must be held when calling this. The function will release and re-aquire the mutex.
3857 : //
3858 : // Does nothing if file deletions are disabled (see disableFileDeletions). A
3859 : // cleanup job will be scheduled when file deletions are re-enabled.
3860 1 : func (d *DB) deleteObsoleteFiles(jobID int) {
3861 1 : if d.mu.disableFileDeletions > 0 {
3862 1 : return
3863 1 : }
3864 :
3865 1 : var obsoleteLogs []fileInfo
3866 1 : for i := range d.mu.log.queue {
3867 1 : // NB: d.mu.versions.minUnflushedLogNum is the log number of the earliest
3868 1 : // log that has not had its contents flushed to an sstable. We can recycle
3869 1 : // the prefix of d.mu.log.queue with log numbers less than
3870 1 : // minUnflushedLogNum.
3871 1 : if d.mu.log.queue[i].fileNum >= d.mu.versions.minUnflushedLogNum {
3872 1 : obsoleteLogs = d.mu.log.queue[:i]
3873 1 : d.mu.log.queue = d.mu.log.queue[i:]
3874 1 : d.mu.versions.metrics.WAL.Files -= int64(len(obsoleteLogs))
3875 1 : break
3876 : }
3877 : }
3878 :
3879 1 : obsoleteTables := append([]fileInfo(nil), d.mu.versions.obsoleteTables...)
3880 1 : d.mu.versions.obsoleteTables = nil
3881 1 :
3882 1 : for _, tbl := range obsoleteTables {
3883 1 : delete(d.mu.versions.zombieTables, tbl.fileNum)
3884 1 : }
3885 :
3886 : // Sort the manifests cause we want to delete some contiguous prefix
3887 : // of the older manifests.
3888 1 : slices.SortFunc(d.mu.versions.obsoleteManifests, func(a, b fileInfo) int {
3889 1 : return cmp.Compare(a.fileNum, b.fileNum)
3890 1 : })
3891 :
3892 1 : var obsoleteManifests []fileInfo
3893 1 : manifestsToDelete := len(d.mu.versions.obsoleteManifests) - d.opts.NumPrevManifest
3894 1 : if manifestsToDelete > 0 {
3895 1 : obsoleteManifests = d.mu.versions.obsoleteManifests[:manifestsToDelete]
3896 1 : d.mu.versions.obsoleteManifests = d.mu.versions.obsoleteManifests[manifestsToDelete:]
3897 1 : if len(d.mu.versions.obsoleteManifests) == 0 {
3898 0 : d.mu.versions.obsoleteManifests = nil
3899 0 : }
3900 : }
3901 :
3902 1 : obsoleteOptions := d.mu.versions.obsoleteOptions
3903 1 : d.mu.versions.obsoleteOptions = nil
3904 1 :
3905 1 : // Release d.mu while preparing the cleanup job and possibly waiting.
3906 1 : // Note the unusual order: Unlock and then Lock.
3907 1 : d.mu.Unlock()
3908 1 : defer d.mu.Lock()
3909 1 :
3910 1 : files := [4]struct {
3911 1 : fileType fileType
3912 1 : obsolete []fileInfo
3913 1 : }{
3914 1 : {fileTypeLog, obsoleteLogs},
3915 1 : {fileTypeTable, obsoleteTables},
3916 1 : {fileTypeManifest, obsoleteManifests},
3917 1 : {fileTypeOptions, obsoleteOptions},
3918 1 : }
3919 1 : _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents)
3920 1 : filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions))
3921 1 : for _, f := range files {
3922 1 : // We sort to make the order of deletions deterministic, which is nice for
3923 1 : // tests.
3924 1 : slices.SortFunc(f.obsolete, func(a, b fileInfo) int {
3925 1 : return cmp.Compare(a.fileNum, b.fileNum)
3926 1 : })
3927 1 : for _, fi := range f.obsolete {
3928 1 : dir := d.dirname
3929 1 : switch f.fileType {
3930 1 : case fileTypeLog:
3931 1 : if !noRecycle && d.logRecycler.add(fi) {
3932 1 : continue
3933 : }
3934 1 : dir = d.walDirname
3935 1 : case fileTypeTable:
3936 1 : d.tableCache.evict(fi.fileNum)
3937 : }
3938 :
3939 1 : filesToDelete = append(filesToDelete, obsoleteFile{
3940 1 : dir: dir,
3941 1 : fileNum: fi.fileNum,
3942 1 : fileType: f.fileType,
3943 1 : fileSize: fi.fileSize,
3944 1 : })
3945 : }
3946 : }
3947 1 : if len(filesToDelete) > 0 {
3948 1 : d.cleanupManager.EnqueueJob(jobID, filesToDelete)
3949 1 : }
3950 1 : if d.opts.private.testingAlwaysWaitForCleanup {
3951 1 : d.cleanupManager.Wait()
3952 1 : }
3953 : }
3954 :
3955 1 : func (d *DB) maybeScheduleObsoleteTableDeletion() {
3956 1 : d.mu.Lock()
3957 1 : defer d.mu.Unlock()
3958 1 : d.maybeScheduleObsoleteTableDeletionLocked()
3959 1 : }
3960 :
3961 1 : func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
3962 1 : if len(d.mu.versions.obsoleteTables) > 0 {
3963 1 : jobID := d.mu.nextJobID
3964 1 : d.mu.nextJobID++
3965 1 : d.deleteObsoleteFiles(jobID)
3966 1 : }
3967 : }
3968 :
3969 1 : func merge(a, b []fileInfo) []fileInfo {
3970 1 : if len(b) == 0 {
3971 1 : return a
3972 1 : }
3973 :
3974 1 : a = append(a, b...)
3975 1 : slices.SortFunc(a, func(a, b fileInfo) int {
3976 1 : return cmp.Compare(a.fileNum, b.fileNum)
3977 1 : })
3978 1 : return slices.CompactFunc(a, func(a, b fileInfo) bool {
3979 1 : return a.fileNum == b.fileNum
3980 1 : })
3981 : }
|