Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "sort"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/keyspan"
14 : "github.com/cockroachdb/pebble/internal/manifest"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/sstable"
17 : )
18 :
19 : // Result stores the result of a compaction - more specifically, the "data" part
20 : // where we use the compaction iterator to write output tables.
21 : type Result struct {
22 : // Err is the result of the compaction. On success, Err is nil and Tables
23 : // stores the output tables. On failure, Err is set and Tables stores the
24 : // tables created so far (and which need to be cleaned up).
25 : Err error
26 : Tables []OutputTable
27 : Stats Stats
28 : }
29 :
30 : // WithError returns a modified Result which has the Err field set.
31 2 : func (r Result) WithError(err error) Result {
32 2 : return Result{
33 2 : Err: errors.CombineErrors(r.Err, err),
34 2 : Tables: r.Tables,
35 2 : Stats: r.Stats,
36 2 : }
37 2 : }
38 :
39 : // OutputTable contains metadata about a table that was created during a compaction.
40 : type OutputTable struct {
41 : CreationTime time.Time
42 : // ObjMeta is metadata for the object backing the table.
43 : ObjMeta objstorage.ObjectMetadata
44 : // WriterMeta is populated once the table is fully written. On compaction
45 : // failure (see Result), WriterMeta might not be set.
46 : WriterMeta sstable.WriterMetadata
47 : }
48 :
49 : // Stats describes stats collected during the compaction.
50 : type Stats struct {
51 : CumulativePinnedKeys uint64
52 : CumulativePinnedSize uint64
53 : CumulativeWrittenSize uint64
54 : CountMissizedDels uint64
55 : }
56 :
57 : // RunnerConfig contains the parameters needed for the Runner.
58 : type RunnerConfig struct {
59 : // CompactionBounds are the bounds containing all the input tables. All output
60 : // tables must fall within these bounds as well.
61 : CompactionBounds base.UserKeyBounds
62 :
63 : // L0SplitKeys is only set for flushes and it contains the flush split keys
64 : // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
65 : // output tables.
66 : L0SplitKeys [][]byte
67 :
68 : // Grandparents are the tables in level+2 that overlap with the files being
69 : // compacted. Used to determine output table boundaries. Do not assume that
70 : // the actual files in the grandparent when this compaction finishes will be
71 : // the same.
72 : Grandparents manifest.LevelSlice
73 :
74 : // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
75 : // allowed for a single output table with the tables in the grandparent level.
76 : MaxGrandparentOverlapBytes uint64
77 :
78 : // TargetOutputFileSize is the desired size of an individual table created
79 : // during compaction. In practice, the sizes can vary between 50%-200% of this
80 : // value.
81 : TargetOutputFileSize uint64
82 :
83 : // Slot is the compaction slot taken up by this compaction. Used to perform
84 : // pacing or account for concurrency limits.
85 : Slot base.CompactionSlot
86 :
87 : // IteratorStats is the stats collected by the compaction iterator.
88 : IteratorStats *base.InternalIteratorStats
89 : }
90 :
91 : // Runner is a helper for running the "data" part of a compaction (where we use
92 : // the compaction iterator to write output tables).
93 : //
94 : // Sample usage:
95 : //
96 : // r := NewRunner(cfg, iter)
97 : // for r.MoreDataToWrite() {
98 : // objMeta, tw := ... // Create object and table writer.
99 : // r.WriteTable(objMeta, tw)
100 : // }
101 : // result := r.Finish()
102 : type Runner struct {
103 : cmp base.Compare
104 : cfg RunnerConfig
105 : iter *Iter
106 :
107 : tables []OutputTable
108 : // Stores any error encountered.
109 : err error
110 : // Last key/value returned by the compaction iterator.
111 : key *base.InternalKey
112 : value base.LazyValue
113 : // Last RANGEDEL span (or portion of it) that was not yet written to a table.
114 : lastRangeDelSpan keyspan.Span
115 : // Last range key span (or portion of it) that was not yet written to a table.
116 : lastRangeKeySpan keyspan.Span
117 : stats Stats
118 : }
119 :
120 : // NewRunner creates a new Runner.
121 2 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
122 2 : r := &Runner{
123 2 : cmp: iter.cmp,
124 2 : cfg: cfg,
125 2 : iter: iter,
126 2 : }
127 2 : r.key, r.value = r.iter.First()
128 2 : return r
129 2 : }
130 :
131 : // MoreDataToWrite returns true if there is more data to be written.
132 2 : func (r *Runner) MoreDataToWrite() bool {
133 2 : if r.err != nil {
134 1 : return false
135 1 : }
136 2 : return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
137 : }
138 :
139 : // WriteTable writes a new output table. This table will be part of
140 : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
141 : //
142 : // WriteTable always closes the Writer.
143 2 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw sstable.RawWriter) {
144 2 : if r.err != nil {
145 0 : panic("error already encountered")
146 : }
147 2 : r.tables = append(r.tables, OutputTable{
148 2 : CreationTime: time.Now(),
149 2 : ObjMeta: objMeta,
150 2 : })
151 2 : splitKey, err := r.writeKeysToTable(tw)
152 2 : err = errors.CombineErrors(err, tw.Close())
153 2 : if err != nil {
154 1 : r.err = err
155 1 : r.key, r.value = nil, base.LazyValue{}
156 1 : return
157 1 : }
158 2 : writerMeta, err := tw.Metadata()
159 2 : if err != nil {
160 0 : r.err = err
161 0 : return
162 0 : }
163 2 : if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
164 0 : r.err = err
165 0 : return
166 0 : }
167 2 : r.tables[len(r.tables)-1].WriterMeta = *writerMeta
168 2 : r.stats.CumulativeWrittenSize += writerMeta.Size
169 : }
170 :
171 2 : func (r *Runner) writeKeysToTable(tw sstable.RawWriter) (splitKey []byte, _ error) {
172 2 : const updateSlotEveryNKeys = 1024
173 2 : firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
174 2 : if r.key != nil && firstKey == nil {
175 2 : firstKey = r.key.UserKey
176 2 : }
177 2 : if firstKey == nil {
178 0 : return nil, base.AssertionFailedf("no data to write")
179 0 : }
180 2 : splitter := NewOutputSplitter(
181 2 : r.cmp, firstKey, r.TableSplitLimit(firstKey),
182 2 : r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
183 2 : )
184 2 : equalPrev := func(k []byte) bool {
185 2 : return tw.ComparePrev(k) == 0
186 2 : }
187 2 : var pinnedKeySize, pinnedValueSize, pinnedCount uint64
188 2 : var iteratedKeys uint64
189 2 : key, value := r.key, r.value
190 2 : for ; key != nil; key, value = r.iter.Next() {
191 2 : iteratedKeys++
192 2 : if iteratedKeys%updateSlotEveryNKeys == 0 {
193 1 : r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize())
194 1 : }
195 2 : if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), equalPrev) {
196 2 : break
197 : }
198 :
199 2 : switch key.Kind() {
200 2 : case base.InternalKeyKindRangeDelete:
201 2 : // The previous span (if any) must end at or before this key, since the
202 2 : // spans we receive are non-overlapping.
203 2 : if err := tw.EncodeSpan(r.lastRangeDelSpan); r.err != nil {
204 0 : return nil, err
205 0 : }
206 2 : r.lastRangeDelSpan.CopyFrom(r.iter.Span())
207 2 : continue
208 :
209 2 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
210 2 : // The previous span (if any) must end at or before this key, since the
211 2 : // spans we receive are non-overlapping.
212 2 : if err := tw.EncodeSpan(r.lastRangeKeySpan); err != nil {
213 0 : return nil, err
214 0 : }
215 2 : r.lastRangeKeySpan.CopyFrom(r.iter.Span())
216 2 : continue
217 : }
218 2 : v, _, err := value.Value(nil)
219 2 : if err != nil {
220 0 : return nil, err
221 0 : }
222 2 : if err := tw.AddWithForceObsolete(*key, v, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
223 0 : return nil, err
224 0 : }
225 2 : if r.iter.SnapshotPinned() {
226 2 : // The kv pair we just added to the sstable was only surfaced by
227 2 : // the compaction iterator because an open snapshot prevented
228 2 : // its elision. Increment the stats.
229 2 : pinnedCount++
230 2 : pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
231 2 : pinnedValueSize += uint64(len(v))
232 2 : }
233 : }
234 2 : r.key, r.value = key, value
235 2 : splitKey = splitter.SplitKey()
236 2 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
237 0 : return nil, err
238 0 : }
239 2 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
240 0 : return nil, err
241 0 : }
242 : // Set internal sstable properties.
243 2 : tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize)
244 2 : r.stats.CumulativePinnedKeys += pinnedCount
245 2 : r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
246 2 : r.cfg.Slot.UpdateMetrics(r.cfg.IteratorStats.BlockBytes, r.stats.CumulativeWrittenSize+tw.EstimatedSize())
247 2 : return splitKey, nil
248 : }
249 :
250 : // Finish closes the compaction iterator and returns the result of the
251 : // compaction.
252 2 : func (r *Runner) Finish() Result {
253 2 : r.err = errors.CombineErrors(r.err, r.iter.Close())
254 2 : // The compaction iterator keeps track of a count of the number of DELSIZED
255 2 : // keys that encoded an incorrect size.
256 2 : r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
257 2 : r.cfg.Slot.Release(r.stats.CumulativeWrittenSize)
258 2 : return Result{
259 2 : Err: r.err,
260 2 : Tables: r.tables,
261 2 : Stats: r.stats,
262 2 : }
263 2 : }
264 :
265 : // TableSplitLimit returns a hard split limit for an output table that starts at
266 : // startKey (which must be strictly greater than startKey), or nil if there is
267 : // no limit.
268 2 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
269 2 : var limitKey []byte
270 2 :
271 2 : // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
272 2 : // that table can extend without excessively overlapping the grandparent
273 2 : // level. If no limit is needed considering the grandparent, limitKey stays
274 2 : // nil.
275 2 : //
276 2 : // This is done in order to prevent a table at level N from overlapping too
277 2 : // much data at level N+1. We want to avoid such large overlaps because they
278 2 : // translate into large compactions. The current heuristic stops output of a
279 2 : // table if the addition of another key would cause the table to overlap more
280 2 : // than 10x the target file size at level N. See
281 2 : // compaction.maxGrandparentOverlapBytes.
282 2 : iter := r.cfg.Grandparents.Iter()
283 2 : var overlappedBytes uint64
284 2 : f := iter.SeekGE(r.cmp, startKey)
285 2 : // Handle an overlapping table.
286 2 : if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
287 2 : overlappedBytes += f.Size
288 2 : f = iter.Next()
289 2 : }
290 2 : for ; f != nil; f = iter.Next() {
291 2 : overlappedBytes += f.Size
292 2 : if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
293 2 : limitKey = f.Smallest.UserKey
294 2 : break
295 : }
296 : }
297 :
298 2 : if len(r.cfg.L0SplitKeys) != 0 {
299 2 : // Find the first split key that is greater than startKey.
300 2 : index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
301 2 : return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
302 2 : })
303 2 : if index < len(r.cfg.L0SplitKeys) {
304 2 : limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
305 2 : }
306 : }
307 :
308 2 : return limitKey
309 : }
310 :
311 : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
312 : // table that was just finished. splitKey is the key where the table must have
313 : // ended (or nil).
314 2 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
315 2 : if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
316 0 : return base.AssertionFailedf("output table has no keys")
317 0 : }
318 :
319 2 : var err error
320 2 : checkBounds := func(smallest, largest base.InternalKey, description string) {
321 2 : bounds := base.UserKeyBoundsFromInternal(smallest, largest)
322 2 : if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
323 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
324 0 : "output table %s bounds %s extend beyond compaction bounds %s",
325 0 : description, bounds, r.cfg.CompactionBounds,
326 0 : ))
327 0 : }
328 2 : if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
329 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
330 0 : "output table %s bounds %s extend beyond split key %s",
331 0 : description, bounds, splitKey,
332 0 : ))
333 0 : }
334 : }
335 :
336 2 : if meta.HasPointKeys {
337 2 : checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
338 2 : }
339 2 : if meta.HasRangeDelKeys {
340 2 : checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
341 2 : }
342 2 : if meta.HasRangeKeys {
343 2 : checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
344 2 : }
345 2 : return err
346 : }
347 :
348 2 : func spanStartOrNil(s *keyspan.Span) []byte {
349 2 : if s.Empty() {
350 2 : return nil
351 2 : }
352 2 : return s.Start
353 : }
|