Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "sort"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/keyspan"
14 : "github.com/cockroachdb/pebble/internal/manifest"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/pebble/sstable"
18 : )
19 :
20 : // Result stores the result of a compaction - more specifically, the "data" part
21 : // where we use the compaction iterator to write output tables.
22 : type Result struct {
23 : // Err is the result of the compaction. On success, Err is nil and Tables
24 : // stores the output tables. On failure, Err is set and Tables stores the
25 : // tables created so far (and which need to be cleaned up).
26 : Err error
27 : Tables []OutputTable
28 : Stats Stats
29 : }
30 :
31 : // WithError returns a modified Result which has the Err field set.
32 1 : func (r Result) WithError(err error) Result {
33 1 : return Result{
34 1 : Err: errors.CombineErrors(r.Err, err),
35 1 : Tables: r.Tables,
36 1 : Stats: r.Stats,
37 1 : }
38 1 : }
39 :
40 : // OutputTable contains metadata about a table that was created during a compaction.
41 : type OutputTable struct {
42 : CreationTime time.Time
43 : // ObjMeta is metadata for the object backing the table.
44 : ObjMeta objstorage.ObjectMetadata
45 : // WriterMeta is populated once the table is fully written. On compaction
46 : // failure (see Result), WriterMeta might not be set.
47 : WriterMeta sstable.WriterMetadata
48 : }
49 :
50 : // Stats describes stats collected during the compaction.
51 : type Stats struct {
52 : CumulativePinnedKeys uint64
53 : CumulativePinnedSize uint64
54 : CountMissizedDels uint64
55 : }
56 :
57 : // RunnerConfig contains the parameters needed for the Runner.
58 : type RunnerConfig struct {
59 : // CompactionBounds are the bounds containing all the input tables. All output
60 : // tables must fall within these bounds as well.
61 : CompactionBounds base.UserKeyBounds
62 :
63 : // L0SplitKeys is only set for flushes and it contains the flush split keys
64 : // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
65 : // output tables.
66 : L0SplitKeys [][]byte
67 :
68 : // Grandparents are the tables in level+2 that overlap with the files being
69 : // compacted. Used to determine output table boundaries. Do not assume that
70 : // the actual files in the grandparent when this compaction finishes will be
71 : // the same.
72 : Grandparents manifest.LevelSlice
73 :
74 : // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
75 : // allowed for a single output table with the tables in the grandparent level.
76 : MaxGrandparentOverlapBytes uint64
77 :
78 : // TargetOutputFileSize is the desired size of an individual table created
79 : // during compaction. In practice, the sizes can vary between 50%-200% of this
80 : // value.
81 : TargetOutputFileSize uint64
82 : }
83 :
84 : // Runner is a helper for running the "data" part of a compaction (where we use
85 : // the compaction iterator to write output tables).
86 : //
87 : // Sample usage:
88 : //
89 : // r := NewRunner(cfg, iter)
90 : // for r.MoreDataToWrite() {
91 : // objMeta, tw := ... // Create object and table writer.
92 : // r.WriteTable(objMeta, tw)
93 : // }
94 : // result := r.Finish()
95 : type Runner struct {
96 : cmp base.Compare
97 : cfg RunnerConfig
98 : iter *Iter
99 :
100 : tables []OutputTable
101 : // Stores any error encountered.
102 : err error
103 : // Last key/value returned by the compaction iterator.
104 : key *base.InternalKey
105 : value []byte
106 : // Last RANGEDEL span (or portion of it) that was not yet written to a table.
107 : lastRangeDelSpan keyspan.Span
108 : // Last range key span (or portion of it) that was not yet written to a table.
109 : lastRangeKeySpan keyspan.Span
110 : stats Stats
111 : }
112 :
113 : // NewRunner creates a new Runner.
114 1 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
115 1 : r := &Runner{
116 1 : cmp: iter.cmp,
117 1 : cfg: cfg,
118 1 : iter: iter,
119 1 : }
120 1 : r.key, r.value = r.iter.First()
121 1 : return r
122 1 : }
123 :
124 : // MoreDataToWrite returns true if there is more data to be written.
125 1 : func (r *Runner) MoreDataToWrite() bool {
126 1 : if r.err != nil {
127 0 : return false
128 0 : }
129 1 : return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
130 : }
131 :
132 : // WriteTable writes a new output table. This table will be part of
133 : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
134 : //
135 : // WriteTable always closes the Writer.
136 1 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.Writer) {
137 1 : if r.err != nil {
138 0 : panic("error already encountered")
139 : }
140 1 : r.tables = append(r.tables, OutputTable{
141 1 : CreationTime: time.Now(),
142 1 : ObjMeta: objMeta,
143 1 : })
144 1 : splitKey, err := r.writeKeysToTable(tw)
145 1 : err = errors.CombineErrors(err, tw.Close())
146 1 : if err != nil {
147 0 : r.err = err
148 0 : r.key, r.value = nil, nil
149 0 : return
150 0 : }
151 1 : writerMeta, err := tw.Metadata()
152 1 : if err != nil {
153 0 : r.err = err
154 0 : return
155 0 : }
156 1 : if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
157 0 : r.err = err
158 0 : return
159 0 : }
160 1 : r.tables[len(r.tables)-1].WriterMeta = *writerMeta
161 : }
162 :
163 1 : func (r *Runner) writeKeysToTable(tw *sstable.Writer) (splitKey []byte, _ error) {
164 1 : firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
165 1 : if r.key != nil && firstKey == nil {
166 1 : firstKey = r.key.UserKey
167 1 : }
168 1 : if firstKey == nil {
169 0 : return nil, base.AssertionFailedf("no data to write")
170 0 : }
171 1 : splitter := NewOutputSplitter(
172 1 : r.cmp, firstKey, r.TableSplitLimit(firstKey),
173 1 : r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
174 1 : )
175 1 : lastUserKeyFn := func() []byte {
176 1 : return tw.UnsafeLastPointUserKey()
177 1 : }
178 1 : var pinnedKeySize, pinnedValueSize, pinnedCount uint64
179 1 : key, value := r.key, r.value
180 1 : for ; key != nil; key, value = r.iter.Next() {
181 1 : if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
182 1 : break
183 : }
184 :
185 1 : switch key.Kind() {
186 1 : case base.InternalKeyKindRangeDelete:
187 1 : // The previous span (if any) must end at or before this key, since the
188 1 : // spans we receive are non-overlapping.
189 1 : if err := tw.EncodeSpan(&r.lastRangeDelSpan); r.err != nil {
190 0 : return nil, err
191 0 : }
192 1 : r.lastRangeDelSpan.CopyFrom(r.iter.Span())
193 1 : continue
194 :
195 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
196 1 : // The previous span (if any) must end at or before this key, since the
197 1 : // spans we receive are non-overlapping.
198 1 : if err := tw.EncodeSpan(&r.lastRangeKeySpan); err != nil {
199 0 : return nil, err
200 0 : }
201 1 : r.lastRangeKeySpan.CopyFrom(r.iter.Span())
202 1 : continue
203 : }
204 1 : if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
205 0 : return nil, err
206 0 : }
207 1 : if r.iter.SnapshotPinned() {
208 1 : // The kv pair we just added to the sstable was only surfaced by
209 1 : // the compaction iterator because an open snapshot prevented
210 1 : // its elision. Increment the stats.
211 1 : pinnedCount++
212 1 : pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
213 1 : pinnedValueSize += uint64(len(value))
214 1 : }
215 : }
216 1 : r.key, r.value = key, value
217 1 : splitKey = splitter.SplitKey()
218 1 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
219 0 : return nil, err
220 0 : }
221 1 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
222 0 : return nil, err
223 0 : }
224 : // Set internal sstable properties.
225 1 : p := getInternalWriterProperties(tw)
226 1 : // Set the snapshot pinned totals.
227 1 : p.SnapshotPinnedKeys = pinnedCount
228 1 : p.SnapshotPinnedKeySize = pinnedKeySize
229 1 : p.SnapshotPinnedValueSize = pinnedValueSize
230 1 : r.stats.CumulativePinnedKeys += pinnedCount
231 1 : r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
232 1 : return splitKey, nil
233 : }
234 :
235 : // Finish closes the compaction iterator and returns the result of the
236 : // compaction.
237 1 : func (r *Runner) Finish() Result {
238 1 : r.err = errors.CombineErrors(r.err, r.iter.Close())
239 1 : // The compaction iterator keeps track of a count of the number of DELSIZED
240 1 : // keys that encoded an incorrect size.
241 1 : r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
242 1 : return Result{
243 1 : Err: r.err,
244 1 : Tables: r.tables,
245 1 : Stats: r.stats,
246 1 : }
247 1 : }
248 :
249 : // TableSplitLimit returns a hard split limit for an output table that starts at
250 : // startKey (which must be strictly greater than startKey), or nil if there is
251 : // no limit.
252 1 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
253 1 : var limitKey []byte
254 1 :
255 1 : // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
256 1 : // that table can extend without excessively overlapping the grandparent
257 1 : // level. If no limit is needed considering the grandparent, limitKey stays
258 1 : // nil.
259 1 : //
260 1 : // This is done in order to prevent a table at level N from overlapping too
261 1 : // much data at level N+1. We want to avoid such large overlaps because they
262 1 : // translate into large compactions. The current heuristic stops output of a
263 1 : // table if the addition of another key would cause the table to overlap more
264 1 : // than 10x the target file size at level N. See
265 1 : // compaction.maxGrandparentOverlapBytes.
266 1 : iter := r.cfg.Grandparents.Iter()
267 1 : var overlappedBytes uint64
268 1 : f := iter.SeekGE(r.cmp, startKey)
269 1 : // Handle an overlapping table.
270 1 : if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
271 1 : overlappedBytes += f.Size
272 1 : f = iter.Next()
273 1 : }
274 1 : for ; f != nil; f = iter.Next() {
275 1 : overlappedBytes += f.Size
276 1 : if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
277 1 : limitKey = f.Smallest.UserKey
278 1 : break
279 : }
280 : }
281 :
282 1 : if len(r.cfg.L0SplitKeys) != 0 {
283 1 : // Find the first split key that is greater than startKey.
284 1 : index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
285 1 : return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
286 1 : })
287 1 : if index < len(r.cfg.L0SplitKeys) {
288 1 : limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
289 1 : }
290 : }
291 :
292 1 : return limitKey
293 : }
294 :
295 : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
296 : // table that was just finished. splitKey is the key where the table must have
297 : // ended (or nil).
298 1 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
299 1 : if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
300 0 : return base.AssertionFailedf("output table has no keys")
301 0 : }
302 :
303 1 : var err error
304 1 : checkBounds := func(smallest, largest base.InternalKey, description string) {
305 1 : bounds := base.UserKeyBoundsFromInternal(smallest, largest)
306 1 : if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
307 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
308 0 : "output table %s bounds %s extend beyond compaction bounds %s",
309 0 : description, bounds, r.cfg.CompactionBounds,
310 0 : ))
311 0 : }
312 1 : if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
313 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
314 0 : "output table %s bounds %s extend beyond split key %s",
315 0 : description, bounds, splitKey,
316 0 : ))
317 0 : }
318 : }
319 :
320 1 : if meta.HasPointKeys {
321 1 : checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
322 1 : }
323 1 : if meta.HasRangeDelKeys {
324 1 : checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
325 1 : }
326 1 : if meta.HasRangeKeys {
327 1 : checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
328 1 : }
329 1 : return err
330 : }
331 :
332 : // getInternalWriterProperties accesses a private variable (in the
333 : // internal/private package) initialized by the sstable Writer. This indirection
334 : // is necessary to ensure non-Pebble users constructing sstables for ingestion
335 : // are unable to set internal-only properties.
336 : var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)
337 :
338 1 : func spanStartOrNil(s *keyspan.Span) []byte {
339 1 : if s.Empty() {
340 1 : return nil
341 1 : }
342 1 : return s.Start
343 : }
|