Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package compact
6 :
7 : import (
8 : "sort"
9 : "time"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/keyspan"
14 : "github.com/cockroachdb/pebble/internal/manifest"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : "github.com/cockroachdb/pebble/sstable"
17 : )
18 :
19 : // Result stores the result of a compaction - more specifically, the "data" part
20 : // where we use the compaction iterator to write output tables.
21 : type Result struct {
22 : // Err is the result of the compaction. On success, Err is nil and Tables
23 : // stores the output tables. On failure, Err is set and Tables stores the
24 : // tables created so far (and which need to be cleaned up).
25 : Err error
26 : Tables []OutputTable
27 : Stats Stats
28 : }
29 :
30 : // WithError returns a modified Result which has the Err field set.
31 1 : func (r Result) WithError(err error) Result {
32 1 : return Result{
33 1 : Err: errors.CombineErrors(r.Err, err),
34 1 : Tables: r.Tables,
35 1 : Stats: r.Stats,
36 1 : }
37 1 : }
38 :
39 : // OutputTable contains metadata about a table that was created during a compaction.
40 : type OutputTable struct {
41 : CreationTime time.Time
42 : // ObjMeta is metadata for the object backing the table.
43 : ObjMeta objstorage.ObjectMetadata
44 : // WriterMeta is populated once the table is fully written. On compaction
45 : // failure (see Result), WriterMeta might not be set.
46 : WriterMeta sstable.WriterMetadata
47 : }
48 :
49 : // Stats describes stats collected during the compaction.
50 : type Stats struct {
51 : CumulativePinnedKeys uint64
52 : CumulativePinnedSize uint64
53 : CountMissizedDels uint64
54 : }
55 :
56 : // RunnerConfig contains the parameters needed for the Runner.
57 : type RunnerConfig struct {
58 : // CompactionBounds are the bounds containing all the input tables. All output
59 : // tables must fall within these bounds as well.
60 : CompactionBounds base.UserKeyBounds
61 :
62 : // L0SplitKeys is only set for flushes and it contains the flush split keys
63 : // (see L0Sublevels.FlushSplitKeys). These are split points enforced for the
64 : // output tables.
65 : L0SplitKeys [][]byte
66 :
67 : // Grandparents are the tables in level+2 that overlap with the files being
68 : // compacted. Used to determine output table boundaries. Do not assume that
69 : // the actual files in the grandparent when this compaction finishes will be
70 : // the same.
71 : Grandparents manifest.LevelSlice
72 :
73 : // MaxGrandparentOverlapBytes is the maximum number of bytes of overlap
74 : // allowed for a single output table with the tables in the grandparent level.
75 : MaxGrandparentOverlapBytes uint64
76 :
77 : // TargetOutputFileSize is the desired size of an individual table created
78 : // during compaction. In practice, the sizes can vary between 50%-200% of this
79 : // value.
80 : TargetOutputFileSize uint64
81 : }
82 :
83 : // Runner is a helper for running the "data" part of a compaction (where we use
84 : // the compaction iterator to write output tables).
85 : //
86 : // Sample usage:
87 : //
88 : // r := NewRunner(cfg, iter)
89 : // for r.MoreDataToWrite() {
90 : // objMeta, tw := ... // Create object and table writer.
91 : // r.WriteTable(objMeta, tw)
92 : // }
93 : // result := r.Finish()
94 : type Runner struct {
95 : cmp base.Compare
96 : cfg RunnerConfig
97 : iter *Iter
98 :
99 : tables []OutputTable
100 : // Stores any error encountered.
101 : err error
102 : // Last key/value returned by the compaction iterator.
103 : key *base.InternalKey
104 : value []byte
105 : // Last RANGEDEL span (or portion of it) that was not yet written to a table.
106 : lastRangeDelSpan keyspan.Span
107 : // Last range key span (or portion of it) that was not yet written to a table.
108 : lastRangeKeySpan keyspan.Span
109 : stats Stats
110 : }
111 :
112 : // NewRunner creates a new Runner.
113 1 : func NewRunner(cfg RunnerConfig, iter *Iter) *Runner {
114 1 : r := &Runner{
115 1 : cmp: iter.cmp,
116 1 : cfg: cfg,
117 1 : iter: iter,
118 1 : }
119 1 : r.key, r.value = r.iter.First()
120 1 : return r
121 1 : }
122 :
123 : // MoreDataToWrite returns true if there is more data to be written.
124 1 : func (r *Runner) MoreDataToWrite() bool {
125 1 : if r.err != nil {
126 1 : return false
127 1 : }
128 1 : return r.key != nil || !r.lastRangeDelSpan.Empty() || !r.lastRangeKeySpan.Empty()
129 : }
130 :
131 : // WriteTable writes a new output table. This table will be part of
132 : // Result.Tables. Should only be called if MoreDataToWrite() returned true.
133 : //
134 : // WriteTable always closes the Writer.
135 1 : func (r *Runner) WriteTable(objMeta objstorage.ObjectMetadata, tw *sstable.RawWriter) {
136 1 : if r.err != nil {
137 0 : panic("error already encountered")
138 : }
139 1 : r.tables = append(r.tables, OutputTable{
140 1 : CreationTime: time.Now(),
141 1 : ObjMeta: objMeta,
142 1 : })
143 1 : splitKey, err := r.writeKeysToTable(tw)
144 1 : err = errors.CombineErrors(err, tw.Close())
145 1 : if err != nil {
146 1 : r.err = err
147 1 : r.key, r.value = nil, nil
148 1 : return
149 1 : }
150 1 : writerMeta, err := tw.Metadata()
151 1 : if err != nil {
152 0 : r.err = err
153 0 : return
154 0 : }
155 1 : if err := r.validateWriterMeta(writerMeta, splitKey); err != nil {
156 0 : r.err = err
157 0 : return
158 0 : }
159 1 : r.tables[len(r.tables)-1].WriterMeta = *writerMeta
160 : }
161 :
162 1 : func (r *Runner) writeKeysToTable(tw *sstable.RawWriter) (splitKey []byte, _ error) {
163 1 : firstKey := base.MinUserKey(r.cmp, spanStartOrNil(&r.lastRangeDelSpan), spanStartOrNil(&r.lastRangeKeySpan))
164 1 : if r.key != nil && firstKey == nil {
165 1 : firstKey = r.key.UserKey
166 1 : }
167 1 : if firstKey == nil {
168 0 : return nil, base.AssertionFailedf("no data to write")
169 0 : }
170 1 : splitter := NewOutputSplitter(
171 1 : r.cmp, firstKey, r.TableSplitLimit(firstKey),
172 1 : r.cfg.TargetOutputFileSize, r.cfg.Grandparents.Iter(), r.iter.Frontiers(),
173 1 : )
174 1 : lastUserKeyFn := func() []byte {
175 1 : return tw.UnsafeLastPointUserKey()
176 1 : }
177 1 : var pinnedKeySize, pinnedValueSize, pinnedCount uint64
178 1 : key, value := r.key, r.value
179 1 : for ; key != nil; key, value = r.iter.Next() {
180 1 : if splitter.ShouldSplitBefore(key.UserKey, tw.EstimatedSize(), lastUserKeyFn) {
181 1 : break
182 : }
183 :
184 1 : switch key.Kind() {
185 1 : case base.InternalKeyKindRangeDelete:
186 1 : // The previous span (if any) must end at or before this key, since the
187 1 : // spans we receive are non-overlapping.
188 1 : if err := tw.EncodeSpan(r.lastRangeDelSpan); r.err != nil {
189 0 : return nil, err
190 0 : }
191 1 : r.lastRangeDelSpan.CopyFrom(r.iter.Span())
192 1 : continue
193 :
194 1 : case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
195 1 : // The previous span (if any) must end at or before this key, since the
196 1 : // spans we receive are non-overlapping.
197 1 : if err := tw.EncodeSpan(r.lastRangeKeySpan); err != nil {
198 0 : return nil, err
199 0 : }
200 1 : r.lastRangeKeySpan.CopyFrom(r.iter.Span())
201 1 : continue
202 : }
203 1 : if err := tw.AddWithForceObsolete(*key, value, r.iter.ForceObsoleteDueToRangeDel()); err != nil {
204 0 : return nil, err
205 0 : }
206 1 : if r.iter.SnapshotPinned() {
207 1 : // The kv pair we just added to the sstable was only surfaced by
208 1 : // the compaction iterator because an open snapshot prevented
209 1 : // its elision. Increment the stats.
210 1 : pinnedCount++
211 1 : pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
212 1 : pinnedValueSize += uint64(len(value))
213 1 : }
214 : }
215 1 : r.key, r.value = key, value
216 1 : splitKey = splitter.SplitKey()
217 1 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeDelSpan, splitKey, tw); err != nil {
218 0 : return nil, err
219 0 : }
220 1 : if err := SplitAndEncodeSpan(r.cmp, &r.lastRangeKeySpan, splitKey, tw); err != nil {
221 0 : return nil, err
222 0 : }
223 : // Set internal sstable properties.
224 1 : tw.SetSnapshotPinnedProperties(pinnedCount, pinnedKeySize, pinnedValueSize)
225 1 : r.stats.CumulativePinnedKeys += pinnedCount
226 1 : r.stats.CumulativePinnedSize += pinnedKeySize + pinnedValueSize
227 1 : return splitKey, nil
228 : }
229 :
230 : // Finish closes the compaction iterator and returns the result of the
231 : // compaction.
232 1 : func (r *Runner) Finish() Result {
233 1 : r.err = errors.CombineErrors(r.err, r.iter.Close())
234 1 : // The compaction iterator keeps track of a count of the number of DELSIZED
235 1 : // keys that encoded an incorrect size.
236 1 : r.stats.CountMissizedDels = r.iter.Stats().CountMissizedDels
237 1 : return Result{
238 1 : Err: r.err,
239 1 : Tables: r.tables,
240 1 : Stats: r.stats,
241 1 : }
242 1 : }
243 :
244 : // TableSplitLimit returns a hard split limit for an output table that starts at
245 : // startKey (which must be strictly greater than startKey), or nil if there is
246 : // no limit.
247 1 : func (r *Runner) TableSplitLimit(startKey []byte) []byte {
248 1 : var limitKey []byte
249 1 :
250 1 : // Enforce the MaxGrandparentOverlapBytes limit: find the user key to which
251 1 : // that table can extend without excessively overlapping the grandparent
252 1 : // level. If no limit is needed considering the grandparent, limitKey stays
253 1 : // nil.
254 1 : //
255 1 : // This is done in order to prevent a table at level N from overlapping too
256 1 : // much data at level N+1. We want to avoid such large overlaps because they
257 1 : // translate into large compactions. The current heuristic stops output of a
258 1 : // table if the addition of another key would cause the table to overlap more
259 1 : // than 10x the target file size at level N. See
260 1 : // compaction.maxGrandparentOverlapBytes.
261 1 : iter := r.cfg.Grandparents.Iter()
262 1 : var overlappedBytes uint64
263 1 : f := iter.SeekGE(r.cmp, startKey)
264 1 : // Handle an overlapping table.
265 1 : if f != nil && r.cmp(f.Smallest.UserKey, startKey) <= 0 {
266 1 : overlappedBytes += f.Size
267 1 : f = iter.Next()
268 1 : }
269 1 : for ; f != nil; f = iter.Next() {
270 1 : overlappedBytes += f.Size
271 1 : if overlappedBytes > r.cfg.MaxGrandparentOverlapBytes {
272 1 : limitKey = f.Smallest.UserKey
273 1 : break
274 : }
275 : }
276 :
277 1 : if len(r.cfg.L0SplitKeys) != 0 {
278 1 : // Find the first split key that is greater than startKey.
279 1 : index := sort.Search(len(r.cfg.L0SplitKeys), func(i int) bool {
280 1 : return r.cmp(r.cfg.L0SplitKeys[i], startKey) > 0
281 1 : })
282 1 : if index < len(r.cfg.L0SplitKeys) {
283 1 : limitKey = base.MinUserKey(r.cmp, limitKey, r.cfg.L0SplitKeys[index])
284 1 : }
285 : }
286 :
287 1 : return limitKey
288 : }
289 :
290 : // validateWriterMeta runs some sanity cehcks on the WriterMetadata on an output
291 : // table that was just finished. splitKey is the key where the table must have
292 : // ended (or nil).
293 1 : func (r *Runner) validateWriterMeta(meta *sstable.WriterMetadata, splitKey []byte) error {
294 1 : if !meta.HasPointKeys && !meta.HasRangeDelKeys && !meta.HasRangeKeys {
295 0 : return base.AssertionFailedf("output table has no keys")
296 0 : }
297 :
298 1 : var err error
299 1 : checkBounds := func(smallest, largest base.InternalKey, description string) {
300 1 : bounds := base.UserKeyBoundsFromInternal(smallest, largest)
301 1 : if !r.cfg.CompactionBounds.ContainsBounds(r.cmp, &bounds) {
302 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
303 0 : "output table %s bounds %s extend beyond compaction bounds %s",
304 0 : description, bounds, r.cfg.CompactionBounds,
305 0 : ))
306 0 : }
307 1 : if splitKey != nil && bounds.End.IsUpperBoundFor(r.cmp, splitKey) {
308 0 : err = errors.CombineErrors(err, base.AssertionFailedf(
309 0 : "output table %s bounds %s extend beyond split key %s",
310 0 : description, bounds, splitKey,
311 0 : ))
312 0 : }
313 : }
314 :
315 1 : if meta.HasPointKeys {
316 1 : checkBounds(meta.SmallestPoint, meta.LargestPoint, "point key")
317 1 : }
318 1 : if meta.HasRangeDelKeys {
319 1 : checkBounds(meta.SmallestRangeDel, meta.LargestRangeDel, "range del")
320 1 : }
321 1 : if meta.HasRangeKeys {
322 1 : checkBounds(meta.SmallestRangeKey, meta.LargestRangeKey, "range key")
323 1 : }
324 1 : return err
325 : }
326 :
327 1 : func spanStartOrNil(s *keyspan.Span) []byte {
328 1 : if s.Empty() {
329 1 : return nil
330 1 : }
331 1 : return s.Start
332 : }
|