Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sort"
12 : "time"
13 :
14 : "github.com/cockroachdb/crlib/crtime"
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/cache"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/manifest"
21 : "github.com/cockroachdb/pebble/internal/overlap"
22 : "github.com/cockroachdb/pebble/internal/sstableinternal"
23 : "github.com/cockroachdb/pebble/objstorage"
24 : "github.com/cockroachdb/pebble/objstorage/remote"
25 : "github.com/cockroachdb/pebble/sstable"
26 : "github.com/cockroachdb/pebble/sstable/block"
27 : )
28 :
29 2 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
30 2 : c := userCmp(a.UserKey, b.UserKey)
31 2 : if c != 0 {
32 2 : return c
33 2 : }
34 2 : if a.IsExclusiveSentinel() {
35 2 : if !b.IsExclusiveSentinel() {
36 2 : return -1
37 2 : }
38 2 : } else if b.IsExclusiveSentinel() {
39 2 : return +1
40 2 : }
41 2 : return 0
42 : }
43 :
44 2 : func ingestValidateKey(opts *Options, key *InternalKey) error {
45 2 : if key.Kind() == InternalKeyKindInvalid {
46 1 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
47 1 : key.Pretty(opts.Comparer.FormatKey))
48 1 : }
49 2 : if key.SeqNum() != 0 {
50 1 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
51 1 : key.Pretty(opts.Comparer.FormatKey))
52 1 : }
53 2 : if err := opts.Comparer.ValidateKey.Validate(key.UserKey); err != nil {
54 0 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s, %w",
55 0 : key.Pretty(opts.Comparer.FormatKey), err)
56 0 : }
57 2 : return nil
58 : }
59 :
60 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
61 : // or shared by another node.
62 : func ingestSynthesizeShared(
63 : opts *Options, sm SharedSSTMeta, tableNum base.TableNum,
64 2 : ) (*manifest.TableMetadata, error) {
65 2 : if sm.Size == 0 {
66 0 : // Disallow 0 file sizes
67 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
68 0 : }
69 : // Don't load table stats. Doing a round trip to shared storage, one SST
70 : // at a time is not worth it as it slows down ingestion.
71 2 : meta := &manifest.TableMetadata{
72 2 : TableNum: tableNum,
73 2 : CreationTime: time.Now().Unix(),
74 2 : Virtual: true,
75 2 : Size: sm.Size,
76 2 : }
77 2 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
78 2 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
79 2 : //
80 2 : // NB: We create new internal keys and pass them into ExtendPointKeyBounds
81 2 : // so that we can sub a zero sequence number into the bounds. We can set
82 2 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
83 2 : // anyway. However, we do need to use the same sequence number across all
84 2 : // bound keys at this step so that we end up with bounds that are consistent
85 2 : // across point/range keys.
86 2 : //
87 2 : // Because of the sequence number rewriting, we cannot use the Kind of
88 2 : // sm.SmallestPointKey. For example, the original SST might start with
89 2 : // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
90 2 : // rewriting the sequence numbers, these keys become a.SET.100 and
91 2 : // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
92 2 : // correct bound, we just use the maximum key kind (which sorts first).
93 2 : // Similarly, we use the smallest key kind for the largest key.
94 2 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable)
95 2 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
96 2 : if sm.LargestPointKey.IsExclusiveSentinel() {
97 2 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
98 2 : }
99 2 : if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
100 2 : smallestPointKey.Trailer < largestPointKey.Trailer {
101 0 : // We get kinds from the sender, however we substitute our own sequence
102 0 : // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
103 0 : // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
104 0 : // kinds need to be reversed now because DelSized > Set.
105 0 : smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
106 0 : }
107 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
108 : }
109 2 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
110 2 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
111 2 : //
112 2 : // See comment above on why we use a zero sequence number and these key
113 2 : // kinds here.
114 2 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
115 2 : largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
116 2 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
117 2 : }
118 :
119 : // For simplicity, we use the same number for both the FileNum and the
120 : // DiskFileNum (even though this is a virtual sstable). Pass the underlying
121 : // TableBacking's size to the same size as the virtualized view of the sstable.
122 : // This ensures that we don't over-prioritize this sstable for compaction just
123 : // yet, as we do not have a clear sense of what parts of this sstable are
124 : // referenced by other nodes.
125 2 : meta.InitVirtualBacking(base.DiskFileNum(tableNum), sm.Size)
126 2 :
127 2 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
128 0 : return nil, err
129 0 : }
130 2 : return meta, nil
131 : }
132 :
133 : // ingestLoad1External loads the fileMetadata for one external sstable.
134 : // Sequence number and target level calculation happens during prepare/apply.
135 : func ingestLoad1External(
136 : opts *Options, e ExternalFile, tableNum base.TableNum,
137 2 : ) (*manifest.TableMetadata, error) {
138 2 : if e.Size == 0 {
139 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
140 0 : }
141 2 : if !e.HasRangeKey && !e.HasPointKey {
142 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
143 0 : }
144 :
145 2 : if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
146 1 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
147 1 : }
148 2 : if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
149 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
150 0 : }
151 2 : if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
152 1 : return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
153 1 : }
154 2 : if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
155 1 : return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
156 1 : }
157 :
158 : // Don't load table stats. Doing a round trip to shared storage, one SST
159 : // at a time is not worth it as it slows down ingestion.
160 2 : meta := &manifest.TableMetadata{
161 2 : TableNum: tableNum,
162 2 : CreationTime: time.Now().Unix(),
163 2 : Size: e.Size,
164 2 : Virtual: true,
165 2 : }
166 2 : // In the name of keeping this ingestion as fast as possible, we avoid *all*
167 2 : // existence checks and synthesize a table metadata with smallest/largest
168 2 : // keys that overlap whatever the passed-in span was.
169 2 : smallestCopy := slices.Clone(e.StartKey)
170 2 : largestCopy := slices.Clone(e.EndKey)
171 2 : if e.HasPointKey {
172 2 : // Sequence numbers are updated later by
173 2 : // ingestUpdateSeqNum, applying a squence number that
174 2 : // is applied to all keys in the sstable.
175 2 : if e.EndKeyIsInclusive {
176 1 : meta.ExtendPointKeyBounds(
177 1 : opts.Comparer.Compare,
178 1 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
179 1 : base.MakeInternalKey(largestCopy, 0, 0))
180 2 : } else {
181 2 : meta.ExtendPointKeyBounds(
182 2 : opts.Comparer.Compare,
183 2 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
184 2 : base.MakeRangeDeleteSentinelKey(largestCopy))
185 2 : }
186 : }
187 2 : if e.HasRangeKey {
188 2 : meta.ExtendRangeKeyBounds(
189 2 : opts.Comparer.Compare,
190 2 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
191 2 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
192 2 : )
193 2 : }
194 :
195 2 : meta.SyntheticPrefixAndSuffix = sstable.MakeSyntheticPrefixAndSuffix(e.SyntheticPrefix, e.SyntheticSuffix)
196 2 :
197 2 : return meta, nil
198 : }
199 :
200 : type rangeKeyIngestValidator struct {
201 : // lastRangeKey is the last range key seen in the previous file.
202 : lastRangeKey keyspan.Span
203 : // comparer, if unset, disables range key validation.
204 : comparer *base.Comparer
205 : }
206 :
207 2 : func disableRangeKeyChecks() rangeKeyIngestValidator {
208 2 : return rangeKeyIngestValidator{}
209 2 : }
210 :
211 : func validateSuffixedBoundaries(
212 : cmp *base.Comparer, lastRangeKey keyspan.Span,
213 2 : ) rangeKeyIngestValidator {
214 2 : return rangeKeyIngestValidator{
215 2 : lastRangeKey: lastRangeKey,
216 2 : comparer: cmp,
217 2 : }
218 2 : }
219 :
220 : // Validate valides if the stored state of this rangeKeyIngestValidator allows for
221 : // a file with the given nextFileSmallestKey to be ingested, such that the stored
222 : // last file's largest range key defragments cleanly with the next file's smallest
223 : // key if it was suffixed. If a value of nil is passed in for nextFileSmallestKey,
224 : // that denotes the next file does not have a range key or there is no next file.
225 2 : func (r *rangeKeyIngestValidator) Validate(nextFileSmallestKey *keyspan.Span) error {
226 2 : if r.comparer == nil {
227 2 : return nil
228 2 : }
229 2 : if r.lastRangeKey.Valid() {
230 2 : if r.comparer.Split.HasSuffix(r.lastRangeKey.End) {
231 1 : if nextFileSmallestKey == nil || !r.comparer.Equal(r.lastRangeKey.End, nextFileSmallestKey.Start) {
232 1 : // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
233 1 : return errors.AssertionFailedf("pebble: ingest sstable has suffixed largest range key that does not match the start key of the next sstable: %s",
234 1 : r.comparer.FormatKey(r.lastRangeKey.End))
235 1 : } else if !keyspan.DefragmentInternal.ShouldDefragment(r.comparer.CompareRangeSuffixes, &r.lastRangeKey, nextFileSmallestKey) {
236 0 : // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
237 0 : return errors.AssertionFailedf("pebble: ingest sstable has suffixed range key that won't defragment with next sstable: %s",
238 0 : r.comparer.FormatKey(r.lastRangeKey.End))
239 0 : }
240 : }
241 2 : } else if nextFileSmallestKey != nil && r.comparer.Split.HasSuffix(nextFileSmallestKey.Start) {
242 0 : return errors.Newf("pebble: ingest sstable has suffixed range key start that won't defragment: %s",
243 0 : r.comparer.FormatKey(nextFileSmallestKey.Start))
244 0 : }
245 2 : return nil
246 : }
247 :
248 : // ingestLoad1 creates the TableMetadata for one file. This file will be owned
249 : // by this store.
250 : //
251 : // prevLastRangeKey is the last range key from the previous file. It is used to
252 : // ensure that the range keys defragment cleanly across files. These checks
253 : // are disabled if disableRangeKeyChecks is true.
254 : func ingestLoad1(
255 : ctx context.Context,
256 : opts *Options,
257 : fmv FormatMajorVersion,
258 : readable objstorage.Readable,
259 : cacheHandle *cache.Handle,
260 : tableNum base.TableNum,
261 : rangeKeyValidator rangeKeyIngestValidator,
262 : ) (
263 : meta *manifest.TableMetadata,
264 : lastRangeKey keyspan.Span,
265 : blockReadStats base.BlockReadStats,
266 : err error,
267 2 : ) {
268 2 : o := opts.MakeReaderOptions()
269 2 : o.CacheOpts = sstableinternal.CacheOptions{
270 2 : CacheHandle: cacheHandle,
271 2 : FileNum: base.PhysicalTableDiskFileNum(tableNum),
272 2 : }
273 2 : r, err := sstable.NewReader(ctx, readable, o)
274 2 : if err != nil {
275 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, errors.CombineErrors(err, readable.Close())
276 1 : }
277 2 : defer func() { _ = r.Close() }()
278 :
279 : // Avoid ingesting tables with format versions this DB doesn't support.
280 2 : tf, err := r.TableFormat()
281 2 : if err != nil {
282 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
283 0 : }
284 2 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
285 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
286 1 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
287 1 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
288 1 : )
289 1 : }
290 :
291 2 : if r.Attributes.Has(sstable.AttributeBlobValues) {
292 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
293 1 : "pebble: ingesting tables with blob references is not supported")
294 1 : }
295 :
296 2 : props, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
297 2 : if err != nil {
298 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
299 1 : }
300 :
301 : // If this is a columnar block, read key schema name from properties block.
302 2 : if tf.BlockColumnar() {
303 2 : if _, ok := opts.KeySchemas[props.KeySchemaName]; !ok {
304 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, errors.Newf(
305 0 : "pebble: table uses key schema %q unknown to the database",
306 0 : props.KeySchemaName)
307 0 : }
308 : }
309 :
310 2 : meta = &manifest.TableMetadata{}
311 2 : meta.TableNum = tableNum
312 2 : meta.Size = max(uint64(readable.Size()), 1)
313 2 : meta.CreationTime = time.Now().Unix()
314 2 : meta.InitPhysicalBacking()
315 2 :
316 2 : // Avoid loading into the file cache for collecting stats if we
317 2 : // don't need to. If there are no range deletions, we have all the
318 2 : // information to compute the stats here.
319 2 : //
320 2 : // This is helpful in tests for avoiding awkwardness around deletion of
321 2 : // ingested files from MemFS. MemFS implements the Windows semantics of
322 2 : // disallowing removal of an open file. Under MemFS, if we don't populate
323 2 : // meta.Stats here, the file will be loaded into the file cache for
324 2 : // calculating stats before we can remove the original link.
325 2 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &props)
326 2 :
327 2 : var iterStats base.InternalIteratorStats
328 2 : env := sstable.ReadEnv{
329 2 : Block: block.ReadEnv{
330 2 : Stats: &iterStats,
331 2 : },
332 2 : }
333 2 : {
334 2 : iterOpts := sstable.IterOptions{
335 2 : Lower: nil,
336 2 : Upper: nil,
337 2 : Transforms: sstable.NoTransforms,
338 2 : Filterer: nil,
339 2 : FilterBlockSizeLimit: sstable.AlwaysUseFilterBlock,
340 2 : Env: env,
341 2 : ReaderProvider: sstable.MakeTrivialReaderProvider(r),
342 2 : BlobContext: sstable.AssertNoBlobHandles,
343 2 : }
344 2 : iter, err := r.NewPointIter(ctx, iterOpts)
345 2 : if err != nil {
346 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
347 0 : }
348 2 : defer func() { _ = iter.Close() }()
349 2 : var smallest InternalKey
350 2 : if kv := iter.First(); kv != nil {
351 2 : if err := ingestValidateKey(opts, &kv.K); err != nil {
352 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
353 1 : }
354 2 : smallest = kv.K.Clone()
355 : }
356 2 : if err := iter.Error(); err != nil {
357 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
358 1 : }
359 2 : if kv := iter.Last(); kv != nil {
360 2 : if err := ingestValidateKey(opts, &kv.K); err != nil {
361 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
362 0 : }
363 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
364 : }
365 2 : if err := iter.Error(); err != nil {
366 1 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
367 1 : }
368 : }
369 :
370 2 : iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, env)
371 2 : if err != nil {
372 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
373 0 : }
374 2 : if iter != nil {
375 2 : defer iter.Close()
376 2 : var smallest InternalKey
377 2 : if s, err := iter.First(); err != nil {
378 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
379 2 : } else if s != nil {
380 2 : key := s.SmallestKey()
381 2 : if err := ingestValidateKey(opts, &key); err != nil {
382 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
383 0 : }
384 2 : smallest = key.Clone()
385 : }
386 2 : if s, err := iter.Last(); err != nil {
387 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
388 2 : } else if s != nil {
389 2 : k := s.SmallestKey()
390 2 : if err := ingestValidateKey(opts, &k); err != nil {
391 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
392 0 : }
393 2 : largest := s.LargestKey().Clone()
394 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
395 : }
396 : }
397 :
398 : // Update the range-key bounds for the table.
399 2 : {
400 2 : iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, env)
401 2 : if err != nil {
402 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
403 0 : }
404 2 : if iter != nil {
405 2 : defer iter.Close()
406 2 : var smallest InternalKey
407 2 : if s, err := iter.First(); err != nil {
408 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
409 2 : } else if s != nil {
410 2 : key := s.SmallestKey()
411 2 : if err := ingestValidateKey(opts, &key); err != nil {
412 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
413 0 : }
414 2 : smallest = key.Clone()
415 2 : // Range keys need some additional validation as we need to ensure they
416 2 : // defragment cleanly with the lastRangeKey from the previous file.
417 2 : if err := rangeKeyValidator.Validate(s); err != nil {
418 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
419 0 : }
420 : }
421 2 : lastRangeKey = keyspan.Span{}
422 2 : if s, err := iter.Last(); err != nil {
423 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
424 2 : } else if s != nil {
425 2 : k := s.SmallestKey()
426 2 : if err := ingestValidateKey(opts, &k); err != nil {
427 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
428 0 : }
429 : // As range keys are fragmented, the end key of the last range key in
430 : // the table provides the upper bound for the table.
431 2 : largest := s.LargestKey().Clone()
432 2 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
433 2 : lastRangeKey = s.Clone()
434 0 : } else {
435 0 : // s == nil.
436 0 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
437 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
438 0 : }
439 : }
440 2 : } else {
441 2 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
442 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
443 0 : }
444 2 : lastRangeKey = keyspan.Span{}
445 : }
446 : }
447 :
448 2 : if !meta.HasPointKeys && !meta.HasRangeKeys {
449 2 : return nil, keyspan.Span{}, base.BlockReadStats{}, nil
450 2 : }
451 :
452 : // Sanity check that the various bounds on the file were set consistently.
453 2 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
454 0 : return nil, keyspan.Span{}, base.BlockReadStats{}, err
455 0 : }
456 :
457 2 : return meta, lastRangeKey, iterStats.TotalBlockReads(), nil
458 : }
459 :
460 : type ingestLoadResult struct {
461 : local []ingestLocalMeta
462 : shared []ingestSharedMeta
463 : external []ingestExternalMeta
464 :
465 : externalFilesHaveLevel bool
466 : blockReadStats base.BlockReadStats
467 : }
468 :
469 : type ingestLocalMeta struct {
470 : *manifest.TableMetadata
471 : path string
472 : }
473 :
474 : type ingestSharedMeta struct {
475 : *manifest.TableMetadata
476 : shared SharedSSTMeta
477 : }
478 :
479 : type ingestExternalMeta struct {
480 : *manifest.TableMetadata
481 : external ExternalFile
482 : // usedExistingBacking is true if the external file is reusing a backing
483 : // that existed before this ingestion. In this case, we called
484 : // VirtualBackings.Protect() on that backing; we will need to call
485 : // Unprotect() after the ingestion.
486 : usedExistingBacking bool
487 : }
488 :
489 2 : func (r *ingestLoadResult) fileCount() int {
490 2 : return len(r.local) + len(r.shared) + len(r.external)
491 2 : }
492 :
493 : func ingestLoad(
494 : ctx context.Context,
495 : opts *Options,
496 : fmv FormatMajorVersion,
497 : paths []string,
498 : shared []SharedSSTMeta,
499 : external []ExternalFile,
500 : cacheHandle *cache.Handle,
501 : pending []base.TableNum,
502 2 : ) (ingestLoadResult, error) {
503 2 : localFileNums := pending[:len(paths)]
504 2 : sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
505 2 : externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
506 2 :
507 2 : var result ingestLoadResult
508 2 : result.local = make([]ingestLocalMeta, 0, len(paths))
509 2 : var lastRangeKey keyspan.Span
510 2 : var blockReadStats base.BlockReadStats
511 2 : // NB: we disable range key boundary assertions if we have shared or external files
512 2 : // present in this ingestion. This is because a suffixed range key in a local file
513 2 : // can possibly defragment with a suffixed range key in a shared or external file.
514 2 : // We also disable range key boundary assertions if we have CreateOnShared set to
515 2 : // true, as that means we could have suffixed RangeKeyDels or Unsets in the local
516 2 : // files that won't ever be surfaced, even if there are no shared or external files
517 2 : // in the ingestion.
518 2 : shouldDisableRangeKeyChecks := len(shared) > 0 || len(external) > 0 || opts.Experimental.CreateOnShared != remote.CreateOnSharedNone
519 2 : for i := range paths {
520 2 : f, err := opts.FS.Open(paths[i])
521 2 : if err != nil {
522 1 : return ingestLoadResult{}, err
523 1 : }
524 :
525 2 : readable, err := sstable.NewSimpleReadable(f)
526 2 : if err != nil {
527 1 : return ingestLoadResult{}, err
528 1 : }
529 2 : var m *manifest.TableMetadata
530 2 : rangeKeyValidator := disableRangeKeyChecks()
531 2 : if !shouldDisableRangeKeyChecks {
532 2 : rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
533 2 : }
534 2 : m, lastRangeKey, blockReadStats, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
535 2 : if err != nil {
536 1 : return ingestLoadResult{}, err
537 1 : }
538 2 : if m != nil {
539 2 : result.local = append(result.local, ingestLocalMeta{
540 2 : TableMetadata: m,
541 2 : path: paths[i],
542 2 : })
543 2 : result.blockReadStats = blockReadStats
544 2 : }
545 : }
546 :
547 2 : if !shouldDisableRangeKeyChecks {
548 2 : rangeKeyValidator := validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
549 2 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
550 1 : return ingestLoadResult{}, err
551 1 : }
552 : }
553 :
554 : // Sort the shared files according to level.
555 2 : sort.Sort(sharedByLevel(shared))
556 2 :
557 2 : result.shared = make([]ingestSharedMeta, 0, len(shared))
558 2 : for i := range shared {
559 2 : m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
560 2 : if err != nil {
561 0 : return ingestLoadResult{}, err
562 0 : }
563 2 : if shared[i].Level < sharedLevelsStart {
564 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
565 0 : }
566 2 : result.shared = append(result.shared, ingestSharedMeta{
567 2 : TableMetadata: m,
568 2 : shared: shared[i],
569 2 : })
570 : }
571 2 : result.external = make([]ingestExternalMeta, 0, len(external))
572 2 : for i := range external {
573 2 : m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
574 2 : if err != nil {
575 1 : return ingestLoadResult{}, err
576 1 : }
577 2 : result.external = append(result.external, ingestExternalMeta{
578 2 : TableMetadata: m,
579 2 : external: external[i],
580 2 : })
581 2 : if external[i].Level > 0 {
582 1 : if i != 0 && !result.externalFilesHaveLevel {
583 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
584 0 : }
585 1 : result.externalFilesHaveLevel = true
586 2 : } else if result.externalFilesHaveLevel {
587 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
588 0 : }
589 : }
590 2 : return result, nil
591 : }
592 :
593 2 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
594 2 : // Verify that all the shared files (i.e. files in sharedMeta)
595 2 : // fit within the exciseSpan.
596 2 : for _, f := range lr.shared {
597 2 : if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
598 0 : return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
599 0 : }
600 : }
601 :
602 2 : if lr.externalFilesHaveLevel {
603 1 : for _, f := range lr.external {
604 1 : if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
605 0 : return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
606 0 : }
607 : }
608 : }
609 :
610 2 : if len(lr.external) > 0 {
611 2 : if len(lr.shared) > 0 {
612 0 : // If external files are present alongside shared files,
613 0 : // return an error.
614 0 : return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
615 0 : }
616 :
617 : // Sort according to the smallest key.
618 2 : slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
619 2 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
620 2 : })
621 2 : for i := 1; i < len(lr.external); i++ {
622 2 : if sstableKeyCompare(cmp, lr.external[i-1].Largest(), lr.external[i].Smallest()) >= 0 {
623 1 : return errors.Newf("pebble: external sstables have overlapping ranges")
624 1 : }
625 : }
626 2 : return nil
627 : }
628 2 : if len(lr.local) <= 1 {
629 2 : return nil
630 2 : }
631 :
632 : // Sort according to the smallest key.
633 2 : slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
634 2 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
635 2 : })
636 :
637 2 : for i := 1; i < len(lr.local); i++ {
638 2 : if sstableKeyCompare(cmp, lr.local[i-1].Largest(), lr.local[i].Smallest()) >= 0 {
639 2 : return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
640 2 : }
641 : }
642 2 : if len(lr.shared) == 0 {
643 2 : return nil
644 2 : }
645 0 : filesInLevel := make([]*manifest.TableMetadata, 0, len(lr.shared))
646 0 : for l := sharedLevelsStart; l < numLevels; l++ {
647 0 : filesInLevel = filesInLevel[:0]
648 0 : for i := range lr.shared {
649 0 : if lr.shared[i].shared.Level == uint8(l) {
650 0 : filesInLevel = append(filesInLevel, lr.shared[i].TableMetadata)
651 0 : }
652 : }
653 0 : for i := range lr.external {
654 0 : if lr.external[i].external.Level == uint8(l) {
655 0 : filesInLevel = append(filesInLevel, lr.external[i].TableMetadata)
656 0 : }
657 : }
658 0 : slices.SortFunc(filesInLevel, func(a, b *manifest.TableMetadata) int {
659 0 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
660 0 : })
661 0 : for i := 1; i < len(filesInLevel); i++ {
662 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest(), filesInLevel[i].Smallest()) >= 0 {
663 0 : return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
664 0 : }
665 : }
666 : }
667 0 : return nil
668 : }
669 :
670 1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
671 1 : var firstErr error
672 1 : for i := range meta {
673 1 : if err := objProvider.Remove(base.FileTypeTable, meta[i].TableBacking.DiskFileNum); err != nil {
674 1 : firstErr = firstError(firstErr, err)
675 1 : }
676 : }
677 1 : return firstErr
678 : }
679 :
680 : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
681 : // copies of the ingested files.
682 : func ingestLinkLocal(
683 : ctx context.Context,
684 : jobID JobID,
685 : opts *Options,
686 : objProvider objstorage.Provider,
687 : localMetas []ingestLocalMeta,
688 2 : ) error {
689 2 : for i := range localMetas {
690 2 : objMeta, err := objProvider.LinkOrCopyFromLocal(
691 2 : ctx, opts.FS, localMetas[i].path, base.FileTypeTable, localMetas[i].TableBacking.DiskFileNum,
692 2 : objstorage.CreateOptions{PreferSharedStorage: true},
693 2 : )
694 2 : if err != nil {
695 1 : if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
696 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
697 0 : }
698 1 : return err
699 : }
700 2 : if opts.EventListener.TableCreated != nil {
701 2 : opts.EventListener.TableCreated(TableCreateInfo{
702 2 : JobID: int(jobID),
703 2 : Reason: "ingesting",
704 2 : Path: objProvider.Path(objMeta),
705 2 : FileNum: base.PhysicalTableDiskFileNum(localMetas[i].TableNum),
706 2 : })
707 2 : }
708 : }
709 2 : return nil
710 : }
711 :
712 : // ingestAttachRemote attaches remote objects to the storage provider.
713 : //
714 : // For external objects, we reuse existing FileBackings from the current version
715 : // when possible.
716 : //
717 : // ingestUnprotectExternalBackings() must be called after this function (even in
718 : // error cases).
719 2 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
720 2 : remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
721 2 : for i := range lr.shared {
722 2 : backing, err := lr.shared[i].shared.Backing.Get()
723 2 : if err != nil {
724 0 : return err
725 0 : }
726 2 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
727 2 : FileNum: lr.shared[i].TableBacking.DiskFileNum,
728 2 : FileType: base.FileTypeTable,
729 2 : Backing: backing,
730 2 : })
731 : }
732 :
733 2 : d.findExistingBackingsForExternalObjects(lr.external)
734 2 :
735 2 : newTableBackings := make(map[remote.ObjectKey]*manifest.TableBacking, len(lr.external))
736 2 : for i := range lr.external {
737 2 : meta := lr.external[i].TableMetadata
738 2 : if meta.TableBacking != nil {
739 2 : // The backing was filled in by findExistingBackingsForExternalObjects().
740 2 : continue
741 : }
742 2 : key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
743 2 : if backing, ok := newTableBackings[key]; ok {
744 2 : // We already created the same backing in this loop. Update its size.
745 2 : backing.Size += lr.external[i].external.Size
746 2 : meta.AttachVirtualBacking(backing)
747 2 : continue
748 : }
749 2 : providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
750 2 : if err != nil {
751 0 : return err
752 0 : }
753 : // We have to attach the remote object (and assign it a DiskFileNum). For
754 : // simplicity, we use the same number for both the FileNum and the
755 : // DiskFileNum (even though this is a virtual sstable).
756 2 : size := max(lr.external[i].external.Size, 1)
757 2 : meta.InitVirtualBacking(base.DiskFileNum(meta.TableNum), size)
758 2 :
759 2 : // Set the underlying TableBacking's size to the same size as the virtualized
760 2 : // view of the sstable. This ensures that we don't over-prioritize this
761 2 : // sstable for compaction just yet, as we do not have a clear sense of
762 2 : // what parts of this sstable are referenced by other nodes.
763 2 : meta.TableBacking.Size = size
764 2 : newTableBackings[key] = meta.TableBacking
765 2 :
766 2 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
767 2 : FileNum: meta.TableBacking.DiskFileNum,
768 2 : FileType: base.FileTypeTable,
769 2 : Backing: providerBacking,
770 2 : })
771 : }
772 :
773 2 : for i := range lr.external {
774 2 : if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
775 0 : return err
776 0 : }
777 : }
778 :
779 2 : remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
780 2 : if err != nil {
781 0 : return err
782 0 : }
783 :
784 2 : for i := range lr.shared {
785 2 : // One corner case around file sizes we need to be mindful of, is that
786 2 : // if one of the shareObjs was initially created by us (and has boomeranged
787 2 : // back from another node), we'll need to update the TableBacking's size
788 2 : // to be the true underlying size. Otherwise, we could hit errors when we
789 2 : // open the db again after a crash/restart (see checkConsistency in open.go),
790 2 : // plus it more accurately allows us to prioritize compactions of files
791 2 : // that were originally created by us.
792 2 : if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
793 2 : size, err := d.objProvider.Size(remoteObjMetas[i])
794 2 : if err != nil {
795 0 : return err
796 0 : }
797 2 : lr.shared[i].TableBacking.Size = max(uint64(size), 1)
798 : }
799 : }
800 :
801 2 : if d.opts.EventListener.TableCreated != nil {
802 2 : for i := range remoteObjMetas {
803 2 : d.opts.EventListener.TableCreated(TableCreateInfo{
804 2 : JobID: int(jobID),
805 2 : Reason: "ingesting",
806 2 : Path: d.objProvider.Path(remoteObjMetas[i]),
807 2 : FileNum: remoteObjMetas[i].DiskFileNum,
808 2 : })
809 2 : }
810 : }
811 :
812 2 : return nil
813 : }
814 :
815 : // findExistingBackingsForExternalObjects populates the TableBacking for external
816 : // files which are already in use by the current version.
817 : //
818 : // We take a Ref and LatestRef on populated backings.
819 2 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
820 2 : d.mu.Lock()
821 2 : defer d.mu.Unlock()
822 2 :
823 2 : for i := range metas {
824 2 : diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
825 2 : // We cross-check against fileBackings in the current version because it is
826 2 : // possible that the external object is referenced by an sstable which only
827 2 : // exists in a previous version. In that case, that object could be removed
828 2 : // at any time so we cannot reuse it.
829 2 : for _, n := range diskFileNums {
830 2 : if backing, ok := d.mu.versions.latest.virtualBackings.Get(n); ok {
831 2 : // Protect this backing from being removed from the latest version. We
832 2 : // will unprotect in ingestUnprotectExternalBackings.
833 2 : d.mu.versions.latest.virtualBackings.Protect(n)
834 2 : metas[i].usedExistingBacking = true
835 2 : metas[i].AttachVirtualBacking(backing)
836 2 :
837 2 : // We can't update the size of the backing here, so make sure the
838 2 : // virtual size is sane.
839 2 : // TODO(radu): investigate what would it take to update the backing size.
840 2 : metas[i].Size = min(metas[i].Size, backing.Size)
841 2 : break
842 : }
843 : }
844 : }
845 : }
846 :
847 : // ingestUnprotectExternalBackings unprotects the file backings that were reused
848 : // for external objects when the ingestion fails.
849 2 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
850 2 : d.mu.Lock()
851 2 : defer d.mu.Unlock()
852 2 :
853 2 : for _, meta := range lr.external {
854 2 : if meta.usedExistingBacking {
855 2 : // If the backing is not use anywhere else and the ingest failed (or the
856 2 : // ingested tables were already compacted away), this call will cause in
857 2 : // the next version update to remove the backing.
858 2 : d.mu.versions.latest.virtualBackings.Unprotect(meta.TableBacking.DiskFileNum)
859 2 : }
860 : }
861 : }
862 :
863 : func setSeqNumInMetadata(
864 : m *manifest.TableMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
865 2 : ) error {
866 2 : setSeqFn := func(k base.InternalKey) base.InternalKey {
867 2 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
868 2 : }
869 : // NB: we set the fields directly here, rather than via their Extend*
870 : // methods, as we are updating sequence numbers.
871 2 : if m.HasPointKeys {
872 2 : m.PointKeyBounds.SetSmallest(setSeqFn(m.PointKeyBounds.Smallest()))
873 2 : }
874 2 : if m.HasRangeKeys {
875 2 : m.RangeKeyBounds.SetSmallest(setSeqFn(m.RangeKeyBounds.Smallest()))
876 2 : }
877 : // Only update the seqnum for the largest key if that key is not an
878 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
879 : // boundary), as doing so effectively drops the exclusive sentinel (by
880 : // lowering the seqnum from the max value), and extends the bounds of the
881 : // table.
882 : // NB: as the largest range key is always an exclusive sentinel, it is never
883 : // updated.
884 2 : if m.HasPointKeys && !m.PointKeyBounds.Largest().IsExclusiveSentinel() {
885 2 : m.PointKeyBounds.SetLargest(setSeqFn(m.PointKeyBounds.Largest()))
886 2 : }
887 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
888 : // Properties.GlobalSeqNum when an sstable is loaded.
889 2 : m.SmallestSeqNum = seqNum
890 2 : m.LargestSeqNum = seqNum
891 2 : m.LargestSeqNumAbsolute = seqNum
892 2 : // Ensure the new bounds are consistent.
893 2 : if err := m.Validate(cmp, format); err != nil {
894 0 : return err
895 0 : }
896 2 : return nil
897 : }
898 :
899 : func ingestUpdateSeqNum(
900 : cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
901 2 : ) error {
902 2 : // Shared sstables are required to be sorted by level ascending. We then
903 2 : // iterate the shared sstables in reverse, assigning the lower sequence
904 2 : // numbers to the shared sstables that will be ingested into the lower
905 2 : // (larger numbered) levels first. This ensures sequence number shadowing is
906 2 : // correct.
907 2 : for i := len(loadResult.shared) - 1; i >= 0; i-- {
908 2 : if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
909 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
910 : }
911 2 : if err := setSeqNumInMetadata(loadResult.shared[i].TableMetadata, seqNum, cmp, format); err != nil {
912 0 : return err
913 0 : }
914 2 : seqNum++
915 : }
916 2 : for i := range loadResult.external {
917 2 : if err := setSeqNumInMetadata(loadResult.external[i].TableMetadata, seqNum, cmp, format); err != nil {
918 0 : return err
919 0 : }
920 2 : seqNum++
921 : }
922 2 : for i := range loadResult.local {
923 2 : if err := setSeqNumInMetadata(loadResult.local[i].TableMetadata, seqNum, cmp, format); err != nil {
924 0 : return err
925 0 : }
926 2 : seqNum++
927 : }
928 2 : return nil
929 : }
930 :
931 : // ingestTargetLevel returns the target level for a file being ingested.
932 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
933 : // its target level calculation, and if a split candidate is found, that file
934 : // is returned as the splitFile.
935 : func ingestTargetLevel(
936 : ctx context.Context,
937 : cmp base.Compare,
938 : lsmOverlap overlap.WithLSM,
939 : baseLevel int,
940 : compactions map[compaction]struct{},
941 : meta *manifest.TableMetadata,
942 : suggestSplit bool,
943 2 : ) (targetLevel int, splitFile *manifest.TableMetadata, err error) {
944 2 : // Find the lowest level which does not have any files which overlap meta. We
945 2 : // search from L0 to L6 looking for whether there are any files in the level
946 2 : // which overlap meta. We want the "lowest" level (where lower means
947 2 : // increasing level number) in order to reduce write amplification.
948 2 : //
949 2 : // There are 2 kinds of overlap we need to check for: file boundary overlap
950 2 : // and data overlap. Data overlap implies file boundary overlap. Note that it
951 2 : // is always possible to ingest into L0.
952 2 : //
953 2 : // To place meta at level i where i > 0:
954 2 : // - there must not be any data overlap with levels <= i, since that will
955 2 : // violate the sequence number invariant.
956 2 : // - no file boundary overlap with level i, since that will violate the
957 2 : // invariant that files do not overlap in levels i > 0.
958 2 : // - if there is only a file overlap at a given level, and no data overlap,
959 2 : // we can still slot a file at that level. We return the fileMetadata with
960 2 : // which we have file boundary overlap (must be only one file, as sstable
961 2 : // bounds are usually tight on user keys) and the caller is expected to split
962 2 : // that sstable into two virtual sstables, allowing this file to go into that
963 2 : // level. Note that if we have file boundary overlap with two files, which
964 2 : // should only happen on rare occasions, we treat it as data overlap and
965 2 : // don't use this optimization.
966 2 : //
967 2 : // The file boundary overlap check is simpler to conceptualize. Consider the
968 2 : // following example, in which the ingested file lies completely before or
969 2 : // after the file being considered.
970 2 : //
971 2 : // |--| |--| ingested file: [a,b] or [f,g]
972 2 : // |-----| existing file: [c,e]
973 2 : // _____________________
974 2 : // a b c d e f g
975 2 : //
976 2 : // In both cases the ingested file can move to considering the next level.
977 2 : //
978 2 : // File boundary overlap does not necessarily imply data overlap. The check
979 2 : // for data overlap is a little more nuanced. Consider the following examples:
980 2 : //
981 2 : // 1. No data overlap:
982 2 : //
983 2 : // |-| |--| ingested file: [cc-d] or [ee-ff]
984 2 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
985 2 : // _____________________
986 2 : // a b c d e f g
987 2 : //
988 2 : // In this case the ingested files can "fall through" this level. The checks
989 2 : // continue at the next level.
990 2 : //
991 2 : // 2. Data overlap:
992 2 : //
993 2 : // |--| ingested file: [d-e]
994 2 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
995 2 : // _____________________
996 2 : // a b c d e f g
997 2 : //
998 2 : // In this case the file cannot be ingested into this level as the point 'dd'
999 2 : // is in the way.
1000 2 : //
1001 2 : // It is worth noting that the check for data overlap is only approximate. In
1002 2 : // the previous example, the ingested table [d-e] could contain only the
1003 2 : // points 'd' and 'e', in which case the table would be eligible for
1004 2 : // considering lower levels. However, such a fine-grained check would need to
1005 2 : // be exhaustive (comparing points and ranges in both the ingested existing
1006 2 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
1007 2 : // existing point that falls within the ingested table bounds as being "data
1008 2 : // overlap".
1009 2 :
1010 2 : if lsmOverlap[0].Result == overlap.Data {
1011 2 : return 0, nil, nil
1012 2 : }
1013 2 : targetLevel = 0
1014 2 : splitFile = nil
1015 2 : metaBounds := meta.UserKeyBounds()
1016 2 : for level := baseLevel; level < numLevels; level++ {
1017 2 : var candidateSplitFile *manifest.TableMetadata
1018 2 : switch lsmOverlap[level].Result {
1019 2 : case overlap.Data:
1020 2 : // We cannot ingest into or under this level; return the best target level
1021 2 : // so far.
1022 2 : return targetLevel, splitFile, nil
1023 :
1024 2 : case overlap.OnlyBoundary:
1025 2 : if !suggestSplit || lsmOverlap[level].SplitFile == nil {
1026 2 : // We can ingest under this level, but not into this level.
1027 2 : continue
1028 : }
1029 : // We can ingest into this level if we split this file.
1030 2 : candidateSplitFile = lsmOverlap[level].SplitFile
1031 :
1032 2 : case overlap.None:
1033 : // We can ingest into this level.
1034 :
1035 0 : default:
1036 0 : return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
1037 : }
1038 :
1039 : // Check boundary overlap with any ongoing compactions. We consider an
1040 : // overlapping compaction that's writing files to an output level as
1041 : // equivalent to boundary overlap with files in that output level.
1042 : //
1043 : // We cannot check for data overlap with the new SSTs compaction will produce
1044 : // since compaction hasn't been done yet. However, there's no need to check
1045 : // since all keys in them will be from levels in [c.startLevel,
1046 : // c.outputLevel], and all those levels have already had their data overlap
1047 : // tested negative (else we'd have returned earlier).
1048 : //
1049 : // An alternative approach would be to cancel these compactions and proceed
1050 : // with an ingest-time split on this level if necessary. However, compaction
1051 : // cancellation can result in significant wasted effort and is best avoided
1052 : // unless necessary.
1053 2 : overlaps := false
1054 2 : for c := range compactions {
1055 2 : tblCompaction, ok := c.(*tableCompaction)
1056 2 : if !ok {
1057 1 : continue
1058 : }
1059 2 : if tblCompaction.outputLevel == nil || level != tblCompaction.outputLevel.level {
1060 2 : continue
1061 : }
1062 2 : bounds := tblCompaction.Bounds()
1063 2 : if bounds != nil && metaBounds.Overlaps(cmp, bounds) {
1064 2 : overlaps = true
1065 2 : break
1066 : }
1067 : }
1068 2 : if !overlaps {
1069 2 : targetLevel = level
1070 2 : splitFile = candidateSplitFile
1071 2 : }
1072 : }
1073 2 : return targetLevel, splitFile, nil
1074 : }
1075 :
1076 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
1077 : // atomic and semantically equivalent to creating a single batch containing all
1078 : // of the mutations in the sstables. Ingestion may require the memtable to be
1079 : // flushed. The ingested sstable files are moved into the DB and must reside on
1080 : // the same filesystem as the DB. Sstables can be created for ingestion using
1081 : // sstable.Writer. On success, Ingest removes the input paths.
1082 : //
1083 : // Ingested sstables must have been created with a known KeySchema (when written
1084 : // with columnar blocks) and Comparer. They must not contain any references to
1085 : // external blob files.
1086 : //
1087 : // Two types of sstables are accepted for ingestion(s): one is sstables present
1088 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
1089 : // present in remote.Storage, referred to as shared or foreign sstables. These
1090 : // shared sstables can be linked through objstorageprovider.Provider, and do not
1091 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
1092 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
1093 : //
1094 : // All sstables *must* be Sync()'d by the caller after all bytes are written
1095 : // and before its file handle is closed; failure to do so could violate
1096 : // durability or lead to corrupted on-disk state. This method cannot, in a
1097 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1098 : // properly synced to disk. Opening new file handles and Sync()-ing them
1099 : // does not always guarantee durability; see the discussion here on that:
1100 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1101 : //
1102 : // Ingestion loads each sstable into the lowest level of the LSM which it
1103 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1104 : // ingestion forces the memtable to flush, and then waits for the flush to
1105 : // occur. In some cases, such as with no foreign sstables and no excise span,
1106 : // ingestion that gets blocked on a memtable can join the flushable queue and
1107 : // finish even before the memtable has been flushed.
1108 : //
1109 : // The steps for ingestion are:
1110 : //
1111 : // 1. Allocate table numbers for every sstable being ingested.
1112 : // 2. Load the metadata for all sstables being ingested.
1113 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1114 : // sstables).
1115 : // 4. Hard link (or copy) the local sstables into the DB directory.
1116 : // 5. Allocate a sequence number to use for all of the entries in the
1117 : // local sstables. This is the step where overlap with memtables is
1118 : // determined. If there is overlap, we remember the most recent memtable
1119 : // that overlaps.
1120 : // 6. Update the sequence number in the ingested local sstables. (Remote
1121 : // sstables get fixed sequence numbers that were determined at load time.)
1122 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1123 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1124 : // 8.1. If an excise span was specified, figure out what sstables in the
1125 : // current version overlap with the excise span, and create new virtual
1126 : // sstables out of those sstables that exclude the excised span (DB.excise).
1127 : // 9. Publish the ingestion sequence number.
1128 : //
1129 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1130 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1131 : // mutations that get sequence numbers larger than the ingestion sequence
1132 : // number get queued up behind the ingestion waiting for it to complete. This
1133 : // can produce a noticeable hiccup in performance. See
1134 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1135 : // this hiccup.
1136 2 : func (d *DB) Ingest(ctx context.Context, paths []string) error {
1137 2 : if err := d.closed.Load(); err != nil {
1138 1 : panic(err)
1139 : }
1140 2 : if d.opts.ReadOnly {
1141 1 : return ErrReadOnly
1142 1 : }
1143 2 : _, err := d.ingest(ctx, ingestArgs{Local: paths})
1144 2 : return err
1145 : }
1146 :
1147 : // IngestOperationStats provides some information about where in the LSM the
1148 : // bytes were ingested.
1149 : type IngestOperationStats struct {
1150 : // Bytes is the total bytes in the ingested sstables.
1151 : Bytes uint64
1152 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1153 : // into L0. This value is approximate when flushable ingests are active and
1154 : // an ingest overlaps an entry in the flushable queue. Currently, this
1155 : // approximation is very rough, only including tables that overlapped the
1156 : // memtable. This estimate may be improved with #2112.
1157 : ApproxIngestedIntoL0Bytes uint64
1158 : // MemtableOverlappingFiles is the count of ingested sstables
1159 : // that overlapped keys in the memtables.
1160 : MemtableOverlappingFiles int
1161 : }
1162 :
1163 : // ExternalFile are external sstables that can be referenced through
1164 : // objprovider and ingested as remote files that will not be refcounted or
1165 : // cleaned up. For use with online restore. Note that the underlying sstable
1166 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1167 : // is expected to only read the keys within those bounds.
1168 : type ExternalFile struct {
1169 : // Locator is the shared.Locator that can be used with objProvider to
1170 : // resolve a reference to this external sstable.
1171 : Locator remote.Locator
1172 :
1173 : // ObjName is the unique name of this sstable on Locator.
1174 : ObjName string
1175 :
1176 : // Size of the referenced proportion of the virtualized sstable. An estimate
1177 : // is acceptable in lieu of the backing file size.
1178 : Size uint64
1179 :
1180 : // StartKey and EndKey define the bounds of the sstable; the ingestion
1181 : // of this file will only result in keys within [StartKey, EndKey) if
1182 : // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
1183 : // These bounds are loose i.e. it's possible for keys to not span the
1184 : // entirety of this range.
1185 : //
1186 : // StartKey and EndKey user keys must not have suffixes.
1187 : //
1188 : // Multiple ExternalFiles in one ingestion must all have non-overlapping
1189 : // bounds.
1190 : StartKey, EndKey []byte
1191 :
1192 : // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
1193 : EndKeyIsInclusive bool
1194 :
1195 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1196 : // or range keys. If both structs are false, an error is returned during
1197 : // ingestion.
1198 : HasPointKey, HasRangeKey bool
1199 :
1200 : // SyntheticPrefix will prepend this prefix to all keys in the file during
1201 : // iteration. Note that the backing file itself is not modified.
1202 : //
1203 : // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
1204 : SyntheticPrefix []byte
1205 :
1206 : // SyntheticSuffix will replace the suffix of every key in the file during
1207 : // iteration. Note that the file itself is not modified, rather, every key
1208 : // returned by an iterator will have the synthetic suffix.
1209 : //
1210 : // SyntheticSuffix can only be used under the following conditions:
1211 : // - the synthetic suffix must sort before any non-empty suffixes in the
1212 : // backing sst (the entire sst, not just the part restricted to Bounds).
1213 : // - the backing sst must not contain multiple keys with the same prefix.
1214 : SyntheticSuffix []byte
1215 :
1216 : // Level denotes the level at which this file was present at read time
1217 : // if the external file was returned by a scan of an existing Pebble
1218 : // instance. If Level is 0, this field is ignored.
1219 : Level uint8
1220 : }
1221 :
1222 : // IngestWithStats does the same as Ingest, and additionally returns
1223 : // IngestOperationStats.
1224 1 : func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
1225 1 : if err := d.closed.Load(); err != nil {
1226 0 : panic(err)
1227 : }
1228 1 : if d.opts.ReadOnly {
1229 0 : return IngestOperationStats{}, ErrReadOnly
1230 0 : }
1231 1 : return d.ingest(ctx, ingestArgs{Local: paths})
1232 : }
1233 :
1234 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1235 : // accepts external files (with locator info that can be resolved using
1236 : // d.opts.SharedStorage). These files must also be non-overlapping with
1237 : // each other, and must be resolvable through d.objProvider.
1238 : func (d *DB) IngestExternalFiles(
1239 : ctx context.Context, external []ExternalFile,
1240 2 : ) (IngestOperationStats, error) {
1241 2 : if err := d.closed.Load(); err != nil {
1242 0 : panic(err)
1243 : }
1244 :
1245 2 : if d.opts.ReadOnly {
1246 0 : return IngestOperationStats{}, ErrReadOnly
1247 0 : }
1248 2 : if d.opts.Experimental.RemoteStorage == nil {
1249 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1250 0 : }
1251 2 : return d.ingest(ctx, ingestArgs{External: external})
1252 : }
1253 :
1254 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1255 : // list of shared files to ingest that can be read from a remote.Storage through
1256 : // a Provider. All the shared files must live within exciseSpan, and any existing
1257 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1258 : // sstables (if not virtual already) and shrinking their spans to exclude
1259 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1260 : // ingestion process.
1261 : //
1262 : // Panics if this DB instance was not instantiated with a remote.Storage and
1263 : // shared sstables are present.
1264 : func (d *DB) IngestAndExcise(
1265 : ctx context.Context,
1266 : paths []string,
1267 : shared []SharedSSTMeta,
1268 : external []ExternalFile,
1269 : exciseSpan KeyRange,
1270 2 : ) (IngestOperationStats, error) {
1271 2 : if err := d.closed.Load(); err != nil {
1272 0 : panic(err)
1273 : }
1274 2 : if d.opts.ReadOnly {
1275 0 : return IngestOperationStats{}, ErrReadOnly
1276 0 : }
1277 : // Excise is only supported on prefix keys.
1278 2 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1279 0 : return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed start key")
1280 0 : }
1281 2 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1282 0 : return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed end key")
1283 0 : }
1284 2 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1285 0 : return IngestOperationStats{}, errors.Newf(
1286 0 : "store has format major version %d; IngestAndExcise requires at least %d",
1287 0 : v, FormatMinForSharedObjects,
1288 0 : )
1289 0 : }
1290 2 : args := ingestArgs{
1291 2 : Local: paths,
1292 2 : Shared: shared,
1293 2 : External: external,
1294 2 : ExciseSpan: exciseSpan,
1295 2 : ExciseBoundsPolicy: tightExciseBounds,
1296 2 : }
1297 2 : return d.ingest(ctx, args)
1298 : }
1299 :
1300 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1301 : func (d *DB) newIngestedFlushableEntry(
1302 : meta []*manifest.TableMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
1303 2 : ) (*flushableEntry, error) {
1304 2 : // If there's an excise being done atomically with the same ingest, we
1305 2 : // assign the lowest sequence number in the set of sequence numbers for this
1306 2 : // ingestion to the excise. Note that we've already allocated fileCount+1
1307 2 : // sequence numbers in this case.
1308 2 : //
1309 2 : // This mimics the behaviour in the non-flushable ingest case (see the callsite
1310 2 : // for ingestUpdateSeqNum).
1311 2 : fileSeqNumStart := seqNum
1312 2 : if exciseSpan.Valid() {
1313 2 : fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise.
1314 2 : // The excise span will be retained by the flushable, outliving the
1315 2 : // caller's ingestion call. Copy it.
1316 2 : exciseSpan = KeyRange{
1317 2 : Start: slices.Clone(exciseSpan.Start),
1318 2 : End: slices.Clone(exciseSpan.End),
1319 2 : }
1320 2 : }
1321 : // Update the sequence number for all of the sstables in the
1322 : // metadata. Writing the metadata to the manifest when the
1323 : // version edit is applied is the mechanism that persists the
1324 : // sequence number. The sstables themselves are left unmodified.
1325 : // In this case, a version edit will only be written to the manifest
1326 : // when the flushable is eventually flushed. If Pebble restarts in that
1327 : // time, then we'll lose the ingest sequence number information. But this
1328 : // information will also be reconstructed on node restart.
1329 2 : for i, m := range meta {
1330 2 : if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
1331 0 : return nil, err
1332 0 : }
1333 : }
1334 :
1335 2 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum)
1336 2 :
1337 2 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1338 2 : // to and the sequence number within the WAL which we'll write this entry
1339 2 : // to.
1340 2 : entry := d.newFlushableEntry(f, logNum, seqNum)
1341 2 : // The flushable entry starts off with a single reader ref, so increment
1342 2 : // the TableMetadata.Refs.
1343 2 : for _, file := range f.files {
1344 2 : file.Ref()
1345 2 : }
1346 2 : entry.unrefFiles = func(of *manifest.ObsoleteFiles) {
1347 2 : // Invoke Unref on each table. If any files become obsolete, they'll be
1348 2 : // added to the set of obsolete files.
1349 2 : for _, file := range f.files {
1350 2 : file.Unref(of)
1351 2 : }
1352 : }
1353 :
1354 2 : entry.flushForced = true
1355 2 : entry.releaseMemAccounting = func() {}
1356 2 : return entry, nil
1357 : }
1358 :
1359 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1360 : // we're holding both locks, the order in which we rotate the memtable or
1361 : // recycle the WAL in this function is irrelevant as long as the correct log
1362 : // numbers are assigned to the appropriate flushable.
1363 : func (d *DB) handleIngestAsFlushable(
1364 : meta []*manifest.TableMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
1365 2 : ) error {
1366 2 : b := d.NewBatch()
1367 2 : if exciseSpan.Valid() {
1368 2 : b.excise(exciseSpan.Start, exciseSpan.End)
1369 2 : }
1370 2 : for _, m := range meta {
1371 2 : b.ingestSST(m.TableNum)
1372 2 : }
1373 2 : b.setSeqNum(seqNum)
1374 2 :
1375 2 : // If the WAL is disabled, then the logNum used to create the flushable
1376 2 : // entry doesn't matter. We just use the logNum assigned to the current
1377 2 : // mutable memtable. If the WAL is enabled, then this logNum will be
1378 2 : // overwritten by the logNum of the log which will contain the log entry
1379 2 : // for the ingestedFlushable.
1380 2 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1381 2 : if !d.opts.DisableWAL {
1382 2 : // We create a new WAL for the flushable instead of reusing the end of
1383 2 : // the previous WAL. This simplifies the increment of the minimum
1384 2 : // unflushed log number, and also simplifies WAL replay.
1385 2 : var prevLogSize uint64
1386 2 : logNum, prevLogSize = d.rotateWAL()
1387 2 : // As the rotator of the WAL, we're responsible for updating the
1388 2 : // previous flushable queue tail's log size.
1389 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
1390 2 :
1391 2 : d.mu.Unlock()
1392 2 : err := d.commit.directWrite(b)
1393 2 : if err != nil {
1394 0 : d.opts.Logger.Fatalf("%v", err)
1395 0 : }
1396 2 : d.mu.Lock()
1397 : }
1398 :
1399 2 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
1400 2 : if err != nil {
1401 0 : return err
1402 0 : }
1403 2 : nextSeqNum := seqNum + base.SeqNum(b.Count())
1404 2 :
1405 2 : // Set newLogNum to the logNum of the previous flushable. This value is
1406 2 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1407 2 : // the appropriate value below.
1408 2 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1409 2 : if !d.opts.DisableWAL {
1410 2 : // newLogNum will be the WAL num of the next mutable memtable which
1411 2 : // comes after the ingestedFlushable in the flushable queue. The mutable
1412 2 : // memtable will be created below.
1413 2 : //
1414 2 : // The prevLogSize returned by rotateWAL is the WAL to which the
1415 2 : // flushable ingest keys were appended. This intermediary WAL is only
1416 2 : // used to record the flushable ingest and nothing else.
1417 2 : newLogNum, entry.logSize = d.rotateWAL()
1418 2 : }
1419 :
1420 2 : d.mu.versions.metrics.Ingest.Count++
1421 2 : currMem := d.mu.mem.mutable
1422 2 : // NB: Placing ingested sstables above the current memtables
1423 2 : // requires rotating of the existing memtables/WAL. There is
1424 2 : // some concern of churning through tiny memtables due to
1425 2 : // ingested sstables being placed on top of them, but those
1426 2 : // memtables would have to be flushed anyways.
1427 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1428 2 : d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
1429 2 : d.updateReadStateLocked(d.opts.DebugCheck)
1430 2 : // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
1431 2 : d.maybeScheduleFlush()
1432 2 : return nil
1433 : }
1434 :
1435 : type ingestArgs struct {
1436 : // Local sstables to ingest.
1437 : Local []string
1438 : // Shared sstables to ingest.
1439 : Shared []SharedSSTMeta
1440 : // External sstables to ingest.
1441 : External []ExternalFile
1442 : // ExciseSpan (unset if not excising).
1443 : ExciseSpan KeyRange
1444 : ExciseBoundsPolicy exciseBoundsPolicy
1445 : }
1446 :
1447 : // See comment at Ingest() for details on how this works.
1448 2 : func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats, error) {
1449 2 : paths := args.Local
1450 2 : shared := args.Shared
1451 2 : external := args.External
1452 2 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1453 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1454 : }
1455 2 : if (args.ExciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1456 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1457 0 : }
1458 2 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
1459 1 : for i := range external {
1460 1 : if len(external[i].SyntheticPrefix) > 0 {
1461 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1462 1 : }
1463 1 : if len(external[i].SyntheticSuffix) > 0 {
1464 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
1465 1 : }
1466 : }
1467 : }
1468 : // Allocate table numbers for all files being ingested and mark them as
1469 : // pending in order to prevent them from being deleted. Note that this causes
1470 : // the file number ordering to be out of alignment with sequence number
1471 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1472 : // that (busted) invariant.
1473 2 : pendingOutputs := make([]base.TableNum, len(paths)+len(shared)+len(external))
1474 2 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1475 2 : pendingOutputs[i] = d.mu.versions.getNextTableNum()
1476 2 : }
1477 :
1478 2 : jobID := d.newJobID()
1479 2 :
1480 2 : // Load the metadata for all the files being ingested. This step detects
1481 2 : // and elides empty sstables.
1482 2 : loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external,
1483 2 : d.cacheHandle, pendingOutputs)
1484 2 : if err != nil {
1485 1 : return IngestOperationStats{}, err
1486 1 : }
1487 :
1488 2 : if loadResult.fileCount() == 0 && !args.ExciseSpan.Valid() {
1489 2 : // All of the sstables to be ingested were empty. Nothing to do.
1490 2 : return IngestOperationStats{}, nil
1491 2 : }
1492 :
1493 : // Verify the sstables do not overlap.
1494 2 : if err := ingestSortAndVerify(d.cmp, loadResult, args.ExciseSpan); err != nil {
1495 2 : return IngestOperationStats{}, err
1496 2 : }
1497 :
1498 : // Hard link the sstables into the DB directory. Since the sstables aren't
1499 : // referenced by a version, they won't be used. If the hard linking fails
1500 : // (e.g. because the files reside on a different filesystem), ingestLinkLocal
1501 : // will fall back to copying, and if that fails we undo our work and return an
1502 : // error.
1503 2 : if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1504 0 : return IngestOperationStats{}, err
1505 0 : }
1506 :
1507 2 : err = d.ingestAttachRemote(jobID, loadResult)
1508 2 : defer d.ingestUnprotectExternalBackings(loadResult)
1509 2 : if err != nil {
1510 0 : return IngestOperationStats{}, err
1511 0 : }
1512 :
1513 : // Make the new tables durable. We need to do this at some point before we
1514 : // update the MANIFEST (via UpdateVersionLocked), otherwise a crash can have
1515 : // the tables referenced in the MANIFEST, but not present in the provider.
1516 2 : if err := d.objProvider.Sync(); err != nil {
1517 1 : return IngestOperationStats{}, err
1518 1 : }
1519 :
1520 : // metaFlushableOverlaps is a map indicating which of the ingested sstables
1521 : // overlap some table in the flushable queue. It's used to approximate
1522 : // ingest-into-L0 stats when using flushable ingests.
1523 2 : metaFlushableOverlaps := make(map[base.TableNum]bool, loadResult.fileCount())
1524 2 : var mem *flushableEntry
1525 2 : var mut *memTable
1526 2 : // asFlushable indicates whether the sstable was ingested as a flushable.
1527 2 : var asFlushable bool
1528 2 : var waitFlushStart crtime.Mono
1529 2 : prepare := func(seqNum base.SeqNum) {
1530 2 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1531 2 :
1532 2 : // Determine the set of bounds we care about for the purpose of checking
1533 2 : // for overlap among the flushables. If there's an excise span, we need
1534 2 : // to check for overlap with its bounds as well.
1535 2 : overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
1536 2 : for _, m := range loadResult.local {
1537 2 : overlapBounds = append(overlapBounds, m.TableMetadata)
1538 2 : }
1539 2 : for _, m := range loadResult.shared {
1540 2 : overlapBounds = append(overlapBounds, m.TableMetadata)
1541 2 : }
1542 2 : for _, m := range loadResult.external {
1543 2 : overlapBounds = append(overlapBounds, m.TableMetadata)
1544 2 : }
1545 2 : if args.ExciseSpan.Valid() {
1546 2 : overlapBounds = append(overlapBounds, &args.ExciseSpan)
1547 2 : }
1548 :
1549 2 : d.mu.Lock()
1550 2 : defer d.mu.Unlock()
1551 2 :
1552 2 : if args.ExciseSpan.Valid() {
1553 2 : // Check if any of the currently-open EventuallyFileOnlySnapshots
1554 2 : // overlap in key ranges with the excise span. If so, we need to
1555 2 : // check for memtable overlaps with all bounds of that
1556 2 : // EventuallyFileOnlySnapshot in addition to the ingestion's own
1557 2 : // bounds too.
1558 2 : overlapBounds = append(overlapBounds, exciseOverlapBounds(
1559 2 : d.cmp, &d.mu.snapshots.snapshotList, args.ExciseSpan, seqNum)...)
1560 2 : }
1561 :
1562 : // Check to see if any files overlap with any of the memtables. The queue
1563 : // is ordered from oldest to newest with the mutable memtable being the
1564 : // last element in the slice. We want to wait for the newest table that
1565 : // overlaps.
1566 :
1567 2 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1568 2 : m := d.mu.mem.queue[i]
1569 2 : m.computePossibleOverlaps(func(b bounded) shouldContinue {
1570 2 : // If this is the first table to overlap a flushable, save
1571 2 : // the flushable. This ingest must be ingested or flushed
1572 2 : // after it.
1573 2 : if mem == nil {
1574 2 : mem = m
1575 2 : }
1576 :
1577 2 : switch v := b.(type) {
1578 2 : case *manifest.TableMetadata:
1579 2 : // NB: False positives are possible if `m` is a flushable
1580 2 : // ingest that overlaps the file `v` in bounds but doesn't
1581 2 : // contain overlapping data. This is considered acceptable
1582 2 : // because it's rare (in CockroachDB a bound overlap likely
1583 2 : // indicates a data overlap), and blocking the commit
1584 2 : // pipeline while we perform I/O to check for overlap may be
1585 2 : // more disruptive than enqueueing this ingestion on the
1586 2 : // flushable queue and switching to a new memtable.
1587 2 : metaFlushableOverlaps[v.TableNum] = true
1588 2 : case *KeyRange:
1589 : // An excise span or an EventuallyFileOnlySnapshot protected range;
1590 : // not a file.
1591 0 : default:
1592 0 : panic("unreachable")
1593 : }
1594 2 : return continueIteration
1595 : }, overlapBounds...)
1596 : }
1597 :
1598 2 : if mem == nil {
1599 2 : // No overlap with any of the queued flushables, so no need to queue
1600 2 : // after them.
1601 2 :
1602 2 : // New writes with higher sequence numbers may be concurrently
1603 2 : // committed. We must ensure they don't flush before this ingest
1604 2 : // completes. To do that, we ref the mutable memtable as a writer,
1605 2 : // preventing its flushing (and the flushing of all subsequent
1606 2 : // flushables in the queue). Once we've acquired the manifest lock
1607 2 : // to add the ingested sstables to the LSM, we can unref as we're
1608 2 : // guaranteed that the flush won't edit the LSM before this ingest.
1609 2 : mut = d.mu.mem.mutable
1610 2 : mut.writerRef()
1611 2 : return
1612 2 : }
1613 :
1614 : // The ingestion overlaps with some entry in the flushable queue. If the
1615 : // pre-conditions are met below, we can treat this ingestion as a flushable
1616 : // ingest, otherwise we wait on the memtable flush before ingestion.
1617 : //
1618 : // TODO(aaditya): We should make flushableIngest compatible with remote
1619 : // files.
1620 2 : hasRemoteFiles := len(shared) > 0 || len(external) > 0
1621 2 : canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
1622 2 : // We require that either the queue of flushables is below the
1623 2 : // stop-writes threshold (note that this is typically a conservative
1624 2 : // check, since not every element of this queue will contribute the full
1625 2 : // memtable memory size that could result in a write stall), or WAL
1626 2 : // failover is permitting an unlimited queue without causing a write
1627 2 : // stall. The latter condition is important to avoid delays in
1628 2 : // visibility of concurrent writes that happen to get a sequence number
1629 2 : // after this ingest and then must wait for this ingest that is itself
1630 2 : // waiting on a large flush. See
1631 2 : // https://github.com/cockroachdb/pebble/issues/4944 for an illustration
1632 2 : // of this problem.
1633 2 : (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold ||
1634 2 : d.mu.log.manager.ElevateWriteStallThresholdForFailover()) &&
1635 2 : !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
1636 2 : (!args.ExciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
1637 2 : if !canIngestFlushable {
1638 2 : // We're not able to ingest as a flushable,
1639 2 : // so we must synchronously flush.
1640 2 : //
1641 2 : // TODO(bilal): Currently, if any of the files being ingested are shared,
1642 2 : // we cannot use flushable ingests and need
1643 2 : // to wait synchronously.
1644 2 : if mem.flushable == d.mu.mem.mutable {
1645 2 : err = d.makeRoomForWrite(nil)
1646 2 : }
1647 : // New writes with higher sequence numbers may be concurrently
1648 : // committed. We must ensure they don't flush before this ingest
1649 : // completes. To do that, we ref the mutable memtable as a writer,
1650 : // preventing its flushing (and the flushing of all subsequent
1651 : // flushables in the queue). Once we've acquired the manifest lock
1652 : // to add the ingested sstables to the LSM, we can unref as we're
1653 : // guaranteed that the flush won't edit the LSM before this ingest.
1654 2 : mut = d.mu.mem.mutable
1655 2 : mut.writerRef()
1656 2 : mem.flushForced = true
1657 2 : waitFlushStart = crtime.NowMono()
1658 2 : d.maybeScheduleFlush()
1659 2 : return
1660 : }
1661 : // Since there aren't too many memtables already queued up, we can
1662 : // slide the ingested sstables on top of the existing memtables.
1663 2 : asFlushable = true
1664 2 : fileMetas := make([]*manifest.TableMetadata, len(loadResult.local))
1665 2 : for i := range fileMetas {
1666 2 : fileMetas[i] = loadResult.local[i].TableMetadata
1667 2 : }
1668 2 : err = d.handleIngestAsFlushable(fileMetas, seqNum, args.ExciseSpan)
1669 : }
1670 :
1671 2 : var ve *manifest.VersionEdit
1672 2 : var waitFlushDuration time.Duration
1673 2 : var manifestUpdateDuration time.Duration
1674 2 : apply := func(seqNum base.SeqNum) {
1675 2 : if err != nil || asFlushable {
1676 2 : // An error occurred during prepare.
1677 2 : if mut != nil {
1678 0 : if mut.writerUnref() {
1679 0 : d.mu.Lock()
1680 0 : d.maybeScheduleFlush()
1681 0 : d.mu.Unlock()
1682 0 : }
1683 : }
1684 2 : return
1685 : }
1686 :
1687 : // If there's an excise being done atomically with the same ingest, we
1688 : // assign the lowest sequence number in the set of sequence numbers for this
1689 : // ingestion to the excise. Note that we've already allocated fileCount+1
1690 : // sequence numbers in this case.
1691 2 : if args.ExciseSpan.Valid() {
1692 2 : seqNum++ // the first seqNum is reserved for the excise.
1693 2 : }
1694 : // Update the sequence numbers for all ingested sstables'
1695 : // metadata. When the version edit is applied, the metadata is
1696 : // written to the manifest, persisting the sequence number.
1697 : // The sstables themselves are left unmodified.
1698 2 : if err = ingestUpdateSeqNum(
1699 2 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1700 2 : ); err != nil {
1701 0 : if mut != nil {
1702 0 : if mut.writerUnref() {
1703 0 : d.mu.Lock()
1704 0 : d.maybeScheduleFlush()
1705 0 : d.mu.Unlock()
1706 0 : }
1707 : }
1708 0 : return
1709 : }
1710 :
1711 : // If we overlapped with a memtable in prepare wait for the flush to
1712 : // finish.
1713 2 : if mem != nil {
1714 2 : <-mem.flushed
1715 2 : waitFlushDuration = waitFlushStart.Elapsed()
1716 2 : }
1717 :
1718 : // Assign the sstables to the correct level in the LSM and apply the
1719 : // version edit.
1720 2 : ve, manifestUpdateDuration, err = d.ingestApply(ctx, jobID, loadResult, mut, args.ExciseSpan, args.ExciseBoundsPolicy, seqNum)
1721 : }
1722 :
1723 : // Only one ingest can occur at a time because if not, one would block waiting
1724 : // for the other to finish applying. This blocking would happen while holding
1725 : // the commit mutex which would prevent unrelated batches from writing their
1726 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1727 : // during ingestion.
1728 2 : seqNumCount := loadResult.fileCount()
1729 2 : if args.ExciseSpan.Valid() {
1730 2 : seqNumCount++
1731 2 : }
1732 2 : d.commit.ingestSem <- struct{}{}
1733 2 : d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
1734 2 : <-d.commit.ingestSem
1735 2 :
1736 2 : if err != nil {
1737 1 : if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
1738 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1739 0 : }
1740 2 : } else {
1741 2 : // Since we either created a hard link to the ingesting files, or copied
1742 2 : // them over, it is safe to remove the originals paths.
1743 2 : for i := range loadResult.local {
1744 2 : path := loadResult.local[i].path
1745 2 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1746 1 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1747 1 : }
1748 : }
1749 : }
1750 :
1751 : // TODO(jackson): Refactor this so that the case where there are no files
1752 : // but a valid excise span is not so exceptional.
1753 :
1754 2 : var stats IngestOperationStats
1755 2 : if loadResult.fileCount() > 0 {
1756 2 : info := TableIngestInfo{
1757 2 : JobID: int(jobID),
1758 2 : Err: err,
1759 2 : flushable: asFlushable,
1760 2 : WaitFlushDuration: waitFlushDuration,
1761 2 : ManifestUpdateDuration: manifestUpdateDuration,
1762 2 : BlockReadDuration: loadResult.blockReadStats.BlockReadDuration,
1763 2 : BlockReadBytes: loadResult.blockReadStats.BlockBytes - loadResult.blockReadStats.BlockBytesInCache,
1764 2 : }
1765 2 : if len(loadResult.local) > 0 {
1766 2 : info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
1767 2 : } else if len(loadResult.shared) > 0 {
1768 2 : info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
1769 2 : } else {
1770 2 : info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
1771 2 : }
1772 2 : if ve != nil {
1773 2 : info.Tables = make([]struct {
1774 2 : TableInfo
1775 2 : Level int
1776 2 : }, len(ve.NewTables))
1777 2 : for i := range ve.NewTables {
1778 2 : e := &ve.NewTables[i]
1779 2 : info.Tables[i].Level = e.Level
1780 2 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1781 2 : stats.Bytes += e.Meta.Size
1782 2 : if e.Level == 0 {
1783 2 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1784 2 : }
1785 2 : if metaFlushableOverlaps[e.Meta.TableNum] {
1786 2 : stats.MemtableOverlappingFiles++
1787 2 : }
1788 : }
1789 2 : } else if asFlushable {
1790 2 : // NB: If asFlushable == true, there are no shared sstables.
1791 2 : info.Tables = make([]struct {
1792 2 : TableInfo
1793 2 : Level int
1794 2 : }, len(loadResult.local))
1795 2 : for i, f := range loadResult.local {
1796 2 : info.Tables[i].Level = -1
1797 2 : info.Tables[i].TableInfo = f.TableInfo()
1798 2 : stats.Bytes += f.Size
1799 2 : // We don't have exact stats on which files will be ingested into
1800 2 : // L0, because actual ingestion into the LSM has been deferred until
1801 2 : // flush time. Instead, we infer based on memtable overlap.
1802 2 : //
1803 2 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1804 2 : // before entering the commit pipeline, we can use that overlap to
1805 2 : // improve our approximation by incorporating overlap with L0, not
1806 2 : // just memtables.
1807 2 : if metaFlushableOverlaps[f.TableNum] {
1808 2 : stats.ApproxIngestedIntoL0Bytes += f.Size
1809 2 : stats.MemtableOverlappingFiles++
1810 2 : }
1811 : }
1812 : }
1813 2 : d.opts.EventListener.TableIngested(info)
1814 : }
1815 :
1816 2 : return stats, err
1817 : }
1818 :
1819 : type ingestSplitFile struct {
1820 : // ingestFile is the file being ingested.
1821 : ingestFile *manifest.TableMetadata
1822 : // splitFile is the file that needs to be split to allow ingestFile to slot
1823 : // into `level` level.
1824 : splitFile *manifest.TableMetadata
1825 : // The level where ingestFile will go (and where splitFile already is).
1826 : level int
1827 : }
1828 :
1829 : // ingestSplit splits files specified in `files` and updates ve in-place to
1830 : // account for existing files getting split into two virtual sstables. The map
1831 : // `replacedFiles` contains an in-progress map of all files that have been
1832 : // replaced with new virtual sstables in this version edit so far, which is also
1833 : // updated in-place.
1834 : //
1835 : // d.mu as well as the manifest lock must be held when calling this method.
1836 : func (d *DB) ingestSplit(
1837 : ctx context.Context,
1838 : ve *manifest.VersionEdit,
1839 : updateMetrics func(*manifest.TableMetadata, int, []manifest.NewTableEntry),
1840 : files []ingestSplitFile,
1841 : replacedTables map[base.TableNum][]manifest.NewTableEntry,
1842 2 : ) error {
1843 2 : for _, s := range files {
1844 2 : ingestFileBounds := s.ingestFile.UserKeyBounds()
1845 2 : // replacedFiles can be thought of as a tree, where we start iterating with
1846 2 : // s.splitFile and run its fileNum through replacedFiles, then find which of
1847 2 : // the replaced files overlaps with s.ingestFile, which becomes the new
1848 2 : // splitFile, then we check splitFile's replacements in replacedFiles again
1849 2 : // for overlap with s.ingestFile, and so on until we either can't find the
1850 2 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
1851 2 : // be split), or we don't find a file that overlaps with s.ingestFile, which
1852 2 : // means a prior ingest split already produced enough room for s.ingestFile
1853 2 : // to go into this level without necessitating another ingest split.
1854 2 : splitFile := s.splitFile
1855 2 : for splitFile != nil {
1856 2 : replaced, ok := replacedTables[splitFile.TableNum]
1857 2 : if !ok {
1858 2 : break
1859 : }
1860 2 : updatedSplitFile := false
1861 2 : for i := range replaced {
1862 2 : if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
1863 2 : if updatedSplitFile {
1864 0 : // This should never happen because the earlier ingestTargetLevel
1865 0 : // function only finds split file candidates that are guaranteed to
1866 0 : // have no data overlap, only boundary overlap. See the comments
1867 0 : // in that method to see the definitions of data vs boundary
1868 0 : // overlap. That, plus the fact that files in `replaced` are
1869 0 : // guaranteed to have file bounds that are tight on user keys
1870 0 : // (as that's what `d.excise` produces), means that the only case
1871 0 : // where we overlap with two or more files in `replaced` is if we
1872 0 : // actually had data overlap all along, or if the ingestion files
1873 0 : // were overlapping, either of which is an invariant violation.
1874 0 : panic("updated with two files in ingestSplit")
1875 : }
1876 2 : splitFile = replaced[i].Meta
1877 2 : updatedSplitFile = true
1878 : }
1879 : }
1880 2 : if !updatedSplitFile {
1881 2 : // None of the replaced files overlapped with the file being ingested.
1882 2 : // This can happen if we've already excised a span overlapping with
1883 2 : // this file, or if we have consecutive ingested files that can slide
1884 2 : // within the same gap between keys in an existing file. For instance,
1885 2 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
1886 2 : // the first loop iteration will split the existing file into one that
1887 2 : // ends in a and another that starts at g, and the second iteration will
1888 2 : // fall into this case and require no splitting.
1889 2 : //
1890 2 : // No splitting necessary.
1891 2 : splitFile = nil
1892 2 : }
1893 : }
1894 2 : if splitFile == nil {
1895 2 : continue
1896 : }
1897 : // NB: excise operates on [start, end). We're splitting at [start, end]
1898 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
1899 : // of exclusive vs inclusive end bounds should not make a difference here
1900 : // as we're guaranteed to not have any data overlap between splitFile and
1901 : // s.ingestFile. d.excise will return an error if we pass an inclusive user
1902 : // key bound _and_ we end up seeing data overlap at the end key.
1903 2 : exciseBounds := base.UserKeyBoundsFromInternal(s.ingestFile.Smallest(), s.ingestFile.Largest())
1904 2 : leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, splitFile, s.level, tightExciseBounds)
1905 2 : if err != nil {
1906 0 : return err
1907 0 : }
1908 2 : added := applyExciseToVersionEdit(ve, splitFile, leftTable, rightTable, s.level)
1909 2 : replacedTables[splitFile.TableNum] = added
1910 2 : for i := range added {
1911 2 : addedBounds := added[i].Meta.UserKeyBounds()
1912 2 : if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
1913 0 : panic("ingest-time split produced a file that overlaps with ingested file")
1914 : }
1915 : }
1916 2 : updateMetrics(splitFile, s.level, added)
1917 : }
1918 : // Flatten the version edit by removing any entries from ve.NewFiles that
1919 : // are also in ve.DeletedFiles.
1920 2 : newNewFiles := ve.NewTables[:0]
1921 2 : for i := range ve.NewTables {
1922 2 : fn := ve.NewTables[i].Meta.TableNum
1923 2 : deEntry := manifest.DeletedTableEntry{Level: ve.NewTables[i].Level, FileNum: fn}
1924 2 : if _, ok := ve.DeletedTables[deEntry]; ok {
1925 2 : delete(ve.DeletedTables, deEntry)
1926 2 : } else {
1927 2 : newNewFiles = append(newNewFiles, ve.NewTables[i])
1928 2 : }
1929 : }
1930 2 : ve.NewTables = newNewFiles
1931 2 : return nil
1932 : }
1933 :
1934 : func (d *DB) ingestApply(
1935 : ctx context.Context,
1936 : jobID JobID,
1937 : lr ingestLoadResult,
1938 : mut *memTable,
1939 : exciseSpan KeyRange,
1940 : exciseBoundsPolicy exciseBoundsPolicy,
1941 : exciseSeqNum base.SeqNum,
1942 2 : ) (*manifest.VersionEdit, time.Duration, error) {
1943 2 : d.mu.Lock()
1944 2 : defer d.mu.Unlock()
1945 2 :
1946 2 : ve := &manifest.VersionEdit{
1947 2 : NewTables: make([]manifest.NewTableEntry, lr.fileCount()),
1948 2 : }
1949 2 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
1950 2 : ve.DeletedTables = map[manifest.DeletedTableEntry]*manifest.TableMetadata{}
1951 2 : }
1952 2 : var metrics levelMetricsDelta
1953 2 :
1954 2 : // Determine the target level inside UpdateVersionLocked. This prevents two
1955 2 : // concurrent ingestion jobs from using the same version to determine the
1956 2 : // target level, and also provides serialization with concurrent compaction
1957 2 : // and flush jobs.
1958 2 : manifestUpdateDuration, err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1959 2 : if mut != nil {
1960 2 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
1961 2 : // acquired the manifest lock, we can be certain that if the mutable
1962 2 : // memtable has received more recent conflicting writes, the flush won't
1963 2 : // beat us to applying to the manifest resulting in sequence number
1964 2 : // inversion. Even though we call maybeScheduleFlush right now, this flush
1965 2 : // will apply after our ingestion.
1966 2 : if mut.writerUnref() {
1967 1 : d.maybeScheduleFlush()
1968 1 : }
1969 : }
1970 :
1971 2 : current := d.mu.versions.currentVersion()
1972 2 : overlapChecker := &overlapChecker{
1973 2 : comparer: d.opts.Comparer,
1974 2 : newIters: d.newIters,
1975 2 : opts: IterOptions{
1976 2 : logger: d.opts.Logger,
1977 2 : Category: categoryIngest,
1978 2 : },
1979 2 : v: current,
1980 2 : }
1981 2 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
1982 2 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
1983 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
1984 2 : // filesToSplit is a list where each element is a pair consisting of a file
1985 2 : // being ingested and a file being split to make room for an ingestion into
1986 2 : // that level. Each ingested file will appear at most once in this list. It
1987 2 : // is possible for split files to appear twice in this list.
1988 2 : filesToSplit := make([]ingestSplitFile, 0)
1989 2 : checkCompactions := false
1990 2 : for i := 0; i < lr.fileCount(); i++ {
1991 2 : // Determine the lowest level in the LSM for which the sstable doesn't
1992 2 : // overlap any existing files in the level.
1993 2 : var m *manifest.TableMetadata
1994 2 : specifiedLevel := -1
1995 2 : isShared := false
1996 2 : isExternal := false
1997 2 : if i < len(lr.local) {
1998 2 : // local file.
1999 2 : m = lr.local[i].TableMetadata
2000 2 : } else if (i - len(lr.local)) < len(lr.shared) {
2001 2 : // shared file.
2002 2 : isShared = true
2003 2 : sharedIdx := i - len(lr.local)
2004 2 : m = lr.shared[sharedIdx].TableMetadata
2005 2 : specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
2006 2 : } else {
2007 2 : // external file.
2008 2 : isExternal = true
2009 2 : externalIdx := i - (len(lr.local) + len(lr.shared))
2010 2 : m = lr.external[externalIdx].TableMetadata
2011 2 : if lr.externalFilesHaveLevel {
2012 1 : specifiedLevel = int(lr.external[externalIdx].external.Level)
2013 1 : }
2014 : }
2015 :
2016 : // Add to CreatedBackingTables if this is a new backing.
2017 : //
2018 : // Shared files always have a new backing. External files have new backings
2019 : // iff the backing disk file num and the file num match (see ingestAttachRemote).
2020 2 : if isShared || (isExternal && m.TableBacking.DiskFileNum == base.DiskFileNum(m.TableNum)) {
2021 2 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.TableBacking)
2022 2 : }
2023 :
2024 2 : f := &ve.NewTables[i]
2025 2 : var err error
2026 2 : if specifiedLevel != -1 {
2027 2 : f.Level = specifiedLevel
2028 2 : } else {
2029 2 : var splitTable *manifest.TableMetadata
2030 2 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest()) && exciseSpan.Contains(d.cmp, m.Largest()) {
2031 2 : // This file fits perfectly within the excise span. We can slot it at
2032 2 : // L6, or sharedLevelsStart - 1 if we have shared files.
2033 2 : if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
2034 2 : f.Level = sharedLevelsStart - 1
2035 2 : if baseLevel > f.Level {
2036 2 : f.Level = 0
2037 2 : }
2038 2 : } else {
2039 2 : f.Level = 6
2040 2 : }
2041 2 : } else {
2042 2 : // We check overlap against the LSM without holding DB.mu. Note that we
2043 2 : // are still holding the log lock, so the version cannot change.
2044 2 : // TODO(radu): perform this check optimistically outside of the log lock.
2045 2 : var lsmOverlap overlap.WithLSM
2046 2 : lsmOverlap, err = func() (overlap.WithLSM, error) {
2047 2 : d.mu.Unlock()
2048 2 : defer d.mu.Lock()
2049 2 : return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
2050 2 : }()
2051 2 : if err == nil {
2052 2 : f.Level, splitTable, err = ingestTargetLevel(
2053 2 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
2054 2 : )
2055 2 : }
2056 : }
2057 :
2058 2 : if splitTable != nil {
2059 2 : if invariants.Enabled {
2060 2 : if lf := current.Levels[f.Level].Find(d.cmp, splitTable); lf.Empty() {
2061 0 : panic("splitFile returned is not in level it should be")
2062 : }
2063 : }
2064 : // We take advantage of the fact that we won't drop the db mutex
2065 : // between now and the call to UpdateVersionLocked. So, no files should
2066 : // get added to a new in-progress compaction at this point. We can
2067 : // avoid having to iterate on in-progress compactions to cancel them
2068 : // if none of the files being split have a compacting state.
2069 2 : if splitTable.IsCompacting() {
2070 1 : checkCompactions = true
2071 1 : }
2072 2 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitTable, level: f.Level})
2073 : }
2074 : }
2075 2 : if err != nil {
2076 0 : return versionUpdate{}, err
2077 0 : }
2078 2 : if isShared && f.Level < sharedLevelsStart {
2079 0 : panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
2080 0 : f.Level, sharedLevelsStart))
2081 : }
2082 2 : f.Meta = m
2083 2 : levelMetrics := metrics[f.Level]
2084 2 : if levelMetrics == nil {
2085 2 : levelMetrics = &LevelMetrics{}
2086 2 : metrics[f.Level] = levelMetrics
2087 2 : }
2088 2 : levelMetrics.TablesCount++
2089 2 : levelMetrics.TablesSize += int64(m.Size)
2090 2 : levelMetrics.EstimatedReferencesSize += m.EstimatedReferenceSize()
2091 2 : levelMetrics.TableBytesIngested += m.Size
2092 2 : levelMetrics.TablesIngested++
2093 : }
2094 : // replacedTables maps files excised due to exciseSpan (or splitFiles returned
2095 : // by ingestTargetLevel), to files that were created to replace it. This map
2096 : // is used to resolve references to split files in filesToSplit, as it is
2097 : // possible for a file that we want to split to no longer exist or have a
2098 : // newer fileMetadata due to a split induced by another ingestion file, or an
2099 : // excise.
2100 2 : replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
2101 2 : updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
2102 2 : levelMetrics := metrics[level]
2103 2 : if levelMetrics == nil {
2104 2 : levelMetrics = &LevelMetrics{}
2105 2 : metrics[level] = levelMetrics
2106 2 : }
2107 2 : levelMetrics.TablesCount--
2108 2 : levelMetrics.TablesSize -= int64(m.Size)
2109 2 : levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
2110 2 : for i := range added {
2111 2 : levelMetrics.TablesCount++
2112 2 : levelMetrics.TablesSize += int64(added[i].Meta.Size)
2113 2 : levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
2114 2 : }
2115 : }
2116 2 : var exciseBounds base.UserKeyBounds
2117 2 : if exciseSpan.Valid() {
2118 2 : exciseBounds = exciseSpan.UserKeyBounds()
2119 2 : d.mu.versions.metrics.Ingest.ExciseIngestCount++
2120 2 : // Iterate through all levels and find files that intersect with exciseSpan.
2121 2 : //
2122 2 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2123 2 : // excises; we only need to hold the version lock which we already are
2124 2 : // holding. However releasing the DB mutex could mess with the
2125 2 : // ingestTargetLevel calculation that happened above, as it assumed that it
2126 2 : // had a complete view of in-progress compactions that wouldn't change
2127 2 : // until UpdateVersionLocked is called. If we were to drop the mutex now,
2128 2 : // we could schedule another in-progress compaction that would go into the
2129 2 : // chosen target level and lead to file overlap within level (which would
2130 2 : // panic in UpdateVersionLocked). We should drop the db mutex here, do the
2131 2 : // excise, then re-grab the DB mutex and rerun just the in-progress
2132 2 : // compaction check to see if any new compactions are conflicting with our
2133 2 : // chosen target levels for files, and if they are, we should signal those
2134 2 : // compactions to error out.
2135 2 : for layer, ls := range current.AllLevelsAndSublevels() {
2136 2 : for m := range ls.Overlaps(d.cmp, exciseSpan.UserKeyBounds()).All() {
2137 2 : leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, m, layer.Level(), exciseBoundsPolicy)
2138 2 : if err != nil {
2139 0 : return versionUpdate{}, err
2140 0 : }
2141 2 : newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
2142 2 : replacedTables[m.TableNum] = newFiles
2143 2 : updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
2144 : }
2145 : }
2146 2 : if d.FormatMajorVersion() >= FormatExciseBoundsRecord {
2147 2 : ve.ExciseBoundsRecord = append(ve.ExciseBoundsRecord, manifest.ExciseOpEntry{
2148 2 : Bounds: exciseBounds,
2149 2 : SeqNum: exciseSeqNum,
2150 2 : })
2151 2 : }
2152 : }
2153 2 : if len(filesToSplit) > 0 {
2154 2 : // For the same reasons as the above call to excise, we hold the db mutex
2155 2 : // while calling this method.
2156 2 : if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedTables); err != nil {
2157 0 : return versionUpdate{}, err
2158 0 : }
2159 : }
2160 2 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2161 2 : for c := range d.mu.compact.inProgress {
2162 2 : if c.VersionEditApplied() {
2163 2 : continue
2164 : }
2165 : // Check if this compaction overlaps with the excise span. Note that just
2166 : // checking if the inputs individually overlap with the excise span
2167 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2168 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2169 : // doing a [c,d) excise at the same time as this compaction, we will have
2170 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2171 : // write a file overlapping with the excise span.
2172 2 : bounds := c.Bounds()
2173 2 : if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
2174 2 : c.Cancel()
2175 2 : }
2176 : // Check if this compaction's inputs have been replaced due to an
2177 : // ingest-time split. In that case, cancel the compaction as a newly picked
2178 : // compaction would need to include any new files that slid in between
2179 : // previously-existing files. Note that we cancel any compaction that has a
2180 : // file that was ingest-split as an input, even if it started before this
2181 : // ingestion.
2182 2 : if checkCompactions {
2183 1 : for _, table := range c.Tables() {
2184 1 : if _, ok := replacedTables[table.TableNum]; ok {
2185 1 : c.Cancel()
2186 1 : break
2187 : }
2188 : }
2189 : }
2190 : }
2191 : }
2192 :
2193 2 : return versionUpdate{
2194 2 : VE: ve,
2195 2 : JobID: jobID,
2196 2 : Metrics: metrics,
2197 2 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
2198 : }, nil
2199 : })
2200 2 : if err != nil {
2201 1 : return nil, 0, err
2202 1 : }
2203 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2204 : // an excise on this span. There should be none as the
2205 : // computePossibleOverlaps steps should have forced these EFOS to transition
2206 : // to file-only snapshots by now. If we see any that conflict with this
2207 : // excise, panic.
2208 2 : if exciseSpan.Valid() {
2209 2 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2210 1 : // Skip non-EFOS snapshots, and also skip any EFOS that were created
2211 1 : // *after* the excise.
2212 1 : if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
2213 0 : continue
2214 : }
2215 1 : efos := s.efos
2216 1 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2217 1 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2218 1 : // global list of all protected ranges instead of having to peer into every
2219 1 : // snapshot.
2220 1 : for i := range efos.protectedRanges {
2221 1 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2222 0 : panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
2223 : }
2224 : }
2225 : }
2226 : }
2227 :
2228 2 : d.mu.versions.metrics.Ingest.Count++
2229 2 :
2230 2 : d.updateReadStateLocked(d.opts.DebugCheck)
2231 2 : // updateReadStateLocked could have generated obsolete tables, schedule a
2232 2 : // cleanup job if necessary.
2233 2 : d.deleteObsoleteFiles(jobID)
2234 2 : d.updateTableStatsLocked(ve.NewTables)
2235 2 : // The ingestion may have pushed a level over the threshold for compaction,
2236 2 : // so check to see if one is necessary and schedule it.
2237 2 : d.maybeScheduleCompaction()
2238 2 : var toValidate []manifest.NewTableEntry
2239 2 : dedup := make(map[base.DiskFileNum]struct{})
2240 2 : for _, entry := range ve.NewTables {
2241 2 : if _, ok := dedup[entry.Meta.TableBacking.DiskFileNum]; !ok {
2242 2 : toValidate = append(toValidate, entry)
2243 2 : dedup[entry.Meta.TableBacking.DiskFileNum] = struct{}{}
2244 2 : }
2245 : }
2246 2 : d.maybeValidateSSTablesLocked(toValidate)
2247 2 :
2248 2 : return ve, manifestUpdateDuration, nil
2249 : }
2250 :
2251 : // maybeValidateSSTablesLocked adds the slice of newTableEntrys to the pending
2252 : // queue of files to be validated, when the feature is enabled.
2253 : //
2254 : // Note that if two entries with the same backing file are added twice, then the
2255 : // block checksums for the backing file will be validated twice.
2256 : //
2257 : // DB.mu must be locked when calling.
2258 2 : func (d *DB) maybeValidateSSTablesLocked(newFiles []manifest.NewTableEntry) {
2259 2 : // Only add to the validation queue when the feature is enabled.
2260 2 : if !d.opts.Experimental.ValidateOnIngest {
2261 2 : return
2262 2 : }
2263 :
2264 2 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2265 2 : if d.shouldValidateSSTablesLocked() {
2266 2 : go d.validateSSTables()
2267 2 : }
2268 : }
2269 :
2270 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2271 : // DB.mu must be locked when calling.
2272 2 : func (d *DB) shouldValidateSSTablesLocked() bool {
2273 2 : return !d.mu.tableValidation.validating &&
2274 2 : d.closed.Load() == nil &&
2275 2 : d.opts.Experimental.ValidateOnIngest &&
2276 2 : len(d.mu.tableValidation.pending) > 0
2277 2 : }
2278 :
2279 : // validateSSTables runs a round of validation on the tables in the pending
2280 : // queue.
2281 2 : func (d *DB) validateSSTables() {
2282 2 : d.mu.Lock()
2283 2 : if !d.shouldValidateSSTablesLocked() {
2284 2 : d.mu.Unlock()
2285 2 : return
2286 2 : }
2287 :
2288 2 : pending := d.mu.tableValidation.pending
2289 2 : d.mu.tableValidation.pending = nil
2290 2 : d.mu.tableValidation.validating = true
2291 2 : jobID := d.newJobIDLocked()
2292 2 : rs := d.loadReadState()
2293 2 :
2294 2 : // Drop DB.mu before performing IO.
2295 2 : d.mu.Unlock()
2296 2 :
2297 2 : // Validate all tables in the pending queue. This could lead to a situation
2298 2 : // where we are starving IO from other tasks due to having to page through
2299 2 : // all the blocks in all the sstables in the queue.
2300 2 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2301 2 :
2302 2 : // If we fail to validate any files due to reasons other than uncovered
2303 2 : // corruption, accumulate them and re-queue them for another attempt.
2304 2 : var retry []manifest.NewTableEntry
2305 2 :
2306 2 : for _, f := range pending {
2307 2 : // The file may have been moved or deleted since it was ingested, in
2308 2 : // which case we skip.
2309 2 : if !rs.current.Contains(f.Level, f.Meta) {
2310 2 : // Assume the file was moved to a lower level. It is rare enough
2311 2 : // that a table is moved or deleted between the time it was ingested
2312 2 : // and the time the validation routine runs that the overall cost of
2313 2 : // this inner loop is tolerably low, when amortized over all
2314 2 : // ingested tables.
2315 2 : found := false
2316 2 : for i := f.Level + 1; i < numLevels; i++ {
2317 2 : if rs.current.Contains(i, f.Meta) {
2318 1 : found = true
2319 1 : break
2320 : }
2321 : }
2322 2 : if !found {
2323 2 : continue
2324 : }
2325 : }
2326 :
2327 : // TOOD(radu): plumb a ReadEnv with a CategoryIngest stats collector through
2328 : // to ValidateBlockChecksums.
2329 2 : err := d.fileCache.withReader(context.TODO(), block.NoReadEnv,
2330 2 : f.Meta, func(r *sstable.Reader, _ sstable.ReadEnv) error {
2331 2 : return r.ValidateBlockChecksums()
2332 2 : })
2333 :
2334 2 : if err != nil {
2335 1 : if IsCorruptionError(err) {
2336 1 : // TODO(travers): Hook into the corruption reporting pipeline, once
2337 1 : // available. See pebble#1192.
2338 1 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2339 1 : } else {
2340 1 : // If there was some other, possibly transient, error that
2341 1 : // caused table validation to fail inform the EventListener and
2342 1 : // move on. We remember the table so that we can retry it in a
2343 1 : // subsequent table validation job.
2344 1 : //
2345 1 : // TODO(jackson): If the error is not transient, this will retry
2346 1 : // validation indefinitely. While not great, it's the same
2347 1 : // behavior as erroring flushes and compactions. We should
2348 1 : // address this as a part of #270.
2349 1 : d.opts.EventListener.BackgroundError(err)
2350 1 : retry = append(retry, f)
2351 1 : continue
2352 : }
2353 : }
2354 :
2355 2 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2356 2 : JobID: int(jobID),
2357 2 : Meta: f.Meta,
2358 2 : })
2359 : }
2360 2 : rs.unref()
2361 2 : d.mu.Lock()
2362 2 : defer d.mu.Unlock()
2363 2 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2364 2 : d.mu.tableValidation.validating = false
2365 2 : d.mu.tableValidation.cond.Broadcast()
2366 2 : if d.shouldValidateSSTablesLocked() {
2367 2 : go d.validateSSTables()
2368 2 : }
2369 : }
|