Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sort"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/cache"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/keyspan"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/internal/overlap"
21 : "github.com/cockroachdb/pebble/internal/sstableinternal"
22 : "github.com/cockroachdb/pebble/objstorage"
23 : "github.com/cockroachdb/pebble/objstorage/remote"
24 : "github.com/cockroachdb/pebble/sstable"
25 : "github.com/cockroachdb/pebble/sstable/block"
26 : )
27 :
28 1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
29 1 : c := userCmp(a.UserKey, b.UserKey)
30 1 : if c != 0 {
31 1 : return c
32 1 : }
33 1 : if a.IsExclusiveSentinel() {
34 1 : if !b.IsExclusiveSentinel() {
35 1 : return -1
36 1 : }
37 1 : } else if b.IsExclusiveSentinel() {
38 1 : return +1
39 1 : }
40 1 : return 0
41 : }
42 :
43 1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
44 1 : if key.Kind() == InternalKeyKindInvalid {
45 1 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
46 1 : key.Pretty(opts.Comparer.FormatKey))
47 1 : }
48 1 : if key.SeqNum() != 0 {
49 1 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
50 1 : key.Pretty(opts.Comparer.FormatKey))
51 1 : }
52 1 : if err := opts.Comparer.ValidateKey.Validate(key.UserKey); err != nil {
53 0 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s, %w",
54 0 : key.Pretty(opts.Comparer.FormatKey), err)
55 0 : }
56 1 : return nil
57 : }
58 :
59 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
60 : // or shared by another node.
61 : func ingestSynthesizeShared(
62 : opts *Options, sm SharedSSTMeta, tableNum base.TableNum,
63 1 : ) (*manifest.TableMetadata, error) {
64 1 : if sm.Size == 0 {
65 0 : // Disallow 0 file sizes
66 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
67 0 : }
68 : // Don't load table stats. Doing a round trip to shared storage, one SST
69 : // at a time is not worth it as it slows down ingestion.
70 1 : meta := &manifest.TableMetadata{
71 1 : TableNum: tableNum,
72 1 : CreationTime: time.Now().Unix(),
73 1 : Virtual: true,
74 1 : Size: sm.Size,
75 1 : }
76 1 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
77 1 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
78 1 : //
79 1 : // NB: We create new internal keys and pass them into ExtendPointKeyBounds
80 1 : // so that we can sub a zero sequence number into the bounds. We can set
81 1 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
82 1 : // anyway. However, we do need to use the same sequence number across all
83 1 : // bound keys at this step so that we end up with bounds that are consistent
84 1 : // across point/range keys.
85 1 : //
86 1 : // Because of the sequence number rewriting, we cannot use the Kind of
87 1 : // sm.SmallestPointKey. For example, the original SST might start with
88 1 : // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
89 1 : // rewriting the sequence numbers, these keys become a.SET.100 and
90 1 : // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
91 1 : // correct bound, we just use the maximum key kind (which sorts first).
92 1 : // Similarly, we use the smallest key kind for the largest key.
93 1 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable)
94 1 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
95 1 : if sm.LargestPointKey.IsExclusiveSentinel() {
96 1 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
97 1 : }
98 1 : if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
99 1 : smallestPointKey.Trailer < largestPointKey.Trailer {
100 0 : // We get kinds from the sender, however we substitute our own sequence
101 0 : // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
102 0 : // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
103 0 : // kinds need to be reversed now because DelSized > Set.
104 0 : smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
105 0 : }
106 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
107 : }
108 1 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
109 1 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
110 1 : //
111 1 : // See comment above on why we use a zero sequence number and these key
112 1 : // kinds here.
113 1 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
114 1 : largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
115 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
116 1 : }
117 :
118 : // For simplicity, we use the same number for both the FileNum and the
119 : // DiskFileNum (even though this is a virtual sstable). Pass the underlying
120 : // TableBacking's size to the same size as the virtualized view of the sstable.
121 : // This ensures that we don't over-prioritize this sstable for compaction just
122 : // yet, as we do not have a clear sense of what parts of this sstable are
123 : // referenced by other nodes.
124 1 : meta.InitVirtualBacking(base.DiskFileNum(tableNum), sm.Size)
125 1 :
126 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
127 0 : return nil, err
128 0 : }
129 1 : return meta, nil
130 : }
131 :
132 : // ingestLoad1External loads the fileMetadata for one external sstable.
133 : // Sequence number and target level calculation happens during prepare/apply.
134 : func ingestLoad1External(
135 : opts *Options, e ExternalFile, tableNum base.TableNum,
136 1 : ) (*manifest.TableMetadata, error) {
137 1 : if e.Size == 0 {
138 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
139 0 : }
140 1 : if !e.HasRangeKey && !e.HasPointKey {
141 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
142 0 : }
143 :
144 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
145 1 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
146 1 : }
147 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
148 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
149 0 : }
150 1 : if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
151 1 : return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
152 1 : }
153 1 : if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
154 1 : return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
155 1 : }
156 :
157 : // Don't load table stats. Doing a round trip to shared storage, one SST
158 : // at a time is not worth it as it slows down ingestion.
159 1 : meta := &manifest.TableMetadata{
160 1 : TableNum: tableNum,
161 1 : CreationTime: time.Now().Unix(),
162 1 : Size: e.Size,
163 1 : Virtual: true,
164 1 : }
165 1 : // In the name of keeping this ingestion as fast as possible, we avoid *all*
166 1 : // existence checks and synthesize a table metadata with smallest/largest
167 1 : // keys that overlap whatever the passed-in span was.
168 1 : smallestCopy := slices.Clone(e.StartKey)
169 1 : largestCopy := slices.Clone(e.EndKey)
170 1 : if e.HasPointKey {
171 1 : // Sequence numbers are updated later by
172 1 : // ingestUpdateSeqNum, applying a squence number that
173 1 : // is applied to all keys in the sstable.
174 1 : if e.EndKeyIsInclusive {
175 1 : meta.ExtendPointKeyBounds(
176 1 : opts.Comparer.Compare,
177 1 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
178 1 : base.MakeInternalKey(largestCopy, 0, 0))
179 1 : } else {
180 1 : meta.ExtendPointKeyBounds(
181 1 : opts.Comparer.Compare,
182 1 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
183 1 : base.MakeRangeDeleteSentinelKey(largestCopy))
184 1 : }
185 : }
186 1 : if e.HasRangeKey {
187 1 : meta.ExtendRangeKeyBounds(
188 1 : opts.Comparer.Compare,
189 1 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
190 1 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
191 1 : )
192 1 : }
193 :
194 1 : meta.SyntheticPrefixAndSuffix = sstable.MakeSyntheticPrefixAndSuffix(e.SyntheticPrefix, e.SyntheticSuffix)
195 1 :
196 1 : return meta, nil
197 : }
198 :
199 : type rangeKeyIngestValidator struct {
200 : // lastRangeKey is the last range key seen in the previous file.
201 : lastRangeKey keyspan.Span
202 : // comparer, if unset, disables range key validation.
203 : comparer *base.Comparer
204 : }
205 :
206 1 : func disableRangeKeyChecks() rangeKeyIngestValidator {
207 1 : return rangeKeyIngestValidator{}
208 1 : }
209 :
210 : func validateSuffixedBoundaries(
211 : cmp *base.Comparer, lastRangeKey keyspan.Span,
212 1 : ) rangeKeyIngestValidator {
213 1 : return rangeKeyIngestValidator{
214 1 : lastRangeKey: lastRangeKey,
215 1 : comparer: cmp,
216 1 : }
217 1 : }
218 :
219 : // Validate valides if the stored state of this rangeKeyIngestValidator allows for
220 : // a file with the given nextFileSmallestKey to be ingested, such that the stored
221 : // last file's largest range key defragments cleanly with the next file's smallest
222 : // key if it was suffixed. If a value of nil is passed in for nextFileSmallestKey,
223 : // that denotes the next file does not have a range key or there is no next file.
224 1 : func (r *rangeKeyIngestValidator) Validate(nextFileSmallestKey *keyspan.Span) error {
225 1 : if r.comparer == nil {
226 1 : return nil
227 1 : }
228 1 : if r.lastRangeKey.Valid() {
229 1 : if r.comparer.Split.HasSuffix(r.lastRangeKey.End) {
230 1 : if nextFileSmallestKey == nil || !r.comparer.Equal(r.lastRangeKey.End, nextFileSmallestKey.Start) {
231 1 : // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
232 1 : return errors.AssertionFailedf("pebble: ingest sstable has suffixed largest range key that does not match the start key of the next sstable: %s",
233 1 : r.comparer.FormatKey(r.lastRangeKey.End))
234 1 : } else if !keyspan.DefragmentInternal.ShouldDefragment(r.comparer.CompareRangeSuffixes, &r.lastRangeKey, nextFileSmallestKey) {
235 0 : // The last range key has a suffix, and it doesn't defragment cleanly with this range key.
236 0 : return errors.AssertionFailedf("pebble: ingest sstable has suffixed range key that won't defragment with next sstable: %s",
237 0 : r.comparer.FormatKey(r.lastRangeKey.End))
238 0 : }
239 : }
240 1 : } else if nextFileSmallestKey != nil && r.comparer.Split.HasSuffix(nextFileSmallestKey.Start) {
241 0 : return errors.Newf("pebble: ingest sstable has suffixed range key start that won't defragment: %s",
242 0 : r.comparer.FormatKey(nextFileSmallestKey.Start))
243 0 : }
244 1 : return nil
245 : }
246 :
247 : // ingestLoad1 creates the TableMetadata for one file. This file will be owned
248 : // by this store.
249 : //
250 : // prevLastRangeKey is the last range key from the previous file. It is used to
251 : // ensure that the range keys defragment cleanly across files. These checks
252 : // are disabled if disableRangeKeyChecks is true.
253 : func ingestLoad1(
254 : ctx context.Context,
255 : opts *Options,
256 : fmv FormatMajorVersion,
257 : readable objstorage.Readable,
258 : cacheHandle *cache.Handle,
259 : tableNum base.TableNum,
260 : rangeKeyValidator rangeKeyIngestValidator,
261 1 : ) (meta *manifest.TableMetadata, lastRangeKey keyspan.Span, err error) {
262 1 : o := opts.MakeReaderOptions()
263 1 : o.CacheOpts = sstableinternal.CacheOptions{
264 1 : CacheHandle: cacheHandle,
265 1 : FileNum: base.PhysicalTableDiskFileNum(tableNum),
266 1 : }
267 1 : r, err := sstable.NewReader(ctx, readable, o)
268 1 : if err != nil {
269 1 : return nil, keyspan.Span{}, errors.CombineErrors(err, readable.Close())
270 1 : }
271 1 : defer func() { _ = r.Close() }()
272 :
273 : // Avoid ingesting tables with format versions this DB doesn't support.
274 1 : tf, err := r.TableFormat()
275 1 : if err != nil {
276 0 : return nil, keyspan.Span{}, err
277 0 : }
278 1 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
279 1 : return nil, keyspan.Span{}, errors.Newf(
280 1 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
281 1 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
282 1 : )
283 1 : }
284 :
285 1 : if r.Attributes.Has(sstable.AttributeBlobValues) {
286 1 : return nil, keyspan.Span{}, errors.Newf(
287 1 : "pebble: ingesting tables with blob references is not supported")
288 1 : }
289 :
290 1 : props, err := r.ReadPropertiesBlock(ctx, nil /* buffer pool */)
291 1 : if err != nil {
292 1 : return nil, keyspan.Span{}, err
293 1 : }
294 :
295 : // If this is a columnar block, read key schema name from properties block.
296 1 : if tf.BlockColumnar() {
297 1 : if _, ok := opts.KeySchemas[props.KeySchemaName]; !ok {
298 0 : return nil, keyspan.Span{}, errors.Newf(
299 0 : "pebble: table uses key schema %q unknown to the database",
300 0 : props.KeySchemaName)
301 0 : }
302 : }
303 :
304 1 : meta = &manifest.TableMetadata{}
305 1 : meta.TableNum = tableNum
306 1 : meta.Size = max(uint64(readable.Size()), 1)
307 1 : meta.CreationTime = time.Now().Unix()
308 1 : meta.InitPhysicalBacking()
309 1 :
310 1 : // Avoid loading into the file cache for collecting stats if we
311 1 : // don't need to. If there are no range deletions, we have all the
312 1 : // information to compute the stats here.
313 1 : //
314 1 : // This is helpful in tests for avoiding awkwardness around deletion of
315 1 : // ingested files from MemFS. MemFS implements the Windows semantics of
316 1 : // disallowing removal of an open file. Under MemFS, if we don't populate
317 1 : // meta.Stats here, the file will be loaded into the file cache for
318 1 : // calculating stats before we can remove the original link.
319 1 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &props.CommonProperties, opts.Logger)
320 1 :
321 1 : {
322 1 : iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */, sstable.AssertNoBlobHandles)
323 1 : if err != nil {
324 1 : return nil, keyspan.Span{}, err
325 1 : }
326 1 : defer func() { _ = iter.Close() }()
327 1 : var smallest InternalKey
328 1 : if kv := iter.First(); kv != nil {
329 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
330 1 : return nil, keyspan.Span{}, err
331 1 : }
332 1 : smallest = kv.K.Clone()
333 : }
334 1 : if err := iter.Error(); err != nil {
335 1 : return nil, keyspan.Span{}, err
336 1 : }
337 1 : if kv := iter.Last(); kv != nil {
338 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
339 0 : return nil, keyspan.Span{}, err
340 0 : }
341 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
342 : }
343 1 : if err := iter.Error(); err != nil {
344 1 : return nil, keyspan.Span{}, err
345 1 : }
346 : }
347 :
348 1 : iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms, sstable.NoReadEnv)
349 1 : if err != nil {
350 0 : return nil, keyspan.Span{}, err
351 0 : }
352 1 : if iter != nil {
353 1 : defer iter.Close()
354 1 : var smallest InternalKey
355 1 : if s, err := iter.First(); err != nil {
356 0 : return nil, keyspan.Span{}, err
357 1 : } else if s != nil {
358 1 : key := s.SmallestKey()
359 1 : if err := ingestValidateKey(opts, &key); err != nil {
360 0 : return nil, keyspan.Span{}, err
361 0 : }
362 1 : smallest = key.Clone()
363 : }
364 1 : if s, err := iter.Last(); err != nil {
365 0 : return nil, keyspan.Span{}, err
366 1 : } else if s != nil {
367 1 : k := s.SmallestKey()
368 1 : if err := ingestValidateKey(opts, &k); err != nil {
369 0 : return nil, keyspan.Span{}, err
370 0 : }
371 1 : largest := s.LargestKey().Clone()
372 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
373 : }
374 : }
375 :
376 : // Update the range-key bounds for the table.
377 1 : {
378 1 : iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms, sstable.NoReadEnv)
379 1 : if err != nil {
380 0 : return nil, keyspan.Span{}, err
381 0 : }
382 1 : if iter != nil {
383 1 : defer iter.Close()
384 1 : var smallest InternalKey
385 1 : if s, err := iter.First(); err != nil {
386 0 : return nil, keyspan.Span{}, err
387 1 : } else if s != nil {
388 1 : key := s.SmallestKey()
389 1 : if err := ingestValidateKey(opts, &key); err != nil {
390 0 : return nil, keyspan.Span{}, err
391 0 : }
392 1 : smallest = key.Clone()
393 1 : // Range keys need some additional validation as we need to ensure they
394 1 : // defragment cleanly with the lastRangeKey from the previous file.
395 1 : if err := rangeKeyValidator.Validate(s); err != nil {
396 0 : return nil, keyspan.Span{}, err
397 0 : }
398 : }
399 1 : lastRangeKey = keyspan.Span{}
400 1 : if s, err := iter.Last(); err != nil {
401 0 : return nil, keyspan.Span{}, err
402 1 : } else if s != nil {
403 1 : k := s.SmallestKey()
404 1 : if err := ingestValidateKey(opts, &k); err != nil {
405 0 : return nil, keyspan.Span{}, err
406 0 : }
407 : // As range keys are fragmented, the end key of the last range key in
408 : // the table provides the upper bound for the table.
409 1 : largest := s.LargestKey().Clone()
410 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
411 1 : lastRangeKey = s.Clone()
412 0 : } else {
413 0 : // s == nil.
414 0 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
415 0 : return nil, keyspan.Span{}, err
416 0 : }
417 : }
418 1 : } else {
419 1 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
420 0 : return nil, keyspan.Span{}, err
421 0 : }
422 1 : lastRangeKey = keyspan.Span{}
423 : }
424 : }
425 :
426 1 : if !meta.HasPointKeys && !meta.HasRangeKeys {
427 1 : return nil, keyspan.Span{}, nil
428 1 : }
429 :
430 : // Sanity check that the various bounds on the file were set consistently.
431 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
432 0 : return nil, keyspan.Span{}, err
433 0 : }
434 :
435 1 : return meta, lastRangeKey, nil
436 : }
437 :
438 : type ingestLoadResult struct {
439 : local []ingestLocalMeta
440 : shared []ingestSharedMeta
441 : external []ingestExternalMeta
442 :
443 : externalFilesHaveLevel bool
444 : }
445 :
446 : type ingestLocalMeta struct {
447 : *manifest.TableMetadata
448 : path string
449 : }
450 :
451 : type ingestSharedMeta struct {
452 : *manifest.TableMetadata
453 : shared SharedSSTMeta
454 : }
455 :
456 : type ingestExternalMeta struct {
457 : *manifest.TableMetadata
458 : external ExternalFile
459 : // usedExistingBacking is true if the external file is reusing a backing
460 : // that existed before this ingestion. In this case, we called
461 : // VirtualBackings.Protect() on that backing; we will need to call
462 : // Unprotect() after the ingestion.
463 : usedExistingBacking bool
464 : }
465 :
466 1 : func (r *ingestLoadResult) fileCount() int {
467 1 : return len(r.local) + len(r.shared) + len(r.external)
468 1 : }
469 :
470 : func ingestLoad(
471 : ctx context.Context,
472 : opts *Options,
473 : fmv FormatMajorVersion,
474 : paths []string,
475 : shared []SharedSSTMeta,
476 : external []ExternalFile,
477 : cacheHandle *cache.Handle,
478 : pending []base.TableNum,
479 1 : ) (ingestLoadResult, error) {
480 1 : localFileNums := pending[:len(paths)]
481 1 : sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
482 1 : externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
483 1 :
484 1 : var result ingestLoadResult
485 1 : result.local = make([]ingestLocalMeta, 0, len(paths))
486 1 : var lastRangeKey keyspan.Span
487 1 : // NB: we disable range key boundary assertions if we have shared or external files
488 1 : // present in this ingestion. This is because a suffixed range key in a local file
489 1 : // can possibly defragment with a suffixed range key in a shared or external file.
490 1 : // We also disable range key boundary assertions if we have CreateOnShared set to
491 1 : // true, as that means we could have suffixed RangeKeyDels or Unsets in the local
492 1 : // files that won't ever be surfaced, even if there are no shared or external files
493 1 : // in the ingestion.
494 1 : shouldDisableRangeKeyChecks := len(shared) > 0 || len(external) > 0 || opts.Experimental.CreateOnShared != remote.CreateOnSharedNone
495 1 : for i := range paths {
496 1 : f, err := opts.FS.Open(paths[i])
497 1 : if err != nil {
498 1 : return ingestLoadResult{}, err
499 1 : }
500 :
501 1 : readable, err := sstable.NewSimpleReadable(f)
502 1 : if err != nil {
503 1 : return ingestLoadResult{}, err
504 1 : }
505 1 : var m *manifest.TableMetadata
506 1 : rangeKeyValidator := disableRangeKeyChecks()
507 1 : if !shouldDisableRangeKeyChecks {
508 1 : rangeKeyValidator = validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
509 1 : }
510 1 : m, lastRangeKey, err = ingestLoad1(ctx, opts, fmv, readable, cacheHandle, localFileNums[i], rangeKeyValidator)
511 1 : if err != nil {
512 1 : return ingestLoadResult{}, err
513 1 : }
514 1 : if m != nil {
515 1 : result.local = append(result.local, ingestLocalMeta{
516 1 : TableMetadata: m,
517 1 : path: paths[i],
518 1 : })
519 1 : }
520 : }
521 :
522 1 : if !shouldDisableRangeKeyChecks {
523 1 : rangeKeyValidator := validateSuffixedBoundaries(opts.Comparer, lastRangeKey)
524 1 : if err := rangeKeyValidator.Validate(nil /* nextFileSmallestKey */); err != nil {
525 1 : return ingestLoadResult{}, err
526 1 : }
527 : }
528 :
529 : // Sort the shared files according to level.
530 1 : sort.Sort(sharedByLevel(shared))
531 1 :
532 1 : result.shared = make([]ingestSharedMeta, 0, len(shared))
533 1 : for i := range shared {
534 1 : m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
535 1 : if err != nil {
536 0 : return ingestLoadResult{}, err
537 0 : }
538 1 : if shared[i].Level < sharedLevelsStart {
539 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
540 0 : }
541 1 : result.shared = append(result.shared, ingestSharedMeta{
542 1 : TableMetadata: m,
543 1 : shared: shared[i],
544 1 : })
545 : }
546 1 : result.external = make([]ingestExternalMeta, 0, len(external))
547 1 : for i := range external {
548 1 : m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
549 1 : if err != nil {
550 1 : return ingestLoadResult{}, err
551 1 : }
552 1 : result.external = append(result.external, ingestExternalMeta{
553 1 : TableMetadata: m,
554 1 : external: external[i],
555 1 : })
556 1 : if external[i].Level > 0 {
557 1 : if i != 0 && !result.externalFilesHaveLevel {
558 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
559 0 : }
560 1 : result.externalFilesHaveLevel = true
561 1 : } else if result.externalFilesHaveLevel {
562 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
563 0 : }
564 : }
565 1 : return result, nil
566 : }
567 :
568 1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
569 1 : // Verify that all the shared files (i.e. files in sharedMeta)
570 1 : // fit within the exciseSpan.
571 1 : for _, f := range lr.shared {
572 1 : if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
573 0 : return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
574 0 : }
575 : }
576 :
577 1 : if lr.externalFilesHaveLevel {
578 1 : for _, f := range lr.external {
579 1 : if !exciseSpan.Contains(cmp, f.Smallest()) || !exciseSpan.Contains(cmp, f.Largest()) {
580 0 : return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
581 0 : }
582 : }
583 : }
584 :
585 1 : if len(lr.external) > 0 {
586 1 : if len(lr.shared) > 0 {
587 0 : // If external files are present alongside shared files,
588 0 : // return an error.
589 0 : return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
590 0 : }
591 :
592 : // Sort according to the smallest key.
593 1 : slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
594 1 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
595 1 : })
596 1 : for i := 1; i < len(lr.external); i++ {
597 1 : if sstableKeyCompare(cmp, lr.external[i-1].Largest(), lr.external[i].Smallest()) >= 0 {
598 1 : return errors.Newf("pebble: external sstables have overlapping ranges")
599 1 : }
600 : }
601 1 : return nil
602 : }
603 1 : if len(lr.local) <= 1 {
604 1 : return nil
605 1 : }
606 :
607 : // Sort according to the smallest key.
608 1 : slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
609 1 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
610 1 : })
611 :
612 1 : for i := 1; i < len(lr.local); i++ {
613 1 : if sstableKeyCompare(cmp, lr.local[i-1].Largest(), lr.local[i].Smallest()) >= 0 {
614 1 : return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
615 1 : }
616 : }
617 1 : if len(lr.shared) == 0 {
618 1 : return nil
619 1 : }
620 0 : filesInLevel := make([]*manifest.TableMetadata, 0, len(lr.shared))
621 0 : for l := sharedLevelsStart; l < numLevels; l++ {
622 0 : filesInLevel = filesInLevel[:0]
623 0 : for i := range lr.shared {
624 0 : if lr.shared[i].shared.Level == uint8(l) {
625 0 : filesInLevel = append(filesInLevel, lr.shared[i].TableMetadata)
626 0 : }
627 : }
628 0 : for i := range lr.external {
629 0 : if lr.external[i].external.Level == uint8(l) {
630 0 : filesInLevel = append(filesInLevel, lr.external[i].TableMetadata)
631 0 : }
632 : }
633 0 : slices.SortFunc(filesInLevel, func(a, b *manifest.TableMetadata) int {
634 0 : return cmp(a.Smallest().UserKey, b.Smallest().UserKey)
635 0 : })
636 0 : for i := 1; i < len(filesInLevel); i++ {
637 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest(), filesInLevel[i].Smallest()) >= 0 {
638 0 : return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
639 0 : }
640 : }
641 : }
642 0 : return nil
643 : }
644 :
645 1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
646 1 : var firstErr error
647 1 : for i := range meta {
648 1 : if err := objProvider.Remove(base.FileTypeTable, meta[i].TableBacking.DiskFileNum); err != nil {
649 1 : firstErr = firstError(firstErr, err)
650 1 : }
651 : }
652 1 : return firstErr
653 : }
654 :
655 : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
656 : // copies of the ingested files.
657 : func ingestLinkLocal(
658 : ctx context.Context,
659 : jobID JobID,
660 : opts *Options,
661 : objProvider objstorage.Provider,
662 : localMetas []ingestLocalMeta,
663 1 : ) error {
664 1 : for i := range localMetas {
665 1 : objMeta, err := objProvider.LinkOrCopyFromLocal(
666 1 : ctx, opts.FS, localMetas[i].path, base.FileTypeTable, localMetas[i].TableBacking.DiskFileNum,
667 1 : objstorage.CreateOptions{PreferSharedStorage: true},
668 1 : )
669 1 : if err != nil {
670 1 : if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
671 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
672 0 : }
673 1 : return err
674 : }
675 1 : if opts.EventListener.TableCreated != nil {
676 1 : opts.EventListener.TableCreated(TableCreateInfo{
677 1 : JobID: int(jobID),
678 1 : Reason: "ingesting",
679 1 : Path: objProvider.Path(objMeta),
680 1 : FileNum: base.PhysicalTableDiskFileNum(localMetas[i].TableNum),
681 1 : })
682 1 : }
683 : }
684 1 : return nil
685 : }
686 :
687 : // ingestAttachRemote attaches remote objects to the storage provider.
688 : //
689 : // For external objects, we reuse existing FileBackings from the current version
690 : // when possible.
691 : //
692 : // ingestUnprotectExternalBackings() must be called after this function (even in
693 : // error cases).
694 1 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
695 1 : remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
696 1 : for i := range lr.shared {
697 1 : backing, err := lr.shared[i].shared.Backing.Get()
698 1 : if err != nil {
699 0 : return err
700 0 : }
701 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
702 1 : FileNum: lr.shared[i].TableBacking.DiskFileNum,
703 1 : FileType: base.FileTypeTable,
704 1 : Backing: backing,
705 1 : })
706 : }
707 :
708 1 : d.findExistingBackingsForExternalObjects(lr.external)
709 1 :
710 1 : newTableBackings := make(map[remote.ObjectKey]*manifest.TableBacking, len(lr.external))
711 1 : for i := range lr.external {
712 1 : meta := lr.external[i].TableMetadata
713 1 : if meta.TableBacking != nil {
714 1 : // The backing was filled in by findExistingBackingsForExternalObjects().
715 1 : continue
716 : }
717 1 : key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
718 1 : if backing, ok := newTableBackings[key]; ok {
719 1 : // We already created the same backing in this loop. Update its size.
720 1 : backing.Size += lr.external[i].external.Size
721 1 : meta.AttachVirtualBacking(backing)
722 1 : continue
723 : }
724 1 : providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
725 1 : if err != nil {
726 0 : return err
727 0 : }
728 : // We have to attach the remote object (and assign it a DiskFileNum). For
729 : // simplicity, we use the same number for both the FileNum and the
730 : // DiskFileNum (even though this is a virtual sstable).
731 1 : size := max(lr.external[i].external.Size, 1)
732 1 : meta.InitVirtualBacking(base.DiskFileNum(meta.TableNum), size)
733 1 :
734 1 : // Set the underlying TableBacking's size to the same size as the virtualized
735 1 : // view of the sstable. This ensures that we don't over-prioritize this
736 1 : // sstable for compaction just yet, as we do not have a clear sense of
737 1 : // what parts of this sstable are referenced by other nodes.
738 1 : meta.TableBacking.Size = size
739 1 : newTableBackings[key] = meta.TableBacking
740 1 :
741 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
742 1 : FileNum: meta.TableBacking.DiskFileNum,
743 1 : FileType: base.FileTypeTable,
744 1 : Backing: providerBacking,
745 1 : })
746 : }
747 :
748 1 : for i := range lr.external {
749 1 : if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
750 0 : return err
751 0 : }
752 : }
753 :
754 1 : remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
755 1 : if err != nil {
756 0 : return err
757 0 : }
758 :
759 1 : for i := range lr.shared {
760 1 : // One corner case around file sizes we need to be mindful of, is that
761 1 : // if one of the shareObjs was initially created by us (and has boomeranged
762 1 : // back from another node), we'll need to update the TableBacking's size
763 1 : // to be the true underlying size. Otherwise, we could hit errors when we
764 1 : // open the db again after a crash/restart (see checkConsistency in open.go),
765 1 : // plus it more accurately allows us to prioritize compactions of files
766 1 : // that were originally created by us.
767 1 : if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
768 1 : size, err := d.objProvider.Size(remoteObjMetas[i])
769 1 : if err != nil {
770 0 : return err
771 0 : }
772 1 : lr.shared[i].TableBacking.Size = max(uint64(size), 1)
773 : }
774 : }
775 :
776 1 : if d.opts.EventListener.TableCreated != nil {
777 1 : for i := range remoteObjMetas {
778 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
779 1 : JobID: int(jobID),
780 1 : Reason: "ingesting",
781 1 : Path: d.objProvider.Path(remoteObjMetas[i]),
782 1 : FileNum: remoteObjMetas[i].DiskFileNum,
783 1 : })
784 1 : }
785 : }
786 :
787 1 : return nil
788 : }
789 :
790 : // findExistingBackingsForExternalObjects populates the TableBacking for external
791 : // files which are already in use by the current version.
792 : //
793 : // We take a Ref and LatestRef on populated backings.
794 1 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
795 1 : d.mu.Lock()
796 1 : defer d.mu.Unlock()
797 1 :
798 1 : for i := range metas {
799 1 : diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
800 1 : // We cross-check against fileBackings in the current version because it is
801 1 : // possible that the external object is referenced by an sstable which only
802 1 : // exists in a previous version. In that case, that object could be removed
803 1 : // at any time so we cannot reuse it.
804 1 : for _, n := range diskFileNums {
805 1 : if backing, ok := d.mu.versions.latest.virtualBackings.Get(n); ok {
806 1 : // Protect this backing from being removed from the latest version. We
807 1 : // will unprotect in ingestUnprotectExternalBackings.
808 1 : d.mu.versions.latest.virtualBackings.Protect(n)
809 1 : metas[i].usedExistingBacking = true
810 1 : metas[i].AttachVirtualBacking(backing)
811 1 :
812 1 : // We can't update the size of the backing here, so make sure the
813 1 : // virtual size is sane.
814 1 : // TODO(radu): investigate what would it take to update the backing size.
815 1 : metas[i].Size = min(metas[i].Size, backing.Size)
816 1 : break
817 : }
818 : }
819 : }
820 : }
821 :
822 : // ingestUnprotectExternalBackings unprotects the file backings that were reused
823 : // for external objects when the ingestion fails.
824 1 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
825 1 : d.mu.Lock()
826 1 : defer d.mu.Unlock()
827 1 :
828 1 : for _, meta := range lr.external {
829 1 : if meta.usedExistingBacking {
830 1 : // If the backing is not use anywhere else and the ingest failed (or the
831 1 : // ingested tables were already compacted away), this call will cause in
832 1 : // the next version update to remove the backing.
833 1 : d.mu.versions.latest.virtualBackings.Unprotect(meta.TableBacking.DiskFileNum)
834 1 : }
835 : }
836 : }
837 :
838 : func setSeqNumInMetadata(
839 : m *manifest.TableMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
840 1 : ) error {
841 1 : setSeqFn := func(k base.InternalKey) base.InternalKey {
842 1 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
843 1 : }
844 : // NB: we set the fields directly here, rather than via their Extend*
845 : // methods, as we are updating sequence numbers.
846 1 : if m.HasPointKeys {
847 1 : m.PointKeyBounds.SetSmallest(setSeqFn(m.PointKeyBounds.Smallest()))
848 1 : }
849 1 : if m.HasRangeKeys {
850 1 : m.RangeKeyBounds.SetSmallest(setSeqFn(m.RangeKeyBounds.Smallest()))
851 1 : }
852 : // Only update the seqnum for the largest key if that key is not an
853 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
854 : // boundary), as doing so effectively drops the exclusive sentinel (by
855 : // lowering the seqnum from the max value), and extends the bounds of the
856 : // table.
857 : // NB: as the largest range key is always an exclusive sentinel, it is never
858 : // updated.
859 1 : if m.HasPointKeys && !m.PointKeyBounds.Largest().IsExclusiveSentinel() {
860 1 : m.PointKeyBounds.SetLargest(setSeqFn(m.PointKeyBounds.Largest()))
861 1 : }
862 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
863 : // Properties.GlobalSeqNum when an sstable is loaded.
864 1 : m.SmallestSeqNum = seqNum
865 1 : m.LargestSeqNum = seqNum
866 1 : m.LargestSeqNumAbsolute = seqNum
867 1 : // Ensure the new bounds are consistent.
868 1 : if err := m.Validate(cmp, format); err != nil {
869 0 : return err
870 0 : }
871 1 : return nil
872 : }
873 :
874 : func ingestUpdateSeqNum(
875 : cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
876 1 : ) error {
877 1 : // Shared sstables are required to be sorted by level ascending. We then
878 1 : // iterate the shared sstables in reverse, assigning the lower sequence
879 1 : // numbers to the shared sstables that will be ingested into the lower
880 1 : // (larger numbered) levels first. This ensures sequence number shadowing is
881 1 : // correct.
882 1 : for i := len(loadResult.shared) - 1; i >= 0; i-- {
883 1 : if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
884 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
885 : }
886 1 : if err := setSeqNumInMetadata(loadResult.shared[i].TableMetadata, seqNum, cmp, format); err != nil {
887 0 : return err
888 0 : }
889 1 : seqNum++
890 : }
891 1 : for i := range loadResult.external {
892 1 : if err := setSeqNumInMetadata(loadResult.external[i].TableMetadata, seqNum, cmp, format); err != nil {
893 0 : return err
894 0 : }
895 1 : seqNum++
896 : }
897 1 : for i := range loadResult.local {
898 1 : if err := setSeqNumInMetadata(loadResult.local[i].TableMetadata, seqNum, cmp, format); err != nil {
899 0 : return err
900 0 : }
901 1 : seqNum++
902 : }
903 1 : return nil
904 : }
905 :
906 : // ingestTargetLevel returns the target level for a file being ingested.
907 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
908 : // its target level calculation, and if a split candidate is found, that file
909 : // is returned as the splitFile.
910 : func ingestTargetLevel(
911 : ctx context.Context,
912 : cmp base.Compare,
913 : lsmOverlap overlap.WithLSM,
914 : baseLevel int,
915 : compactions map[compaction]struct{},
916 : meta *manifest.TableMetadata,
917 : suggestSplit bool,
918 1 : ) (targetLevel int, splitFile *manifest.TableMetadata, err error) {
919 1 : // Find the lowest level which does not have any files which overlap meta. We
920 1 : // search from L0 to L6 looking for whether there are any files in the level
921 1 : // which overlap meta. We want the "lowest" level (where lower means
922 1 : // increasing level number) in order to reduce write amplification.
923 1 : //
924 1 : // There are 2 kinds of overlap we need to check for: file boundary overlap
925 1 : // and data overlap. Data overlap implies file boundary overlap. Note that it
926 1 : // is always possible to ingest into L0.
927 1 : //
928 1 : // To place meta at level i where i > 0:
929 1 : // - there must not be any data overlap with levels <= i, since that will
930 1 : // violate the sequence number invariant.
931 1 : // - no file boundary overlap with level i, since that will violate the
932 1 : // invariant that files do not overlap in levels i > 0.
933 1 : // - if there is only a file overlap at a given level, and no data overlap,
934 1 : // we can still slot a file at that level. We return the fileMetadata with
935 1 : // which we have file boundary overlap (must be only one file, as sstable
936 1 : // bounds are usually tight on user keys) and the caller is expected to split
937 1 : // that sstable into two virtual sstables, allowing this file to go into that
938 1 : // level. Note that if we have file boundary overlap with two files, which
939 1 : // should only happen on rare occasions, we treat it as data overlap and
940 1 : // don't use this optimization.
941 1 : //
942 1 : // The file boundary overlap check is simpler to conceptualize. Consider the
943 1 : // following example, in which the ingested file lies completely before or
944 1 : // after the file being considered.
945 1 : //
946 1 : // |--| |--| ingested file: [a,b] or [f,g]
947 1 : // |-----| existing file: [c,e]
948 1 : // _____________________
949 1 : // a b c d e f g
950 1 : //
951 1 : // In both cases the ingested file can move to considering the next level.
952 1 : //
953 1 : // File boundary overlap does not necessarily imply data overlap. The check
954 1 : // for data overlap is a little more nuanced. Consider the following examples:
955 1 : //
956 1 : // 1. No data overlap:
957 1 : //
958 1 : // |-| |--| ingested file: [cc-d] or [ee-ff]
959 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
960 1 : // _____________________
961 1 : // a b c d e f g
962 1 : //
963 1 : // In this case the ingested files can "fall through" this level. The checks
964 1 : // continue at the next level.
965 1 : //
966 1 : // 2. Data overlap:
967 1 : //
968 1 : // |--| ingested file: [d-e]
969 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
970 1 : // _____________________
971 1 : // a b c d e f g
972 1 : //
973 1 : // In this case the file cannot be ingested into this level as the point 'dd'
974 1 : // is in the way.
975 1 : //
976 1 : // It is worth noting that the check for data overlap is only approximate. In
977 1 : // the previous example, the ingested table [d-e] could contain only the
978 1 : // points 'd' and 'e', in which case the table would be eligible for
979 1 : // considering lower levels. However, such a fine-grained check would need to
980 1 : // be exhaustive (comparing points and ranges in both the ingested existing
981 1 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
982 1 : // existing point that falls within the ingested table bounds as being "data
983 1 : // overlap".
984 1 :
985 1 : if lsmOverlap[0].Result == overlap.Data {
986 1 : return 0, nil, nil
987 1 : }
988 1 : targetLevel = 0
989 1 : splitFile = nil
990 1 : metaBounds := meta.UserKeyBounds()
991 1 : for level := baseLevel; level < numLevels; level++ {
992 1 : var candidateSplitFile *manifest.TableMetadata
993 1 : switch lsmOverlap[level].Result {
994 1 : case overlap.Data:
995 1 : // We cannot ingest into or under this level; return the best target level
996 1 : // so far.
997 1 : return targetLevel, splitFile, nil
998 :
999 1 : case overlap.OnlyBoundary:
1000 1 : if !suggestSplit || lsmOverlap[level].SplitFile == nil {
1001 1 : // We can ingest under this level, but not into this level.
1002 1 : continue
1003 : }
1004 : // We can ingest into this level if we split this file.
1005 1 : candidateSplitFile = lsmOverlap[level].SplitFile
1006 :
1007 1 : case overlap.None:
1008 : // We can ingest into this level.
1009 :
1010 0 : default:
1011 0 : return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
1012 : }
1013 :
1014 : // Check boundary overlap with any ongoing compactions. We consider an
1015 : // overlapping compaction that's writing files to an output level as
1016 : // equivalent to boundary overlap with files in that output level.
1017 : //
1018 : // We cannot check for data overlap with the new SSTs compaction will produce
1019 : // since compaction hasn't been done yet. However, there's no need to check
1020 : // since all keys in them will be from levels in [c.startLevel,
1021 : // c.outputLevel], and all those levels have already had their data overlap
1022 : // tested negative (else we'd have returned earlier).
1023 : //
1024 : // An alternative approach would be to cancel these compactions and proceed
1025 : // with an ingest-time split on this level if necessary. However, compaction
1026 : // cancellation can result in significant wasted effort and is best avoided
1027 : // unless necessary.
1028 1 : overlaps := false
1029 1 : for c := range compactions {
1030 1 : tblCompaction, ok := c.(*tableCompaction)
1031 1 : if !ok {
1032 0 : continue
1033 : }
1034 1 : if tblCompaction.outputLevel == nil || level != tblCompaction.outputLevel.level {
1035 1 : continue
1036 : }
1037 1 : bounds := tblCompaction.Bounds()
1038 1 : if bounds != nil && metaBounds.Overlaps(cmp, bounds) {
1039 1 : overlaps = true
1040 1 : break
1041 : }
1042 : }
1043 1 : if !overlaps {
1044 1 : targetLevel = level
1045 1 : splitFile = candidateSplitFile
1046 1 : }
1047 : }
1048 1 : return targetLevel, splitFile, nil
1049 : }
1050 :
1051 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
1052 : // atomic and semantically equivalent to creating a single batch containing all
1053 : // of the mutations in the sstables. Ingestion may require the memtable to be
1054 : // flushed. The ingested sstable files are moved into the DB and must reside on
1055 : // the same filesystem as the DB. Sstables can be created for ingestion using
1056 : // sstable.Writer. On success, Ingest removes the input paths.
1057 : //
1058 : // Ingested sstables must have been created with a known KeySchema (when written
1059 : // with columnar blocks) and Comparer. They must not contain any references to
1060 : // external blob files.
1061 : //
1062 : // Two types of sstables are accepted for ingestion(s): one is sstables present
1063 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
1064 : // present in remote.Storage, referred to as shared or foreign sstables. These
1065 : // shared sstables can be linked through objstorageprovider.Provider, and do not
1066 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
1067 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
1068 : //
1069 : // All sstables *must* be Sync()'d by the caller after all bytes are written
1070 : // and before its file handle is closed; failure to do so could violate
1071 : // durability or lead to corrupted on-disk state. This method cannot, in a
1072 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1073 : // properly synced to disk. Opening new file handles and Sync()-ing them
1074 : // does not always guarantee durability; see the discussion here on that:
1075 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1076 : //
1077 : // Ingestion loads each sstable into the lowest level of the LSM which it
1078 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1079 : // ingestion forces the memtable to flush, and then waits for the flush to
1080 : // occur. In some cases, such as with no foreign sstables and no excise span,
1081 : // ingestion that gets blocked on a memtable can join the flushable queue and
1082 : // finish even before the memtable has been flushed.
1083 : //
1084 : // The steps for ingestion are:
1085 : //
1086 : // 1. Allocate table numbers for every sstable being ingested.
1087 : // 2. Load the metadata for all sstables being ingested.
1088 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1089 : // sstables).
1090 : // 4. Hard link (or copy) the local sstables into the DB directory.
1091 : // 5. Allocate a sequence number to use for all of the entries in the
1092 : // local sstables. This is the step where overlap with memtables is
1093 : // determined. If there is overlap, we remember the most recent memtable
1094 : // that overlaps.
1095 : // 6. Update the sequence number in the ingested local sstables. (Remote
1096 : // sstables get fixed sequence numbers that were determined at load time.)
1097 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1098 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1099 : // 8.1. If an excise span was specified, figure out what sstables in the
1100 : // current version overlap with the excise span, and create new virtual
1101 : // sstables out of those sstables that exclude the excised span (DB.excise).
1102 : // 9. Publish the ingestion sequence number.
1103 : //
1104 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1105 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1106 : // mutations that get sequence numbers larger than the ingestion sequence
1107 : // number get queued up behind the ingestion waiting for it to complete. This
1108 : // can produce a noticeable hiccup in performance. See
1109 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1110 : // this hiccup.
1111 1 : func (d *DB) Ingest(ctx context.Context, paths []string) error {
1112 1 : if err := d.closed.Load(); err != nil {
1113 1 : panic(err)
1114 : }
1115 1 : if d.opts.ReadOnly {
1116 1 : return ErrReadOnly
1117 1 : }
1118 1 : _, err := d.ingest(ctx, ingestArgs{Local: paths})
1119 1 : return err
1120 : }
1121 :
1122 : // IngestOperationStats provides some information about where in the LSM the
1123 : // bytes were ingested.
1124 : type IngestOperationStats struct {
1125 : // Bytes is the total bytes in the ingested sstables.
1126 : Bytes uint64
1127 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1128 : // into L0. This value is approximate when flushable ingests are active and
1129 : // an ingest overlaps an entry in the flushable queue. Currently, this
1130 : // approximation is very rough, only including tables that overlapped the
1131 : // memtable. This estimate may be improved with #2112.
1132 : ApproxIngestedIntoL0Bytes uint64
1133 : // MemtableOverlappingFiles is the count of ingested sstables
1134 : // that overlapped keys in the memtables.
1135 : MemtableOverlappingFiles int
1136 : }
1137 :
1138 : // ExternalFile are external sstables that can be referenced through
1139 : // objprovider and ingested as remote files that will not be refcounted or
1140 : // cleaned up. For use with online restore. Note that the underlying sstable
1141 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1142 : // is expected to only read the keys within those bounds.
1143 : type ExternalFile struct {
1144 : // Locator is the shared.Locator that can be used with objProvider to
1145 : // resolve a reference to this external sstable.
1146 : Locator remote.Locator
1147 :
1148 : // ObjName is the unique name of this sstable on Locator.
1149 : ObjName string
1150 :
1151 : // Size of the referenced proportion of the virtualized sstable. An estimate
1152 : // is acceptable in lieu of the backing file size.
1153 : Size uint64
1154 :
1155 : // StartKey and EndKey define the bounds of the sstable; the ingestion
1156 : // of this file will only result in keys within [StartKey, EndKey) if
1157 : // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
1158 : // These bounds are loose i.e. it's possible for keys to not span the
1159 : // entirety of this range.
1160 : //
1161 : // StartKey and EndKey user keys must not have suffixes.
1162 : //
1163 : // Multiple ExternalFiles in one ingestion must all have non-overlapping
1164 : // bounds.
1165 : StartKey, EndKey []byte
1166 :
1167 : // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
1168 : EndKeyIsInclusive bool
1169 :
1170 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1171 : // or range keys. If both structs are false, an error is returned during
1172 : // ingestion.
1173 : HasPointKey, HasRangeKey bool
1174 :
1175 : // SyntheticPrefix will prepend this prefix to all keys in the file during
1176 : // iteration. Note that the backing file itself is not modified.
1177 : //
1178 : // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
1179 : SyntheticPrefix []byte
1180 :
1181 : // SyntheticSuffix will replace the suffix of every key in the file during
1182 : // iteration. Note that the file itself is not modified, rather, every key
1183 : // returned by an iterator will have the synthetic suffix.
1184 : //
1185 : // SyntheticSuffix can only be used under the following conditions:
1186 : // - the synthetic suffix must sort before any non-empty suffixes in the
1187 : // backing sst (the entire sst, not just the part restricted to Bounds).
1188 : // - the backing sst must not contain multiple keys with the same prefix.
1189 : SyntheticSuffix []byte
1190 :
1191 : // Level denotes the level at which this file was present at read time
1192 : // if the external file was returned by a scan of an existing Pebble
1193 : // instance. If Level is 0, this field is ignored.
1194 : Level uint8
1195 : }
1196 :
1197 : // IngestWithStats does the same as Ingest, and additionally returns
1198 : // IngestOperationStats.
1199 1 : func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
1200 1 : if err := d.closed.Load(); err != nil {
1201 0 : panic(err)
1202 : }
1203 1 : if d.opts.ReadOnly {
1204 0 : return IngestOperationStats{}, ErrReadOnly
1205 0 : }
1206 1 : return d.ingest(ctx, ingestArgs{Local: paths})
1207 : }
1208 :
1209 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1210 : // accepts external files (with locator info that can be resolved using
1211 : // d.opts.SharedStorage). These files must also be non-overlapping with
1212 : // each other, and must be resolvable through d.objProvider.
1213 : func (d *DB) IngestExternalFiles(
1214 : ctx context.Context, external []ExternalFile,
1215 1 : ) (IngestOperationStats, error) {
1216 1 : if err := d.closed.Load(); err != nil {
1217 0 : panic(err)
1218 : }
1219 :
1220 1 : if d.opts.ReadOnly {
1221 0 : return IngestOperationStats{}, ErrReadOnly
1222 0 : }
1223 1 : if d.opts.Experimental.RemoteStorage == nil {
1224 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1225 0 : }
1226 1 : return d.ingest(ctx, ingestArgs{External: external})
1227 : }
1228 :
1229 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1230 : // list of shared files to ingest that can be read from a remote.Storage through
1231 : // a Provider. All the shared files must live within exciseSpan, and any existing
1232 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1233 : // sstables (if not virtual already) and shrinking their spans to exclude
1234 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1235 : // ingestion process.
1236 : //
1237 : // Panics if this DB instance was not instantiated with a remote.Storage and
1238 : // shared sstables are present.
1239 : func (d *DB) IngestAndExcise(
1240 : ctx context.Context,
1241 : paths []string,
1242 : shared []SharedSSTMeta,
1243 : external []ExternalFile,
1244 : exciseSpan KeyRange,
1245 1 : ) (IngestOperationStats, error) {
1246 1 : if err := d.closed.Load(); err != nil {
1247 0 : panic(err)
1248 : }
1249 1 : if d.opts.ReadOnly {
1250 0 : return IngestOperationStats{}, ErrReadOnly
1251 0 : }
1252 : // Excise is only supported on prefix keys.
1253 1 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1254 0 : return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed start key")
1255 0 : }
1256 1 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1257 0 : return IngestOperationStats{}, errors.New("IngestAndExcise called with suffixed end key")
1258 0 : }
1259 1 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1260 0 : return IngestOperationStats{}, errors.Newf(
1261 0 : "store has format major version %d; IngestAndExcise requires at least %d",
1262 0 : v, FormatMinForSharedObjects,
1263 0 : )
1264 0 : }
1265 1 : args := ingestArgs{
1266 1 : Local: paths,
1267 1 : Shared: shared,
1268 1 : External: external,
1269 1 : ExciseSpan: exciseSpan,
1270 1 : ExciseBoundsPolicy: tightExciseBounds,
1271 1 : }
1272 1 : return d.ingest(ctx, args)
1273 : }
1274 :
1275 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1276 : func (d *DB) newIngestedFlushableEntry(
1277 : meta []*manifest.TableMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
1278 1 : ) (*flushableEntry, error) {
1279 1 : // If there's an excise being done atomically with the same ingest, we
1280 1 : // assign the lowest sequence number in the set of sequence numbers for this
1281 1 : // ingestion to the excise. Note that we've already allocated fileCount+1
1282 1 : // sequence numbers in this case.
1283 1 : //
1284 1 : // This mimics the behaviour in the non-flushable ingest case (see the callsite
1285 1 : // for ingestUpdateSeqNum).
1286 1 : fileSeqNumStart := seqNum
1287 1 : if exciseSpan.Valid() {
1288 1 : fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise.
1289 1 : // The excise span will be retained by the flushable, outliving the
1290 1 : // caller's ingestion call. Copy it.
1291 1 : exciseSpan = KeyRange{
1292 1 : Start: slices.Clone(exciseSpan.Start),
1293 1 : End: slices.Clone(exciseSpan.End),
1294 1 : }
1295 1 : }
1296 : // Update the sequence number for all of the sstables in the
1297 : // metadata. Writing the metadata to the manifest when the
1298 : // version edit is applied is the mechanism that persists the
1299 : // sequence number. The sstables themselves are left unmodified.
1300 : // In this case, a version edit will only be written to the manifest
1301 : // when the flushable is eventually flushed. If Pebble restarts in that
1302 : // time, then we'll lose the ingest sequence number information. But this
1303 : // information will also be reconstructed on node restart.
1304 1 : for i, m := range meta {
1305 1 : if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
1306 0 : return nil, err
1307 0 : }
1308 : }
1309 :
1310 1 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum)
1311 1 :
1312 1 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1313 1 : // to and the sequence number within the WAL which we'll write this entry
1314 1 : // to.
1315 1 : entry := d.newFlushableEntry(f, logNum, seqNum)
1316 1 : // The flushable entry starts off with a single reader ref, so increment
1317 1 : // the TableMetadata.Refs.
1318 1 : for _, file := range f.files {
1319 1 : file.Ref()
1320 1 : }
1321 1 : entry.unrefFiles = func(of *manifest.ObsoleteFiles) {
1322 1 : // Invoke Unref on each table. If any files become obsolete, they'll be
1323 1 : // added to the set of obsolete files.
1324 1 : for _, file := range f.files {
1325 1 : file.Unref(of)
1326 1 : }
1327 : }
1328 :
1329 1 : entry.flushForced = true
1330 1 : entry.releaseMemAccounting = func() {}
1331 1 : return entry, nil
1332 : }
1333 :
1334 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1335 : // we're holding both locks, the order in which we rotate the memtable or
1336 : // recycle the WAL in this function is irrelevant as long as the correct log
1337 : // numbers are assigned to the appropriate flushable.
1338 : func (d *DB) handleIngestAsFlushable(
1339 : meta []*manifest.TableMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
1340 1 : ) error {
1341 1 : b := d.NewBatch()
1342 1 : if exciseSpan.Valid() {
1343 1 : b.excise(exciseSpan.Start, exciseSpan.End)
1344 1 : }
1345 1 : for _, m := range meta {
1346 1 : b.ingestSST(m.TableNum)
1347 1 : }
1348 1 : b.setSeqNum(seqNum)
1349 1 :
1350 1 : // If the WAL is disabled, then the logNum used to create the flushable
1351 1 : // entry doesn't matter. We just use the logNum assigned to the current
1352 1 : // mutable memtable. If the WAL is enabled, then this logNum will be
1353 1 : // overwritten by the logNum of the log which will contain the log entry
1354 1 : // for the ingestedFlushable.
1355 1 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1356 1 : if !d.opts.DisableWAL {
1357 1 : // We create a new WAL for the flushable instead of reusing the end of
1358 1 : // the previous WAL. This simplifies the increment of the minimum
1359 1 : // unflushed log number, and also simplifies WAL replay.
1360 1 : var prevLogSize uint64
1361 1 : logNum, prevLogSize = d.rotateWAL()
1362 1 : // As the rotator of the WAL, we're responsible for updating the
1363 1 : // previous flushable queue tail's log size.
1364 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
1365 1 :
1366 1 : d.mu.Unlock()
1367 1 : err := d.commit.directWrite(b)
1368 1 : if err != nil {
1369 0 : d.opts.Logger.Fatalf("%v", err)
1370 0 : }
1371 1 : d.mu.Lock()
1372 : }
1373 :
1374 1 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
1375 1 : if err != nil {
1376 0 : return err
1377 0 : }
1378 1 : nextSeqNum := seqNum + base.SeqNum(b.Count())
1379 1 :
1380 1 : // Set newLogNum to the logNum of the previous flushable. This value is
1381 1 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1382 1 : // the appropriate value below.
1383 1 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1384 1 : if !d.opts.DisableWAL {
1385 1 : // newLogNum will be the WAL num of the next mutable memtable which
1386 1 : // comes after the ingestedFlushable in the flushable queue. The mutable
1387 1 : // memtable will be created below.
1388 1 : //
1389 1 : // The prevLogSize returned by rotateWAL is the WAL to which the
1390 1 : // flushable ingest keys were appended. This intermediary WAL is only
1391 1 : // used to record the flushable ingest and nothing else.
1392 1 : newLogNum, entry.logSize = d.rotateWAL()
1393 1 : }
1394 :
1395 1 : d.mu.versions.metrics.Ingest.Count++
1396 1 : currMem := d.mu.mem.mutable
1397 1 : // NB: Placing ingested sstables above the current memtables
1398 1 : // requires rotating of the existing memtables/WAL. There is
1399 1 : // some concern of churning through tiny memtables due to
1400 1 : // ingested sstables being placed on top of them, but those
1401 1 : // memtables would have to be flushed anyways.
1402 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1403 1 : d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
1404 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1405 1 : // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
1406 1 : d.maybeScheduleFlush()
1407 1 : return nil
1408 : }
1409 :
1410 : type ingestArgs struct {
1411 : // Local sstables to ingest.
1412 : Local []string
1413 : // Shared sstables to ingest.
1414 : Shared []SharedSSTMeta
1415 : // External sstables to ingest.
1416 : External []ExternalFile
1417 : // ExciseSpan (unset if not excising).
1418 : ExciseSpan KeyRange
1419 : ExciseBoundsPolicy exciseBoundsPolicy
1420 : }
1421 :
1422 : // See comment at Ingest() for details on how this works.
1423 1 : func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats, error) {
1424 1 : paths := args.Local
1425 1 : shared := args.Shared
1426 1 : external := args.External
1427 1 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1428 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1429 : }
1430 1 : if (args.ExciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1431 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1432 0 : }
1433 1 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
1434 1 : for i := range external {
1435 1 : if len(external[i].SyntheticPrefix) > 0 {
1436 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1437 1 : }
1438 1 : if len(external[i].SyntheticSuffix) > 0 {
1439 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
1440 1 : }
1441 : }
1442 : }
1443 : // Allocate table numbers for all files being ingested and mark them as
1444 : // pending in order to prevent them from being deleted. Note that this causes
1445 : // the file number ordering to be out of alignment with sequence number
1446 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1447 : // that (busted) invariant.
1448 1 : pendingOutputs := make([]base.TableNum, len(paths)+len(shared)+len(external))
1449 1 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1450 1 : pendingOutputs[i] = d.mu.versions.getNextTableNum()
1451 1 : }
1452 :
1453 1 : jobID := d.newJobID()
1454 1 :
1455 1 : // Load the metadata for all the files being ingested. This step detects
1456 1 : // and elides empty sstables.
1457 1 : loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheHandle, pendingOutputs)
1458 1 : if err != nil {
1459 1 : return IngestOperationStats{}, err
1460 1 : }
1461 :
1462 1 : if loadResult.fileCount() == 0 && !args.ExciseSpan.Valid() {
1463 1 : // All of the sstables to be ingested were empty. Nothing to do.
1464 1 : return IngestOperationStats{}, nil
1465 1 : }
1466 :
1467 : // Verify the sstables do not overlap.
1468 1 : if err := ingestSortAndVerify(d.cmp, loadResult, args.ExciseSpan); err != nil {
1469 1 : return IngestOperationStats{}, err
1470 1 : }
1471 :
1472 : // Hard link the sstables into the DB directory. Since the sstables aren't
1473 : // referenced by a version, they won't be used. If the hard linking fails
1474 : // (e.g. because the files reside on a different filesystem), ingestLinkLocal
1475 : // will fall back to copying, and if that fails we undo our work and return an
1476 : // error.
1477 1 : if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1478 0 : return IngestOperationStats{}, err
1479 0 : }
1480 :
1481 1 : err = d.ingestAttachRemote(jobID, loadResult)
1482 1 : defer d.ingestUnprotectExternalBackings(loadResult)
1483 1 : if err != nil {
1484 0 : return IngestOperationStats{}, err
1485 0 : }
1486 :
1487 : // Make the new tables durable. We need to do this at some point before we
1488 : // update the MANIFEST (via UpdateVersionLocked), otherwise a crash can have
1489 : // the tables referenced in the MANIFEST, but not present in the provider.
1490 1 : if err := d.objProvider.Sync(); err != nil {
1491 1 : return IngestOperationStats{}, err
1492 1 : }
1493 :
1494 : // metaFlushableOverlaps is a map indicating which of the ingested sstables
1495 : // overlap some table in the flushable queue. It's used to approximate
1496 : // ingest-into-L0 stats when using flushable ingests.
1497 1 : metaFlushableOverlaps := make(map[base.TableNum]bool, loadResult.fileCount())
1498 1 : var mem *flushableEntry
1499 1 : var mut *memTable
1500 1 : // asFlushable indicates whether the sstable was ingested as a flushable.
1501 1 : var asFlushable bool
1502 1 : prepare := func(seqNum base.SeqNum) {
1503 1 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1504 1 :
1505 1 : // Determine the set of bounds we care about for the purpose of checking
1506 1 : // for overlap among the flushables. If there's an excise span, we need
1507 1 : // to check for overlap with its bounds as well.
1508 1 : overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
1509 1 : for _, m := range loadResult.local {
1510 1 : overlapBounds = append(overlapBounds, m.TableMetadata)
1511 1 : }
1512 1 : for _, m := range loadResult.shared {
1513 1 : overlapBounds = append(overlapBounds, m.TableMetadata)
1514 1 : }
1515 1 : for _, m := range loadResult.external {
1516 1 : overlapBounds = append(overlapBounds, m.TableMetadata)
1517 1 : }
1518 1 : if args.ExciseSpan.Valid() {
1519 1 : overlapBounds = append(overlapBounds, &args.ExciseSpan)
1520 1 : }
1521 :
1522 1 : d.mu.Lock()
1523 1 : defer d.mu.Unlock()
1524 1 :
1525 1 : if args.ExciseSpan.Valid() {
1526 1 : // Check if any of the currently-open EventuallyFileOnlySnapshots
1527 1 : // overlap in key ranges with the excise span. If so, we need to
1528 1 : // check for memtable overlaps with all bounds of that
1529 1 : // EventuallyFileOnlySnapshot in addition to the ingestion's own
1530 1 : // bounds too.
1531 1 : overlapBounds = append(overlapBounds, exciseOverlapBounds(
1532 1 : d.cmp, &d.mu.snapshots.snapshotList, args.ExciseSpan, seqNum)...)
1533 1 : }
1534 :
1535 : // Check to see if any files overlap with any of the memtables. The queue
1536 : // is ordered from oldest to newest with the mutable memtable being the
1537 : // last element in the slice. We want to wait for the newest table that
1538 : // overlaps.
1539 :
1540 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1541 1 : m := d.mu.mem.queue[i]
1542 1 : m.computePossibleOverlaps(func(b bounded) shouldContinue {
1543 1 : // If this is the first table to overlap a flushable, save
1544 1 : // the flushable. This ingest must be ingested or flushed
1545 1 : // after it.
1546 1 : if mem == nil {
1547 1 : mem = m
1548 1 : }
1549 :
1550 1 : switch v := b.(type) {
1551 1 : case *manifest.TableMetadata:
1552 1 : // NB: False positives are possible if `m` is a flushable
1553 1 : // ingest that overlaps the file `v` in bounds but doesn't
1554 1 : // contain overlapping data. This is considered acceptable
1555 1 : // because it's rare (in CockroachDB a bound overlap likely
1556 1 : // indicates a data overlap), and blocking the commit
1557 1 : // pipeline while we perform I/O to check for overlap may be
1558 1 : // more disruptive than enqueueing this ingestion on the
1559 1 : // flushable queue and switching to a new memtable.
1560 1 : metaFlushableOverlaps[v.TableNum] = true
1561 1 : case *KeyRange:
1562 : // An excise span or an EventuallyFileOnlySnapshot protected range;
1563 : // not a file.
1564 0 : default:
1565 0 : panic("unreachable")
1566 : }
1567 1 : return continueIteration
1568 : }, overlapBounds...)
1569 : }
1570 :
1571 1 : if mem == nil {
1572 1 : // No overlap with any of the queued flushables, so no need to queue
1573 1 : // after them.
1574 1 :
1575 1 : // New writes with higher sequence numbers may be concurrently
1576 1 : // committed. We must ensure they don't flush before this ingest
1577 1 : // completes. To do that, we ref the mutable memtable as a writer,
1578 1 : // preventing its flushing (and the flushing of all subsequent
1579 1 : // flushables in the queue). Once we've acquired the manifest lock
1580 1 : // to add the ingested sstables to the LSM, we can unref as we're
1581 1 : // guaranteed that the flush won't edit the LSM before this ingest.
1582 1 : mut = d.mu.mem.mutable
1583 1 : mut.writerRef()
1584 1 : return
1585 1 : }
1586 :
1587 : // The ingestion overlaps with some entry in the flushable queue. If the
1588 : // pre-conditions are met below, we can treat this ingestion as a flushable
1589 : // ingest, otherwise we wait on the memtable flush before ingestion.
1590 : //
1591 : // TODO(aaditya): We should make flushableIngest compatible with remote
1592 : // files.
1593 1 : hasRemoteFiles := len(shared) > 0 || len(external) > 0
1594 1 : canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
1595 1 : // We require that either the queue of flushables is below the
1596 1 : // stop-writes threshold (note that this is typically a conservative
1597 1 : // check, since not every element of this queue will contribute the full
1598 1 : // memtable memory size that could result in a write stall), or WAL
1599 1 : // failover is permitting an unlimited queue without causing a write
1600 1 : // stall. The latter condition is important to avoid delays in
1601 1 : // visibility of concurrent writes that happen to get a sequence number
1602 1 : // after this ingest and then must wait for this ingest that is itself
1603 1 : // waiting on a large flush. See
1604 1 : // https://github.com/cockroachdb/pebble/issues/4944 for an illustration
1605 1 : // of this problem.
1606 1 : (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold ||
1607 1 : d.mu.log.manager.ElevateWriteStallThresholdForFailover()) &&
1608 1 : !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
1609 1 : (!args.ExciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
1610 1 :
1611 1 : if !canIngestFlushable {
1612 1 : // We're not able to ingest as a flushable,
1613 1 : // so we must synchronously flush.
1614 1 : //
1615 1 : // TODO(bilal): Currently, if any of the files being ingested are shared,
1616 1 : // we cannot use flushable ingests and need
1617 1 : // to wait synchronously.
1618 1 : if mem.flushable == d.mu.mem.mutable {
1619 1 : err = d.makeRoomForWrite(nil)
1620 1 : }
1621 : // New writes with higher sequence numbers may be concurrently
1622 : // committed. We must ensure they don't flush before this ingest
1623 : // completes. To do that, we ref the mutable memtable as a writer,
1624 : // preventing its flushing (and the flushing of all subsequent
1625 : // flushables in the queue). Once we've acquired the manifest lock
1626 : // to add the ingested sstables to the LSM, we can unref as we're
1627 : // guaranteed that the flush won't edit the LSM before this ingest.
1628 1 : mut = d.mu.mem.mutable
1629 1 : mut.writerRef()
1630 1 : mem.flushForced = true
1631 1 : d.maybeScheduleFlush()
1632 1 : return
1633 : }
1634 : // Since there aren't too many memtables already queued up, we can
1635 : // slide the ingested sstables on top of the existing memtables.
1636 1 : asFlushable = true
1637 1 : fileMetas := make([]*manifest.TableMetadata, len(loadResult.local))
1638 1 : for i := range fileMetas {
1639 1 : fileMetas[i] = loadResult.local[i].TableMetadata
1640 1 : }
1641 1 : err = d.handleIngestAsFlushable(fileMetas, seqNum, args.ExciseSpan)
1642 : }
1643 :
1644 1 : var ve *manifest.VersionEdit
1645 1 : apply := func(seqNum base.SeqNum) {
1646 1 : if err != nil || asFlushable {
1647 1 : // An error occurred during prepare.
1648 1 : if mut != nil {
1649 0 : if mut.writerUnref() {
1650 0 : d.mu.Lock()
1651 0 : d.maybeScheduleFlush()
1652 0 : d.mu.Unlock()
1653 0 : }
1654 : }
1655 1 : return
1656 : }
1657 :
1658 : // If there's an excise being done atomically with the same ingest, we
1659 : // assign the lowest sequence number in the set of sequence numbers for this
1660 : // ingestion to the excise. Note that we've already allocated fileCount+1
1661 : // sequence numbers in this case.
1662 1 : if args.ExciseSpan.Valid() {
1663 1 : seqNum++ // the first seqNum is reserved for the excise.
1664 1 : }
1665 : // Update the sequence numbers for all ingested sstables'
1666 : // metadata. When the version edit is applied, the metadata is
1667 : // written to the manifest, persisting the sequence number.
1668 : // The sstables themselves are left unmodified.
1669 1 : if err = ingestUpdateSeqNum(
1670 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1671 1 : ); err != nil {
1672 0 : if mut != nil {
1673 0 : if mut.writerUnref() {
1674 0 : d.mu.Lock()
1675 0 : d.maybeScheduleFlush()
1676 0 : d.mu.Unlock()
1677 0 : }
1678 : }
1679 0 : return
1680 : }
1681 :
1682 : // If we overlapped with a memtable in prepare wait for the flush to
1683 : // finish.
1684 1 : if mem != nil {
1685 1 : <-mem.flushed
1686 1 : }
1687 :
1688 : // Assign the sstables to the correct level in the LSM and apply the
1689 : // version edit.
1690 1 : ve, err = d.ingestApply(ctx, jobID, loadResult, mut, args.ExciseSpan, args.ExciseBoundsPolicy, seqNum)
1691 : }
1692 :
1693 : // Only one ingest can occur at a time because if not, one would block waiting
1694 : // for the other to finish applying. This blocking would happen while holding
1695 : // the commit mutex which would prevent unrelated batches from writing their
1696 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1697 : // during ingestion.
1698 1 : seqNumCount := loadResult.fileCount()
1699 1 : if args.ExciseSpan.Valid() {
1700 1 : seqNumCount++
1701 1 : }
1702 1 : d.commit.ingestSem <- struct{}{}
1703 1 : d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
1704 1 : <-d.commit.ingestSem
1705 1 :
1706 1 : if err != nil {
1707 1 : if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
1708 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1709 0 : }
1710 1 : } else {
1711 1 : // Since we either created a hard link to the ingesting files, or copied
1712 1 : // them over, it is safe to remove the originals paths.
1713 1 : for i := range loadResult.local {
1714 1 : path := loadResult.local[i].path
1715 1 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1716 1 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1717 1 : }
1718 : }
1719 : }
1720 :
1721 : // TODO(jackson): Refactor this so that the case where there are no files
1722 : // but a valid excise span is not so exceptional.
1723 :
1724 1 : var stats IngestOperationStats
1725 1 : if loadResult.fileCount() > 0 {
1726 1 : info := TableIngestInfo{
1727 1 : JobID: int(jobID),
1728 1 : Err: err,
1729 1 : flushable: asFlushable,
1730 1 : }
1731 1 : if len(loadResult.local) > 0 {
1732 1 : info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
1733 1 : } else if len(loadResult.shared) > 0 {
1734 1 : info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
1735 1 : } else {
1736 1 : info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
1737 1 : }
1738 1 : if ve != nil {
1739 1 : info.Tables = make([]struct {
1740 1 : TableInfo
1741 1 : Level int
1742 1 : }, len(ve.NewTables))
1743 1 : for i := range ve.NewTables {
1744 1 : e := &ve.NewTables[i]
1745 1 : info.Tables[i].Level = e.Level
1746 1 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1747 1 : stats.Bytes += e.Meta.Size
1748 1 : if e.Level == 0 {
1749 1 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1750 1 : }
1751 1 : if metaFlushableOverlaps[e.Meta.TableNum] {
1752 1 : stats.MemtableOverlappingFiles++
1753 1 : }
1754 : }
1755 1 : } else if asFlushable {
1756 1 : // NB: If asFlushable == true, there are no shared sstables.
1757 1 : info.Tables = make([]struct {
1758 1 : TableInfo
1759 1 : Level int
1760 1 : }, len(loadResult.local))
1761 1 : for i, f := range loadResult.local {
1762 1 : info.Tables[i].Level = -1
1763 1 : info.Tables[i].TableInfo = f.TableInfo()
1764 1 : stats.Bytes += f.Size
1765 1 : // We don't have exact stats on which files will be ingested into
1766 1 : // L0, because actual ingestion into the LSM has been deferred until
1767 1 : // flush time. Instead, we infer based on memtable overlap.
1768 1 : //
1769 1 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1770 1 : // before entering the commit pipeline, we can use that overlap to
1771 1 : // improve our approximation by incorporating overlap with L0, not
1772 1 : // just memtables.
1773 1 : if metaFlushableOverlaps[f.TableNum] {
1774 1 : stats.ApproxIngestedIntoL0Bytes += f.Size
1775 1 : stats.MemtableOverlappingFiles++
1776 1 : }
1777 : }
1778 : }
1779 1 : d.opts.EventListener.TableIngested(info)
1780 : }
1781 :
1782 1 : return stats, err
1783 : }
1784 :
1785 : type ingestSplitFile struct {
1786 : // ingestFile is the file being ingested.
1787 : ingestFile *manifest.TableMetadata
1788 : // splitFile is the file that needs to be split to allow ingestFile to slot
1789 : // into `level` level.
1790 : splitFile *manifest.TableMetadata
1791 : // The level where ingestFile will go (and where splitFile already is).
1792 : level int
1793 : }
1794 :
1795 : // ingestSplit splits files specified in `files` and updates ve in-place to
1796 : // account for existing files getting split into two virtual sstables. The map
1797 : // `replacedFiles` contains an in-progress map of all files that have been
1798 : // replaced with new virtual sstables in this version edit so far, which is also
1799 : // updated in-place.
1800 : //
1801 : // d.mu as well as the manifest lock must be held when calling this method.
1802 : func (d *DB) ingestSplit(
1803 : ctx context.Context,
1804 : ve *manifest.VersionEdit,
1805 : updateMetrics func(*manifest.TableMetadata, int, []manifest.NewTableEntry),
1806 : files []ingestSplitFile,
1807 : replacedTables map[base.TableNum][]manifest.NewTableEntry,
1808 1 : ) error {
1809 1 : for _, s := range files {
1810 1 : ingestFileBounds := s.ingestFile.UserKeyBounds()
1811 1 : // replacedFiles can be thought of as a tree, where we start iterating with
1812 1 : // s.splitFile and run its fileNum through replacedFiles, then find which of
1813 1 : // the replaced files overlaps with s.ingestFile, which becomes the new
1814 1 : // splitFile, then we check splitFile's replacements in replacedFiles again
1815 1 : // for overlap with s.ingestFile, and so on until we either can't find the
1816 1 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
1817 1 : // be split), or we don't find a file that overlaps with s.ingestFile, which
1818 1 : // means a prior ingest split already produced enough room for s.ingestFile
1819 1 : // to go into this level without necessitating another ingest split.
1820 1 : splitFile := s.splitFile
1821 1 : for splitFile != nil {
1822 1 : replaced, ok := replacedTables[splitFile.TableNum]
1823 1 : if !ok {
1824 1 : break
1825 : }
1826 1 : updatedSplitFile := false
1827 1 : for i := range replaced {
1828 1 : if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
1829 1 : if updatedSplitFile {
1830 0 : // This should never happen because the earlier ingestTargetLevel
1831 0 : // function only finds split file candidates that are guaranteed to
1832 0 : // have no data overlap, only boundary overlap. See the comments
1833 0 : // in that method to see the definitions of data vs boundary
1834 0 : // overlap. That, plus the fact that files in `replaced` are
1835 0 : // guaranteed to have file bounds that are tight on user keys
1836 0 : // (as that's what `d.excise` produces), means that the only case
1837 0 : // where we overlap with two or more files in `replaced` is if we
1838 0 : // actually had data overlap all along, or if the ingestion files
1839 0 : // were overlapping, either of which is an invariant violation.
1840 0 : panic("updated with two files in ingestSplit")
1841 : }
1842 1 : splitFile = replaced[i].Meta
1843 1 : updatedSplitFile = true
1844 : }
1845 : }
1846 1 : if !updatedSplitFile {
1847 1 : // None of the replaced files overlapped with the file being ingested.
1848 1 : // This can happen if we've already excised a span overlapping with
1849 1 : // this file, or if we have consecutive ingested files that can slide
1850 1 : // within the same gap between keys in an existing file. For instance,
1851 1 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
1852 1 : // the first loop iteration will split the existing file into one that
1853 1 : // ends in a and another that starts at g, and the second iteration will
1854 1 : // fall into this case and require no splitting.
1855 1 : //
1856 1 : // No splitting necessary.
1857 1 : splitFile = nil
1858 1 : }
1859 : }
1860 1 : if splitFile == nil {
1861 1 : continue
1862 : }
1863 : // NB: excise operates on [start, end). We're splitting at [start, end]
1864 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
1865 : // of exclusive vs inclusive end bounds should not make a difference here
1866 : // as we're guaranteed to not have any data overlap between splitFile and
1867 : // s.ingestFile. d.excise will return an error if we pass an inclusive user
1868 : // key bound _and_ we end up seeing data overlap at the end key.
1869 1 : exciseBounds := base.UserKeyBoundsFromInternal(s.ingestFile.Smallest(), s.ingestFile.Largest())
1870 1 : leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, splitFile, s.level, tightExciseBounds)
1871 1 : if err != nil {
1872 0 : return err
1873 0 : }
1874 1 : added := applyExciseToVersionEdit(ve, splitFile, leftTable, rightTable, s.level)
1875 1 : replacedTables[splitFile.TableNum] = added
1876 1 : for i := range added {
1877 1 : addedBounds := added[i].Meta.UserKeyBounds()
1878 1 : if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
1879 0 : panic("ingest-time split produced a file that overlaps with ingested file")
1880 : }
1881 : }
1882 1 : updateMetrics(splitFile, s.level, added)
1883 : }
1884 : // Flatten the version edit by removing any entries from ve.NewFiles that
1885 : // are also in ve.DeletedFiles.
1886 1 : newNewFiles := ve.NewTables[:0]
1887 1 : for i := range ve.NewTables {
1888 1 : fn := ve.NewTables[i].Meta.TableNum
1889 1 : deEntry := manifest.DeletedTableEntry{Level: ve.NewTables[i].Level, FileNum: fn}
1890 1 : if _, ok := ve.DeletedTables[deEntry]; ok {
1891 1 : delete(ve.DeletedTables, deEntry)
1892 1 : } else {
1893 1 : newNewFiles = append(newNewFiles, ve.NewTables[i])
1894 1 : }
1895 : }
1896 1 : ve.NewTables = newNewFiles
1897 1 : return nil
1898 : }
1899 :
1900 : func (d *DB) ingestApply(
1901 : ctx context.Context,
1902 : jobID JobID,
1903 : lr ingestLoadResult,
1904 : mut *memTable,
1905 : exciseSpan KeyRange,
1906 : exciseBoundsPolicy exciseBoundsPolicy,
1907 : exciseSeqNum base.SeqNum,
1908 1 : ) (*manifest.VersionEdit, error) {
1909 1 : d.mu.Lock()
1910 1 : defer d.mu.Unlock()
1911 1 :
1912 1 : ve := &manifest.VersionEdit{
1913 1 : NewTables: make([]manifest.NewTableEntry, lr.fileCount()),
1914 1 : }
1915 1 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
1916 1 : ve.DeletedTables = map[manifest.DeletedTableEntry]*manifest.TableMetadata{}
1917 1 : }
1918 1 : var metrics levelMetricsDelta
1919 1 :
1920 1 : // Determine the target level inside UpdateVersionLocked. This prevents two
1921 1 : // concurrent ingestion jobs from using the same version to determine the
1922 1 : // target level, and also provides serialization with concurrent compaction
1923 1 : // and flush jobs.
1924 1 : err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1925 1 : if mut != nil {
1926 1 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
1927 1 : // acquired the manifest lock, we can be certain that if the mutable
1928 1 : // memtable has received more recent conflicting writes, the flush won't
1929 1 : // beat us to applying to the manifest resulting in sequence number
1930 1 : // inversion. Even though we call maybeScheduleFlush right now, this flush
1931 1 : // will apply after our ingestion.
1932 1 : if mut.writerUnref() {
1933 1 : d.maybeScheduleFlush()
1934 1 : }
1935 : }
1936 :
1937 1 : current := d.mu.versions.currentVersion()
1938 1 : overlapChecker := &overlapChecker{
1939 1 : comparer: d.opts.Comparer,
1940 1 : newIters: d.newIters,
1941 1 : opts: IterOptions{
1942 1 : logger: d.opts.Logger,
1943 1 : Category: categoryIngest,
1944 1 : },
1945 1 : v: current,
1946 1 : }
1947 1 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
1948 1 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
1949 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
1950 1 : // filesToSplit is a list where each element is a pair consisting of a file
1951 1 : // being ingested and a file being split to make room for an ingestion into
1952 1 : // that level. Each ingested file will appear at most once in this list. It
1953 1 : // is possible for split files to appear twice in this list.
1954 1 : filesToSplit := make([]ingestSplitFile, 0)
1955 1 : checkCompactions := false
1956 1 : for i := 0; i < lr.fileCount(); i++ {
1957 1 : // Determine the lowest level in the LSM for which the sstable doesn't
1958 1 : // overlap any existing files in the level.
1959 1 : var m *manifest.TableMetadata
1960 1 : specifiedLevel := -1
1961 1 : isShared := false
1962 1 : isExternal := false
1963 1 : if i < len(lr.local) {
1964 1 : // local file.
1965 1 : m = lr.local[i].TableMetadata
1966 1 : } else if (i - len(lr.local)) < len(lr.shared) {
1967 1 : // shared file.
1968 1 : isShared = true
1969 1 : sharedIdx := i - len(lr.local)
1970 1 : m = lr.shared[sharedIdx].TableMetadata
1971 1 : specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
1972 1 : } else {
1973 1 : // external file.
1974 1 : isExternal = true
1975 1 : externalIdx := i - (len(lr.local) + len(lr.shared))
1976 1 : m = lr.external[externalIdx].TableMetadata
1977 1 : if lr.externalFilesHaveLevel {
1978 1 : specifiedLevel = int(lr.external[externalIdx].external.Level)
1979 1 : }
1980 : }
1981 :
1982 : // Add to CreatedBackingTables if this is a new backing.
1983 : //
1984 : // Shared files always have a new backing. External files have new backings
1985 : // iff the backing disk file num and the file num match (see ingestAttachRemote).
1986 1 : if isShared || (isExternal && m.TableBacking.DiskFileNum == base.DiskFileNum(m.TableNum)) {
1987 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.TableBacking)
1988 1 : }
1989 :
1990 1 : f := &ve.NewTables[i]
1991 1 : var err error
1992 1 : if specifiedLevel != -1 {
1993 1 : f.Level = specifiedLevel
1994 1 : } else {
1995 1 : var splitTable *manifest.TableMetadata
1996 1 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest()) && exciseSpan.Contains(d.cmp, m.Largest()) {
1997 1 : // This file fits perfectly within the excise span. We can slot it at
1998 1 : // L6, or sharedLevelsStart - 1 if we have shared files.
1999 1 : if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
2000 1 : f.Level = sharedLevelsStart - 1
2001 1 : if baseLevel > f.Level {
2002 1 : f.Level = 0
2003 1 : }
2004 1 : } else {
2005 1 : f.Level = 6
2006 1 : }
2007 1 : } else {
2008 1 : // We check overlap against the LSM without holding DB.mu. Note that we
2009 1 : // are still holding the log lock, so the version cannot change.
2010 1 : // TODO(radu): perform this check optimistically outside of the log lock.
2011 1 : var lsmOverlap overlap.WithLSM
2012 1 : lsmOverlap, err = func() (overlap.WithLSM, error) {
2013 1 : d.mu.Unlock()
2014 1 : defer d.mu.Lock()
2015 1 : return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
2016 1 : }()
2017 1 : if err == nil {
2018 1 : f.Level, splitTable, err = ingestTargetLevel(
2019 1 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
2020 1 : )
2021 1 : }
2022 : }
2023 :
2024 1 : if splitTable != nil {
2025 1 : if invariants.Enabled {
2026 1 : if lf := current.Levels[f.Level].Find(d.cmp, splitTable); lf.Empty() {
2027 0 : panic("splitFile returned is not in level it should be")
2028 : }
2029 : }
2030 : // We take advantage of the fact that we won't drop the db mutex
2031 : // between now and the call to UpdateVersionLocked. So, no files should
2032 : // get added to a new in-progress compaction at this point. We can
2033 : // avoid having to iterate on in-progress compactions to cancel them
2034 : // if none of the files being split have a compacting state.
2035 1 : if splitTable.IsCompacting() {
2036 0 : checkCompactions = true
2037 0 : }
2038 1 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitTable, level: f.Level})
2039 : }
2040 : }
2041 1 : if err != nil {
2042 0 : return versionUpdate{}, err
2043 0 : }
2044 1 : if isShared && f.Level < sharedLevelsStart {
2045 0 : panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
2046 0 : f.Level, sharedLevelsStart))
2047 : }
2048 1 : f.Meta = m
2049 1 : levelMetrics := metrics[f.Level]
2050 1 : if levelMetrics == nil {
2051 1 : levelMetrics = &LevelMetrics{}
2052 1 : metrics[f.Level] = levelMetrics
2053 1 : }
2054 1 : levelMetrics.TablesCount++
2055 1 : levelMetrics.TablesSize += int64(m.Size)
2056 1 : levelMetrics.EstimatedReferencesSize += m.EstimatedReferenceSize()
2057 1 : levelMetrics.TableBytesIngested += m.Size
2058 1 : levelMetrics.TablesIngested++
2059 : }
2060 : // replacedTables maps files excised due to exciseSpan (or splitFiles returned
2061 : // by ingestTargetLevel), to files that were created to replace it. This map
2062 : // is used to resolve references to split files in filesToSplit, as it is
2063 : // possible for a file that we want to split to no longer exist or have a
2064 : // newer fileMetadata due to a split induced by another ingestion file, or an
2065 : // excise.
2066 1 : replacedTables := make(map[base.TableNum][]manifest.NewTableEntry)
2067 1 : updateLevelMetricsOnExcise := func(m *manifest.TableMetadata, level int, added []manifest.NewTableEntry) {
2068 1 : levelMetrics := metrics[level]
2069 1 : if levelMetrics == nil {
2070 1 : levelMetrics = &LevelMetrics{}
2071 1 : metrics[level] = levelMetrics
2072 1 : }
2073 1 : levelMetrics.TablesCount--
2074 1 : levelMetrics.TablesSize -= int64(m.Size)
2075 1 : levelMetrics.EstimatedReferencesSize -= m.EstimatedReferenceSize()
2076 1 : for i := range added {
2077 1 : levelMetrics.TablesCount++
2078 1 : levelMetrics.TablesSize += int64(added[i].Meta.Size)
2079 1 : levelMetrics.EstimatedReferencesSize += added[i].Meta.EstimatedReferenceSize()
2080 1 : }
2081 : }
2082 1 : var exciseBounds base.UserKeyBounds
2083 1 : if exciseSpan.Valid() {
2084 1 : exciseBounds = exciseSpan.UserKeyBounds()
2085 1 : // Iterate through all levels and find files that intersect with exciseSpan.
2086 1 : //
2087 1 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2088 1 : // excises; we only need to hold the version lock which we already are
2089 1 : // holding. However releasing the DB mutex could mess with the
2090 1 : // ingestTargetLevel calculation that happened above, as it assumed that it
2091 1 : // had a complete view of in-progress compactions that wouldn't change
2092 1 : // until UpdateVersionLocked is called. If we were to drop the mutex now,
2093 1 : // we could schedule another in-progress compaction that would go into the
2094 1 : // chosen target level and lead to file overlap within level (which would
2095 1 : // panic in UpdateVersionLocked). We should drop the db mutex here, do the
2096 1 : // excise, then re-grab the DB mutex and rerun just the in-progress
2097 1 : // compaction check to see if any new compactions are conflicting with our
2098 1 : // chosen target levels for files, and if they are, we should signal those
2099 1 : // compactions to error out.
2100 1 : for layer, ls := range current.AllLevelsAndSublevels() {
2101 1 : for m := range ls.Overlaps(d.cmp, exciseSpan.UserKeyBounds()).All() {
2102 1 : leftTable, rightTable, err := d.exciseTable(ctx, exciseBounds, m, layer.Level(), exciseBoundsPolicy)
2103 1 : if err != nil {
2104 0 : return versionUpdate{}, err
2105 0 : }
2106 1 : newFiles := applyExciseToVersionEdit(ve, m, leftTable, rightTable, layer.Level())
2107 1 : replacedTables[m.TableNum] = newFiles
2108 1 : updateLevelMetricsOnExcise(m, layer.Level(), newFiles)
2109 : }
2110 : }
2111 : }
2112 1 : if len(filesToSplit) > 0 {
2113 1 : // For the same reasons as the above call to excise, we hold the db mutex
2114 1 : // while calling this method.
2115 1 : if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedTables); err != nil {
2116 0 : return versionUpdate{}, err
2117 0 : }
2118 : }
2119 1 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2120 1 : for c := range d.mu.compact.inProgress {
2121 1 : if c.VersionEditApplied() {
2122 0 : continue
2123 : }
2124 : // Check if this compaction overlaps with the excise span. Note that just
2125 : // checking if the inputs individually overlap with the excise span
2126 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2127 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2128 : // doing a [c,d) excise at the same time as this compaction, we will have
2129 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2130 : // write a file overlapping with the excise span.
2131 1 : bounds := c.Bounds()
2132 1 : if bounds != nil && bounds.Overlaps(d.cmp, &exciseBounds) {
2133 1 : c.Cancel()
2134 1 : }
2135 : // Check if this compaction's inputs have been replaced due to an
2136 : // ingest-time split. In that case, cancel the compaction as a newly picked
2137 : // compaction would need to include any new files that slid in between
2138 : // previously-existing files. Note that we cancel any compaction that has a
2139 : // file that was ingest-split as an input, even if it started before this
2140 : // ingestion.
2141 1 : if checkCompactions {
2142 0 : for _, table := range c.Tables() {
2143 0 : if _, ok := replacedTables[table.TableNum]; ok {
2144 0 : c.Cancel()
2145 0 : break
2146 : }
2147 : }
2148 : }
2149 : }
2150 : }
2151 :
2152 1 : return versionUpdate{
2153 1 : VE: ve,
2154 1 : JobID: jobID,
2155 1 : Metrics: metrics,
2156 1 : InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
2157 : }, nil
2158 : })
2159 1 : if err != nil {
2160 1 : return nil, err
2161 1 : }
2162 :
2163 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2164 : // an excise on this span. There should be none as the
2165 : // computePossibleOverlaps steps should have forced these EFOS to transition
2166 : // to file-only snapshots by now. If we see any that conflict with this
2167 : // excise, panic.
2168 1 : if exciseSpan.Valid() {
2169 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2170 0 : // Skip non-EFOS snapshots, and also skip any EFOS that were created
2171 0 : // *after* the excise.
2172 0 : if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
2173 0 : continue
2174 : }
2175 0 : efos := s.efos
2176 0 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2177 0 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2178 0 : // global list of all protected ranges instead of having to peer into every
2179 0 : // snapshot.
2180 0 : for i := range efos.protectedRanges {
2181 0 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2182 0 : panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
2183 : }
2184 : }
2185 : }
2186 : }
2187 :
2188 1 : d.mu.versions.metrics.Ingest.Count++
2189 1 :
2190 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2191 1 : // updateReadStateLocked could have generated obsolete tables, schedule a
2192 1 : // cleanup job if necessary.
2193 1 : d.deleteObsoleteFiles(jobID)
2194 1 : d.updateTableStatsLocked(ve.NewTables)
2195 1 : // The ingestion may have pushed a level over the threshold for compaction,
2196 1 : // so check to see if one is necessary and schedule it.
2197 1 : d.maybeScheduleCompaction()
2198 1 : var toValidate []manifest.NewTableEntry
2199 1 : dedup := make(map[base.DiskFileNum]struct{})
2200 1 : for _, entry := range ve.NewTables {
2201 1 : if _, ok := dedup[entry.Meta.TableBacking.DiskFileNum]; !ok {
2202 1 : toValidate = append(toValidate, entry)
2203 1 : dedup[entry.Meta.TableBacking.DiskFileNum] = struct{}{}
2204 1 : }
2205 : }
2206 1 : d.maybeValidateSSTablesLocked(toValidate)
2207 1 : return ve, nil
2208 : }
2209 :
2210 : // maybeValidateSSTablesLocked adds the slice of newTableEntrys to the pending
2211 : // queue of files to be validated, when the feature is enabled.
2212 : //
2213 : // Note that if two entries with the same backing file are added twice, then the
2214 : // block checksums for the backing file will be validated twice.
2215 : //
2216 : // DB.mu must be locked when calling.
2217 1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []manifest.NewTableEntry) {
2218 1 : // Only add to the validation queue when the feature is enabled.
2219 1 : if !d.opts.Experimental.ValidateOnIngest {
2220 1 : return
2221 1 : }
2222 :
2223 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2224 1 : if d.shouldValidateSSTablesLocked() {
2225 1 : go d.validateSSTables()
2226 1 : }
2227 : }
2228 :
2229 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2230 : // DB.mu must be locked when calling.
2231 1 : func (d *DB) shouldValidateSSTablesLocked() bool {
2232 1 : return !d.mu.tableValidation.validating &&
2233 1 : d.closed.Load() == nil &&
2234 1 : d.opts.Experimental.ValidateOnIngest &&
2235 1 : len(d.mu.tableValidation.pending) > 0
2236 1 : }
2237 :
2238 : // validateSSTables runs a round of validation on the tables in the pending
2239 : // queue.
2240 1 : func (d *DB) validateSSTables() {
2241 1 : d.mu.Lock()
2242 1 : if !d.shouldValidateSSTablesLocked() {
2243 1 : d.mu.Unlock()
2244 1 : return
2245 1 : }
2246 :
2247 1 : pending := d.mu.tableValidation.pending
2248 1 : d.mu.tableValidation.pending = nil
2249 1 : d.mu.tableValidation.validating = true
2250 1 : jobID := d.newJobIDLocked()
2251 1 : rs := d.loadReadState()
2252 1 :
2253 1 : // Drop DB.mu before performing IO.
2254 1 : d.mu.Unlock()
2255 1 :
2256 1 : // Validate all tables in the pending queue. This could lead to a situation
2257 1 : // where we are starving IO from other tasks due to having to page through
2258 1 : // all the blocks in all the sstables in the queue.
2259 1 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2260 1 :
2261 1 : // If we fail to validate any files due to reasons other than uncovered
2262 1 : // corruption, accumulate them and re-queue them for another attempt.
2263 1 : var retry []manifest.NewTableEntry
2264 1 :
2265 1 : for _, f := range pending {
2266 1 : // The file may have been moved or deleted since it was ingested, in
2267 1 : // which case we skip.
2268 1 : if !rs.current.Contains(f.Level, f.Meta) {
2269 1 : // Assume the file was moved to a lower level. It is rare enough
2270 1 : // that a table is moved or deleted between the time it was ingested
2271 1 : // and the time the validation routine runs that the overall cost of
2272 1 : // this inner loop is tolerably low, when amortized over all
2273 1 : // ingested tables.
2274 1 : found := false
2275 1 : for i := f.Level + 1; i < numLevels; i++ {
2276 1 : if rs.current.Contains(i, f.Meta) {
2277 0 : found = true
2278 0 : break
2279 : }
2280 : }
2281 1 : if !found {
2282 1 : continue
2283 : }
2284 : }
2285 :
2286 : // TOOD(radu): plumb a ReadEnv with a CategoryIngest stats collector through
2287 : // to ValidateBlockChecksums.
2288 1 : err := d.fileCache.withReader(context.TODO(), block.NoReadEnv,
2289 1 : f.Meta, func(r *sstable.Reader, _ sstable.ReadEnv) error {
2290 1 : return r.ValidateBlockChecksums()
2291 1 : })
2292 :
2293 1 : if err != nil {
2294 1 : if IsCorruptionError(err) {
2295 1 : // TODO(travers): Hook into the corruption reporting pipeline, once
2296 1 : // available. See pebble#1192.
2297 1 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2298 1 : } else {
2299 1 : // If there was some other, possibly transient, error that
2300 1 : // caused table validation to fail inform the EventListener and
2301 1 : // move on. We remember the table so that we can retry it in a
2302 1 : // subsequent table validation job.
2303 1 : //
2304 1 : // TODO(jackson): If the error is not transient, this will retry
2305 1 : // validation indefinitely. While not great, it's the same
2306 1 : // behavior as erroring flushes and compactions. We should
2307 1 : // address this as a part of #270.
2308 1 : d.opts.EventListener.BackgroundError(err)
2309 1 : retry = append(retry, f)
2310 1 : continue
2311 : }
2312 : }
2313 :
2314 1 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2315 1 : JobID: int(jobID),
2316 1 : Meta: f.Meta,
2317 1 : })
2318 : }
2319 1 : rs.unref()
2320 1 : d.mu.Lock()
2321 1 : defer d.mu.Unlock()
2322 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2323 1 : d.mu.tableValidation.validating = false
2324 1 : d.mu.tableValidation.cond.Broadcast()
2325 1 : if d.shouldValidateSSTablesLocked() {
2326 1 : go d.validateSSTables()
2327 1 : }
2328 : }
|