Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sort"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/internal/overlap"
19 : "github.com/cockroachdb/pebble/internal/sstableinternal"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/sstable"
23 : )
24 :
25 1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
26 1 : c := userCmp(a.UserKey, b.UserKey)
27 1 : if c != 0 {
28 1 : return c
29 1 : }
30 1 : if a.IsExclusiveSentinel() {
31 1 : if !b.IsExclusiveSentinel() {
32 1 : return -1
33 1 : }
34 1 : } else if b.IsExclusiveSentinel() {
35 1 : return +1
36 1 : }
37 1 : return 0
38 : }
39 :
40 : // KeyRange encodes a key range in user key space. A KeyRange's Start is
41 : // inclusive while its End is exclusive.
42 : //
43 : // KeyRange is equivalent to base.UserKeyBounds with exclusive end.
44 : type KeyRange struct {
45 : Start, End []byte
46 : }
47 :
48 : // Valid returns true if the KeyRange is defined.
49 1 : func (k *KeyRange) Valid() bool {
50 1 : return k.Start != nil && k.End != nil
51 1 : }
52 :
53 : // Contains returns whether the specified key exists in the KeyRange.
54 1 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
55 1 : v := cmp(key.UserKey, k.End)
56 1 : return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
57 1 : }
58 :
59 : // UserKeyBounds returns the KeyRange as UserKeyBounds. Also implements the internal `bounded` interface.
60 1 : func (k KeyRange) UserKeyBounds() base.UserKeyBounds {
61 1 : return base.UserKeyBoundsEndExclusive(k.Start, k.End)
62 1 : }
63 :
64 : // OverlapsInternalKeyRange checks if the specified internal key range has an
65 : // overlap with the KeyRange. Note that we aren't checking for full containment
66 : // of smallest-largest within k, rather just that there's some intersection
67 : // between the two ranges.
68 1 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
69 1 : ukb := k.UserKeyBounds()
70 1 : b := base.UserKeyBoundsFromInternal(smallest, largest)
71 1 : return ukb.Overlaps(cmp, &b)
72 1 : }
73 :
74 : // Overlaps checks if the specified file has an overlap with the KeyRange.
75 : // Note that we aren't checking for full containment of m within k, rather just
76 : // that there's some intersection between m and k's bounds.
77 1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
78 1 : b := k.UserKeyBounds()
79 1 : return m.Overlaps(cmp, &b)
80 1 : }
81 :
82 : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
83 : // Note that we aren't checking for full containment of either span in the other,
84 : // just that there's a key x that is in both key ranges.
85 1 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
86 1 : return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
87 1 : }
88 :
89 1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
90 1 : if key.Kind() == InternalKeyKindInvalid {
91 1 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
92 1 : key.Pretty(opts.Comparer.FormatKey))
93 1 : }
94 1 : if key.SeqNum() != 0 {
95 1 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
96 1 : key.Pretty(opts.Comparer.FormatKey))
97 1 : }
98 1 : return nil
99 : }
100 :
101 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
102 : // or shared by another node.
103 : func ingestSynthesizeShared(
104 : opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
105 1 : ) (*fileMetadata, error) {
106 1 : if sm.Size == 0 {
107 0 : // Disallow 0 file sizes
108 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
109 0 : }
110 : // Don't load table stats. Doing a round trip to shared storage, one SST
111 : // at a time is not worth it as it slows down ingestion.
112 1 : meta := &fileMetadata{
113 1 : FileNum: fileNum,
114 1 : CreationTime: time.Now().Unix(),
115 1 : Virtual: true,
116 1 : Size: sm.Size,
117 1 : }
118 1 : // For simplicity, we use the same number for both the FileNum and the
119 1 : // DiskFileNum (even though this is a virtual sstable). Pass the underlying
120 1 : // FileBacking's size to the same size as the virtualized view of the sstable.
121 1 : // This ensures that we don't over-prioritize this sstable for compaction just
122 1 : // yet, as we do not have a clear sense of what parts of this sstable are
123 1 : // referenced by other nodes.
124 1 : meta.InitProviderBacking(base.DiskFileNum(fileNum), sm.Size)
125 1 :
126 1 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
127 1 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
128 1 : //
129 1 : // NB: We create new internal keys and pass them into ExtendPointKeyBounds
130 1 : // so that we can sub a zero sequence number into the bounds. We can set
131 1 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
132 1 : // anyway. However, we do need to use the same sequence number across all
133 1 : // bound keys at this step so that we end up with bounds that are consistent
134 1 : // across point/range keys.
135 1 : //
136 1 : // Because of the sequence number rewriting, we cannot use the Kind of
137 1 : // sm.SmallestPointKey. For example, the original SST might start with
138 1 : // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
139 1 : // rewriting the sequence numbers, these keys become a.SET.100 and
140 1 : // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
141 1 : // correct bound, we just use the maximum key kind (which sorts first).
142 1 : // Similarly, we use the smallest key kind for the largest key.
143 1 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax)
144 1 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
145 1 : if sm.LargestPointKey.IsExclusiveSentinel() {
146 1 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
147 1 : }
148 1 : if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
149 1 : smallestPointKey.Trailer < largestPointKey.Trailer {
150 0 : // We get kinds from the sender, however we substitute our own sequence
151 0 : // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
152 0 : // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
153 0 : // kinds need to be reversed now because DelSized > Set.
154 0 : smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
155 0 : }
156 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
157 : }
158 1 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
159 1 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
160 1 : //
161 1 : // See comment above on why we use a zero sequence number and these key
162 1 : // kinds here.
163 1 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
164 1 : largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
165 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
166 1 : }
167 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
168 0 : return nil, err
169 0 : }
170 1 : return meta, nil
171 : }
172 :
173 : // ingestLoad1External loads the fileMetadata for one external sstable.
174 : // Sequence number and target level calculation happens during prepare/apply.
175 : func ingestLoad1External(
176 : opts *Options, e ExternalFile, fileNum base.FileNum,
177 1 : ) (*fileMetadata, error) {
178 1 : if e.Size == 0 {
179 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
180 0 : }
181 1 : if !e.HasRangeKey && !e.HasPointKey {
182 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
183 0 : }
184 :
185 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
186 1 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
187 1 : }
188 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
189 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
190 0 : }
191 1 : if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
192 1 : return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
193 1 : }
194 1 : if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
195 1 : return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
196 1 : }
197 :
198 : // Don't load table stats. Doing a round trip to shared storage, one SST
199 : // at a time is not worth it as it slows down ingestion.
200 1 : meta := &fileMetadata{
201 1 : FileNum: fileNum,
202 1 : CreationTime: time.Now().Unix(),
203 1 : Virtual: true,
204 1 : Size: e.Size,
205 1 : }
206 1 :
207 1 : // In the name of keeping this ingestion as fast as possible, we avoid
208 1 : // *all* existence checks and synthesize a file metadata with smallest/largest
209 1 : // keys that overlap whatever the passed-in span was.
210 1 : smallestCopy := slices.Clone(e.StartKey)
211 1 : largestCopy := slices.Clone(e.EndKey)
212 1 : if e.HasPointKey {
213 1 : // Sequence numbers are updated later by
214 1 : // ingestUpdateSeqNum, applying a squence number that
215 1 : // is applied to all keys in the sstable.
216 1 : if e.EndKeyIsInclusive {
217 1 : meta.ExtendPointKeyBounds(
218 1 : opts.Comparer.Compare,
219 1 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
220 1 : base.MakeInternalKey(largestCopy, 0, 0))
221 1 : } else {
222 1 : meta.ExtendPointKeyBounds(
223 1 : opts.Comparer.Compare,
224 1 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
225 1 : base.MakeRangeDeleteSentinelKey(largestCopy))
226 1 : }
227 : }
228 1 : if e.HasRangeKey {
229 1 : meta.ExtendRangeKeyBounds(
230 1 : opts.Comparer.Compare,
231 1 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
232 1 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
233 1 : )
234 1 : }
235 :
236 1 : meta.SyntheticPrefix = e.SyntheticPrefix
237 1 : meta.SyntheticSuffix = e.SyntheticSuffix
238 1 :
239 1 : return meta, nil
240 : }
241 :
242 : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
243 : // by this store.
244 : func ingestLoad1(
245 : ctx context.Context,
246 : opts *Options,
247 : fmv FormatMajorVersion,
248 : readable objstorage.Readable,
249 : cacheID uint64,
250 : fileNum base.FileNum,
251 1 : ) (*fileMetadata, error) {
252 1 : o := opts.MakeReaderOptions()
253 1 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
254 1 : Cache: opts.Cache,
255 1 : CacheID: cacheID,
256 1 : FileNum: base.PhysicalTableDiskFileNum(fileNum),
257 1 : })
258 1 : r, err := sstable.NewReader(ctx, readable, o)
259 1 : if err != nil {
260 1 : return nil, err
261 1 : }
262 1 : defer r.Close()
263 1 :
264 1 : // Avoid ingesting tables with format versions this DB doesn't support.
265 1 : tf, err := r.TableFormat()
266 1 : if err != nil {
267 0 : return nil, err
268 0 : }
269 1 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
270 1 : return nil, errors.Newf(
271 1 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
272 1 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
273 1 : )
274 1 : }
275 :
276 1 : meta := &fileMetadata{}
277 1 : meta.FileNum = fileNum
278 1 : meta.Size = uint64(readable.Size())
279 1 : meta.CreationTime = time.Now().Unix()
280 1 : meta.InitPhysicalBacking()
281 1 :
282 1 : // Avoid loading into the table cache for collecting stats if we
283 1 : // don't need to. If there are no range deletions, we have all the
284 1 : // information to compute the stats here.
285 1 : //
286 1 : // This is helpful in tests for avoiding awkwardness around deletion of
287 1 : // ingested files from MemFS. MemFS implements the Windows semantics of
288 1 : // disallowing removal of an open file. Under MemFS, if we don't populate
289 1 : // meta.Stats here, the file will be loaded into the table cache for
290 1 : // calculating stats before we can remove the original link.
291 1 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
292 1 :
293 1 : {
294 1 : iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
295 1 : if err != nil {
296 1 : return nil, err
297 1 : }
298 1 : defer iter.Close()
299 1 : var smallest InternalKey
300 1 : if kv := iter.First(); kv != nil {
301 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
302 1 : return nil, err
303 1 : }
304 1 : smallest = kv.K.Clone()
305 : }
306 1 : if err := iter.Error(); err != nil {
307 1 : return nil, err
308 1 : }
309 1 : if kv := iter.Last(); kv != nil {
310 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
311 0 : return nil, err
312 0 : }
313 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
314 : }
315 1 : if err := iter.Error(); err != nil {
316 1 : return nil, err
317 1 : }
318 : }
319 :
320 1 : iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms)
321 1 : if err != nil {
322 0 : return nil, err
323 0 : }
324 1 : if iter != nil {
325 1 : defer iter.Close()
326 1 : var smallest InternalKey
327 1 : if s, err := iter.First(); err != nil {
328 0 : return nil, err
329 1 : } else if s != nil {
330 1 : key := s.SmallestKey()
331 1 : if err := ingestValidateKey(opts, &key); err != nil {
332 0 : return nil, err
333 0 : }
334 1 : smallest = key.Clone()
335 : }
336 1 : if s, err := iter.Last(); err != nil {
337 0 : return nil, err
338 1 : } else if s != nil {
339 1 : k := s.SmallestKey()
340 1 : if err := ingestValidateKey(opts, &k); err != nil {
341 0 : return nil, err
342 0 : }
343 1 : largest := s.LargestKey().Clone()
344 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
345 : }
346 : }
347 :
348 : // Update the range-key bounds for the table.
349 1 : {
350 1 : iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms)
351 1 : if err != nil {
352 0 : return nil, err
353 0 : }
354 1 : if iter != nil {
355 1 : defer iter.Close()
356 1 : var smallest InternalKey
357 1 : if s, err := iter.First(); err != nil {
358 0 : return nil, err
359 1 : } else if s != nil {
360 1 : key := s.SmallestKey()
361 1 : if err := ingestValidateKey(opts, &key); err != nil {
362 0 : return nil, err
363 0 : }
364 1 : smallest = key.Clone()
365 : }
366 1 : if s, err := iter.Last(); err != nil {
367 0 : return nil, err
368 1 : } else if s != nil {
369 1 : k := s.SmallestKey()
370 1 : if err := ingestValidateKey(opts, &k); err != nil {
371 0 : return nil, err
372 0 : }
373 : // As range keys are fragmented, the end key of the last range key in
374 : // the table provides the upper bound for the table.
375 1 : largest := s.LargestKey().Clone()
376 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
377 : }
378 : }
379 : }
380 :
381 1 : if !meta.HasPointKeys && !meta.HasRangeKeys {
382 1 : return nil, nil
383 1 : }
384 :
385 : // Sanity check that the various bounds on the file were set consistently.
386 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
387 0 : return nil, err
388 0 : }
389 :
390 1 : return meta, nil
391 : }
392 :
393 : type ingestLoadResult struct {
394 : local []ingestLocalMeta
395 : shared []ingestSharedMeta
396 : external []ingestExternalMeta
397 :
398 : externalFilesHaveLevel bool
399 : }
400 :
401 : type ingestLocalMeta struct {
402 : *fileMetadata
403 : path string
404 : }
405 :
406 : type ingestSharedMeta struct {
407 : *fileMetadata
408 : shared SharedSSTMeta
409 : }
410 :
411 : type ingestExternalMeta struct {
412 : *fileMetadata
413 : external ExternalFile
414 : // usedExistingBacking is true if the external file is reusing a backing
415 : // that existed before this ingestion. In this case, we called
416 : // VirtualBackings.Protect() on that backing; we will need to call
417 : // Unprotect() after the ingestion.
418 : usedExistingBacking bool
419 : }
420 :
421 1 : func (r *ingestLoadResult) fileCount() int {
422 1 : return len(r.local) + len(r.shared) + len(r.external)
423 1 : }
424 :
425 : func ingestLoad(
426 : opts *Options,
427 : fmv FormatMajorVersion,
428 : paths []string,
429 : shared []SharedSSTMeta,
430 : external []ExternalFile,
431 : cacheID uint64,
432 : pending []base.FileNum,
433 1 : ) (ingestLoadResult, error) {
434 1 : ctx := context.TODO()
435 1 :
436 1 : localFileNums := pending[:len(paths)]
437 1 : sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
438 1 : externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
439 1 :
440 1 : var result ingestLoadResult
441 1 : result.local = make([]ingestLocalMeta, 0, len(paths))
442 1 : for i := range paths {
443 1 : f, err := opts.FS.Open(paths[i])
444 1 : if err != nil {
445 1 : return ingestLoadResult{}, err
446 1 : }
447 :
448 1 : readable, err := sstable.NewSimpleReadable(f)
449 1 : if err != nil {
450 1 : return ingestLoadResult{}, err
451 1 : }
452 1 : m, err := ingestLoad1(ctx, opts, fmv, readable, cacheID, localFileNums[i])
453 1 : if err != nil {
454 1 : return ingestLoadResult{}, err
455 1 : }
456 1 : if m != nil {
457 1 : result.local = append(result.local, ingestLocalMeta{
458 1 : fileMetadata: m,
459 1 : path: paths[i],
460 1 : })
461 1 : }
462 : }
463 :
464 : // Sort the shared files according to level.
465 1 : sort.Sort(sharedByLevel(shared))
466 1 :
467 1 : result.shared = make([]ingestSharedMeta, 0, len(shared))
468 1 : for i := range shared {
469 1 : m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
470 1 : if err != nil {
471 0 : return ingestLoadResult{}, err
472 0 : }
473 1 : if shared[i].Level < sharedLevelsStart {
474 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
475 0 : }
476 1 : result.shared = append(result.shared, ingestSharedMeta{
477 1 : fileMetadata: m,
478 1 : shared: shared[i],
479 1 : })
480 : }
481 1 : result.external = make([]ingestExternalMeta, 0, len(external))
482 1 : for i := range external {
483 1 : m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
484 1 : if err != nil {
485 1 : return ingestLoadResult{}, err
486 1 : }
487 1 : result.external = append(result.external, ingestExternalMeta{
488 1 : fileMetadata: m,
489 1 : external: external[i],
490 1 : })
491 1 : if external[i].Level > 0 {
492 1 : if i != 0 && !result.externalFilesHaveLevel {
493 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
494 0 : }
495 1 : result.externalFilesHaveLevel = true
496 1 : } else if result.externalFilesHaveLevel {
497 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
498 0 : }
499 : }
500 1 : return result, nil
501 : }
502 :
503 1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
504 1 : // Verify that all the shared files (i.e. files in sharedMeta)
505 1 : // fit within the exciseSpan.
506 1 : for _, f := range lr.shared {
507 1 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
508 0 : return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
509 0 : }
510 : }
511 :
512 1 : if lr.externalFilesHaveLevel {
513 1 : for _, f := range lr.external {
514 1 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
515 0 : return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
516 0 : }
517 : }
518 : }
519 :
520 1 : if len(lr.external) > 0 {
521 1 : if len(lr.shared) > 0 {
522 0 : // If external files are present alongside shared files,
523 0 : // return an error.
524 0 : return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
525 0 : }
526 :
527 : // Sort according to the smallest key.
528 1 : slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
529 1 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
530 1 : })
531 1 : for i := 1; i < len(lr.external); i++ {
532 1 : if sstableKeyCompare(cmp, lr.external[i-1].Largest, lr.external[i].Smallest) >= 0 {
533 1 : return errors.Newf("pebble: external sstables have overlapping ranges")
534 1 : }
535 : }
536 1 : return nil
537 : }
538 1 : if len(lr.local) <= 1 {
539 1 : return nil
540 1 : }
541 :
542 : // Sort according to the smallest key.
543 1 : slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
544 1 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
545 1 : })
546 :
547 1 : for i := 1; i < len(lr.local); i++ {
548 1 : if sstableKeyCompare(cmp, lr.local[i-1].Largest, lr.local[i].Smallest) >= 0 {
549 1 : return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
550 1 : }
551 : }
552 1 : if len(lr.shared) == 0 {
553 1 : return nil
554 1 : }
555 0 : filesInLevel := make([]*fileMetadata, 0, len(lr.shared))
556 0 : for l := sharedLevelsStart; l < numLevels; l++ {
557 0 : filesInLevel = filesInLevel[:0]
558 0 : for i := range lr.shared {
559 0 : if lr.shared[i].shared.Level == uint8(l) {
560 0 : filesInLevel = append(filesInLevel, lr.shared[i].fileMetadata)
561 0 : }
562 : }
563 0 : for i := range lr.external {
564 0 : if lr.external[i].external.Level == uint8(l) {
565 0 : filesInLevel = append(filesInLevel, lr.external[i].fileMetadata)
566 0 : }
567 : }
568 0 : slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
569 0 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
570 0 : })
571 0 : for i := 1; i < len(filesInLevel); i++ {
572 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
573 0 : return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
574 0 : }
575 : }
576 : }
577 0 : return nil
578 : }
579 :
580 1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
581 1 : var firstErr error
582 1 : for i := range meta {
583 1 : if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
584 1 : firstErr = firstError(firstErr, err)
585 1 : }
586 : }
587 1 : return firstErr
588 : }
589 :
590 : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
591 : // copies of the ingested files.
592 : func ingestLinkLocal(
593 : jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
594 1 : ) error {
595 1 : for i := range localMetas {
596 1 : objMeta, err := objProvider.LinkOrCopyFromLocal(
597 1 : context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
598 1 : objstorage.CreateOptions{PreferSharedStorage: true},
599 1 : )
600 1 : if err != nil {
601 1 : if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
602 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
603 0 : }
604 1 : return err
605 : }
606 1 : if opts.EventListener.TableCreated != nil {
607 1 : opts.EventListener.TableCreated(TableCreateInfo{
608 1 : JobID: int(jobID),
609 1 : Reason: "ingesting",
610 1 : Path: objProvider.Path(objMeta),
611 1 : FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
612 1 : })
613 1 : }
614 : }
615 1 : return nil
616 : }
617 :
618 : // ingestAttachRemote attaches remote objects to the storage provider.
619 : //
620 : // For external objects, we reuse existing FileBackings from the current version
621 : // when possible.
622 : //
623 : // ingestUnprotectExternalBackings() must be called after this function (even in
624 : // error cases).
625 1 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
626 1 : remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
627 1 : for i := range lr.shared {
628 1 : backing, err := lr.shared[i].shared.Backing.Get()
629 1 : if err != nil {
630 0 : return err
631 0 : }
632 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
633 1 : FileNum: lr.shared[i].FileBacking.DiskFileNum,
634 1 : FileType: fileTypeTable,
635 1 : Backing: backing,
636 1 : })
637 : }
638 :
639 1 : d.findExistingBackingsForExternalObjects(lr.external)
640 1 :
641 1 : newFileBackings := make(map[remote.ObjectKey]*fileBacking, len(lr.external))
642 1 : for i := range lr.external {
643 1 : meta := lr.external[i].fileMetadata
644 1 : if meta.FileBacking != nil {
645 1 : // The backing was filled in by findExistingBackingsForExternalObjects().
646 1 : continue
647 : }
648 1 : key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
649 1 : if backing, ok := newFileBackings[key]; ok {
650 1 : // We already created the same backing in this loop.
651 1 : meta.FileBacking = backing
652 1 : continue
653 : }
654 1 : providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
655 1 : if err != nil {
656 0 : return err
657 0 : }
658 : // We have to attach the remote object (and assign it a DiskFileNum). For
659 : // simplicity, we use the same number for both the FileNum and the
660 : // DiskFileNum (even though this is a virtual sstable).
661 1 : meta.InitProviderBacking(base.DiskFileNum(meta.FileNum), lr.external[i].external.Size)
662 1 :
663 1 : // Set the underlying FileBacking's size to the same size as the virtualized
664 1 : // view of the sstable. This ensures that we don't over-prioritize this
665 1 : // sstable for compaction just yet, as we do not have a clear sense of
666 1 : // what parts of this sstable are referenced by other nodes.
667 1 : meta.FileBacking.Size = lr.external[i].external.Size
668 1 : newFileBackings[key] = meta.FileBacking
669 1 :
670 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
671 1 : FileNum: meta.FileBacking.DiskFileNum,
672 1 : FileType: fileTypeTable,
673 1 : Backing: providerBacking,
674 1 : })
675 : }
676 :
677 1 : for i := range lr.external {
678 1 : if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
679 0 : return err
680 0 : }
681 : }
682 :
683 1 : remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
684 1 : if err != nil {
685 0 : return err
686 0 : }
687 :
688 1 : for i := range lr.shared {
689 1 : // One corner case around file sizes we need to be mindful of, is that
690 1 : // if one of the shareObjs was initially created by us (and has boomeranged
691 1 : // back from another node), we'll need to update the FileBacking's size
692 1 : // to be the true underlying size. Otherwise, we could hit errors when we
693 1 : // open the db again after a crash/restart (see checkConsistency in open.go),
694 1 : // plus it more accurately allows us to prioritize compactions of files
695 1 : // that were originally created by us.
696 1 : if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
697 1 : size, err := d.objProvider.Size(remoteObjMetas[i])
698 1 : if err != nil {
699 0 : return err
700 0 : }
701 1 : lr.shared[i].FileBacking.Size = uint64(size)
702 : }
703 : }
704 :
705 1 : if d.opts.EventListener.TableCreated != nil {
706 1 : for i := range remoteObjMetas {
707 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
708 1 : JobID: int(jobID),
709 1 : Reason: "ingesting",
710 1 : Path: d.objProvider.Path(remoteObjMetas[i]),
711 1 : FileNum: remoteObjMetas[i].DiskFileNum,
712 1 : })
713 1 : }
714 : }
715 :
716 1 : return nil
717 : }
718 :
719 : // findExistingBackingsForExternalObjects populates the FileBacking for external
720 : // files which are already in use by the current version.
721 : //
722 : // We take a Ref and LatestRef on populated backings.
723 1 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
724 1 : d.mu.Lock()
725 1 : defer d.mu.Unlock()
726 1 :
727 1 : for i := range metas {
728 1 : diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
729 1 : // We cross-check against fileBackings in the current version because it is
730 1 : // possible that the external object is referenced by an sstable which only
731 1 : // exists in a previous version. In that case, that object could be removed
732 1 : // at any time so we cannot reuse it.
733 1 : for _, n := range diskFileNums {
734 1 : if backing, ok := d.mu.versions.virtualBackings.Get(n); ok {
735 1 : // Protect this backing from being removed from the latest version. We
736 1 : // will unprotect in ingestUnprotectExternalBackings.
737 1 : d.mu.versions.virtualBackings.Protect(n)
738 1 : metas[i].usedExistingBacking = true
739 1 : metas[i].FileBacking = backing
740 1 : break
741 : }
742 : }
743 : }
744 : }
745 :
746 : // ingestUnprotectExternalBackings unprotects the file backings that were reused
747 : // for external objects when the ingestion fails.
748 1 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
749 1 : d.mu.Lock()
750 1 : defer d.mu.Unlock()
751 1 :
752 1 : for _, meta := range lr.external {
753 1 : if meta.usedExistingBacking {
754 1 : // If the backing is not use anywhere else and the ingest failed (or the
755 1 : // ingested tables were already compacted away), this call will cause in
756 1 : // the next version update to remove the backing.
757 1 : d.mu.versions.virtualBackings.Unprotect(meta.FileBacking.DiskFileNum)
758 1 : }
759 : }
760 : }
761 :
762 : func setSeqNumInMetadata(
763 : m *fileMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
764 1 : ) error {
765 1 : setSeqFn := func(k base.InternalKey) base.InternalKey {
766 1 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
767 1 : }
768 : // NB: we set the fields directly here, rather than via their Extend*
769 : // methods, as we are updating sequence numbers.
770 1 : if m.HasPointKeys {
771 1 : m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
772 1 : }
773 1 : if m.HasRangeKeys {
774 1 : m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
775 1 : }
776 1 : m.Smallest = setSeqFn(m.Smallest)
777 1 : // Only update the seqnum for the largest key if that key is not an
778 1 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
779 1 : // boundary), as doing so effectively drops the exclusive sentinel (by
780 1 : // lowering the seqnum from the max value), and extends the bounds of the
781 1 : // table.
782 1 : // NB: as the largest range key is always an exclusive sentinel, it is never
783 1 : // updated.
784 1 : if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
785 1 : m.LargestPointKey = setSeqFn(m.LargestPointKey)
786 1 : }
787 1 : if !m.Largest.IsExclusiveSentinel() {
788 1 : m.Largest = setSeqFn(m.Largest)
789 1 : }
790 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
791 : // Properties.GlobalSeqNum when an sstable is loaded.
792 1 : m.SmallestSeqNum = seqNum
793 1 : m.LargestSeqNum = seqNum
794 1 : m.LargestSeqNumAbsolute = seqNum
795 1 : // Ensure the new bounds are consistent.
796 1 : if err := m.Validate(cmp, format); err != nil {
797 0 : return err
798 0 : }
799 1 : return nil
800 : }
801 :
802 : func ingestUpdateSeqNum(
803 : cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
804 1 : ) error {
805 1 : // Shared sstables are required to be sorted by level ascending. We then
806 1 : // iterate the shared sstables in reverse, assigning the lower sequence
807 1 : // numbers to the shared sstables that will be ingested into the lower
808 1 : // (larger numbered) levels first. This ensures sequence number shadowing is
809 1 : // correct.
810 1 : for i := len(loadResult.shared) - 1; i >= 0; i-- {
811 1 : if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
812 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
813 : }
814 1 : if err := setSeqNumInMetadata(loadResult.shared[i].fileMetadata, seqNum, cmp, format); err != nil {
815 0 : return err
816 0 : }
817 1 : seqNum++
818 : }
819 1 : for i := range loadResult.external {
820 1 : if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
821 0 : return err
822 0 : }
823 1 : seqNum++
824 : }
825 1 : for i := range loadResult.local {
826 1 : if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
827 0 : return err
828 0 : }
829 1 : seqNum++
830 : }
831 1 : return nil
832 : }
833 :
834 : // ingestTargetLevel returns the target level for a file being ingested.
835 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
836 : // its target level calculation, and if a split candidate is found, that file
837 : // is returned as the splitFile.
838 : func ingestTargetLevel(
839 : ctx context.Context,
840 : cmp base.Compare,
841 : lsmOverlap overlap.WithLSM,
842 : baseLevel int,
843 : compactions map[*compaction]struct{},
844 : meta *fileMetadata,
845 : suggestSplit bool,
846 1 : ) (targetLevel int, splitFile *fileMetadata, err error) {
847 1 : // Find the lowest level which does not have any files which overlap meta. We
848 1 : // search from L0 to L6 looking for whether there are any files in the level
849 1 : // which overlap meta. We want the "lowest" level (where lower means
850 1 : // increasing level number) in order to reduce write amplification.
851 1 : //
852 1 : // There are 2 kinds of overlap we need to check for: file boundary overlap
853 1 : // and data overlap. Data overlap implies file boundary overlap. Note that it
854 1 : // is always possible to ingest into L0.
855 1 : //
856 1 : // To place meta at level i where i > 0:
857 1 : // - there must not be any data overlap with levels <= i, since that will
858 1 : // violate the sequence number invariant.
859 1 : // - no file boundary overlap with level i, since that will violate the
860 1 : // invariant that files do not overlap in levels i > 0.
861 1 : // - if there is only a file overlap at a given level, and no data overlap,
862 1 : // we can still slot a file at that level. We return the fileMetadata with
863 1 : // which we have file boundary overlap (must be only one file, as sstable
864 1 : // bounds are usually tight on user keys) and the caller is expected to split
865 1 : // that sstable into two virtual sstables, allowing this file to go into that
866 1 : // level. Note that if we have file boundary overlap with two files, which
867 1 : // should only happen on rare occasions, we treat it as data overlap and
868 1 : // don't use this optimization.
869 1 : //
870 1 : // The file boundary overlap check is simpler to conceptualize. Consider the
871 1 : // following example, in which the ingested file lies completely before or
872 1 : // after the file being considered.
873 1 : //
874 1 : // |--| |--| ingested file: [a,b] or [f,g]
875 1 : // |-----| existing file: [c,e]
876 1 : // _____________________
877 1 : // a b c d e f g
878 1 : //
879 1 : // In both cases the ingested file can move to considering the next level.
880 1 : //
881 1 : // File boundary overlap does not necessarily imply data overlap. The check
882 1 : // for data overlap is a little more nuanced. Consider the following examples:
883 1 : //
884 1 : // 1. No data overlap:
885 1 : //
886 1 : // |-| |--| ingested file: [cc-d] or [ee-ff]
887 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
888 1 : // _____________________
889 1 : // a b c d e f g
890 1 : //
891 1 : // In this case the ingested files can "fall through" this level. The checks
892 1 : // continue at the next level.
893 1 : //
894 1 : // 2. Data overlap:
895 1 : //
896 1 : // |--| ingested file: [d-e]
897 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
898 1 : // _____________________
899 1 : // a b c d e f g
900 1 : //
901 1 : // In this case the file cannot be ingested into this level as the point 'dd'
902 1 : // is in the way.
903 1 : //
904 1 : // It is worth noting that the check for data overlap is only approximate. In
905 1 : // the previous example, the ingested table [d-e] could contain only the
906 1 : // points 'd' and 'e', in which case the table would be eligible for
907 1 : // considering lower levels. However, such a fine-grained check would need to
908 1 : // be exhaustive (comparing points and ranges in both the ingested existing
909 1 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
910 1 : // existing point that falls within the ingested table bounds as being "data
911 1 : // overlap".
912 1 :
913 1 : if lsmOverlap[0].Result == overlap.Data {
914 1 : return 0, nil, nil
915 1 : }
916 1 : targetLevel = 0
917 1 : splitFile = nil
918 1 : for level := baseLevel; level < numLevels; level++ {
919 1 : var candidateSplitFile *fileMetadata
920 1 : switch lsmOverlap[level].Result {
921 1 : case overlap.Data:
922 1 : // We cannot ingest into or under this level; return the best target level
923 1 : // so far.
924 1 : return targetLevel, splitFile, nil
925 :
926 1 : case overlap.OnlyBoundary:
927 1 : if !suggestSplit || lsmOverlap[level].SplitFile == nil {
928 1 : // We can ingest under this level, but not into this level.
929 1 : continue
930 : }
931 : // We can ingest into this level if we split this file.
932 1 : candidateSplitFile = lsmOverlap[level].SplitFile
933 :
934 1 : case overlap.None:
935 : // We can ingest into this level.
936 :
937 0 : default:
938 0 : return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
939 : }
940 :
941 : // Check boundary overlap with any ongoing compactions. We consider an
942 : // overlapping compaction that's writing files to an output level as
943 : // equivalent to boundary overlap with files in that output level.
944 : //
945 : // We cannot check for data overlap with the new SSTs compaction will produce
946 : // since compaction hasn't been done yet. However, there's no need to check
947 : // since all keys in them will be from levels in [c.startLevel,
948 : // c.outputLevel], and all those levels have already had their data overlap
949 : // tested negative (else we'd have returned earlier).
950 : //
951 : // An alternative approach would be to cancel these compactions and proceed
952 : // with an ingest-time split on this level if necessary. However, compaction
953 : // cancellation can result in significant wasted effort and is best avoided
954 : // unless necessary.
955 1 : overlaps := false
956 1 : for c := range compactions {
957 1 : if c.outputLevel == nil || level != c.outputLevel.level {
958 1 : continue
959 : }
960 1 : if cmp(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
961 1 : cmp(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
962 1 : overlaps = true
963 1 : break
964 : }
965 : }
966 1 : if !overlaps {
967 1 : targetLevel = level
968 1 : splitFile = candidateSplitFile
969 1 : }
970 : }
971 1 : return targetLevel, splitFile, nil
972 : }
973 :
974 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
975 : // atomic and semantically equivalent to creating a single batch containing all
976 : // of the mutations in the sstables. Ingestion may require the memtable to be
977 : // flushed. The ingested sstable files are moved into the DB and must reside on
978 : // the same filesystem as the DB. Sstables can be created for ingestion using
979 : // sstable.Writer. On success, Ingest removes the input paths.
980 : //
981 : // Two types of sstables are accepted for ingestion(s): one is sstables present
982 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
983 : // present in remote.Storage, referred to as shared or foreign sstables. These
984 : // shared sstables can be linked through objstorageprovider.Provider, and do not
985 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
986 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
987 : //
988 : // All sstables *must* be Sync()'d by the caller after all bytes are written
989 : // and before its file handle is closed; failure to do so could violate
990 : // durability or lead to corrupted on-disk state. This method cannot, in a
991 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
992 : // properly synced to disk. Opening new file handles and Sync()-ing them
993 : // does not always guarantee durability; see the discussion here on that:
994 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
995 : //
996 : // Ingestion loads each sstable into the lowest level of the LSM which it
997 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
998 : // ingestion forces the memtable to flush, and then waits for the flush to
999 : // occur. In some cases, such as with no foreign sstables and no excise span,
1000 : // ingestion that gets blocked on a memtable can join the flushable queue and
1001 : // finish even before the memtable has been flushed.
1002 : //
1003 : // The steps for ingestion are:
1004 : //
1005 : // 1. Allocate file numbers for every sstable being ingested.
1006 : // 2. Load the metadata for all sstables being ingested.
1007 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1008 : // sstables).
1009 : // 4. Hard link (or copy) the local sstables into the DB directory.
1010 : // 5. Allocate a sequence number to use for all of the entries in the
1011 : // local sstables. This is the step where overlap with memtables is
1012 : // determined. If there is overlap, we remember the most recent memtable
1013 : // that overlaps.
1014 : // 6. Update the sequence number in the ingested local sstables. (Remote
1015 : // sstables get fixed sequence numbers that were determined at load time.)
1016 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1017 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1018 : // 8.1. If an excise span was specified, figure out what sstables in the
1019 : // current version overlap with the excise span, and create new virtual
1020 : // sstables out of those sstables that exclude the excised span (DB.excise).
1021 : // 9. Publish the ingestion sequence number.
1022 : //
1023 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1024 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1025 : // mutations that get sequence numbers larger than the ingestion sequence
1026 : // number get queued up behind the ingestion waiting for it to complete. This
1027 : // can produce a noticeable hiccup in performance. See
1028 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1029 : // this hiccup.
1030 1 : func (d *DB) Ingest(paths []string) error {
1031 1 : if err := d.closed.Load(); err != nil {
1032 1 : panic(err)
1033 : }
1034 1 : if d.opts.ReadOnly {
1035 1 : return ErrReadOnly
1036 1 : }
1037 1 : _, err := d.ingest(paths, nil /* shared */, KeyRange{}, false, nil /* external */)
1038 1 : return err
1039 : }
1040 :
1041 : // IngestOperationStats provides some information about where in the LSM the
1042 : // bytes were ingested.
1043 : type IngestOperationStats struct {
1044 : // Bytes is the total bytes in the ingested sstables.
1045 : Bytes uint64
1046 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1047 : // into L0. This value is approximate when flushable ingests are active and
1048 : // an ingest overlaps an entry in the flushable queue. Currently, this
1049 : // approximation is very rough, only including tables that overlapped the
1050 : // memtable. This estimate may be improved with #2112.
1051 : ApproxIngestedIntoL0Bytes uint64
1052 : // MemtableOverlappingFiles is the count of ingested sstables
1053 : // that overlapped keys in the memtables.
1054 : MemtableOverlappingFiles int
1055 : }
1056 :
1057 : // ExternalFile are external sstables that can be referenced through
1058 : // objprovider and ingested as remote files that will not be refcounted or
1059 : // cleaned up. For use with online restore. Note that the underlying sstable
1060 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1061 : // is expected to only read the keys within those bounds.
1062 : type ExternalFile struct {
1063 : // Locator is the shared.Locator that can be used with objProvider to
1064 : // resolve a reference to this external sstable.
1065 : Locator remote.Locator
1066 :
1067 : // ObjName is the unique name of this sstable on Locator.
1068 : ObjName string
1069 :
1070 : // Size of the referenced proportion of the virtualized sstable. An estimate
1071 : // is acceptable in lieu of the backing file size.
1072 : Size uint64
1073 :
1074 : // StartKey and EndKey define the bounds of the sstable; the ingestion
1075 : // of this file will only result in keys within [StartKey, EndKey) if
1076 : // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
1077 : // These bounds are loose i.e. it's possible for keys to not span the
1078 : // entirety of this range.
1079 : //
1080 : // StartKey and EndKey user keys must not have suffixes.
1081 : //
1082 : // Multiple ExternalFiles in one ingestion must all have non-overlapping
1083 : // bounds.
1084 : StartKey, EndKey []byte
1085 :
1086 : // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
1087 : EndKeyIsInclusive bool
1088 :
1089 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1090 : // or range keys. If both structs are false, an error is returned during
1091 : // ingestion.
1092 : HasPointKey, HasRangeKey bool
1093 :
1094 : // SyntheticPrefix will prepend this suffix to all keys in the file during
1095 : // iteration. Note that the backing file itself is not modified.
1096 : //
1097 : // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
1098 : SyntheticPrefix []byte
1099 :
1100 : // SyntheticSuffix will replace the suffix of every key in the file during
1101 : // iteration. Note that the file itself is not modified, rather, every key
1102 : // returned by an iterator will have the synthetic suffix.
1103 : //
1104 : // SyntheticSuffix can only be used under the following conditions:
1105 : // - the synthetic suffix must sort before any non-empty suffixes in the
1106 : // backing sst (the entire sst, not just the part restricted to Bounds).
1107 : // - the backing sst must not contain multiple keys with the same prefix.
1108 : SyntheticSuffix []byte
1109 :
1110 : // Level denotes the level at which this file was present at read time
1111 : // if the external file was returned by a scan of an existing Pebble
1112 : // instance. If Level is 0, this field is ignored.
1113 : Level uint8
1114 : }
1115 :
1116 : // IngestWithStats does the same as Ingest, and additionally returns
1117 : // IngestOperationStats.
1118 1 : func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
1119 1 : if err := d.closed.Load(); err != nil {
1120 0 : panic(err)
1121 : }
1122 1 : if d.opts.ReadOnly {
1123 0 : return IngestOperationStats{}, ErrReadOnly
1124 0 : }
1125 1 : return d.ingest(paths, nil, KeyRange{}, false, nil)
1126 : }
1127 :
1128 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1129 : // accepts external files (with locator info that can be resolved using
1130 : // d.opts.SharedStorage). These files must also be non-overlapping with
1131 : // each other, and must be resolvable through d.objProvider.
1132 1 : func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
1133 1 : if err := d.closed.Load(); err != nil {
1134 0 : panic(err)
1135 : }
1136 :
1137 1 : if d.opts.ReadOnly {
1138 0 : return IngestOperationStats{}, ErrReadOnly
1139 0 : }
1140 1 : if d.opts.Experimental.RemoteStorage == nil {
1141 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1142 0 : }
1143 1 : return d.ingest(nil, nil, KeyRange{}, false, external)
1144 : }
1145 :
1146 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1147 : // list of shared files to ingest that can be read from a remote.Storage through
1148 : // a Provider. All the shared files must live within exciseSpan, and any existing
1149 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1150 : // sstables (if not virtual already) and shrinking their spans to exclude
1151 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1152 : // ingestion process.
1153 : //
1154 : // Panics if this DB instance was not instantiated with a remote.Storage and
1155 : // shared sstables are present.
1156 : func (d *DB) IngestAndExcise(
1157 : paths []string,
1158 : shared []SharedSSTMeta,
1159 : external []ExternalFile,
1160 : exciseSpan KeyRange,
1161 : sstsContainExciseTombstone bool,
1162 1 : ) (IngestOperationStats, error) {
1163 1 : if err := d.closed.Load(); err != nil {
1164 0 : panic(err)
1165 : }
1166 1 : if d.opts.ReadOnly {
1167 0 : return IngestOperationStats{}, ErrReadOnly
1168 0 : }
1169 1 : if invariants.Enabled {
1170 1 : // Excise is only supported on prefix keys.
1171 1 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1172 0 : panic("IngestAndExcise called with suffixed start key")
1173 : }
1174 1 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1175 0 : panic("IngestAndExcise called with suffixed end key")
1176 : }
1177 : }
1178 1 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1179 0 : return IngestOperationStats{}, errors.Errorf(
1180 0 : "store has format major version %d; IngestAndExcise requires at least %d",
1181 0 : v, FormatMinForSharedObjects,
1182 0 : )
1183 0 : }
1184 1 : return d.ingest(paths, shared, exciseSpan, sstsContainExciseTombstone, external)
1185 : }
1186 :
1187 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1188 : func (d *DB) newIngestedFlushableEntry(
1189 : meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
1190 1 : ) (*flushableEntry, error) {
1191 1 : // Update the sequence number for all of the sstables in the
1192 1 : // metadata. Writing the metadata to the manifest when the
1193 1 : // version edit is applied is the mechanism that persists the
1194 1 : // sequence number. The sstables themselves are left unmodified.
1195 1 : // In this case, a version edit will only be written to the manifest
1196 1 : // when the flushable is eventually flushed. If Pebble restarts in that
1197 1 : // time, then we'll lose the ingest sequence number information. But this
1198 1 : // information will also be reconstructed on node restart.
1199 1 : for i, m := range meta {
1200 1 : if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
1201 0 : return nil, err
1202 0 : }
1203 : }
1204 :
1205 1 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan)
1206 1 :
1207 1 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1208 1 : // to and the sequence number within the WAL which we'll write this entry
1209 1 : // to.
1210 1 : entry := d.newFlushableEntry(f, logNum, seqNum)
1211 1 : // The flushable entry starts off with a single reader ref, so increment
1212 1 : // the FileMetadata.Refs.
1213 1 : for _, file := range f.files {
1214 1 : file.FileBacking.Ref()
1215 1 : }
1216 1 : entry.unrefFiles = func() []*fileBacking {
1217 1 : var obsolete []*fileBacking
1218 1 : for _, file := range f.files {
1219 1 : if file.FileBacking.Unref() == 0 {
1220 1 : obsolete = append(obsolete, file.FileMetadata.FileBacking)
1221 1 : }
1222 : }
1223 1 : return obsolete
1224 : }
1225 :
1226 1 : entry.flushForced = true
1227 1 : entry.releaseMemAccounting = func() {}
1228 1 : return entry, nil
1229 : }
1230 :
1231 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1232 : // we're holding both locks, the order in which we rotate the memtable or
1233 : // recycle the WAL in this function is irrelevant as long as the correct log
1234 : // numbers are assigned to the appropriate flushable.
1235 : func (d *DB) handleIngestAsFlushable(
1236 : meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
1237 1 : ) error {
1238 1 : b := d.NewBatch()
1239 1 : for _, m := range meta {
1240 1 : b.ingestSST(m.FileNum)
1241 1 : }
1242 1 : b.setSeqNum(seqNum)
1243 1 :
1244 1 : // If the WAL is disabled, then the logNum used to create the flushable
1245 1 : // entry doesn't matter. We just use the logNum assigned to the current
1246 1 : // mutable memtable. If the WAL is enabled, then this logNum will be
1247 1 : // overwritten by the logNum of the log which will contain the log entry
1248 1 : // for the ingestedFlushable.
1249 1 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1250 1 : if !d.opts.DisableWAL {
1251 1 : // We create a new WAL for the flushable instead of reusing the end of
1252 1 : // the previous WAL. This simplifies the increment of the minimum
1253 1 : // unflushed log number, and also simplifies WAL replay.
1254 1 : var prevLogSize uint64
1255 1 : logNum, prevLogSize = d.rotateWAL()
1256 1 : // As the rotator of the WAL, we're responsible for updating the
1257 1 : // previous flushable queue tail's log size.
1258 1 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
1259 1 :
1260 1 : d.mu.Unlock()
1261 1 : err := d.commit.directWrite(b)
1262 1 : if err != nil {
1263 0 : d.opts.Logger.Fatalf("%v", err)
1264 0 : }
1265 1 : d.mu.Lock()
1266 : }
1267 :
1268 1 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
1269 1 : if err != nil {
1270 0 : return err
1271 0 : }
1272 1 : nextSeqNum := seqNum + base.SeqNum(b.Count())
1273 1 :
1274 1 : // Set newLogNum to the logNum of the previous flushable. This value is
1275 1 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1276 1 : // the appropriate value below.
1277 1 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1278 1 : if !d.opts.DisableWAL {
1279 1 : // newLogNum will be the WAL num of the next mutable memtable which
1280 1 : // comes after the ingestedFlushable in the flushable queue. The mutable
1281 1 : // memtable will be created below.
1282 1 : //
1283 1 : // The prevLogSize returned by rotateWAL is the WAL to which the
1284 1 : // flushable ingest keys were appended. This intermediary WAL is only
1285 1 : // used to record the flushable ingest and nothing else.
1286 1 : newLogNum, entry.logSize = d.rotateWAL()
1287 1 : }
1288 :
1289 1 : d.mu.versions.metrics.Ingest.Count++
1290 1 : currMem := d.mu.mem.mutable
1291 1 : // NB: Placing ingested sstables above the current memtables
1292 1 : // requires rotating of the existing memtables/WAL. There is
1293 1 : // some concern of churning through tiny memtables due to
1294 1 : // ingested sstables being placed on top of them, but those
1295 1 : // memtables would have to be flushed anyways.
1296 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1297 1 : d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
1298 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1299 1 : // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
1300 1 : d.maybeScheduleFlush()
1301 1 : return nil
1302 : }
1303 :
1304 : // See comment at Ingest() for details on how this works.
1305 : func (d *DB) ingest(
1306 : paths []string,
1307 : shared []SharedSSTMeta,
1308 : exciseSpan KeyRange,
1309 : sstsContainExciseTombstone bool,
1310 : external []ExternalFile,
1311 1 : ) (IngestOperationStats, error) {
1312 1 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1313 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1314 : }
1315 1 : if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1316 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1317 0 : }
1318 1 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
1319 1 : for i := range external {
1320 1 : if len(external[i].SyntheticPrefix) > 0 {
1321 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1322 1 : }
1323 1 : if len(external[i].SyntheticSuffix) > 0 {
1324 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
1325 1 : }
1326 : }
1327 : }
1328 1 : ctx := context.Background()
1329 1 : // Allocate file numbers for all of the files being ingested and mark them as
1330 1 : // pending in order to prevent them from being deleted. Note that this causes
1331 1 : // the file number ordering to be out of alignment with sequence number
1332 1 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1333 1 : // that (busted) invariant.
1334 1 : d.mu.Lock()
1335 1 : pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
1336 1 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1337 1 : pendingOutputs[i] = d.mu.versions.getNextFileNum()
1338 1 : }
1339 :
1340 1 : jobID := d.newJobIDLocked()
1341 1 : d.mu.Unlock()
1342 1 :
1343 1 : // Load the metadata for all the files being ingested. This step detects
1344 1 : // and elides empty sstables.
1345 1 : loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
1346 1 : if err != nil {
1347 1 : return IngestOperationStats{}, err
1348 1 : }
1349 :
1350 1 : if loadResult.fileCount() == 0 {
1351 1 : // All of the sstables to be ingested were empty. Nothing to do.
1352 1 : return IngestOperationStats{}, nil
1353 1 : }
1354 :
1355 : // Verify the sstables do not overlap.
1356 1 : if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
1357 1 : return IngestOperationStats{}, err
1358 1 : }
1359 :
1360 : // Hard link the sstables into the DB directory. Since the sstables aren't
1361 : // referenced by a version, they won't be used. If the hard linking fails
1362 : // (e.g. because the files reside on a different filesystem), ingestLinkLocal
1363 : // will fall back to copying, and if that fails we undo our work and return an
1364 : // error.
1365 1 : if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1366 0 : return IngestOperationStats{}, err
1367 0 : }
1368 :
1369 1 : err = d.ingestAttachRemote(jobID, loadResult)
1370 1 : defer d.ingestUnprotectExternalBackings(loadResult)
1371 1 : if err != nil {
1372 0 : return IngestOperationStats{}, err
1373 0 : }
1374 :
1375 : // Make the new tables durable. We need to do this at some point before we
1376 : // update the MANIFEST (via logAndApply), otherwise a crash can have the
1377 : // tables referenced in the MANIFEST, but not present in the provider.
1378 1 : if err := d.objProvider.Sync(); err != nil {
1379 1 : return IngestOperationStats{}, err
1380 1 : }
1381 :
1382 : // metaFlushableOverlaps is a map indicating which of the ingested sstables
1383 : // overlap some table in the flushable queue. It's used to approximate
1384 : // ingest-into-L0 stats when using flushable ingests.
1385 1 : metaFlushableOverlaps := make(map[FileNum]bool, loadResult.fileCount())
1386 1 : var mem *flushableEntry
1387 1 : var mut *memTable
1388 1 : // asFlushable indicates whether the sstable was ingested as a flushable.
1389 1 : var asFlushable bool
1390 1 : prepare := func(seqNum base.SeqNum) {
1391 1 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1392 1 :
1393 1 : // Determine the set of bounds we care about for the purpose of checking
1394 1 : // for overlap among the flushables. If there's an excise span, we need
1395 1 : // to check for overlap with its bounds as well.
1396 1 : overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
1397 1 : for _, m := range loadResult.local {
1398 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1399 1 : }
1400 1 : for _, m := range loadResult.shared {
1401 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1402 1 : }
1403 1 : for _, m := range loadResult.external {
1404 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1405 1 : }
1406 1 : if exciseSpan.Valid() {
1407 1 : overlapBounds = append(overlapBounds, &exciseSpan)
1408 1 : }
1409 :
1410 1 : d.mu.Lock()
1411 1 : defer d.mu.Unlock()
1412 1 :
1413 1 : // Check if any of the currently-open EventuallyFileOnlySnapshots overlap
1414 1 : // in key ranges with the excise span. If so, we need to check for memtable
1415 1 : // overlaps with all bounds of that EventuallyFileOnlySnapshot in addition
1416 1 : // to the ingestion's own bounds too.
1417 1 :
1418 1 : if exciseSpan.Valid() {
1419 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
1420 1 : if s.efos == nil {
1421 0 : continue
1422 : }
1423 1 : if base.Visible(seqNum, s.efos.seqNum, base.SeqNumMax) {
1424 0 : // We only worry about snapshots older than the excise. Any snapshots
1425 0 : // created after the excise should see the excised view of the LSM
1426 0 : // anyway.
1427 0 : //
1428 0 : // Since we delay publishing the excise seqnum as visible until after
1429 0 : // the apply step, this case will never be hit in practice until we
1430 0 : // make excises flushable ingests.
1431 0 : continue
1432 : }
1433 1 : if invariants.Enabled {
1434 1 : if s.efos.hasTransitioned() {
1435 0 : panic("unexpected transitioned EFOS in snapshots list")
1436 : }
1437 : }
1438 1 : for i := range s.efos.protectedRanges {
1439 1 : if !s.efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
1440 1 : continue
1441 : }
1442 : // Our excise conflicts with this EFOS. We need to add its protected
1443 : // ranges to our overlapBounds. Grow overlapBounds in one allocation
1444 : // if necesary.
1445 1 : prs := s.efos.protectedRanges
1446 1 : if cap(overlapBounds) < len(overlapBounds)+len(prs) {
1447 1 : oldOverlapBounds := overlapBounds
1448 1 : overlapBounds = make([]bounded, len(oldOverlapBounds), len(oldOverlapBounds)+len(prs))
1449 1 : copy(overlapBounds, oldOverlapBounds)
1450 1 : }
1451 1 : for i := range prs {
1452 1 : overlapBounds = append(overlapBounds, &prs[i])
1453 1 : }
1454 1 : break
1455 : }
1456 : }
1457 : }
1458 :
1459 : // Check to see if any files overlap with any of the memtables. The queue
1460 : // is ordered from oldest to newest with the mutable memtable being the
1461 : // last element in the slice. We want to wait for the newest table that
1462 : // overlaps.
1463 :
1464 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1465 1 : m := d.mu.mem.queue[i]
1466 1 : m.computePossibleOverlaps(func(b bounded) shouldContinue {
1467 1 : // If this is the first table to overlap a flushable, save
1468 1 : // the flushable. This ingest must be ingested or flushed
1469 1 : // after it.
1470 1 : if mem == nil {
1471 1 : mem = m
1472 1 : }
1473 :
1474 1 : switch v := b.(type) {
1475 1 : case *fileMetadata:
1476 1 : // NB: False positives are possible if `m` is a flushable
1477 1 : // ingest that overlaps the file `v` in bounds but doesn't
1478 1 : // contain overlapping data. This is considered acceptable
1479 1 : // because it's rare (in CockroachDB a bound overlap likely
1480 1 : // indicates a data overlap), and blocking the commit
1481 1 : // pipeline while we perform I/O to check for overlap may be
1482 1 : // more disruptive than enqueueing this ingestion on the
1483 1 : // flushable queue and switching to a new memtable.
1484 1 : metaFlushableOverlaps[v.FileNum] = true
1485 1 : case *KeyRange:
1486 : // An excise span or an EventuallyFileOnlySnapshot protected range;
1487 : // not a file.
1488 0 : default:
1489 0 : panic("unreachable")
1490 : }
1491 1 : return continueIteration
1492 : }, overlapBounds...)
1493 : }
1494 :
1495 1 : if mem == nil {
1496 1 : // No overlap with any of the queued flushables, so no need to queue
1497 1 : // after them.
1498 1 :
1499 1 : // New writes with higher sequence numbers may be concurrently
1500 1 : // committed. We must ensure they don't flush before this ingest
1501 1 : // completes. To do that, we ref the mutable memtable as a writer,
1502 1 : // preventing its flushing (and the flushing of all subsequent
1503 1 : // flushables in the queue). Once we've acquired the manifest lock
1504 1 : // to add the ingested sstables to the LSM, we can unref as we're
1505 1 : // guaranteed that the flush won't edit the LSM before this ingest.
1506 1 : mut = d.mu.mem.mutable
1507 1 : mut.writerRef()
1508 1 : return
1509 1 : }
1510 :
1511 : // The ingestion overlaps with some entry in the flushable queue. If the
1512 : // pre-conditions are met below, we can treat this ingestion as a flushable
1513 : // ingest, otherwise we wait on the memtable flush before ingestion.
1514 : //
1515 : // TODO(aaditya): We should make flushableIngest compatible with remote
1516 : // files.
1517 1 : hasRemoteFiles := len(shared) > 0 || len(external) > 0
1518 1 : canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
1519 1 : (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
1520 1 : !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles
1521 1 :
1522 1 : if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) {
1523 1 : // We're not able to ingest as a flushable,
1524 1 : // so we must synchronously flush.
1525 1 : //
1526 1 : // TODO(bilal): Currently, if any of the files being ingested are shared or
1527 1 : // there's an excise span present, we cannot use flushable ingests and need
1528 1 : // to wait synchronously. Either remove this caveat by fleshing out
1529 1 : // flushable ingest logic to also account for these cases, or remove this
1530 1 : // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676
1531 1 : if mem.flushable == d.mu.mem.mutable {
1532 1 : err = d.makeRoomForWrite(nil)
1533 1 : }
1534 : // New writes with higher sequence numbers may be concurrently
1535 : // committed. We must ensure they don't flush before this ingest
1536 : // completes. To do that, we ref the mutable memtable as a writer,
1537 : // preventing its flushing (and the flushing of all subsequent
1538 : // flushables in the queue). Once we've acquired the manifest lock
1539 : // to add the ingested sstables to the LSM, we can unref as we're
1540 : // guaranteed that the flush won't edit the LSM before this ingest.
1541 1 : mut = d.mu.mem.mutable
1542 1 : mut.writerRef()
1543 1 : mem.flushForced = true
1544 1 : d.maybeScheduleFlush()
1545 1 : return
1546 : }
1547 : // Since there aren't too many memtables already queued up, we can
1548 : // slide the ingested sstables on top of the existing memtables.
1549 1 : asFlushable = true
1550 1 : fileMetas := make([]*fileMetadata, len(loadResult.local))
1551 1 : for i := range fileMetas {
1552 1 : fileMetas[i] = loadResult.local[i].fileMetadata
1553 1 : }
1554 1 : err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
1555 : }
1556 :
1557 1 : var ve *versionEdit
1558 1 : apply := func(seqNum base.SeqNum) {
1559 1 : if err != nil || asFlushable {
1560 1 : // An error occurred during prepare.
1561 1 : if mut != nil {
1562 0 : if mut.writerUnref() {
1563 0 : d.mu.Lock()
1564 0 : d.maybeScheduleFlush()
1565 0 : d.mu.Unlock()
1566 0 : }
1567 : }
1568 1 : return
1569 : }
1570 :
1571 : // If there's an excise being done atomically with the same ingest, we
1572 : // assign the lowest sequence number in the set of sequence numbers for this
1573 : // ingestion to the excise. Note that we've already allocated fileCount+1
1574 : // sequence numbers in this case.
1575 1 : if exciseSpan.Valid() {
1576 1 : seqNum++ // the first seqNum is reserved for the excise.
1577 1 : }
1578 : // Update the sequence numbers for all ingested sstables'
1579 : // metadata. When the version edit is applied, the metadata is
1580 : // written to the manifest, persisting the sequence number.
1581 : // The sstables themselves are left unmodified.
1582 1 : if err = ingestUpdateSeqNum(
1583 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1584 1 : ); err != nil {
1585 0 : if mut != nil {
1586 0 : if mut.writerUnref() {
1587 0 : d.mu.Lock()
1588 0 : d.maybeScheduleFlush()
1589 0 : d.mu.Unlock()
1590 0 : }
1591 : }
1592 0 : return
1593 : }
1594 :
1595 : // If we overlapped with a memtable in prepare wait for the flush to
1596 : // finish.
1597 1 : if mem != nil {
1598 1 : <-mem.flushed
1599 1 : }
1600 :
1601 : // Assign the sstables to the correct level in the LSM and apply the
1602 : // version edit.
1603 1 : ve, err = d.ingestApply(ctx, jobID, loadResult, mut, exciseSpan, seqNum)
1604 : }
1605 :
1606 : // Only one ingest can occur at a time because if not, one would block waiting
1607 : // for the other to finish applying. This blocking would happen while holding
1608 : // the commit mutex which would prevent unrelated batches from writing their
1609 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1610 : // during ingestion.
1611 1 : seqNumCount := loadResult.fileCount()
1612 1 : if exciseSpan.Valid() {
1613 1 : seqNumCount++
1614 1 : }
1615 1 : d.commit.ingestSem <- struct{}{}
1616 1 : d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
1617 1 : <-d.commit.ingestSem
1618 1 :
1619 1 : if err != nil {
1620 1 : if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
1621 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1622 0 : }
1623 1 : } else {
1624 1 : // Since we either created a hard link to the ingesting files, or copied
1625 1 : // them over, it is safe to remove the originals paths.
1626 1 : for i := range loadResult.local {
1627 1 : path := loadResult.local[i].path
1628 1 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1629 1 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1630 1 : }
1631 : }
1632 : }
1633 :
1634 1 : info := TableIngestInfo{
1635 1 : JobID: int(jobID),
1636 1 : Err: err,
1637 1 : flushable: asFlushable,
1638 1 : }
1639 1 : if len(loadResult.local) > 0 {
1640 1 : info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
1641 1 : } else if len(loadResult.shared) > 0 {
1642 1 : info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
1643 1 : } else {
1644 1 : info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
1645 1 : }
1646 1 : var stats IngestOperationStats
1647 1 : if ve != nil {
1648 1 : info.Tables = make([]struct {
1649 1 : TableInfo
1650 1 : Level int
1651 1 : }, len(ve.NewFiles))
1652 1 : for i := range ve.NewFiles {
1653 1 : e := &ve.NewFiles[i]
1654 1 : info.Tables[i].Level = e.Level
1655 1 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1656 1 : stats.Bytes += e.Meta.Size
1657 1 : if e.Level == 0 {
1658 1 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1659 1 : }
1660 1 : if metaFlushableOverlaps[e.Meta.FileNum] {
1661 1 : stats.MemtableOverlappingFiles++
1662 1 : }
1663 : }
1664 1 : } else if asFlushable {
1665 1 : // NB: If asFlushable == true, there are no shared sstables.
1666 1 : info.Tables = make([]struct {
1667 1 : TableInfo
1668 1 : Level int
1669 1 : }, len(loadResult.local))
1670 1 : for i, f := range loadResult.local {
1671 1 : info.Tables[i].Level = -1
1672 1 : info.Tables[i].TableInfo = f.TableInfo()
1673 1 : stats.Bytes += f.Size
1674 1 : // We don't have exact stats on which files will be ingested into
1675 1 : // L0, because actual ingestion into the LSM has been deferred until
1676 1 : // flush time. Instead, we infer based on memtable overlap.
1677 1 : //
1678 1 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1679 1 : // before entering the commit pipeline, we can use that overlap to
1680 1 : // improve our approximation by incorporating overlap with L0, not
1681 1 : // just memtables.
1682 1 : if metaFlushableOverlaps[f.FileNum] {
1683 1 : stats.ApproxIngestedIntoL0Bytes += f.Size
1684 1 : stats.MemtableOverlappingFiles++
1685 1 : }
1686 : }
1687 : }
1688 1 : d.opts.EventListener.TableIngested(info)
1689 1 :
1690 1 : return stats, err
1691 : }
1692 :
1693 : // excise updates ve to include a replacement of the file m with new virtual
1694 : // sstables that exclude exciseSpan, returning a slice of newly-created files if
1695 : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
1696 : // and m is deleted. Note that ve is updated in-place.
1697 : //
1698 : // The manifest lock must be held when calling this method.
1699 : func (d *DB) excise(
1700 : exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
1701 1 : ) ([]manifest.NewFileEntry, error) {
1702 1 : numCreatedFiles := 0
1703 1 : // Check if there's actually an overlap between m and exciseSpan.
1704 1 : mBounds := base.UserKeyBoundsFromInternal(m.Smallest, m.Largest)
1705 1 : if !exciseSpan.Overlaps(d.cmp, &mBounds) {
1706 1 : return nil, nil
1707 1 : }
1708 1 : ve.DeletedFiles[deletedFileEntry{
1709 1 : Level: level,
1710 1 : FileNum: m.FileNum,
1711 1 : }] = m
1712 1 : // Fast path: m sits entirely within the exciseSpan, so just delete it.
1713 1 : if exciseSpan.ContainsInternalKey(d.cmp, m.Smallest) && exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
1714 1 : return nil, nil
1715 1 : }
1716 :
1717 1 : var iters iterSet
1718 1 : var itersLoaded bool
1719 1 : defer iters.CloseAll()
1720 1 : loadItersIfNecessary := func() error {
1721 1 : if itersLoaded {
1722 1 : return nil
1723 1 : }
1724 1 : var err error
1725 1 : iters, err = d.newIters(context.TODO(), m, &IterOptions{
1726 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1727 1 : Category: "pebble-ingest",
1728 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1729 1 : },
1730 1 : level: manifest.Level(level),
1731 1 : }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
1732 1 : itersLoaded = true
1733 1 : return err
1734 : }
1735 :
1736 1 : needsBacking := false
1737 1 : // Create a file to the left of the excise span, if necessary.
1738 1 : // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
1739 1 : //
1740 1 : // We create bounds that are tight on user keys, and we make the effort to find
1741 1 : // the last key in the original sstable that's smaller than exciseSpan.Start
1742 1 : // even though it requires some sstable reads. We could choose to create
1743 1 : // virtual sstables on loose userKey bounds, in which case we could just set
1744 1 : // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
1745 1 : // issue with that approach would be that it'd lead to lots of small virtual
1746 1 : // sstables in the LSM that have no guarantee on containing even a single user
1747 1 : // key within the file bounds. This has the potential to increase both read and
1748 1 : // write-amp as we will be opening up these sstables only to find no relevant
1749 1 : // keys in the read path, and compacting sstables on top of them instead of
1750 1 : // directly into the space occupied by them. We choose to incur the cost of
1751 1 : // calculating tight bounds at this time instead of creating more work in the
1752 1 : // future.
1753 1 : //
1754 1 : // TODO(bilal): Some of this work can happen without grabbing the manifest
1755 1 : // lock; we could grab one currentVersion, release the lock, calculate excised
1756 1 : // files, then grab the lock again and recalculate for just the files that
1757 1 : // have changed since our previous calculation. Do this optimiaztino as part of
1758 1 : // https://github.com/cockroachdb/pebble/issues/2112 .
1759 1 : if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
1760 1 : leftFile := &fileMetadata{
1761 1 : Virtual: true,
1762 1 : FileBacking: m.FileBacking,
1763 1 : FileNum: d.mu.versions.getNextFileNum(),
1764 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1765 1 : // sufficient for maintaining correctness.
1766 1 : SmallestSeqNum: m.SmallestSeqNum,
1767 1 : LargestSeqNum: m.LargestSeqNum,
1768 1 : LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
1769 1 : SyntheticPrefix: m.SyntheticPrefix,
1770 1 : SyntheticSuffix: m.SyntheticSuffix,
1771 1 : }
1772 1 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
1773 1 : // This file will probably contain point keys.
1774 1 : if err := loadItersIfNecessary(); err != nil {
1775 0 : return nil, err
1776 0 : }
1777 1 : smallestPointKey := m.SmallestPointKey
1778 1 : if kv := iters.Point().SeekLT(exciseSpan.Start, base.SeekLTFlagsNone); kv != nil {
1779 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, kv.K.Clone())
1780 1 : }
1781 : // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
1782 : // needs to be a copy if the key is owned by the range del iter.
1783 1 : var lastRangeDel []byte
1784 1 : if rdel, err := iters.RangeDeletion().SeekLT(exciseSpan.Start); err != nil {
1785 0 : return nil, err
1786 1 : } else if rdel != nil {
1787 1 : lastRangeDel = append(lastRangeDel[:0], rdel.End...)
1788 1 : if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
1789 1 : lastRangeDel = exciseSpan.Start
1790 1 : }
1791 : }
1792 1 : if lastRangeDel != nil {
1793 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
1794 1 : }
1795 : }
1796 1 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestRangeKey) {
1797 1 : // This file will probably contain range keys.
1798 1 : if err := loadItersIfNecessary(); err != nil {
1799 0 : return nil, err
1800 0 : }
1801 1 : smallestRangeKey := m.SmallestRangeKey
1802 1 : // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
1803 1 : // needs to be a copy if the key is owned by the range key iter.
1804 1 : var lastRangeKey []byte
1805 1 : var lastRangeKeyKind InternalKeyKind
1806 1 : if rkey, err := iters.RangeKey().SeekLT(exciseSpan.Start); err != nil {
1807 0 : return nil, err
1808 1 : } else if rkey != nil {
1809 1 : lastRangeKey = append(lastRangeKey[:0], rkey.End...)
1810 1 : if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
1811 1 : lastRangeKey = exciseSpan.Start
1812 1 : }
1813 1 : lastRangeKeyKind = rkey.Keys[0].Kind()
1814 : }
1815 1 : if lastRangeKey != nil {
1816 1 : leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
1817 1 : }
1818 : }
1819 1 : if leftFile.HasRangeKeys || leftFile.HasPointKeys {
1820 1 : var err error
1821 1 : leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
1822 1 : if err != nil {
1823 0 : return nil, err
1824 0 : }
1825 1 : if leftFile.Size == 0 {
1826 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1827 1 : // such as if the excised file only has range keys/dels and no point
1828 1 : // keys. This can cause panics in places where we divide by file sizes.
1829 1 : // Correct for it here.
1830 1 : leftFile.Size = 1
1831 1 : }
1832 1 : if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1833 0 : return nil, err
1834 0 : }
1835 1 : leftFile.ValidateVirtual(m)
1836 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
1837 1 : needsBacking = true
1838 1 : numCreatedFiles++
1839 : }
1840 : }
1841 : // Create a file to the right, if necessary.
1842 1 : if exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
1843 1 : // No key exists to the right of the excise span in this file.
1844 1 : if needsBacking && !m.Virtual {
1845 1 : // If m is virtual, then its file backing is already known to the manifest.
1846 1 : // We don't need to create another file backing. Note that there must be
1847 1 : // only one CreatedBackingTables entry per backing sstable. This is
1848 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1849 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1850 1 : }
1851 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1852 : }
1853 : // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
1854 : //
1855 : // See comment before the definition of leftFile for the motivation behind
1856 : // calculating tight user-key bounds.
1857 1 : rightFile := &fileMetadata{
1858 1 : Virtual: true,
1859 1 : FileBacking: m.FileBacking,
1860 1 : FileNum: d.mu.versions.getNextFileNum(),
1861 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1862 1 : // sufficient for maintaining correctness.
1863 1 : SmallestSeqNum: m.SmallestSeqNum,
1864 1 : LargestSeqNum: m.LargestSeqNum,
1865 1 : LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
1866 1 : SyntheticPrefix: m.SyntheticPrefix,
1867 1 : SyntheticSuffix: m.SyntheticSuffix,
1868 1 : }
1869 1 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
1870 1 : // This file will probably contain point keys
1871 1 : if err := loadItersIfNecessary(); err != nil {
1872 0 : return nil, err
1873 0 : }
1874 1 : largestPointKey := m.LargestPointKey
1875 1 : if kv := iters.Point().SeekGE(exciseSpan.End.Key, base.SeekGEFlagsNone); kv != nil {
1876 1 : if exciseSpan.End.Kind == base.Inclusive && d.equal(exciseSpan.End.Key, kv.K.UserKey) {
1877 0 : return nil, base.AssertionFailedf("cannot excise with an inclusive end key and data overlap at end key")
1878 0 : }
1879 1 : rightFile.ExtendPointKeyBounds(d.cmp, kv.K.Clone(), largestPointKey)
1880 : }
1881 : // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
1882 : // needs to be a copy if the key is owned by the range del iter.
1883 1 : var firstRangeDel []byte
1884 1 : rdel, err := iters.RangeDeletion().SeekGE(exciseSpan.End.Key)
1885 1 : if err != nil {
1886 0 : return nil, err
1887 1 : } else if rdel != nil {
1888 1 : firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
1889 1 : if d.cmp(firstRangeDel, exciseSpan.End.Key) < 0 {
1890 1 : // NB: This can only be done if the end bound is exclusive.
1891 1 : if exciseSpan.End.Kind != base.Exclusive {
1892 0 : return nil, base.AssertionFailedf("cannot truncate rangedel during excise with an inclusive upper bound")
1893 0 : }
1894 1 : firstRangeDel = exciseSpan.End.Key
1895 : }
1896 : }
1897 1 : if firstRangeDel != nil {
1898 1 : smallestPointKey := rdel.SmallestKey()
1899 1 : smallestPointKey.UserKey = firstRangeDel
1900 1 : rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
1901 1 : }
1902 : }
1903 1 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestRangeKey) {
1904 1 : // This file will probably contain range keys.
1905 1 : if err := loadItersIfNecessary(); err != nil {
1906 0 : return nil, err
1907 0 : }
1908 1 : largestRangeKey := m.LargestRangeKey
1909 1 : // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
1910 1 : // needs to be a copy if the key is owned by the range key iter.
1911 1 : var firstRangeKey []byte
1912 1 : rkey, err := iters.RangeKey().SeekGE(exciseSpan.End.Key)
1913 1 : if err != nil {
1914 0 : return nil, err
1915 1 : } else if rkey != nil {
1916 1 : firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
1917 1 : if d.cmp(firstRangeKey, exciseSpan.End.Key) < 0 {
1918 1 : if exciseSpan.End.Kind != base.Exclusive {
1919 0 : return nil, base.AssertionFailedf("cannot truncate range key during excise with an inclusive upper bound")
1920 0 : }
1921 1 : firstRangeKey = exciseSpan.End.Key
1922 : }
1923 : }
1924 1 : if firstRangeKey != nil {
1925 1 : smallestRangeKey := rkey.SmallestKey()
1926 1 : smallestRangeKey.UserKey = firstRangeKey
1927 1 : // We call ExtendRangeKeyBounds so any internal boundType fields are
1928 1 : // set correctly. Note that this is mildly wasteful as we'll be comparing
1929 1 : // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
1930 1 : // avoided if we exported ExtendOverallKeyBounds or so.
1931 1 : rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
1932 1 : }
1933 : }
1934 1 : if rightFile.HasRangeKeys || rightFile.HasPointKeys {
1935 1 : var err error
1936 1 : rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
1937 1 : if err != nil {
1938 0 : return nil, err
1939 0 : }
1940 1 : if rightFile.Size == 0 {
1941 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1942 1 : // such as if the excised file only has range keys/dels and no point keys.
1943 1 : // This can cause panics in places where we divide by file sizes. Correct
1944 1 : // for it here.
1945 1 : rightFile.Size = 1
1946 1 : }
1947 1 : if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1948 0 : return nil, err
1949 0 : }
1950 1 : rightFile.ValidateVirtual(m)
1951 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
1952 1 : needsBacking = true
1953 1 : numCreatedFiles++
1954 : }
1955 :
1956 1 : if needsBacking && !m.Virtual {
1957 1 : // If m is virtual, then its file backing is already known to the manifest.
1958 1 : // We don't need to create another file backing. Note that there must be
1959 1 : // only one CreatedBackingTables entry per backing sstable. This is
1960 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1961 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1962 1 : }
1963 :
1964 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1965 : }
1966 :
1967 : type ingestSplitFile struct {
1968 : // ingestFile is the file being ingested.
1969 : ingestFile *fileMetadata
1970 : // splitFile is the file that needs to be split to allow ingestFile to slot
1971 : // into `level` level.
1972 : splitFile *fileMetadata
1973 : // The level where ingestFile will go (and where splitFile already is).
1974 : level int
1975 : }
1976 :
1977 : // ingestSplit splits files specified in `files` and updates ve in-place to
1978 : // account for existing files getting split into two virtual sstables. The map
1979 : // `replacedFiles` contains an in-progress map of all files that have been
1980 : // replaced with new virtual sstables in this version edit so far, which is also
1981 : // updated in-place.
1982 : //
1983 : // d.mu as well as the manifest lock must be held when calling this method.
1984 : func (d *DB) ingestSplit(
1985 : ve *versionEdit,
1986 : updateMetrics func(*fileMetadata, int, []newFileEntry),
1987 : files []ingestSplitFile,
1988 : replacedFiles map[base.FileNum][]newFileEntry,
1989 1 : ) error {
1990 1 : for _, s := range files {
1991 1 : ingestFileBounds := s.ingestFile.UserKeyBounds()
1992 1 : // replacedFiles can be thought of as a tree, where we start iterating with
1993 1 : // s.splitFile and run its fileNum through replacedFiles, then find which of
1994 1 : // the replaced files overlaps with s.ingestFile, which becomes the new
1995 1 : // splitFile, then we check splitFile's replacements in replacedFiles again
1996 1 : // for overlap with s.ingestFile, and so on until we either can't find the
1997 1 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
1998 1 : // be split), or we don't find a file that overlaps with s.ingestFile, which
1999 1 : // means a prior ingest split already produced enough room for s.ingestFile
2000 1 : // to go into this level without necessitating another ingest split.
2001 1 : splitFile := s.splitFile
2002 1 : for splitFile != nil {
2003 1 : replaced, ok := replacedFiles[splitFile.FileNum]
2004 1 : if !ok {
2005 1 : break
2006 : }
2007 1 : updatedSplitFile := false
2008 1 : for i := range replaced {
2009 1 : if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
2010 1 : if updatedSplitFile {
2011 0 : // This should never happen because the earlier ingestTargetLevel
2012 0 : // function only finds split file candidates that are guaranteed to
2013 0 : // have no data overlap, only boundary overlap. See the comments
2014 0 : // in that method to see the definitions of data vs boundary
2015 0 : // overlap. That, plus the fact that files in `replaced` are
2016 0 : // guaranteed to have file bounds that are tight on user keys
2017 0 : // (as that's what `d.excise` produces), means that the only case
2018 0 : // where we overlap with two or more files in `replaced` is if we
2019 0 : // actually had data overlap all along, or if the ingestion files
2020 0 : // were overlapping, either of which is an invariant violation.
2021 0 : panic("updated with two files in ingestSplit")
2022 : }
2023 1 : splitFile = replaced[i].Meta
2024 1 : updatedSplitFile = true
2025 : }
2026 : }
2027 1 : if !updatedSplitFile {
2028 1 : // None of the replaced files overlapped with the file being ingested.
2029 1 : // This can happen if we've already excised a span overlapping with
2030 1 : // this file, or if we have consecutive ingested files that can slide
2031 1 : // within the same gap between keys in an existing file. For instance,
2032 1 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
2033 1 : // the first loop iteration will split the existing file into one that
2034 1 : // ends in a and another that starts at g, and the second iteration will
2035 1 : // fall into this case and require no splitting.
2036 1 : //
2037 1 : // No splitting necessary.
2038 1 : splitFile = nil
2039 1 : }
2040 : }
2041 1 : if splitFile == nil {
2042 1 : continue
2043 : }
2044 : // NB: excise operates on [start, end). We're splitting at [start, end]
2045 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
2046 : // of exclusive vs inclusive end bounds should not make a difference here
2047 : // as we're guaranteed to not have any data overlap between splitFile and
2048 : // s.ingestFile. d.excise will return an error if we pass an inclusive user
2049 : // key bound _and_ we end up seeing data overlap at the end key.
2050 1 : added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
2051 1 : if err != nil {
2052 0 : return err
2053 0 : }
2054 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2055 1 : Level: s.level,
2056 1 : FileNum: splitFile.FileNum,
2057 1 : }]; !ok {
2058 0 : panic("did not split file that was expected to be split")
2059 : }
2060 1 : replacedFiles[splitFile.FileNum] = added
2061 1 : for i := range added {
2062 1 : addedBounds := added[i].Meta.UserKeyBounds()
2063 1 : if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
2064 0 : panic("ingest-time split produced a file that overlaps with ingested file")
2065 : }
2066 : }
2067 1 : updateMetrics(splitFile, s.level, added)
2068 : }
2069 : // Flatten the version edit by removing any entries from ve.NewFiles that
2070 : // are also in ve.DeletedFiles.
2071 1 : newNewFiles := ve.NewFiles[:0]
2072 1 : for i := range ve.NewFiles {
2073 1 : fn := ve.NewFiles[i].Meta.FileNum
2074 1 : deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
2075 1 : if _, ok := ve.DeletedFiles[deEntry]; ok {
2076 1 : delete(ve.DeletedFiles, deEntry)
2077 1 : } else {
2078 1 : newNewFiles = append(newNewFiles, ve.NewFiles[i])
2079 1 : }
2080 : }
2081 1 : ve.NewFiles = newNewFiles
2082 1 : return nil
2083 : }
2084 :
2085 : func (d *DB) ingestApply(
2086 : ctx context.Context,
2087 : jobID JobID,
2088 : lr ingestLoadResult,
2089 : mut *memTable,
2090 : exciseSpan KeyRange,
2091 : exciseSeqNum base.SeqNum,
2092 1 : ) (*versionEdit, error) {
2093 1 : d.mu.Lock()
2094 1 : defer d.mu.Unlock()
2095 1 :
2096 1 : ve := &versionEdit{
2097 1 : NewFiles: make([]newFileEntry, lr.fileCount()),
2098 1 : }
2099 1 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
2100 1 : ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
2101 1 : }
2102 1 : metrics := make(map[int]*LevelMetrics)
2103 1 :
2104 1 : // Lock the manifest for writing before we use the current version to
2105 1 : // determine the target level. This prevents two concurrent ingestion jobs
2106 1 : // from using the same version to determine the target level, and also
2107 1 : // provides serialization with concurrent compaction and flush jobs.
2108 1 : // logAndApply unconditionally releases the manifest lock, but any earlier
2109 1 : // returns must unlock the manifest.
2110 1 : d.mu.versions.logLock()
2111 1 :
2112 1 : if mut != nil {
2113 1 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
2114 1 : // acquired the manifest lock, we can be certain that if the mutable
2115 1 : // memtable has received more recent conflicting writes, the flush won't
2116 1 : // beat us to applying to the manifest resulting in sequence number
2117 1 : // inversion. Even though we call maybeScheduleFlush right now, this flush
2118 1 : // will apply after our ingestion.
2119 1 : if mut.writerUnref() {
2120 1 : d.maybeScheduleFlush()
2121 1 : }
2122 : }
2123 :
2124 1 : current := d.mu.versions.currentVersion()
2125 1 : overlapChecker := &overlapChecker{
2126 1 : comparer: d.opts.Comparer,
2127 1 : newIters: d.newIters,
2128 1 : opts: IterOptions{
2129 1 : logger: d.opts.Logger,
2130 1 : CategoryAndQoS: sstable.CategoryAndQoS{
2131 1 : Category: "pebble-ingest",
2132 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
2133 1 : },
2134 1 : },
2135 1 : v: current,
2136 1 : }
2137 1 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
2138 1 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
2139 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
2140 1 : // filesToSplit is a list where each element is a pair consisting of a file
2141 1 : // being ingested and a file being split to make room for an ingestion into
2142 1 : // that level. Each ingested file will appear at most once in this list. It
2143 1 : // is possible for split files to appear twice in this list.
2144 1 : filesToSplit := make([]ingestSplitFile, 0)
2145 1 : checkCompactions := false
2146 1 : for i := 0; i < lr.fileCount(); i++ {
2147 1 : // Determine the lowest level in the LSM for which the sstable doesn't
2148 1 : // overlap any existing files in the level.
2149 1 : var m *fileMetadata
2150 1 : specifiedLevel := -1
2151 1 : isShared := false
2152 1 : isExternal := false
2153 1 : if i < len(lr.local) {
2154 1 : // local file.
2155 1 : m = lr.local[i].fileMetadata
2156 1 : } else if (i - len(lr.local)) < len(lr.shared) {
2157 1 : // shared file.
2158 1 : isShared = true
2159 1 : sharedIdx := i - len(lr.local)
2160 1 : m = lr.shared[sharedIdx].fileMetadata
2161 1 : specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
2162 1 : } else {
2163 1 : // external file.
2164 1 : isExternal = true
2165 1 : externalIdx := i - (len(lr.local) + len(lr.shared))
2166 1 : m = lr.external[externalIdx].fileMetadata
2167 1 : if lr.externalFilesHaveLevel {
2168 1 : specifiedLevel = int(lr.external[externalIdx].external.Level)
2169 1 : }
2170 : }
2171 :
2172 : // Add to CreatedBackingTables if this is a new backing.
2173 : //
2174 : // Shared files always have a new backing. External files have new backings
2175 : // iff the backing disk file num and the file num match (see ingestAttachRemote).
2176 1 : if isShared || (isExternal && m.FileBacking.DiskFileNum == base.DiskFileNum(m.FileNum)) {
2177 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2178 1 : }
2179 :
2180 1 : f := &ve.NewFiles[i]
2181 1 : var err error
2182 1 : if specifiedLevel != -1 {
2183 1 : f.Level = specifiedLevel
2184 1 : } else {
2185 1 : var splitFile *fileMetadata
2186 1 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
2187 1 : // This file fits perfectly within the excise span. We can slot it at
2188 1 : // L6, or sharedLevelsStart - 1 if we have shared files.
2189 1 : if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
2190 1 : f.Level = sharedLevelsStart - 1
2191 1 : if baseLevel > f.Level {
2192 1 : f.Level = 0
2193 1 : }
2194 1 : } else {
2195 1 : f.Level = 6
2196 1 : }
2197 1 : } else {
2198 1 : // We check overlap against the LSM without holding DB.mu. Note that we
2199 1 : // are still holding the log lock, so the version cannot change.
2200 1 : // TODO(radu): perform this check optimistically outside of the log lock.
2201 1 : var lsmOverlap overlap.WithLSM
2202 1 : lsmOverlap, err = func() (overlap.WithLSM, error) {
2203 1 : d.mu.Unlock()
2204 1 : defer d.mu.Lock()
2205 1 : return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
2206 1 : }()
2207 1 : if err == nil {
2208 1 : f.Level, splitFile, err = ingestTargetLevel(
2209 1 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
2210 1 : )
2211 1 : }
2212 : }
2213 :
2214 1 : if splitFile != nil {
2215 1 : if invariants.Enabled {
2216 1 : if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf.Empty() {
2217 0 : panic("splitFile returned is not in level it should be")
2218 : }
2219 : }
2220 : // We take advantage of the fact that we won't drop the db mutex
2221 : // between now and the call to logAndApply. So, no files should
2222 : // get added to a new in-progress compaction at this point. We can
2223 : // avoid having to iterate on in-progress compactions to cancel them
2224 : // if none of the files being split have a compacting state.
2225 1 : if splitFile.IsCompacting() {
2226 0 : checkCompactions = true
2227 0 : }
2228 1 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
2229 : }
2230 : }
2231 1 : if err != nil {
2232 0 : d.mu.versions.logUnlock()
2233 0 : return nil, err
2234 0 : }
2235 1 : if isShared && f.Level < sharedLevelsStart {
2236 0 : panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
2237 0 : f.Level, sharedLevelsStart))
2238 : }
2239 1 : f.Meta = m
2240 1 : levelMetrics := metrics[f.Level]
2241 1 : if levelMetrics == nil {
2242 1 : levelMetrics = &LevelMetrics{}
2243 1 : metrics[f.Level] = levelMetrics
2244 1 : }
2245 1 : levelMetrics.NumFiles++
2246 1 : levelMetrics.Size += int64(m.Size)
2247 1 : levelMetrics.BytesIngested += m.Size
2248 1 : levelMetrics.TablesIngested++
2249 : }
2250 : // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
2251 : // by ingestTargetLevel), to files that were created to replace it. This map
2252 : // is used to resolve references to split files in filesToSplit, as it is
2253 : // possible for a file that we want to split to no longer exist or have a
2254 : // newer fileMetadata due to a split induced by another ingestion file, or an
2255 : // excise.
2256 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
2257 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
2258 1 : levelMetrics := metrics[level]
2259 1 : if levelMetrics == nil {
2260 1 : levelMetrics = &LevelMetrics{}
2261 1 : metrics[level] = levelMetrics
2262 1 : }
2263 1 : levelMetrics.NumFiles--
2264 1 : levelMetrics.Size -= int64(m.Size)
2265 1 : for i := range added {
2266 1 : levelMetrics.NumFiles++
2267 1 : levelMetrics.Size += int64(added[i].Meta.Size)
2268 1 : }
2269 : }
2270 1 : if exciseSpan.Valid() {
2271 1 : // Iterate through all levels and find files that intersect with exciseSpan.
2272 1 : //
2273 1 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2274 1 : // excises; we only need to hold the version lock which we already are
2275 1 : // holding. However releasing the DB mutex could mess with the
2276 1 : // ingestTargetLevel calculation that happened above, as it assumed that it
2277 1 : // had a complete view of in-progress compactions that wouldn't change
2278 1 : // until logAndApply is called. If we were to drop the mutex now, we could
2279 1 : // schedule another in-progress compaction that would go into the chosen target
2280 1 : // level and lead to file overlap within level (which would panic in
2281 1 : // logAndApply). We should drop the db mutex here, do the excise, then
2282 1 : // re-grab the DB mutex and rerun just the in-progress compaction check to
2283 1 : // see if any new compactions are conflicting with our chosen target levels
2284 1 : // for files, and if they are, we should signal those compactions to error
2285 1 : // out.
2286 1 : for level := range current.Levels {
2287 1 : overlaps := current.Overlaps(level, exciseSpan.UserKeyBounds())
2288 1 : iter := overlaps.Iter()
2289 1 :
2290 1 : for m := iter.First(); m != nil; m = iter.Next() {
2291 1 : newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level)
2292 1 : if err != nil {
2293 0 : return nil, err
2294 0 : }
2295 :
2296 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2297 1 : Level: level,
2298 1 : FileNum: m.FileNum,
2299 1 : }]; !ok {
2300 1 : // We did not excise this file.
2301 1 : continue
2302 : }
2303 1 : replacedFiles[m.FileNum] = newFiles
2304 1 : updateLevelMetricsOnExcise(m, level, newFiles)
2305 : }
2306 : }
2307 : }
2308 1 : if len(filesToSplit) > 0 {
2309 1 : // For the same reasons as the above call to excise, we hold the db mutex
2310 1 : // while calling this method.
2311 1 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2312 0 : return nil, err
2313 0 : }
2314 : }
2315 1 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2316 1 : for c := range d.mu.compact.inProgress {
2317 1 : if c.versionEditApplied {
2318 0 : continue
2319 : }
2320 : // Check if this compaction overlaps with the excise span. Note that just
2321 : // checking if the inputs individually overlap with the excise span
2322 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2323 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2324 : // doing a [c,d) excise at the same time as this compaction, we will have
2325 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2326 : // write a file overlapping with the excise span.
2327 1 : if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
2328 1 : c.cancel.Store(true)
2329 1 : }
2330 : // Check if this compaction's inputs have been replaced due to an
2331 : // ingest-time split. In that case, cancel the compaction as a newly picked
2332 : // compaction would need to include any new files that slid in between
2333 : // previously-existing files. Note that we cancel any compaction that has a
2334 : // file that was ingest-split as an input, even if it started before this
2335 : // ingestion.
2336 1 : if checkCompactions {
2337 0 : for i := range c.inputs {
2338 0 : iter := c.inputs[i].files.Iter()
2339 0 : for f := iter.First(); f != nil; f = iter.Next() {
2340 0 : if _, ok := replacedFiles[f.FileNum]; ok {
2341 0 : c.cancel.Store(true)
2342 0 : break
2343 : }
2344 : }
2345 : }
2346 : }
2347 : }
2348 : }
2349 :
2350 1 : if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2351 1 : return d.getInProgressCompactionInfoLocked(nil)
2352 1 : }); err != nil {
2353 1 : // Note: any error during logAndApply is fatal; this won't be reachable in production.
2354 1 : return nil, err
2355 1 : }
2356 :
2357 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2358 : // an excise on this span. There should be none as the
2359 : // computePossibleOverlaps steps should have forced these EFOS to transition
2360 : // to file-only snapshots by now. If we see any that conflict with this
2361 : // excise, panic.
2362 1 : if exciseSpan.Valid() {
2363 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2364 0 : // Skip non-EFOS snapshots, and also skip any EFOS that were created
2365 0 : // *after* the excise.
2366 0 : if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
2367 0 : continue
2368 : }
2369 0 : efos := s.efos
2370 0 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2371 0 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2372 0 : // global list of all protected ranges instead of having to peer into every
2373 0 : // snapshot.
2374 0 : for i := range efos.protectedRanges {
2375 0 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2376 0 : panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
2377 : }
2378 : }
2379 : }
2380 : }
2381 :
2382 1 : d.mu.versions.metrics.Ingest.Count++
2383 1 :
2384 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2385 1 : // updateReadStateLocked could have generated obsolete tables, schedule a
2386 1 : // cleanup job if necessary.
2387 1 : d.deleteObsoleteFiles(jobID)
2388 1 : d.updateTableStatsLocked(ve.NewFiles)
2389 1 : // The ingestion may have pushed a level over the threshold for compaction,
2390 1 : // so check to see if one is necessary and schedule it.
2391 1 : d.maybeScheduleCompaction()
2392 1 : var toValidate []manifest.NewFileEntry
2393 1 : dedup := make(map[base.DiskFileNum]struct{})
2394 1 : for _, entry := range ve.NewFiles {
2395 1 : if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
2396 1 : toValidate = append(toValidate, entry)
2397 1 : dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
2398 1 : }
2399 : }
2400 1 : d.maybeValidateSSTablesLocked(toValidate)
2401 1 : return ve, nil
2402 : }
2403 :
2404 : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
2405 : // queue of files to be validated, when the feature is enabled.
2406 : //
2407 : // Note that if two entries with the same backing file are added twice, then the
2408 : // block checksums for the backing file will be validated twice.
2409 : //
2410 : // DB.mu must be locked when calling.
2411 1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
2412 1 : // Only add to the validation queue when the feature is enabled.
2413 1 : if !d.opts.Experimental.ValidateOnIngest {
2414 1 : return
2415 1 : }
2416 :
2417 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2418 1 : if d.shouldValidateSSTablesLocked() {
2419 1 : go d.validateSSTables()
2420 1 : }
2421 : }
2422 :
2423 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2424 : // DB.mu must be locked when calling.
2425 1 : func (d *DB) shouldValidateSSTablesLocked() bool {
2426 1 : return !d.mu.tableValidation.validating &&
2427 1 : d.closed.Load() == nil &&
2428 1 : d.opts.Experimental.ValidateOnIngest &&
2429 1 : len(d.mu.tableValidation.pending) > 0
2430 1 : }
2431 :
2432 : // validateSSTables runs a round of validation on the tables in the pending
2433 : // queue.
2434 1 : func (d *DB) validateSSTables() {
2435 1 : d.mu.Lock()
2436 1 : if !d.shouldValidateSSTablesLocked() {
2437 1 : d.mu.Unlock()
2438 1 : return
2439 1 : }
2440 :
2441 1 : pending := d.mu.tableValidation.pending
2442 1 : d.mu.tableValidation.pending = nil
2443 1 : d.mu.tableValidation.validating = true
2444 1 : jobID := d.newJobIDLocked()
2445 1 : rs := d.loadReadState()
2446 1 :
2447 1 : // Drop DB.mu before performing IO.
2448 1 : d.mu.Unlock()
2449 1 :
2450 1 : // Validate all tables in the pending queue. This could lead to a situation
2451 1 : // where we are starving IO from other tasks due to having to page through
2452 1 : // all the blocks in all the sstables in the queue.
2453 1 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2454 1 :
2455 1 : // If we fail to validate any files due to reasons other than uncovered
2456 1 : // corruption, accumulate them and re-queue them for another attempt.
2457 1 : var retry []manifest.NewFileEntry
2458 1 :
2459 1 : for _, f := range pending {
2460 1 : // The file may have been moved or deleted since it was ingested, in
2461 1 : // which case we skip.
2462 1 : if !rs.current.Contains(f.Level, f.Meta) {
2463 0 : // Assume the file was moved to a lower level. It is rare enough
2464 0 : // that a table is moved or deleted between the time it was ingested
2465 0 : // and the time the validation routine runs that the overall cost of
2466 0 : // this inner loop is tolerably low, when amortized over all
2467 0 : // ingested tables.
2468 0 : found := false
2469 0 : for i := f.Level + 1; i < numLevels; i++ {
2470 0 : if rs.current.Contains(i, f.Meta) {
2471 0 : found = true
2472 0 : break
2473 : }
2474 : }
2475 0 : if !found {
2476 0 : continue
2477 : }
2478 : }
2479 :
2480 1 : var err error
2481 1 : if f.Meta.Virtual {
2482 1 : err = d.tableCache.withVirtualReader(
2483 1 : f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
2484 1 : return v.ValidateBlockChecksumsOnBacking()
2485 1 : })
2486 1 : } else {
2487 1 : err = d.tableCache.withReader(
2488 1 : f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
2489 1 : return r.ValidateBlockChecksums()
2490 1 : })
2491 : }
2492 :
2493 1 : if err != nil {
2494 1 : if IsCorruptionError(err) {
2495 1 : // TODO(travers): Hook into the corruption reporting pipeline, once
2496 1 : // available. See pebble#1192.
2497 1 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2498 1 : } else {
2499 1 : // If there was some other, possibly transient, error that
2500 1 : // caused table validation to fail inform the EventListener and
2501 1 : // move on. We remember the table so that we can retry it in a
2502 1 : // subsequent table validation job.
2503 1 : //
2504 1 : // TODO(jackson): If the error is not transient, this will retry
2505 1 : // validation indefinitely. While not great, it's the same
2506 1 : // behavior as erroring flushes and compactions. We should
2507 1 : // address this as a part of #270.
2508 1 : d.opts.EventListener.BackgroundError(err)
2509 1 : retry = append(retry, f)
2510 1 : continue
2511 : }
2512 : }
2513 :
2514 1 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2515 1 : JobID: int(jobID),
2516 1 : Meta: f.Meta,
2517 1 : })
2518 : }
2519 1 : rs.unref()
2520 1 : d.mu.Lock()
2521 1 : defer d.mu.Unlock()
2522 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2523 1 : d.mu.tableValidation.validating = false
2524 1 : d.mu.tableValidation.cond.Broadcast()
2525 1 : if d.shouldValidateSSTablesLocked() {
2526 1 : go d.validateSSTables()
2527 1 : }
2528 : }
|