Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sort"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/cache"
17 : "github.com/cockroachdb/pebble/internal/invariants"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/internal/overlap"
20 : "github.com/cockroachdb/pebble/internal/sstableinternal"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/objstorage/remote"
23 : "github.com/cockroachdb/pebble/sstable"
24 : )
25 :
26 2 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
27 2 : c := userCmp(a.UserKey, b.UserKey)
28 2 : if c != 0 {
29 2 : return c
30 2 : }
31 2 : if a.IsExclusiveSentinel() {
32 2 : if !b.IsExclusiveSentinel() {
33 2 : return -1
34 2 : }
35 2 : } else if b.IsExclusiveSentinel() {
36 2 : return +1
37 2 : }
38 2 : return 0
39 : }
40 :
41 : // KeyRange encodes a key range in user key space. A KeyRange's Start is
42 : // inclusive while its End is exclusive.
43 : //
44 : // KeyRange is equivalent to base.UserKeyBounds with exclusive end.
45 : type KeyRange struct {
46 : Start, End []byte
47 : }
48 :
49 : // Valid returns true if the KeyRange is defined.
50 2 : func (k *KeyRange) Valid() bool {
51 2 : return k.Start != nil && k.End != nil
52 2 : }
53 :
54 : // Contains returns whether the specified key exists in the KeyRange.
55 2 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
56 2 : v := cmp(key.UserKey, k.End)
57 2 : return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
58 2 : }
59 :
60 : // UserKeyBounds returns the KeyRange as UserKeyBounds. Also implements the internal `bounded` interface.
61 2 : func (k KeyRange) UserKeyBounds() base.UserKeyBounds {
62 2 : return base.UserKeyBoundsEndExclusive(k.Start, k.End)
63 2 : }
64 :
65 : // OverlapsInternalKeyRange checks if the specified internal key range has an
66 : // overlap with the KeyRange. Note that we aren't checking for full containment
67 : // of smallest-largest within k, rather just that there's some intersection
68 : // between the two ranges.
69 2 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
70 2 : ukb := k.UserKeyBounds()
71 2 : b := base.UserKeyBoundsFromInternal(smallest, largest)
72 2 : return ukb.Overlaps(cmp, &b)
73 2 : }
74 :
75 : // Overlaps checks if the specified file has an overlap with the KeyRange.
76 : // Note that we aren't checking for full containment of m within k, rather just
77 : // that there's some intersection between m and k's bounds.
78 1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
79 1 : b := k.UserKeyBounds()
80 1 : return m.Overlaps(cmp, &b)
81 1 : }
82 :
83 : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
84 : // Note that we aren't checking for full containment of either span in the other,
85 : // just that there's a key x that is in both key ranges.
86 2 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
87 2 : return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
88 2 : }
89 :
90 2 : func ingestValidateKey(opts *Options, key *InternalKey) error {
91 2 : if key.Kind() == InternalKeyKindInvalid {
92 1 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
93 1 : key.Pretty(opts.Comparer.FormatKey))
94 1 : }
95 2 : if key.SeqNum() != 0 {
96 1 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
97 1 : key.Pretty(opts.Comparer.FormatKey))
98 1 : }
99 2 : return nil
100 : }
101 :
102 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
103 : // or shared by another node.
104 : func ingestSynthesizeShared(
105 : opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
106 2 : ) (*fileMetadata, error) {
107 2 : if sm.Size == 0 {
108 0 : // Disallow 0 file sizes
109 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
110 0 : }
111 : // Don't load table stats. Doing a round trip to shared storage, one SST
112 : // at a time is not worth it as it slows down ingestion.
113 2 : meta := &fileMetadata{
114 2 : FileNum: fileNum,
115 2 : CreationTime: time.Now().Unix(),
116 2 : Virtual: true,
117 2 : Size: sm.Size,
118 2 : }
119 2 : // For simplicity, we use the same number for both the FileNum and the
120 2 : // DiskFileNum (even though this is a virtual sstable). Pass the underlying
121 2 : // FileBacking's size to the same size as the virtualized view of the sstable.
122 2 : // This ensures that we don't over-prioritize this sstable for compaction just
123 2 : // yet, as we do not have a clear sense of what parts of this sstable are
124 2 : // referenced by other nodes.
125 2 : meta.InitProviderBacking(base.DiskFileNum(fileNum), sm.Size)
126 2 :
127 2 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
128 2 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
129 2 : //
130 2 : // NB: We create new internal keys and pass them into ExtendPointKeyBounds
131 2 : // so that we can sub a zero sequence number into the bounds. We can set
132 2 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
133 2 : // anyway. However, we do need to use the same sequence number across all
134 2 : // bound keys at this step so that we end up with bounds that are consistent
135 2 : // across point/range keys.
136 2 : //
137 2 : // Because of the sequence number rewriting, we cannot use the Kind of
138 2 : // sm.SmallestPointKey. For example, the original SST might start with
139 2 : // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
140 2 : // rewriting the sequence numbers, these keys become a.SET.100 and
141 2 : // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
142 2 : // correct bound, we just use the maximum key kind (which sorts first).
143 2 : // Similarly, we use the smallest key kind for the largest key.
144 2 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSSTable)
145 2 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
146 2 : if sm.LargestPointKey.IsExclusiveSentinel() {
147 2 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
148 2 : }
149 2 : if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
150 2 : smallestPointKey.Trailer < largestPointKey.Trailer {
151 0 : // We get kinds from the sender, however we substitute our own sequence
152 0 : // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
153 0 : // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
154 0 : // kinds need to be reversed now because DelSized > Set.
155 0 : smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
156 0 : }
157 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
158 : }
159 2 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
160 2 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
161 2 : //
162 2 : // See comment above on why we use a zero sequence number and these key
163 2 : // kinds here.
164 2 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
165 2 : largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
166 2 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
167 2 : }
168 2 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
169 0 : return nil, err
170 0 : }
171 2 : return meta, nil
172 : }
173 :
174 : // ingestLoad1External loads the fileMetadata for one external sstable.
175 : // Sequence number and target level calculation happens during prepare/apply.
176 : func ingestLoad1External(
177 : opts *Options, e ExternalFile, fileNum base.FileNum,
178 2 : ) (*fileMetadata, error) {
179 2 : if e.Size == 0 {
180 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
181 0 : }
182 2 : if !e.HasRangeKey && !e.HasPointKey {
183 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
184 0 : }
185 :
186 2 : if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
187 1 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
188 1 : }
189 2 : if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
190 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
191 0 : }
192 2 : if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
193 1 : return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
194 1 : }
195 2 : if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
196 1 : return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
197 1 : }
198 :
199 : // Don't load table stats. Doing a round trip to shared storage, one SST
200 : // at a time is not worth it as it slows down ingestion.
201 2 : meta := &fileMetadata{
202 2 : FileNum: fileNum,
203 2 : CreationTime: time.Now().Unix(),
204 2 : Virtual: true,
205 2 : Size: e.Size,
206 2 : }
207 2 :
208 2 : // In the name of keeping this ingestion as fast as possible, we avoid
209 2 : // *all* existence checks and synthesize a file metadata with smallest/largest
210 2 : // keys that overlap whatever the passed-in span was.
211 2 : smallestCopy := slices.Clone(e.StartKey)
212 2 : largestCopy := slices.Clone(e.EndKey)
213 2 : if e.HasPointKey {
214 2 : // Sequence numbers are updated later by
215 2 : // ingestUpdateSeqNum, applying a squence number that
216 2 : // is applied to all keys in the sstable.
217 2 : if e.EndKeyIsInclusive {
218 1 : meta.ExtendPointKeyBounds(
219 1 : opts.Comparer.Compare,
220 1 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
221 1 : base.MakeInternalKey(largestCopy, 0, 0))
222 2 : } else {
223 2 : meta.ExtendPointKeyBounds(
224 2 : opts.Comparer.Compare,
225 2 : base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSSTable),
226 2 : base.MakeRangeDeleteSentinelKey(largestCopy))
227 2 : }
228 : }
229 2 : if e.HasRangeKey {
230 2 : meta.ExtendRangeKeyBounds(
231 2 : opts.Comparer.Compare,
232 2 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
233 2 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
234 2 : )
235 2 : }
236 :
237 2 : meta.SyntheticPrefixAndSuffix = sstable.MakeSyntheticPrefixAndSuffix(e.SyntheticPrefix, e.SyntheticSuffix)
238 2 :
239 2 : return meta, nil
240 : }
241 :
242 : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
243 : // by this store.
244 : func ingestLoad1(
245 : ctx context.Context,
246 : opts *Options,
247 : fmv FormatMajorVersion,
248 : readable objstorage.Readable,
249 : cacheID cache.ID,
250 : fileNum base.FileNum,
251 2 : ) (*fileMetadata, error) {
252 2 : o := opts.MakeReaderOptions()
253 2 : o.SetInternalCacheOpts(sstableinternal.CacheOptions{
254 2 : Cache: opts.Cache,
255 2 : CacheID: cacheID,
256 2 : FileNum: base.PhysicalTableDiskFileNum(fileNum),
257 2 : })
258 2 : r, err := sstable.NewReader(ctx, readable, o)
259 2 : if err != nil {
260 1 : return nil, err
261 1 : }
262 2 : defer r.Close()
263 2 :
264 2 : // Avoid ingesting tables with format versions this DB doesn't support.
265 2 : tf, err := r.TableFormat()
266 2 : if err != nil {
267 0 : return nil, err
268 0 : }
269 2 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
270 1 : return nil, errors.Newf(
271 1 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
272 1 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
273 1 : )
274 1 : }
275 2 : if tf.BlockColumnar() {
276 2 : if _, ok := opts.KeySchemas[r.Properties.KeySchemaName]; !ok {
277 0 : return nil, errors.Newf(
278 0 : "pebble: table uses key schema %q unknown to the database",
279 0 : r.Properties.KeySchemaName)
280 0 : }
281 : }
282 :
283 2 : meta := &fileMetadata{}
284 2 : meta.FileNum = fileNum
285 2 : meta.Size = uint64(readable.Size())
286 2 : meta.CreationTime = time.Now().Unix()
287 2 : meta.InitPhysicalBacking()
288 2 :
289 2 : // Avoid loading into the table cache for collecting stats if we
290 2 : // don't need to. If there are no range deletions, we have all the
291 2 : // information to compute the stats here.
292 2 : //
293 2 : // This is helpful in tests for avoiding awkwardness around deletion of
294 2 : // ingested files from MemFS. MemFS implements the Windows semantics of
295 2 : // disallowing removal of an open file. Under MemFS, if we don't populate
296 2 : // meta.Stats here, the file will be loaded into the table cache for
297 2 : // calculating stats before we can remove the original link.
298 2 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
299 2 :
300 2 : {
301 2 : iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
302 2 : if err != nil {
303 1 : return nil, err
304 1 : }
305 2 : defer iter.Close()
306 2 : var smallest InternalKey
307 2 : if kv := iter.First(); kv != nil {
308 2 : if err := ingestValidateKey(opts, &kv.K); err != nil {
309 1 : return nil, err
310 1 : }
311 2 : smallest = kv.K.Clone()
312 : }
313 2 : if err := iter.Error(); err != nil {
314 1 : return nil, err
315 1 : }
316 2 : if kv := iter.Last(); kv != nil {
317 2 : if err := ingestValidateKey(opts, &kv.K); err != nil {
318 0 : return nil, err
319 0 : }
320 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
321 : }
322 2 : if err := iter.Error(); err != nil {
323 1 : return nil, err
324 1 : }
325 : }
326 :
327 2 : iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms)
328 2 : if err != nil {
329 0 : return nil, err
330 0 : }
331 2 : if iter != nil {
332 2 : defer iter.Close()
333 2 : var smallest InternalKey
334 2 : if s, err := iter.First(); err != nil {
335 0 : return nil, err
336 2 : } else if s != nil {
337 2 : key := s.SmallestKey()
338 2 : if err := ingestValidateKey(opts, &key); err != nil {
339 0 : return nil, err
340 0 : }
341 2 : smallest = key.Clone()
342 : }
343 2 : if s, err := iter.Last(); err != nil {
344 0 : return nil, err
345 2 : } else if s != nil {
346 2 : k := s.SmallestKey()
347 2 : if err := ingestValidateKey(opts, &k); err != nil {
348 0 : return nil, err
349 0 : }
350 2 : largest := s.LargestKey().Clone()
351 2 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
352 : }
353 : }
354 :
355 : // Update the range-key bounds for the table.
356 2 : {
357 2 : iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms)
358 2 : if err != nil {
359 0 : return nil, err
360 0 : }
361 2 : if iter != nil {
362 2 : defer iter.Close()
363 2 : var smallest InternalKey
364 2 : if s, err := iter.First(); err != nil {
365 0 : return nil, err
366 2 : } else if s != nil {
367 2 : key := s.SmallestKey()
368 2 : if err := ingestValidateKey(opts, &key); err != nil {
369 0 : return nil, err
370 0 : }
371 2 : smallest = key.Clone()
372 : }
373 2 : if s, err := iter.Last(); err != nil {
374 0 : return nil, err
375 2 : } else if s != nil {
376 2 : k := s.SmallestKey()
377 2 : if err := ingestValidateKey(opts, &k); err != nil {
378 0 : return nil, err
379 0 : }
380 : // As range keys are fragmented, the end key of the last range key in
381 : // the table provides the upper bound for the table.
382 2 : largest := s.LargestKey().Clone()
383 2 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
384 : }
385 : }
386 : }
387 :
388 2 : if !meta.HasPointKeys && !meta.HasRangeKeys {
389 2 : return nil, nil
390 2 : }
391 :
392 : // Sanity check that the various bounds on the file were set consistently.
393 2 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
394 0 : return nil, err
395 0 : }
396 :
397 2 : return meta, nil
398 : }
399 :
400 : type ingestLoadResult struct {
401 : local []ingestLocalMeta
402 : shared []ingestSharedMeta
403 : external []ingestExternalMeta
404 :
405 : externalFilesHaveLevel bool
406 : }
407 :
408 : type ingestLocalMeta struct {
409 : *fileMetadata
410 : path string
411 : }
412 :
413 : type ingestSharedMeta struct {
414 : *fileMetadata
415 : shared SharedSSTMeta
416 : }
417 :
418 : type ingestExternalMeta struct {
419 : *fileMetadata
420 : external ExternalFile
421 : // usedExistingBacking is true if the external file is reusing a backing
422 : // that existed before this ingestion. In this case, we called
423 : // VirtualBackings.Protect() on that backing; we will need to call
424 : // Unprotect() after the ingestion.
425 : usedExistingBacking bool
426 : }
427 :
428 2 : func (r *ingestLoadResult) fileCount() int {
429 2 : return len(r.local) + len(r.shared) + len(r.external)
430 2 : }
431 :
432 : func ingestLoad(
433 : ctx context.Context,
434 : opts *Options,
435 : fmv FormatMajorVersion,
436 : paths []string,
437 : shared []SharedSSTMeta,
438 : external []ExternalFile,
439 : cacheID cache.ID,
440 : pending []base.FileNum,
441 2 : ) (ingestLoadResult, error) {
442 2 : localFileNums := pending[:len(paths)]
443 2 : sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
444 2 : externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
445 2 :
446 2 : var result ingestLoadResult
447 2 : result.local = make([]ingestLocalMeta, 0, len(paths))
448 2 : for i := range paths {
449 2 : f, err := opts.FS.Open(paths[i])
450 2 : if err != nil {
451 1 : return ingestLoadResult{}, err
452 1 : }
453 :
454 2 : readable, err := sstable.NewSimpleReadable(f)
455 2 : if err != nil {
456 1 : return ingestLoadResult{}, err
457 1 : }
458 2 : m, err := ingestLoad1(ctx, opts, fmv, readable, cacheID, localFileNums[i])
459 2 : if err != nil {
460 1 : return ingestLoadResult{}, err
461 1 : }
462 2 : if m != nil {
463 2 : result.local = append(result.local, ingestLocalMeta{
464 2 : fileMetadata: m,
465 2 : path: paths[i],
466 2 : })
467 2 : }
468 : }
469 :
470 : // Sort the shared files according to level.
471 2 : sort.Sort(sharedByLevel(shared))
472 2 :
473 2 : result.shared = make([]ingestSharedMeta, 0, len(shared))
474 2 : for i := range shared {
475 2 : m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
476 2 : if err != nil {
477 0 : return ingestLoadResult{}, err
478 0 : }
479 2 : if shared[i].Level < sharedLevelsStart {
480 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
481 0 : }
482 2 : result.shared = append(result.shared, ingestSharedMeta{
483 2 : fileMetadata: m,
484 2 : shared: shared[i],
485 2 : })
486 : }
487 2 : result.external = make([]ingestExternalMeta, 0, len(external))
488 2 : for i := range external {
489 2 : m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
490 2 : if err != nil {
491 1 : return ingestLoadResult{}, err
492 1 : }
493 2 : result.external = append(result.external, ingestExternalMeta{
494 2 : fileMetadata: m,
495 2 : external: external[i],
496 2 : })
497 2 : if external[i].Level > 0 {
498 1 : if i != 0 && !result.externalFilesHaveLevel {
499 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
500 0 : }
501 1 : result.externalFilesHaveLevel = true
502 2 : } else if result.externalFilesHaveLevel {
503 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
504 0 : }
505 : }
506 2 : return result, nil
507 : }
508 :
509 2 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
510 2 : // Verify that all the shared files (i.e. files in sharedMeta)
511 2 : // fit within the exciseSpan.
512 2 : for _, f := range lr.shared {
513 2 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
514 0 : return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
515 0 : }
516 : }
517 :
518 2 : if lr.externalFilesHaveLevel {
519 1 : for _, f := range lr.external {
520 1 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
521 0 : return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
522 0 : }
523 : }
524 : }
525 :
526 2 : if len(lr.external) > 0 {
527 2 : if len(lr.shared) > 0 {
528 0 : // If external files are present alongside shared files,
529 0 : // return an error.
530 0 : return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
531 0 : }
532 :
533 : // Sort according to the smallest key.
534 2 : slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
535 2 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
536 2 : })
537 2 : for i := 1; i < len(lr.external); i++ {
538 2 : if sstableKeyCompare(cmp, lr.external[i-1].Largest, lr.external[i].Smallest) >= 0 {
539 1 : return errors.Newf("pebble: external sstables have overlapping ranges")
540 1 : }
541 : }
542 2 : return nil
543 : }
544 2 : if len(lr.local) <= 1 {
545 2 : return nil
546 2 : }
547 :
548 : // Sort according to the smallest key.
549 2 : slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
550 2 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
551 2 : })
552 :
553 2 : for i := 1; i < len(lr.local); i++ {
554 2 : if sstableKeyCompare(cmp, lr.local[i-1].Largest, lr.local[i].Smallest) >= 0 {
555 2 : return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
556 2 : }
557 : }
558 2 : if len(lr.shared) == 0 {
559 2 : return nil
560 2 : }
561 0 : filesInLevel := make([]*fileMetadata, 0, len(lr.shared))
562 0 : for l := sharedLevelsStart; l < numLevels; l++ {
563 0 : filesInLevel = filesInLevel[:0]
564 0 : for i := range lr.shared {
565 0 : if lr.shared[i].shared.Level == uint8(l) {
566 0 : filesInLevel = append(filesInLevel, lr.shared[i].fileMetadata)
567 0 : }
568 : }
569 0 : for i := range lr.external {
570 0 : if lr.external[i].external.Level == uint8(l) {
571 0 : filesInLevel = append(filesInLevel, lr.external[i].fileMetadata)
572 0 : }
573 : }
574 0 : slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
575 0 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
576 0 : })
577 0 : for i := 1; i < len(filesInLevel); i++ {
578 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
579 0 : return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
580 0 : }
581 : }
582 : }
583 0 : return nil
584 : }
585 :
586 1 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
587 1 : var firstErr error
588 1 : for i := range meta {
589 1 : if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
590 1 : firstErr = firstError(firstErr, err)
591 1 : }
592 : }
593 1 : return firstErr
594 : }
595 :
596 : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
597 : // copies of the ingested files.
598 : func ingestLinkLocal(
599 : ctx context.Context,
600 : jobID JobID,
601 : opts *Options,
602 : objProvider objstorage.Provider,
603 : localMetas []ingestLocalMeta,
604 2 : ) error {
605 2 : for i := range localMetas {
606 2 : objMeta, err := objProvider.LinkOrCopyFromLocal(
607 2 : ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
608 2 : objstorage.CreateOptions{PreferSharedStorage: true},
609 2 : )
610 2 : if err != nil {
611 1 : if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
612 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
613 0 : }
614 1 : return err
615 : }
616 2 : if opts.EventListener.TableCreated != nil {
617 2 : opts.EventListener.TableCreated(TableCreateInfo{
618 2 : JobID: int(jobID),
619 2 : Reason: "ingesting",
620 2 : Path: objProvider.Path(objMeta),
621 2 : FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
622 2 : })
623 2 : }
624 : }
625 2 : return nil
626 : }
627 :
628 : // ingestAttachRemote attaches remote objects to the storage provider.
629 : //
630 : // For external objects, we reuse existing FileBackings from the current version
631 : // when possible.
632 : //
633 : // ingestUnprotectExternalBackings() must be called after this function (even in
634 : // error cases).
635 2 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
636 2 : remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
637 2 : for i := range lr.shared {
638 2 : backing, err := lr.shared[i].shared.Backing.Get()
639 2 : if err != nil {
640 0 : return err
641 0 : }
642 2 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
643 2 : FileNum: lr.shared[i].FileBacking.DiskFileNum,
644 2 : FileType: fileTypeTable,
645 2 : Backing: backing,
646 2 : })
647 : }
648 :
649 2 : d.findExistingBackingsForExternalObjects(lr.external)
650 2 :
651 2 : newFileBackings := make(map[remote.ObjectKey]*fileBacking, len(lr.external))
652 2 : for i := range lr.external {
653 2 : meta := lr.external[i].fileMetadata
654 2 : if meta.FileBacking != nil {
655 2 : // The backing was filled in by findExistingBackingsForExternalObjects().
656 2 : continue
657 : }
658 2 : key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
659 2 : if backing, ok := newFileBackings[key]; ok {
660 2 : // We already created the same backing in this loop.
661 2 : meta.FileBacking = backing
662 2 : continue
663 : }
664 2 : providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
665 2 : if err != nil {
666 0 : return err
667 0 : }
668 : // We have to attach the remote object (and assign it a DiskFileNum). For
669 : // simplicity, we use the same number for both the FileNum and the
670 : // DiskFileNum (even though this is a virtual sstable).
671 2 : meta.InitProviderBacking(base.DiskFileNum(meta.FileNum), lr.external[i].external.Size)
672 2 :
673 2 : // Set the underlying FileBacking's size to the same size as the virtualized
674 2 : // view of the sstable. This ensures that we don't over-prioritize this
675 2 : // sstable for compaction just yet, as we do not have a clear sense of
676 2 : // what parts of this sstable are referenced by other nodes.
677 2 : meta.FileBacking.Size = lr.external[i].external.Size
678 2 : newFileBackings[key] = meta.FileBacking
679 2 :
680 2 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
681 2 : FileNum: meta.FileBacking.DiskFileNum,
682 2 : FileType: fileTypeTable,
683 2 : Backing: providerBacking,
684 2 : })
685 : }
686 :
687 2 : for i := range lr.external {
688 2 : if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
689 0 : return err
690 0 : }
691 : }
692 :
693 2 : remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
694 2 : if err != nil {
695 0 : return err
696 0 : }
697 :
698 2 : for i := range lr.shared {
699 2 : // One corner case around file sizes we need to be mindful of, is that
700 2 : // if one of the shareObjs was initially created by us (and has boomeranged
701 2 : // back from another node), we'll need to update the FileBacking's size
702 2 : // to be the true underlying size. Otherwise, we could hit errors when we
703 2 : // open the db again after a crash/restart (see checkConsistency in open.go),
704 2 : // plus it more accurately allows us to prioritize compactions of files
705 2 : // that were originally created by us.
706 2 : if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
707 2 : size, err := d.objProvider.Size(remoteObjMetas[i])
708 2 : if err != nil {
709 0 : return err
710 0 : }
711 2 : lr.shared[i].FileBacking.Size = uint64(size)
712 : }
713 : }
714 :
715 2 : if d.opts.EventListener.TableCreated != nil {
716 2 : for i := range remoteObjMetas {
717 2 : d.opts.EventListener.TableCreated(TableCreateInfo{
718 2 : JobID: int(jobID),
719 2 : Reason: "ingesting",
720 2 : Path: d.objProvider.Path(remoteObjMetas[i]),
721 2 : FileNum: remoteObjMetas[i].DiskFileNum,
722 2 : })
723 2 : }
724 : }
725 :
726 2 : return nil
727 : }
728 :
729 : // findExistingBackingsForExternalObjects populates the FileBacking for external
730 : // files which are already in use by the current version.
731 : //
732 : // We take a Ref and LatestRef on populated backings.
733 2 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
734 2 : d.mu.Lock()
735 2 : defer d.mu.Unlock()
736 2 :
737 2 : for i := range metas {
738 2 : diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
739 2 : // We cross-check against fileBackings in the current version because it is
740 2 : // possible that the external object is referenced by an sstable which only
741 2 : // exists in a previous version. In that case, that object could be removed
742 2 : // at any time so we cannot reuse it.
743 2 : for _, n := range diskFileNums {
744 2 : if backing, ok := d.mu.versions.virtualBackings.Get(n); ok {
745 2 : // Protect this backing from being removed from the latest version. We
746 2 : // will unprotect in ingestUnprotectExternalBackings.
747 2 : d.mu.versions.virtualBackings.Protect(n)
748 2 : metas[i].usedExistingBacking = true
749 2 : metas[i].FileBacking = backing
750 2 : break
751 : }
752 : }
753 : }
754 : }
755 :
756 : // ingestUnprotectExternalBackings unprotects the file backings that were reused
757 : // for external objects when the ingestion fails.
758 2 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
759 2 : d.mu.Lock()
760 2 : defer d.mu.Unlock()
761 2 :
762 2 : for _, meta := range lr.external {
763 2 : if meta.usedExistingBacking {
764 2 : // If the backing is not use anywhere else and the ingest failed (or the
765 2 : // ingested tables were already compacted away), this call will cause in
766 2 : // the next version update to remove the backing.
767 2 : d.mu.versions.virtualBackings.Unprotect(meta.FileBacking.DiskFileNum)
768 2 : }
769 : }
770 : }
771 :
772 : func setSeqNumInMetadata(
773 : m *fileMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey,
774 2 : ) error {
775 2 : setSeqFn := func(k base.InternalKey) base.InternalKey {
776 2 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
777 2 : }
778 : // NB: we set the fields directly here, rather than via their Extend*
779 : // methods, as we are updating sequence numbers.
780 2 : if m.HasPointKeys {
781 2 : m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
782 2 : }
783 2 : if m.HasRangeKeys {
784 2 : m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
785 2 : }
786 2 : m.Smallest = setSeqFn(m.Smallest)
787 2 : // Only update the seqnum for the largest key if that key is not an
788 2 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
789 2 : // boundary), as doing so effectively drops the exclusive sentinel (by
790 2 : // lowering the seqnum from the max value), and extends the bounds of the
791 2 : // table.
792 2 : // NB: as the largest range key is always an exclusive sentinel, it is never
793 2 : // updated.
794 2 : if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
795 2 : m.LargestPointKey = setSeqFn(m.LargestPointKey)
796 2 : }
797 2 : if !m.Largest.IsExclusiveSentinel() {
798 2 : m.Largest = setSeqFn(m.Largest)
799 2 : }
800 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
801 : // Properties.GlobalSeqNum when an sstable is loaded.
802 2 : m.SmallestSeqNum = seqNum
803 2 : m.LargestSeqNum = seqNum
804 2 : m.LargestSeqNumAbsolute = seqNum
805 2 : // Ensure the new bounds are consistent.
806 2 : if err := m.Validate(cmp, format); err != nil {
807 0 : return err
808 0 : }
809 2 : return nil
810 : }
811 :
812 : func ingestUpdateSeqNum(
813 : cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult,
814 2 : ) error {
815 2 : // Shared sstables are required to be sorted by level ascending. We then
816 2 : // iterate the shared sstables in reverse, assigning the lower sequence
817 2 : // numbers to the shared sstables that will be ingested into the lower
818 2 : // (larger numbered) levels first. This ensures sequence number shadowing is
819 2 : // correct.
820 2 : for i := len(loadResult.shared) - 1; i >= 0; i-- {
821 2 : if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
822 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
823 : }
824 2 : if err := setSeqNumInMetadata(loadResult.shared[i].fileMetadata, seqNum, cmp, format); err != nil {
825 0 : return err
826 0 : }
827 2 : seqNum++
828 : }
829 2 : for i := range loadResult.external {
830 2 : if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
831 0 : return err
832 0 : }
833 2 : seqNum++
834 : }
835 2 : for i := range loadResult.local {
836 2 : if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
837 0 : return err
838 0 : }
839 2 : seqNum++
840 : }
841 2 : return nil
842 : }
843 :
844 : // ingestTargetLevel returns the target level for a file being ingested.
845 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
846 : // its target level calculation, and if a split candidate is found, that file
847 : // is returned as the splitFile.
848 : func ingestTargetLevel(
849 : ctx context.Context,
850 : cmp base.Compare,
851 : lsmOverlap overlap.WithLSM,
852 : baseLevel int,
853 : compactions map[*compaction]struct{},
854 : meta *fileMetadata,
855 : suggestSplit bool,
856 2 : ) (targetLevel int, splitFile *fileMetadata, err error) {
857 2 : // Find the lowest level which does not have any files which overlap meta. We
858 2 : // search from L0 to L6 looking for whether there are any files in the level
859 2 : // which overlap meta. We want the "lowest" level (where lower means
860 2 : // increasing level number) in order to reduce write amplification.
861 2 : //
862 2 : // There are 2 kinds of overlap we need to check for: file boundary overlap
863 2 : // and data overlap. Data overlap implies file boundary overlap. Note that it
864 2 : // is always possible to ingest into L0.
865 2 : //
866 2 : // To place meta at level i where i > 0:
867 2 : // - there must not be any data overlap with levels <= i, since that will
868 2 : // violate the sequence number invariant.
869 2 : // - no file boundary overlap with level i, since that will violate the
870 2 : // invariant that files do not overlap in levels i > 0.
871 2 : // - if there is only a file overlap at a given level, and no data overlap,
872 2 : // we can still slot a file at that level. We return the fileMetadata with
873 2 : // which we have file boundary overlap (must be only one file, as sstable
874 2 : // bounds are usually tight on user keys) and the caller is expected to split
875 2 : // that sstable into two virtual sstables, allowing this file to go into that
876 2 : // level. Note that if we have file boundary overlap with two files, which
877 2 : // should only happen on rare occasions, we treat it as data overlap and
878 2 : // don't use this optimization.
879 2 : //
880 2 : // The file boundary overlap check is simpler to conceptualize. Consider the
881 2 : // following example, in which the ingested file lies completely before or
882 2 : // after the file being considered.
883 2 : //
884 2 : // |--| |--| ingested file: [a,b] or [f,g]
885 2 : // |-----| existing file: [c,e]
886 2 : // _____________________
887 2 : // a b c d e f g
888 2 : //
889 2 : // In both cases the ingested file can move to considering the next level.
890 2 : //
891 2 : // File boundary overlap does not necessarily imply data overlap. The check
892 2 : // for data overlap is a little more nuanced. Consider the following examples:
893 2 : //
894 2 : // 1. No data overlap:
895 2 : //
896 2 : // |-| |--| ingested file: [cc-d] or [ee-ff]
897 2 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
898 2 : // _____________________
899 2 : // a b c d e f g
900 2 : //
901 2 : // In this case the ingested files can "fall through" this level. The checks
902 2 : // continue at the next level.
903 2 : //
904 2 : // 2. Data overlap:
905 2 : //
906 2 : // |--| ingested file: [d-e]
907 2 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
908 2 : // _____________________
909 2 : // a b c d e f g
910 2 : //
911 2 : // In this case the file cannot be ingested into this level as the point 'dd'
912 2 : // is in the way.
913 2 : //
914 2 : // It is worth noting that the check for data overlap is only approximate. In
915 2 : // the previous example, the ingested table [d-e] could contain only the
916 2 : // points 'd' and 'e', in which case the table would be eligible for
917 2 : // considering lower levels. However, such a fine-grained check would need to
918 2 : // be exhaustive (comparing points and ranges in both the ingested existing
919 2 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
920 2 : // existing point that falls within the ingested table bounds as being "data
921 2 : // overlap".
922 2 :
923 2 : if lsmOverlap[0].Result == overlap.Data {
924 2 : return 0, nil, nil
925 2 : }
926 2 : targetLevel = 0
927 2 : splitFile = nil
928 2 : for level := baseLevel; level < numLevels; level++ {
929 2 : var candidateSplitFile *fileMetadata
930 2 : switch lsmOverlap[level].Result {
931 2 : case overlap.Data:
932 2 : // We cannot ingest into or under this level; return the best target level
933 2 : // so far.
934 2 : return targetLevel, splitFile, nil
935 :
936 2 : case overlap.OnlyBoundary:
937 2 : if !suggestSplit || lsmOverlap[level].SplitFile == nil {
938 2 : // We can ingest under this level, but not into this level.
939 2 : continue
940 : }
941 : // We can ingest into this level if we split this file.
942 2 : candidateSplitFile = lsmOverlap[level].SplitFile
943 :
944 2 : case overlap.None:
945 : // We can ingest into this level.
946 :
947 0 : default:
948 0 : return 0, nil, base.AssertionFailedf("unexpected WithLevel.Result: %v", lsmOverlap[level].Result)
949 : }
950 :
951 : // Check boundary overlap with any ongoing compactions. We consider an
952 : // overlapping compaction that's writing files to an output level as
953 : // equivalent to boundary overlap with files in that output level.
954 : //
955 : // We cannot check for data overlap with the new SSTs compaction will produce
956 : // since compaction hasn't been done yet. However, there's no need to check
957 : // since all keys in them will be from levels in [c.startLevel,
958 : // c.outputLevel], and all those levels have already had their data overlap
959 : // tested negative (else we'd have returned earlier).
960 : //
961 : // An alternative approach would be to cancel these compactions and proceed
962 : // with an ingest-time split on this level if necessary. However, compaction
963 : // cancellation can result in significant wasted effort and is best avoided
964 : // unless necessary.
965 2 : overlaps := false
966 2 : for c := range compactions {
967 2 : if c.outputLevel == nil || level != c.outputLevel.level {
968 2 : continue
969 : }
970 2 : if cmp(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
971 2 : cmp(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
972 2 : overlaps = true
973 2 : break
974 : }
975 : }
976 2 : if !overlaps {
977 2 : targetLevel = level
978 2 : splitFile = candidateSplitFile
979 2 : }
980 : }
981 2 : return targetLevel, splitFile, nil
982 : }
983 :
984 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
985 : // atomic and semantically equivalent to creating a single batch containing all
986 : // of the mutations in the sstables. Ingestion may require the memtable to be
987 : // flushed. The ingested sstable files are moved into the DB and must reside on
988 : // the same filesystem as the DB. Sstables can be created for ingestion using
989 : // sstable.Writer. On success, Ingest removes the input paths.
990 : //
991 : // Two types of sstables are accepted for ingestion(s): one is sstables present
992 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
993 : // present in remote.Storage, referred to as shared or foreign sstables. These
994 : // shared sstables can be linked through objstorageprovider.Provider, and do not
995 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
996 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
997 : //
998 : // All sstables *must* be Sync()'d by the caller after all bytes are written
999 : // and before its file handle is closed; failure to do so could violate
1000 : // durability or lead to corrupted on-disk state. This method cannot, in a
1001 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1002 : // properly synced to disk. Opening new file handles and Sync()-ing them
1003 : // does not always guarantee durability; see the discussion here on that:
1004 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1005 : //
1006 : // Ingestion loads each sstable into the lowest level of the LSM which it
1007 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1008 : // ingestion forces the memtable to flush, and then waits for the flush to
1009 : // occur. In some cases, such as with no foreign sstables and no excise span,
1010 : // ingestion that gets blocked on a memtable can join the flushable queue and
1011 : // finish even before the memtable has been flushed.
1012 : //
1013 : // The steps for ingestion are:
1014 : //
1015 : // 1. Allocate file numbers for every sstable being ingested.
1016 : // 2. Load the metadata for all sstables being ingested.
1017 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1018 : // sstables).
1019 : // 4. Hard link (or copy) the local sstables into the DB directory.
1020 : // 5. Allocate a sequence number to use for all of the entries in the
1021 : // local sstables. This is the step where overlap with memtables is
1022 : // determined. If there is overlap, we remember the most recent memtable
1023 : // that overlaps.
1024 : // 6. Update the sequence number in the ingested local sstables. (Remote
1025 : // sstables get fixed sequence numbers that were determined at load time.)
1026 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1027 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1028 : // 8.1. If an excise span was specified, figure out what sstables in the
1029 : // current version overlap with the excise span, and create new virtual
1030 : // sstables out of those sstables that exclude the excised span (DB.excise).
1031 : // 9. Publish the ingestion sequence number.
1032 : //
1033 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1034 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1035 : // mutations that get sequence numbers larger than the ingestion sequence
1036 : // number get queued up behind the ingestion waiting for it to complete. This
1037 : // can produce a noticeable hiccup in performance. See
1038 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1039 : // this hiccup.
1040 2 : func (d *DB) Ingest(ctx context.Context, paths []string) error {
1041 2 : if err := d.closed.Load(); err != nil {
1042 1 : panic(err)
1043 : }
1044 2 : if d.opts.ReadOnly {
1045 1 : return ErrReadOnly
1046 1 : }
1047 2 : _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, nil /* external */)
1048 2 : return err
1049 : }
1050 :
1051 : // IngestOperationStats provides some information about where in the LSM the
1052 : // bytes were ingested.
1053 : type IngestOperationStats struct {
1054 : // Bytes is the total bytes in the ingested sstables.
1055 : Bytes uint64
1056 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1057 : // into L0. This value is approximate when flushable ingests are active and
1058 : // an ingest overlaps an entry in the flushable queue. Currently, this
1059 : // approximation is very rough, only including tables that overlapped the
1060 : // memtable. This estimate may be improved with #2112.
1061 : ApproxIngestedIntoL0Bytes uint64
1062 : // MemtableOverlappingFiles is the count of ingested sstables
1063 : // that overlapped keys in the memtables.
1064 : MemtableOverlappingFiles int
1065 : }
1066 :
1067 : // ExternalFile are external sstables that can be referenced through
1068 : // objprovider and ingested as remote files that will not be refcounted or
1069 : // cleaned up. For use with online restore. Note that the underlying sstable
1070 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1071 : // is expected to only read the keys within those bounds.
1072 : type ExternalFile struct {
1073 : // Locator is the shared.Locator that can be used with objProvider to
1074 : // resolve a reference to this external sstable.
1075 : Locator remote.Locator
1076 :
1077 : // ObjName is the unique name of this sstable on Locator.
1078 : ObjName string
1079 :
1080 : // Size of the referenced proportion of the virtualized sstable. An estimate
1081 : // is acceptable in lieu of the backing file size.
1082 : Size uint64
1083 :
1084 : // StartKey and EndKey define the bounds of the sstable; the ingestion
1085 : // of this file will only result in keys within [StartKey, EndKey) if
1086 : // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
1087 : // These bounds are loose i.e. it's possible for keys to not span the
1088 : // entirety of this range.
1089 : //
1090 : // StartKey and EndKey user keys must not have suffixes.
1091 : //
1092 : // Multiple ExternalFiles in one ingestion must all have non-overlapping
1093 : // bounds.
1094 : StartKey, EndKey []byte
1095 :
1096 : // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
1097 : EndKeyIsInclusive bool
1098 :
1099 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1100 : // or range keys. If both structs are false, an error is returned during
1101 : // ingestion.
1102 : HasPointKey, HasRangeKey bool
1103 :
1104 : // SyntheticPrefix will prepend this suffix to all keys in the file during
1105 : // iteration. Note that the backing file itself is not modified.
1106 : //
1107 : // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
1108 : SyntheticPrefix []byte
1109 :
1110 : // SyntheticSuffix will replace the suffix of every key in the file during
1111 : // iteration. Note that the file itself is not modified, rather, every key
1112 : // returned by an iterator will have the synthetic suffix.
1113 : //
1114 : // SyntheticSuffix can only be used under the following conditions:
1115 : // - the synthetic suffix must sort before any non-empty suffixes in the
1116 : // backing sst (the entire sst, not just the part restricted to Bounds).
1117 : // - the backing sst must not contain multiple keys with the same prefix.
1118 : SyntheticSuffix []byte
1119 :
1120 : // Level denotes the level at which this file was present at read time
1121 : // if the external file was returned by a scan of an existing Pebble
1122 : // instance. If Level is 0, this field is ignored.
1123 : Level uint8
1124 : }
1125 :
1126 : // IngestWithStats does the same as Ingest, and additionally returns
1127 : // IngestOperationStats.
1128 1 : func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
1129 1 : if err := d.closed.Load(); err != nil {
1130 0 : panic(err)
1131 : }
1132 1 : if d.opts.ReadOnly {
1133 0 : return IngestOperationStats{}, ErrReadOnly
1134 0 : }
1135 1 : return d.ingest(ctx, paths, nil, KeyRange{}, nil)
1136 : }
1137 :
1138 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1139 : // accepts external files (with locator info that can be resolved using
1140 : // d.opts.SharedStorage). These files must also be non-overlapping with
1141 : // each other, and must be resolvable through d.objProvider.
1142 : func (d *DB) IngestExternalFiles(
1143 : ctx context.Context, external []ExternalFile,
1144 2 : ) (IngestOperationStats, error) {
1145 2 : if err := d.closed.Load(); err != nil {
1146 0 : panic(err)
1147 : }
1148 :
1149 2 : if d.opts.ReadOnly {
1150 0 : return IngestOperationStats{}, ErrReadOnly
1151 0 : }
1152 2 : if d.opts.Experimental.RemoteStorage == nil {
1153 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1154 0 : }
1155 2 : return d.ingest(ctx, nil, nil, KeyRange{}, external)
1156 : }
1157 :
1158 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1159 : // list of shared files to ingest that can be read from a remote.Storage through
1160 : // a Provider. All the shared files must live within exciseSpan, and any existing
1161 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1162 : // sstables (if not virtual already) and shrinking their spans to exclude
1163 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1164 : // ingestion process.
1165 : //
1166 : // Panics if this DB instance was not instantiated with a remote.Storage and
1167 : // shared sstables are present.
1168 : func (d *DB) IngestAndExcise(
1169 : ctx context.Context,
1170 : paths []string,
1171 : shared []SharedSSTMeta,
1172 : external []ExternalFile,
1173 : exciseSpan KeyRange,
1174 2 : ) (IngestOperationStats, error) {
1175 2 : if err := d.closed.Load(); err != nil {
1176 0 : panic(err)
1177 : }
1178 2 : if d.opts.ReadOnly {
1179 0 : return IngestOperationStats{}, ErrReadOnly
1180 0 : }
1181 2 : if invariants.Enabled {
1182 2 : // Excise is only supported on prefix keys.
1183 2 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1184 0 : panic("IngestAndExcise called with suffixed start key")
1185 : }
1186 2 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1187 0 : panic("IngestAndExcise called with suffixed end key")
1188 : }
1189 : }
1190 2 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1191 0 : return IngestOperationStats{}, errors.Errorf(
1192 0 : "store has format major version %d; IngestAndExcise requires at least %d",
1193 0 : v, FormatMinForSharedObjects,
1194 0 : )
1195 0 : }
1196 2 : return d.ingest(ctx, paths, shared, exciseSpan, external)
1197 : }
1198 :
1199 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1200 : func (d *DB) newIngestedFlushableEntry(
1201 : meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange,
1202 2 : ) (*flushableEntry, error) {
1203 2 : // If there's an excise being done atomically with the same ingest, we
1204 2 : // assign the lowest sequence number in the set of sequence numbers for this
1205 2 : // ingestion to the excise. Note that we've already allocated fileCount+1
1206 2 : // sequence numbers in this case.
1207 2 : //
1208 2 : // This mimics the behaviour in the non-flushable ingest case (see the callsite
1209 2 : // for ingestUpdateSeqNum).
1210 2 : fileSeqNumStart := seqNum
1211 2 : if exciseSpan.Valid() {
1212 2 : fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise.
1213 2 : }
1214 : // Update the sequence number for all of the sstables in the
1215 : // metadata. Writing the metadata to the manifest when the
1216 : // version edit is applied is the mechanism that persists the
1217 : // sequence number. The sstables themselves are left unmodified.
1218 : // In this case, a version edit will only be written to the manifest
1219 : // when the flushable is eventually flushed. If Pebble restarts in that
1220 : // time, then we'll lose the ingest sequence number information. But this
1221 : // information will also be reconstructed on node restart.
1222 2 : for i, m := range meta {
1223 2 : if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
1224 0 : return nil, err
1225 0 : }
1226 : }
1227 :
1228 2 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum)
1229 2 :
1230 2 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1231 2 : // to and the sequence number within the WAL which we'll write this entry
1232 2 : // to.
1233 2 : entry := d.newFlushableEntry(f, logNum, seqNum)
1234 2 : // The flushable entry starts off with a single reader ref, so increment
1235 2 : // the FileMetadata.Refs.
1236 2 : for _, file := range f.files {
1237 2 : file.FileBacking.Ref()
1238 2 : }
1239 2 : entry.unrefFiles = func() []*fileBacking {
1240 2 : var obsolete []*fileBacking
1241 2 : for _, file := range f.files {
1242 2 : if file.FileBacking.Unref() == 0 {
1243 2 : obsolete = append(obsolete, file.FileMetadata.FileBacking)
1244 2 : }
1245 : }
1246 2 : return obsolete
1247 : }
1248 :
1249 2 : entry.flushForced = true
1250 2 : entry.releaseMemAccounting = func() {}
1251 2 : return entry, nil
1252 : }
1253 :
1254 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1255 : // we're holding both locks, the order in which we rotate the memtable or
1256 : // recycle the WAL in this function is irrelevant as long as the correct log
1257 : // numbers are assigned to the appropriate flushable.
1258 : func (d *DB) handleIngestAsFlushable(
1259 : meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange,
1260 2 : ) error {
1261 2 : b := d.NewBatch()
1262 2 : if exciseSpan.Valid() {
1263 2 : b.excise(exciseSpan.Start, exciseSpan.End)
1264 2 : }
1265 2 : for _, m := range meta {
1266 2 : b.ingestSST(m.FileNum)
1267 2 : }
1268 2 : b.setSeqNum(seqNum)
1269 2 :
1270 2 : // If the WAL is disabled, then the logNum used to create the flushable
1271 2 : // entry doesn't matter. We just use the logNum assigned to the current
1272 2 : // mutable memtable. If the WAL is enabled, then this logNum will be
1273 2 : // overwritten by the logNum of the log which will contain the log entry
1274 2 : // for the ingestedFlushable.
1275 2 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1276 2 : if !d.opts.DisableWAL {
1277 2 : // We create a new WAL for the flushable instead of reusing the end of
1278 2 : // the previous WAL. This simplifies the increment of the minimum
1279 2 : // unflushed log number, and also simplifies WAL replay.
1280 2 : var prevLogSize uint64
1281 2 : logNum, prevLogSize = d.rotateWAL()
1282 2 : // As the rotator of the WAL, we're responsible for updating the
1283 2 : // previous flushable queue tail's log size.
1284 2 : d.mu.mem.queue[len(d.mu.mem.queue)-1].logSize = prevLogSize
1285 2 :
1286 2 : d.mu.Unlock()
1287 2 : err := d.commit.directWrite(b)
1288 2 : if err != nil {
1289 0 : d.opts.Logger.Fatalf("%v", err)
1290 0 : }
1291 2 : d.mu.Lock()
1292 : }
1293 :
1294 : // The excise span is going to outlive this ingestion call. Copy it.
1295 2 : exciseSpan = KeyRange{
1296 2 : Start: slices.Clone(exciseSpan.Start),
1297 2 : End: slices.Clone(exciseSpan.End),
1298 2 : }
1299 2 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
1300 2 : if err != nil {
1301 0 : return err
1302 0 : }
1303 2 : nextSeqNum := seqNum + base.SeqNum(b.Count())
1304 2 :
1305 2 : // Set newLogNum to the logNum of the previous flushable. This value is
1306 2 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1307 2 : // the appropriate value below.
1308 2 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1309 2 : if !d.opts.DisableWAL {
1310 2 : // newLogNum will be the WAL num of the next mutable memtable which
1311 2 : // comes after the ingestedFlushable in the flushable queue. The mutable
1312 2 : // memtable will be created below.
1313 2 : //
1314 2 : // The prevLogSize returned by rotateWAL is the WAL to which the
1315 2 : // flushable ingest keys were appended. This intermediary WAL is only
1316 2 : // used to record the flushable ingest and nothing else.
1317 2 : newLogNum, entry.logSize = d.rotateWAL()
1318 2 : }
1319 :
1320 2 : d.mu.versions.metrics.Ingest.Count++
1321 2 : currMem := d.mu.mem.mutable
1322 2 : // NB: Placing ingested sstables above the current memtables
1323 2 : // requires rotating of the existing memtables/WAL. There is
1324 2 : // some concern of churning through tiny memtables due to
1325 2 : // ingested sstables being placed on top of them, but those
1326 2 : // memtables would have to be flushed anyways.
1327 2 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1328 2 : d.rotateMemtable(newLogNum, nextSeqNum, currMem, 0 /* minSize */)
1329 2 : d.updateReadStateLocked(d.opts.DebugCheck)
1330 2 : // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
1331 2 : d.maybeScheduleFlush()
1332 2 : return nil
1333 : }
1334 :
1335 : // See comment at Ingest() for details on how this works.
1336 : func (d *DB) ingest(
1337 : ctx context.Context,
1338 : paths []string,
1339 : shared []SharedSSTMeta,
1340 : exciseSpan KeyRange,
1341 : external []ExternalFile,
1342 2 : ) (IngestOperationStats, error) {
1343 2 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1344 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1345 : }
1346 2 : if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1347 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1348 0 : }
1349 2 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
1350 1 : for i := range external {
1351 1 : if len(external[i].SyntheticPrefix) > 0 {
1352 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1353 1 : }
1354 1 : if len(external[i].SyntheticSuffix) > 0 {
1355 1 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
1356 1 : }
1357 : }
1358 : }
1359 : // Allocate file numbers for all of the files being ingested and mark them as
1360 : // pending in order to prevent them from being deleted. Note that this causes
1361 : // the file number ordering to be out of alignment with sequence number
1362 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1363 : // that (busted) invariant.
1364 2 : pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
1365 2 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1366 2 : pendingOutputs[i] = d.mu.versions.getNextFileNum()
1367 2 : }
1368 :
1369 2 : jobID := d.newJobID()
1370 2 :
1371 2 : // Load the metadata for all the files being ingested. This step detects
1372 2 : // and elides empty sstables.
1373 2 : loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
1374 2 : if err != nil {
1375 1 : return IngestOperationStats{}, err
1376 1 : }
1377 :
1378 2 : if loadResult.fileCount() == 0 {
1379 2 : // All of the sstables to be ingested were empty. Nothing to do.
1380 2 : return IngestOperationStats{}, nil
1381 2 : }
1382 :
1383 : // Verify the sstables do not overlap.
1384 2 : if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
1385 2 : return IngestOperationStats{}, err
1386 2 : }
1387 :
1388 : // Hard link the sstables into the DB directory. Since the sstables aren't
1389 : // referenced by a version, they won't be used. If the hard linking fails
1390 : // (e.g. because the files reside on a different filesystem), ingestLinkLocal
1391 : // will fall back to copying, and if that fails we undo our work and return an
1392 : // error.
1393 2 : if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1394 0 : return IngestOperationStats{}, err
1395 0 : }
1396 :
1397 2 : err = d.ingestAttachRemote(jobID, loadResult)
1398 2 : defer d.ingestUnprotectExternalBackings(loadResult)
1399 2 : if err != nil {
1400 0 : return IngestOperationStats{}, err
1401 0 : }
1402 :
1403 : // Make the new tables durable. We need to do this at some point before we
1404 : // update the MANIFEST (via logAndApply), otherwise a crash can have the
1405 : // tables referenced in the MANIFEST, but not present in the provider.
1406 2 : if err := d.objProvider.Sync(); err != nil {
1407 1 : return IngestOperationStats{}, err
1408 1 : }
1409 :
1410 : // metaFlushableOverlaps is a map indicating which of the ingested sstables
1411 : // overlap some table in the flushable queue. It's used to approximate
1412 : // ingest-into-L0 stats when using flushable ingests.
1413 2 : metaFlushableOverlaps := make(map[FileNum]bool, loadResult.fileCount())
1414 2 : var mem *flushableEntry
1415 2 : var mut *memTable
1416 2 : // asFlushable indicates whether the sstable was ingested as a flushable.
1417 2 : var asFlushable bool
1418 2 : prepare := func(seqNum base.SeqNum) {
1419 2 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1420 2 :
1421 2 : // Determine the set of bounds we care about for the purpose of checking
1422 2 : // for overlap among the flushables. If there's an excise span, we need
1423 2 : // to check for overlap with its bounds as well.
1424 2 : overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
1425 2 : for _, m := range loadResult.local {
1426 2 : overlapBounds = append(overlapBounds, m.fileMetadata)
1427 2 : }
1428 2 : for _, m := range loadResult.shared {
1429 2 : overlapBounds = append(overlapBounds, m.fileMetadata)
1430 2 : }
1431 2 : for _, m := range loadResult.external {
1432 2 : overlapBounds = append(overlapBounds, m.fileMetadata)
1433 2 : }
1434 2 : if exciseSpan.Valid() {
1435 2 : overlapBounds = append(overlapBounds, &exciseSpan)
1436 2 : }
1437 :
1438 2 : d.mu.Lock()
1439 2 : defer d.mu.Unlock()
1440 2 :
1441 2 : // Check if any of the currently-open EventuallyFileOnlySnapshots overlap
1442 2 : // in key ranges with the excise span. If so, we need to check for memtable
1443 2 : // overlaps with all bounds of that EventuallyFileOnlySnapshot in addition
1444 2 : // to the ingestion's own bounds too.
1445 2 :
1446 2 : if exciseSpan.Valid() {
1447 2 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
1448 2 : if s.efos == nil {
1449 0 : continue
1450 : }
1451 2 : if base.Visible(seqNum, s.efos.seqNum, base.SeqNumMax) {
1452 0 : // We only worry about snapshots older than the excise. Any snapshots
1453 0 : // created after the excise should see the excised view of the LSM
1454 0 : // anyway.
1455 0 : //
1456 0 : // Since we delay publishing the excise seqnum as visible until after
1457 0 : // the apply step, this case will never be hit in practice until we
1458 0 : // make excises flushable ingests.
1459 0 : continue
1460 : }
1461 2 : if invariants.Enabled {
1462 2 : if s.efos.hasTransitioned() {
1463 0 : panic("unexpected transitioned EFOS in snapshots list")
1464 : }
1465 : }
1466 2 : for i := range s.efos.protectedRanges {
1467 2 : if !s.efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
1468 2 : continue
1469 : }
1470 : // Our excise conflicts with this EFOS. We need to add its protected
1471 : // ranges to our overlapBounds. Grow overlapBounds in one allocation
1472 : // if necesary.
1473 2 : prs := s.efos.protectedRanges
1474 2 : if cap(overlapBounds) < len(overlapBounds)+len(prs) {
1475 2 : oldOverlapBounds := overlapBounds
1476 2 : overlapBounds = make([]bounded, len(oldOverlapBounds), len(oldOverlapBounds)+len(prs))
1477 2 : copy(overlapBounds, oldOverlapBounds)
1478 2 : }
1479 2 : for i := range prs {
1480 2 : overlapBounds = append(overlapBounds, &prs[i])
1481 2 : }
1482 2 : break
1483 : }
1484 : }
1485 : }
1486 :
1487 : // Check to see if any files overlap with any of the memtables. The queue
1488 : // is ordered from oldest to newest with the mutable memtable being the
1489 : // last element in the slice. We want to wait for the newest table that
1490 : // overlaps.
1491 :
1492 2 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1493 2 : m := d.mu.mem.queue[i]
1494 2 : m.computePossibleOverlaps(func(b bounded) shouldContinue {
1495 2 : // If this is the first table to overlap a flushable, save
1496 2 : // the flushable. This ingest must be ingested or flushed
1497 2 : // after it.
1498 2 : if mem == nil {
1499 2 : mem = m
1500 2 : }
1501 :
1502 2 : switch v := b.(type) {
1503 2 : case *fileMetadata:
1504 2 : // NB: False positives are possible if `m` is a flushable
1505 2 : // ingest that overlaps the file `v` in bounds but doesn't
1506 2 : // contain overlapping data. This is considered acceptable
1507 2 : // because it's rare (in CockroachDB a bound overlap likely
1508 2 : // indicates a data overlap), and blocking the commit
1509 2 : // pipeline while we perform I/O to check for overlap may be
1510 2 : // more disruptive than enqueueing this ingestion on the
1511 2 : // flushable queue and switching to a new memtable.
1512 2 : metaFlushableOverlaps[v.FileNum] = true
1513 2 : case *KeyRange:
1514 : // An excise span or an EventuallyFileOnlySnapshot protected range;
1515 : // not a file.
1516 0 : default:
1517 0 : panic("unreachable")
1518 : }
1519 2 : return continueIteration
1520 : }, overlapBounds...)
1521 : }
1522 :
1523 2 : if mem == nil {
1524 2 : // No overlap with any of the queued flushables, so no need to queue
1525 2 : // after them.
1526 2 :
1527 2 : // New writes with higher sequence numbers may be concurrently
1528 2 : // committed. We must ensure they don't flush before this ingest
1529 2 : // completes. To do that, we ref the mutable memtable as a writer,
1530 2 : // preventing its flushing (and the flushing of all subsequent
1531 2 : // flushables in the queue). Once we've acquired the manifest lock
1532 2 : // to add the ingested sstables to the LSM, we can unref as we're
1533 2 : // guaranteed that the flush won't edit the LSM before this ingest.
1534 2 : mut = d.mu.mem.mutable
1535 2 : mut.writerRef()
1536 2 : return
1537 2 : }
1538 :
1539 : // The ingestion overlaps with some entry in the flushable queue. If the
1540 : // pre-conditions are met below, we can treat this ingestion as a flushable
1541 : // ingest, otherwise we wait on the memtable flush before ingestion.
1542 : //
1543 : // TODO(aaditya): We should make flushableIngest compatible with remote
1544 : // files.
1545 2 : hasRemoteFiles := len(shared) > 0 || len(external) > 0
1546 2 : canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
1547 2 : (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
1548 2 : !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
1549 2 : (!exciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
1550 2 :
1551 2 : if !canIngestFlushable {
1552 2 : // We're not able to ingest as a flushable,
1553 2 : // so we must synchronously flush.
1554 2 : //
1555 2 : // TODO(bilal): Currently, if any of the files being ingested are shared,
1556 2 : // we cannot use flushable ingests and need
1557 2 : // to wait synchronously.
1558 2 : if mem.flushable == d.mu.mem.mutable {
1559 2 : err = d.makeRoomForWrite(nil)
1560 2 : }
1561 : // New writes with higher sequence numbers may be concurrently
1562 : // committed. We must ensure they don't flush before this ingest
1563 : // completes. To do that, we ref the mutable memtable as a writer,
1564 : // preventing its flushing (and the flushing of all subsequent
1565 : // flushables in the queue). Once we've acquired the manifest lock
1566 : // to add the ingested sstables to the LSM, we can unref as we're
1567 : // guaranteed that the flush won't edit the LSM before this ingest.
1568 2 : mut = d.mu.mem.mutable
1569 2 : mut.writerRef()
1570 2 : mem.flushForced = true
1571 2 : d.maybeScheduleFlush()
1572 2 : return
1573 : }
1574 : // Since there aren't too many memtables already queued up, we can
1575 : // slide the ingested sstables on top of the existing memtables.
1576 2 : asFlushable = true
1577 2 : fileMetas := make([]*fileMetadata, len(loadResult.local))
1578 2 : for i := range fileMetas {
1579 2 : fileMetas[i] = loadResult.local[i].fileMetadata
1580 2 : }
1581 2 : err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
1582 : }
1583 :
1584 2 : var ve *versionEdit
1585 2 : apply := func(seqNum base.SeqNum) {
1586 2 : if err != nil || asFlushable {
1587 2 : // An error occurred during prepare.
1588 2 : if mut != nil {
1589 0 : if mut.writerUnref() {
1590 0 : d.mu.Lock()
1591 0 : d.maybeScheduleFlush()
1592 0 : d.mu.Unlock()
1593 0 : }
1594 : }
1595 2 : return
1596 : }
1597 :
1598 : // If there's an excise being done atomically with the same ingest, we
1599 : // assign the lowest sequence number in the set of sequence numbers for this
1600 : // ingestion to the excise. Note that we've already allocated fileCount+1
1601 : // sequence numbers in this case.
1602 2 : if exciseSpan.Valid() {
1603 2 : seqNum++ // the first seqNum is reserved for the excise.
1604 2 : }
1605 : // Update the sequence numbers for all ingested sstables'
1606 : // metadata. When the version edit is applied, the metadata is
1607 : // written to the manifest, persisting the sequence number.
1608 : // The sstables themselves are left unmodified.
1609 2 : if err = ingestUpdateSeqNum(
1610 2 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1611 2 : ); err != nil {
1612 0 : if mut != nil {
1613 0 : if mut.writerUnref() {
1614 0 : d.mu.Lock()
1615 0 : d.maybeScheduleFlush()
1616 0 : d.mu.Unlock()
1617 0 : }
1618 : }
1619 0 : return
1620 : }
1621 :
1622 : // If we overlapped with a memtable in prepare wait for the flush to
1623 : // finish.
1624 2 : if mem != nil {
1625 2 : <-mem.flushed
1626 2 : }
1627 :
1628 : // Assign the sstables to the correct level in the LSM and apply the
1629 : // version edit.
1630 2 : ve, err = d.ingestApply(ctx, jobID, loadResult, mut, exciseSpan, seqNum)
1631 : }
1632 :
1633 : // Only one ingest can occur at a time because if not, one would block waiting
1634 : // for the other to finish applying. This blocking would happen while holding
1635 : // the commit mutex which would prevent unrelated batches from writing their
1636 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1637 : // during ingestion.
1638 2 : seqNumCount := loadResult.fileCount()
1639 2 : if exciseSpan.Valid() {
1640 2 : seqNumCount++
1641 2 : }
1642 2 : d.commit.ingestSem <- struct{}{}
1643 2 : d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
1644 2 : <-d.commit.ingestSem
1645 2 :
1646 2 : if err != nil {
1647 1 : if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
1648 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1649 0 : }
1650 2 : } else {
1651 2 : // Since we either created a hard link to the ingesting files, or copied
1652 2 : // them over, it is safe to remove the originals paths.
1653 2 : for i := range loadResult.local {
1654 2 : path := loadResult.local[i].path
1655 2 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1656 1 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1657 1 : }
1658 : }
1659 : }
1660 :
1661 2 : info := TableIngestInfo{
1662 2 : JobID: int(jobID),
1663 2 : Err: err,
1664 2 : flushable: asFlushable,
1665 2 : }
1666 2 : if len(loadResult.local) > 0 {
1667 2 : info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
1668 2 : } else if len(loadResult.shared) > 0 {
1669 2 : info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
1670 2 : } else {
1671 2 : info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
1672 2 : }
1673 2 : var stats IngestOperationStats
1674 2 : if ve != nil {
1675 2 : info.Tables = make([]struct {
1676 2 : TableInfo
1677 2 : Level int
1678 2 : }, len(ve.NewFiles))
1679 2 : for i := range ve.NewFiles {
1680 2 : e := &ve.NewFiles[i]
1681 2 : info.Tables[i].Level = e.Level
1682 2 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1683 2 : stats.Bytes += e.Meta.Size
1684 2 : if e.Level == 0 {
1685 2 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1686 2 : }
1687 2 : if metaFlushableOverlaps[e.Meta.FileNum] {
1688 2 : stats.MemtableOverlappingFiles++
1689 2 : }
1690 : }
1691 2 : } else if asFlushable {
1692 2 : // NB: If asFlushable == true, there are no shared sstables.
1693 2 : info.Tables = make([]struct {
1694 2 : TableInfo
1695 2 : Level int
1696 2 : }, len(loadResult.local))
1697 2 : for i, f := range loadResult.local {
1698 2 : info.Tables[i].Level = -1
1699 2 : info.Tables[i].TableInfo = f.TableInfo()
1700 2 : stats.Bytes += f.Size
1701 2 : // We don't have exact stats on which files will be ingested into
1702 2 : // L0, because actual ingestion into the LSM has been deferred until
1703 2 : // flush time. Instead, we infer based on memtable overlap.
1704 2 : //
1705 2 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1706 2 : // before entering the commit pipeline, we can use that overlap to
1707 2 : // improve our approximation by incorporating overlap with L0, not
1708 2 : // just memtables.
1709 2 : if metaFlushableOverlaps[f.FileNum] {
1710 2 : stats.ApproxIngestedIntoL0Bytes += f.Size
1711 2 : stats.MemtableOverlappingFiles++
1712 2 : }
1713 : }
1714 : }
1715 2 : d.opts.EventListener.TableIngested(info)
1716 2 :
1717 2 : return stats, err
1718 : }
1719 :
1720 : // excise updates ve to include a replacement of the file m with new virtual
1721 : // sstables that exclude exciseSpan, returning a slice of newly-created files if
1722 : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
1723 : // and m is deleted. Note that ve is updated in-place.
1724 : //
1725 : // This method is agnostic to whether d.mu is held or not. Some cases call it with
1726 : // the db mutex held (eg. ingest-time excises), while in the case of compactions
1727 : // the mutex is not held.
1728 : func (d *DB) excise(
1729 : ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
1730 2 : ) ([]manifest.NewFileEntry, error) {
1731 2 : numCreatedFiles := 0
1732 2 : // Check if there's actually an overlap between m and exciseSpan.
1733 2 : mBounds := base.UserKeyBoundsFromInternal(m.Smallest, m.Largest)
1734 2 : if !exciseSpan.Overlaps(d.cmp, &mBounds) {
1735 2 : return nil, nil
1736 2 : }
1737 2 : ve.DeletedFiles[deletedFileEntry{
1738 2 : Level: level,
1739 2 : FileNum: m.FileNum,
1740 2 : }] = m
1741 2 : // Fast path: m sits entirely within the exciseSpan, so just delete it.
1742 2 : if exciseSpan.ContainsInternalKey(d.cmp, m.Smallest) && exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
1743 2 : return nil, nil
1744 2 : }
1745 :
1746 2 : var iters iterSet
1747 2 : var itersLoaded bool
1748 2 : defer iters.CloseAll()
1749 2 : loadItersIfNecessary := func() error {
1750 2 : if itersLoaded {
1751 2 : return nil
1752 2 : }
1753 2 : var err error
1754 2 : iters, err = d.newIters(ctx, m, &IterOptions{
1755 2 : CategoryAndQoS: sstable.CategoryAndQoS{
1756 2 : Category: "pebble-ingest",
1757 2 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1758 2 : },
1759 2 : layer: manifest.Level(level),
1760 2 : }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
1761 2 : itersLoaded = true
1762 2 : return err
1763 : }
1764 :
1765 2 : needsBacking := false
1766 2 : // Create a file to the left of the excise span, if necessary.
1767 2 : // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
1768 2 : //
1769 2 : // We create bounds that are tight on user keys, and we make the effort to find
1770 2 : // the last key in the original sstable that's smaller than exciseSpan.Start
1771 2 : // even though it requires some sstable reads. We could choose to create
1772 2 : // virtual sstables on loose userKey bounds, in which case we could just set
1773 2 : // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
1774 2 : // issue with that approach would be that it'd lead to lots of small virtual
1775 2 : // sstables in the LSM that have no guarantee on containing even a single user
1776 2 : // key within the file bounds. This has the potential to increase both read and
1777 2 : // write-amp as we will be opening up these sstables only to find no relevant
1778 2 : // keys in the read path, and compacting sstables on top of them instead of
1779 2 : // directly into the space occupied by them. We choose to incur the cost of
1780 2 : // calculating tight bounds at this time instead of creating more work in the
1781 2 : // future.
1782 2 : //
1783 2 : // TODO(bilal): Some of this work can happen without grabbing the manifest
1784 2 : // lock; we could grab one currentVersion, release the lock, calculate excised
1785 2 : // files, then grab the lock again and recalculate for just the files that
1786 2 : // have changed since our previous calculation. Do this optimiaztino as part of
1787 2 : // https://github.com/cockroachdb/pebble/issues/2112 .
1788 2 : if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
1789 2 : leftFile := &fileMetadata{
1790 2 : Virtual: true,
1791 2 : FileBacking: m.FileBacking,
1792 2 : FileNum: d.mu.versions.getNextFileNum(),
1793 2 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1794 2 : // sufficient for maintaining correctness.
1795 2 : SmallestSeqNum: m.SmallestSeqNum,
1796 2 : LargestSeqNum: m.LargestSeqNum,
1797 2 : LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
1798 2 : SyntheticPrefixAndSuffix: m.SyntheticPrefixAndSuffix,
1799 2 : }
1800 2 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
1801 2 : // This file will probably contain point keys.
1802 2 : if err := loadItersIfNecessary(); err != nil {
1803 0 : return nil, err
1804 0 : }
1805 2 : smallestPointKey := m.SmallestPointKey
1806 2 : if kv := iters.Point().SeekLT(exciseSpan.Start, base.SeekLTFlagsNone); kv != nil {
1807 2 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, kv.K.Clone())
1808 2 : }
1809 : // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
1810 : // needs to be a copy if the key is owned by the range del iter.
1811 2 : var lastRangeDel []byte
1812 2 : if rdel, err := iters.RangeDeletion().SeekLT(exciseSpan.Start); err != nil {
1813 0 : return nil, err
1814 2 : } else if rdel != nil {
1815 2 : lastRangeDel = append(lastRangeDel[:0], rdel.End...)
1816 2 : if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
1817 2 : lastRangeDel = exciseSpan.Start
1818 2 : }
1819 : }
1820 2 : if lastRangeDel != nil {
1821 2 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
1822 2 : }
1823 : }
1824 2 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestRangeKey) {
1825 2 : // This file will probably contain range keys.
1826 2 : if err := loadItersIfNecessary(); err != nil {
1827 0 : return nil, err
1828 0 : }
1829 2 : smallestRangeKey := m.SmallestRangeKey
1830 2 : // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
1831 2 : // needs to be a copy if the key is owned by the range key iter.
1832 2 : var lastRangeKey []byte
1833 2 : var lastRangeKeyKind InternalKeyKind
1834 2 : if rkey, err := iters.RangeKey().SeekLT(exciseSpan.Start); err != nil {
1835 0 : return nil, err
1836 2 : } else if rkey != nil {
1837 2 : lastRangeKey = append(lastRangeKey[:0], rkey.End...)
1838 2 : if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
1839 2 : lastRangeKey = exciseSpan.Start
1840 2 : }
1841 2 : lastRangeKeyKind = rkey.Keys[0].Kind()
1842 : }
1843 2 : if lastRangeKey != nil {
1844 2 : leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
1845 2 : }
1846 : }
1847 2 : if leftFile.HasRangeKeys || leftFile.HasPointKeys {
1848 2 : var err error
1849 2 : leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
1850 2 : if err != nil {
1851 0 : return nil, err
1852 0 : }
1853 2 : if leftFile.Size == 0 {
1854 2 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1855 2 : // such as if the excised file only has range keys/dels and no point
1856 2 : // keys. This can cause panics in places where we divide by file sizes.
1857 2 : // Correct for it here.
1858 2 : leftFile.Size = 1
1859 2 : }
1860 2 : if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1861 0 : return nil, err
1862 0 : }
1863 2 : leftFile.ValidateVirtual(m)
1864 2 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
1865 2 : needsBacking = true
1866 2 : numCreatedFiles++
1867 : }
1868 : }
1869 : // Create a file to the right, if necessary.
1870 2 : if exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
1871 2 : // No key exists to the right of the excise span in this file.
1872 2 : if needsBacking && !m.Virtual {
1873 2 : // If m is virtual, then its file backing is already known to the manifest.
1874 2 : // We don't need to create another file backing. Note that there must be
1875 2 : // only one CreatedBackingTables entry per backing sstable. This is
1876 2 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1877 2 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1878 2 : }
1879 2 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1880 : }
1881 : // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
1882 : //
1883 : // See comment before the definition of leftFile for the motivation behind
1884 : // calculating tight user-key bounds.
1885 2 : rightFile := &fileMetadata{
1886 2 : Virtual: true,
1887 2 : FileBacking: m.FileBacking,
1888 2 : FileNum: d.mu.versions.getNextFileNum(),
1889 2 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1890 2 : // sufficient for maintaining correctness.
1891 2 : SmallestSeqNum: m.SmallestSeqNum,
1892 2 : LargestSeqNum: m.LargestSeqNum,
1893 2 : LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
1894 2 : SyntheticPrefixAndSuffix: m.SyntheticPrefixAndSuffix,
1895 2 : }
1896 2 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
1897 2 : // This file will probably contain point keys
1898 2 : if err := loadItersIfNecessary(); err != nil {
1899 0 : return nil, err
1900 0 : }
1901 2 : largestPointKey := m.LargestPointKey
1902 2 : if kv := iters.Point().SeekGE(exciseSpan.End.Key, base.SeekGEFlagsNone); kv != nil {
1903 2 : if exciseSpan.End.Kind == base.Inclusive && d.equal(exciseSpan.End.Key, kv.K.UserKey) {
1904 0 : return nil, base.AssertionFailedf("cannot excise with an inclusive end key and data overlap at end key")
1905 0 : }
1906 2 : rightFile.ExtendPointKeyBounds(d.cmp, kv.K.Clone(), largestPointKey)
1907 : }
1908 : // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
1909 : // needs to be a copy if the key is owned by the range del iter.
1910 2 : var firstRangeDel []byte
1911 2 : rdel, err := iters.RangeDeletion().SeekGE(exciseSpan.End.Key)
1912 2 : if err != nil {
1913 0 : return nil, err
1914 2 : } else if rdel != nil {
1915 2 : firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
1916 2 : if d.cmp(firstRangeDel, exciseSpan.End.Key) < 0 {
1917 2 : // NB: This can only be done if the end bound is exclusive.
1918 2 : if exciseSpan.End.Kind != base.Exclusive {
1919 0 : return nil, base.AssertionFailedf("cannot truncate rangedel during excise with an inclusive upper bound")
1920 0 : }
1921 2 : firstRangeDel = exciseSpan.End.Key
1922 : }
1923 : }
1924 2 : if firstRangeDel != nil {
1925 2 : smallestPointKey := rdel.SmallestKey()
1926 2 : smallestPointKey.UserKey = firstRangeDel
1927 2 : rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
1928 2 : }
1929 : }
1930 2 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestRangeKey) {
1931 2 : // This file will probably contain range keys.
1932 2 : if err := loadItersIfNecessary(); err != nil {
1933 0 : return nil, err
1934 0 : }
1935 2 : largestRangeKey := m.LargestRangeKey
1936 2 : // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
1937 2 : // needs to be a copy if the key is owned by the range key iter.
1938 2 : var firstRangeKey []byte
1939 2 : rkey, err := iters.RangeKey().SeekGE(exciseSpan.End.Key)
1940 2 : if err != nil {
1941 0 : return nil, err
1942 2 : } else if rkey != nil {
1943 2 : firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
1944 2 : if d.cmp(firstRangeKey, exciseSpan.End.Key) < 0 {
1945 2 : if exciseSpan.End.Kind != base.Exclusive {
1946 0 : return nil, base.AssertionFailedf("cannot truncate range key during excise with an inclusive upper bound")
1947 0 : }
1948 2 : firstRangeKey = exciseSpan.End.Key
1949 : }
1950 : }
1951 2 : if firstRangeKey != nil {
1952 2 : smallestRangeKey := rkey.SmallestKey()
1953 2 : smallestRangeKey.UserKey = firstRangeKey
1954 2 : // We call ExtendRangeKeyBounds so any internal boundType fields are
1955 2 : // set correctly. Note that this is mildly wasteful as we'll be comparing
1956 2 : // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
1957 2 : // avoided if we exported ExtendOverallKeyBounds or so.
1958 2 : rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
1959 2 : }
1960 : }
1961 2 : if rightFile.HasRangeKeys || rightFile.HasPointKeys {
1962 2 : var err error
1963 2 : rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
1964 2 : if err != nil {
1965 0 : return nil, err
1966 0 : }
1967 2 : if rightFile.Size == 0 {
1968 2 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1969 2 : // such as if the excised file only has range keys/dels and no point keys.
1970 2 : // This can cause panics in places where we divide by file sizes. Correct
1971 2 : // for it here.
1972 2 : rightFile.Size = 1
1973 2 : }
1974 2 : if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1975 0 : return nil, err
1976 0 : }
1977 2 : rightFile.ValidateVirtual(m)
1978 2 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
1979 2 : needsBacking = true
1980 2 : numCreatedFiles++
1981 : }
1982 :
1983 2 : if needsBacking && !m.Virtual {
1984 2 : // If m is virtual, then its file backing is already known to the manifest.
1985 2 : // We don't need to create another file backing. Note that there must be
1986 2 : // only one CreatedBackingTables entry per backing sstable. This is
1987 2 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1988 2 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1989 2 : }
1990 :
1991 2 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1992 : }
1993 :
1994 : type ingestSplitFile struct {
1995 : // ingestFile is the file being ingested.
1996 : ingestFile *fileMetadata
1997 : // splitFile is the file that needs to be split to allow ingestFile to slot
1998 : // into `level` level.
1999 : splitFile *fileMetadata
2000 : // The level where ingestFile will go (and where splitFile already is).
2001 : level int
2002 : }
2003 :
2004 : // ingestSplit splits files specified in `files` and updates ve in-place to
2005 : // account for existing files getting split into two virtual sstables. The map
2006 : // `replacedFiles` contains an in-progress map of all files that have been
2007 : // replaced with new virtual sstables in this version edit so far, which is also
2008 : // updated in-place.
2009 : //
2010 : // d.mu as well as the manifest lock must be held when calling this method.
2011 : func (d *DB) ingestSplit(
2012 : ctx context.Context,
2013 : ve *versionEdit,
2014 : updateMetrics func(*fileMetadata, int, []newFileEntry),
2015 : files []ingestSplitFile,
2016 : replacedFiles map[base.FileNum][]newFileEntry,
2017 2 : ) error {
2018 2 : for _, s := range files {
2019 2 : ingestFileBounds := s.ingestFile.UserKeyBounds()
2020 2 : // replacedFiles can be thought of as a tree, where we start iterating with
2021 2 : // s.splitFile and run its fileNum through replacedFiles, then find which of
2022 2 : // the replaced files overlaps with s.ingestFile, which becomes the new
2023 2 : // splitFile, then we check splitFile's replacements in replacedFiles again
2024 2 : // for overlap with s.ingestFile, and so on until we either can't find the
2025 2 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
2026 2 : // be split), or we don't find a file that overlaps with s.ingestFile, which
2027 2 : // means a prior ingest split already produced enough room for s.ingestFile
2028 2 : // to go into this level without necessitating another ingest split.
2029 2 : splitFile := s.splitFile
2030 2 : for splitFile != nil {
2031 2 : replaced, ok := replacedFiles[splitFile.FileNum]
2032 2 : if !ok {
2033 2 : break
2034 : }
2035 2 : updatedSplitFile := false
2036 2 : for i := range replaced {
2037 2 : if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
2038 2 : if updatedSplitFile {
2039 0 : // This should never happen because the earlier ingestTargetLevel
2040 0 : // function only finds split file candidates that are guaranteed to
2041 0 : // have no data overlap, only boundary overlap. See the comments
2042 0 : // in that method to see the definitions of data vs boundary
2043 0 : // overlap. That, plus the fact that files in `replaced` are
2044 0 : // guaranteed to have file bounds that are tight on user keys
2045 0 : // (as that's what `d.excise` produces), means that the only case
2046 0 : // where we overlap with two or more files in `replaced` is if we
2047 0 : // actually had data overlap all along, or if the ingestion files
2048 0 : // were overlapping, either of which is an invariant violation.
2049 0 : panic("updated with two files in ingestSplit")
2050 : }
2051 2 : splitFile = replaced[i].Meta
2052 2 : updatedSplitFile = true
2053 : }
2054 : }
2055 2 : if !updatedSplitFile {
2056 2 : // None of the replaced files overlapped with the file being ingested.
2057 2 : // This can happen if we've already excised a span overlapping with
2058 2 : // this file, or if we have consecutive ingested files that can slide
2059 2 : // within the same gap between keys in an existing file. For instance,
2060 2 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
2061 2 : // the first loop iteration will split the existing file into one that
2062 2 : // ends in a and another that starts at g, and the second iteration will
2063 2 : // fall into this case and require no splitting.
2064 2 : //
2065 2 : // No splitting necessary.
2066 2 : splitFile = nil
2067 2 : }
2068 : }
2069 2 : if splitFile == nil {
2070 2 : continue
2071 : }
2072 : // NB: excise operates on [start, end). We're splitting at [start, end]
2073 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
2074 : // of exclusive vs inclusive end bounds should not make a difference here
2075 : // as we're guaranteed to not have any data overlap between splitFile and
2076 : // s.ingestFile. d.excise will return an error if we pass an inclusive user
2077 : // key bound _and_ we end up seeing data overlap at the end key.
2078 2 : added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
2079 2 : if err != nil {
2080 0 : return err
2081 0 : }
2082 2 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2083 2 : Level: s.level,
2084 2 : FileNum: splitFile.FileNum,
2085 2 : }]; !ok {
2086 0 : panic("did not split file that was expected to be split")
2087 : }
2088 2 : replacedFiles[splitFile.FileNum] = added
2089 2 : for i := range added {
2090 2 : addedBounds := added[i].Meta.UserKeyBounds()
2091 2 : if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
2092 0 : panic("ingest-time split produced a file that overlaps with ingested file")
2093 : }
2094 : }
2095 2 : updateMetrics(splitFile, s.level, added)
2096 : }
2097 : // Flatten the version edit by removing any entries from ve.NewFiles that
2098 : // are also in ve.DeletedFiles.
2099 2 : newNewFiles := ve.NewFiles[:0]
2100 2 : for i := range ve.NewFiles {
2101 2 : fn := ve.NewFiles[i].Meta.FileNum
2102 2 : deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
2103 2 : if _, ok := ve.DeletedFiles[deEntry]; ok {
2104 2 : delete(ve.DeletedFiles, deEntry)
2105 2 : } else {
2106 2 : newNewFiles = append(newNewFiles, ve.NewFiles[i])
2107 2 : }
2108 : }
2109 2 : ve.NewFiles = newNewFiles
2110 2 : return nil
2111 : }
2112 :
2113 : func (d *DB) ingestApply(
2114 : ctx context.Context,
2115 : jobID JobID,
2116 : lr ingestLoadResult,
2117 : mut *memTable,
2118 : exciseSpan KeyRange,
2119 : exciseSeqNum base.SeqNum,
2120 2 : ) (*versionEdit, error) {
2121 2 : d.mu.Lock()
2122 2 : defer d.mu.Unlock()
2123 2 :
2124 2 : ve := &versionEdit{
2125 2 : NewFiles: make([]newFileEntry, lr.fileCount()),
2126 2 : }
2127 2 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
2128 2 : ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
2129 2 : }
2130 2 : metrics := make(map[int]*LevelMetrics)
2131 2 :
2132 2 : // Lock the manifest for writing before we use the current version to
2133 2 : // determine the target level. This prevents two concurrent ingestion jobs
2134 2 : // from using the same version to determine the target level, and also
2135 2 : // provides serialization with concurrent compaction and flush jobs.
2136 2 : // logAndApply unconditionally releases the manifest lock, but any earlier
2137 2 : // returns must unlock the manifest.
2138 2 : d.mu.versions.logLock()
2139 2 :
2140 2 : if mut != nil {
2141 2 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
2142 2 : // acquired the manifest lock, we can be certain that if the mutable
2143 2 : // memtable has received more recent conflicting writes, the flush won't
2144 2 : // beat us to applying to the manifest resulting in sequence number
2145 2 : // inversion. Even though we call maybeScheduleFlush right now, this flush
2146 2 : // will apply after our ingestion.
2147 2 : if mut.writerUnref() {
2148 2 : d.maybeScheduleFlush()
2149 2 : }
2150 : }
2151 :
2152 2 : current := d.mu.versions.currentVersion()
2153 2 : overlapChecker := &overlapChecker{
2154 2 : comparer: d.opts.Comparer,
2155 2 : newIters: d.newIters,
2156 2 : opts: IterOptions{
2157 2 : logger: d.opts.Logger,
2158 2 : CategoryAndQoS: sstable.CategoryAndQoS{
2159 2 : Category: "pebble-ingest",
2160 2 : QoSLevel: sstable.LatencySensitiveQoSLevel,
2161 2 : },
2162 2 : },
2163 2 : v: current,
2164 2 : }
2165 2 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
2166 2 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
2167 2 : baseLevel := d.mu.versions.picker.getBaseLevel()
2168 2 : // filesToSplit is a list where each element is a pair consisting of a file
2169 2 : // being ingested and a file being split to make room for an ingestion into
2170 2 : // that level. Each ingested file will appear at most once in this list. It
2171 2 : // is possible for split files to appear twice in this list.
2172 2 : filesToSplit := make([]ingestSplitFile, 0)
2173 2 : checkCompactions := false
2174 2 : for i := 0; i < lr.fileCount(); i++ {
2175 2 : // Determine the lowest level in the LSM for which the sstable doesn't
2176 2 : // overlap any existing files in the level.
2177 2 : var m *fileMetadata
2178 2 : specifiedLevel := -1
2179 2 : isShared := false
2180 2 : isExternal := false
2181 2 : if i < len(lr.local) {
2182 2 : // local file.
2183 2 : m = lr.local[i].fileMetadata
2184 2 : } else if (i - len(lr.local)) < len(lr.shared) {
2185 2 : // shared file.
2186 2 : isShared = true
2187 2 : sharedIdx := i - len(lr.local)
2188 2 : m = lr.shared[sharedIdx].fileMetadata
2189 2 : specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
2190 2 : } else {
2191 2 : // external file.
2192 2 : isExternal = true
2193 2 : externalIdx := i - (len(lr.local) + len(lr.shared))
2194 2 : m = lr.external[externalIdx].fileMetadata
2195 2 : if lr.externalFilesHaveLevel {
2196 1 : specifiedLevel = int(lr.external[externalIdx].external.Level)
2197 1 : }
2198 : }
2199 :
2200 : // Add to CreatedBackingTables if this is a new backing.
2201 : //
2202 : // Shared files always have a new backing. External files have new backings
2203 : // iff the backing disk file num and the file num match (see ingestAttachRemote).
2204 2 : if isShared || (isExternal && m.FileBacking.DiskFileNum == base.DiskFileNum(m.FileNum)) {
2205 2 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2206 2 : }
2207 :
2208 2 : f := &ve.NewFiles[i]
2209 2 : var err error
2210 2 : if specifiedLevel != -1 {
2211 2 : f.Level = specifiedLevel
2212 2 : } else {
2213 2 : var splitFile *fileMetadata
2214 2 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
2215 2 : // This file fits perfectly within the excise span. We can slot it at
2216 2 : // L6, or sharedLevelsStart - 1 if we have shared files.
2217 2 : if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
2218 2 : f.Level = sharedLevelsStart - 1
2219 2 : if baseLevel > f.Level {
2220 2 : f.Level = 0
2221 2 : }
2222 2 : } else {
2223 2 : f.Level = 6
2224 2 : }
2225 2 : } else {
2226 2 : // We check overlap against the LSM without holding DB.mu. Note that we
2227 2 : // are still holding the log lock, so the version cannot change.
2228 2 : // TODO(radu): perform this check optimistically outside of the log lock.
2229 2 : var lsmOverlap overlap.WithLSM
2230 2 : lsmOverlap, err = func() (overlap.WithLSM, error) {
2231 2 : d.mu.Unlock()
2232 2 : defer d.mu.Lock()
2233 2 : return overlapChecker.DetermineLSMOverlap(ctx, m.UserKeyBounds())
2234 2 : }()
2235 2 : if err == nil {
2236 2 : f.Level, splitFile, err = ingestTargetLevel(
2237 2 : ctx, d.cmp, lsmOverlap, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit,
2238 2 : )
2239 2 : }
2240 : }
2241 :
2242 2 : if splitFile != nil {
2243 2 : if invariants.Enabled {
2244 2 : if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf.Empty() {
2245 0 : panic("splitFile returned is not in level it should be")
2246 : }
2247 : }
2248 : // We take advantage of the fact that we won't drop the db mutex
2249 : // between now and the call to logAndApply. So, no files should
2250 : // get added to a new in-progress compaction at this point. We can
2251 : // avoid having to iterate on in-progress compactions to cancel them
2252 : // if none of the files being split have a compacting state.
2253 2 : if splitFile.IsCompacting() {
2254 1 : checkCompactions = true
2255 1 : }
2256 2 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
2257 : }
2258 : }
2259 2 : if err != nil {
2260 0 : d.mu.versions.logUnlock()
2261 0 : return nil, err
2262 0 : }
2263 2 : if isShared && f.Level < sharedLevelsStart {
2264 0 : panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
2265 0 : f.Level, sharedLevelsStart))
2266 : }
2267 2 : f.Meta = m
2268 2 : levelMetrics := metrics[f.Level]
2269 2 : if levelMetrics == nil {
2270 2 : levelMetrics = &LevelMetrics{}
2271 2 : metrics[f.Level] = levelMetrics
2272 2 : }
2273 2 : levelMetrics.NumFiles++
2274 2 : levelMetrics.Size += int64(m.Size)
2275 2 : levelMetrics.BytesIngested += m.Size
2276 2 : levelMetrics.TablesIngested++
2277 : }
2278 : // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
2279 : // by ingestTargetLevel), to files that were created to replace it. This map
2280 : // is used to resolve references to split files in filesToSplit, as it is
2281 : // possible for a file that we want to split to no longer exist or have a
2282 : // newer fileMetadata due to a split induced by another ingestion file, or an
2283 : // excise.
2284 2 : replacedFiles := make(map[base.FileNum][]newFileEntry)
2285 2 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
2286 2 : levelMetrics := metrics[level]
2287 2 : if levelMetrics == nil {
2288 2 : levelMetrics = &LevelMetrics{}
2289 2 : metrics[level] = levelMetrics
2290 2 : }
2291 2 : levelMetrics.NumFiles--
2292 2 : levelMetrics.Size -= int64(m.Size)
2293 2 : for i := range added {
2294 2 : levelMetrics.NumFiles++
2295 2 : levelMetrics.Size += int64(added[i].Meta.Size)
2296 2 : }
2297 : }
2298 2 : if exciseSpan.Valid() {
2299 2 : // Iterate through all levels and find files that intersect with exciseSpan.
2300 2 : //
2301 2 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2302 2 : // excises; we only need to hold the version lock which we already are
2303 2 : // holding. However releasing the DB mutex could mess with the
2304 2 : // ingestTargetLevel calculation that happened above, as it assumed that it
2305 2 : // had a complete view of in-progress compactions that wouldn't change
2306 2 : // until logAndApply is called. If we were to drop the mutex now, we could
2307 2 : // schedule another in-progress compaction that would go into the chosen target
2308 2 : // level and lead to file overlap within level (which would panic in
2309 2 : // logAndApply). We should drop the db mutex here, do the excise, then
2310 2 : // re-grab the DB mutex and rerun just the in-progress compaction check to
2311 2 : // see if any new compactions are conflicting with our chosen target levels
2312 2 : // for files, and if they are, we should signal those compactions to error
2313 2 : // out.
2314 2 : for level := range current.Levels {
2315 2 : overlaps := current.Overlaps(level, exciseSpan.UserKeyBounds())
2316 2 : iter := overlaps.Iter()
2317 2 :
2318 2 : for m := iter.First(); m != nil; m = iter.Next() {
2319 2 : newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level)
2320 2 : if err != nil {
2321 0 : return nil, err
2322 0 : }
2323 :
2324 2 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2325 2 : Level: level,
2326 2 : FileNum: m.FileNum,
2327 2 : }]; !ok {
2328 2 : // We did not excise this file.
2329 2 : continue
2330 : }
2331 2 : replacedFiles[m.FileNum] = newFiles
2332 2 : updateLevelMetricsOnExcise(m, level, newFiles)
2333 : }
2334 : }
2335 : }
2336 2 : if len(filesToSplit) > 0 {
2337 2 : // For the same reasons as the above call to excise, we hold the db mutex
2338 2 : // while calling this method.
2339 2 : if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2340 0 : return nil, err
2341 0 : }
2342 : }
2343 2 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2344 2 : for c := range d.mu.compact.inProgress {
2345 2 : if c.versionEditApplied {
2346 1 : continue
2347 : }
2348 : // Check if this compaction overlaps with the excise span. Note that just
2349 : // checking if the inputs individually overlap with the excise span
2350 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2351 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2352 : // doing a [c,d) excise at the same time as this compaction, we will have
2353 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2354 : // write a file overlapping with the excise span.
2355 2 : if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
2356 2 : c.cancel.Store(true)
2357 2 : }
2358 : // Check if this compaction's inputs have been replaced due to an
2359 : // ingest-time split. In that case, cancel the compaction as a newly picked
2360 : // compaction would need to include any new files that slid in between
2361 : // previously-existing files. Note that we cancel any compaction that has a
2362 : // file that was ingest-split as an input, even if it started before this
2363 : // ingestion.
2364 2 : if checkCompactions {
2365 1 : for i := range c.inputs {
2366 1 : iter := c.inputs[i].files.Iter()
2367 1 : for f := iter.First(); f != nil; f = iter.Next() {
2368 1 : if _, ok := replacedFiles[f.FileNum]; ok {
2369 1 : c.cancel.Store(true)
2370 1 : break
2371 : }
2372 : }
2373 : }
2374 : }
2375 : }
2376 : }
2377 :
2378 2 : if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2379 2 : return d.getInProgressCompactionInfoLocked(nil)
2380 2 : }); err != nil {
2381 1 : // Note: any error during logAndApply is fatal; this won't be reachable in production.
2382 1 : return nil, err
2383 1 : }
2384 :
2385 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2386 : // an excise on this span. There should be none as the
2387 : // computePossibleOverlaps steps should have forced these EFOS to transition
2388 : // to file-only snapshots by now. If we see any that conflict with this
2389 : // excise, panic.
2390 2 : if exciseSpan.Valid() {
2391 2 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2392 1 : // Skip non-EFOS snapshots, and also skip any EFOS that were created
2393 1 : // *after* the excise.
2394 1 : if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.SeqNumMax) {
2395 0 : continue
2396 : }
2397 1 : efos := s.efos
2398 1 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2399 1 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2400 1 : // global list of all protected ranges instead of having to peer into every
2401 1 : // snapshot.
2402 1 : for i := range efos.protectedRanges {
2403 1 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2404 0 : panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
2405 : }
2406 : }
2407 : }
2408 : }
2409 :
2410 2 : d.mu.versions.metrics.Ingest.Count++
2411 2 :
2412 2 : d.updateReadStateLocked(d.opts.DebugCheck)
2413 2 : // updateReadStateLocked could have generated obsolete tables, schedule a
2414 2 : // cleanup job if necessary.
2415 2 : d.deleteObsoleteFiles(jobID)
2416 2 : d.updateTableStatsLocked(ve.NewFiles)
2417 2 : // The ingestion may have pushed a level over the threshold for compaction,
2418 2 : // so check to see if one is necessary and schedule it.
2419 2 : d.maybeScheduleCompaction()
2420 2 : var toValidate []manifest.NewFileEntry
2421 2 : dedup := make(map[base.DiskFileNum]struct{})
2422 2 : for _, entry := range ve.NewFiles {
2423 2 : if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
2424 2 : toValidate = append(toValidate, entry)
2425 2 : dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
2426 2 : }
2427 : }
2428 2 : d.maybeValidateSSTablesLocked(toValidate)
2429 2 : return ve, nil
2430 : }
2431 :
2432 : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
2433 : // queue of files to be validated, when the feature is enabled.
2434 : //
2435 : // Note that if two entries with the same backing file are added twice, then the
2436 : // block checksums for the backing file will be validated twice.
2437 : //
2438 : // DB.mu must be locked when calling.
2439 2 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
2440 2 : // Only add to the validation queue when the feature is enabled.
2441 2 : if !d.opts.Experimental.ValidateOnIngest {
2442 2 : return
2443 2 : }
2444 :
2445 2 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2446 2 : if d.shouldValidateSSTablesLocked() {
2447 2 : go d.validateSSTables()
2448 2 : }
2449 : }
2450 :
2451 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2452 : // DB.mu must be locked when calling.
2453 2 : func (d *DB) shouldValidateSSTablesLocked() bool {
2454 2 : return !d.mu.tableValidation.validating &&
2455 2 : d.closed.Load() == nil &&
2456 2 : d.opts.Experimental.ValidateOnIngest &&
2457 2 : len(d.mu.tableValidation.pending) > 0
2458 2 : }
2459 :
2460 : // validateSSTables runs a round of validation on the tables in the pending
2461 : // queue.
2462 2 : func (d *DB) validateSSTables() {
2463 2 : d.mu.Lock()
2464 2 : if !d.shouldValidateSSTablesLocked() {
2465 2 : d.mu.Unlock()
2466 2 : return
2467 2 : }
2468 :
2469 2 : pending := d.mu.tableValidation.pending
2470 2 : d.mu.tableValidation.pending = nil
2471 2 : d.mu.tableValidation.validating = true
2472 2 : jobID := d.newJobIDLocked()
2473 2 : rs := d.loadReadState()
2474 2 :
2475 2 : // Drop DB.mu before performing IO.
2476 2 : d.mu.Unlock()
2477 2 :
2478 2 : // Validate all tables in the pending queue. This could lead to a situation
2479 2 : // where we are starving IO from other tasks due to having to page through
2480 2 : // all the blocks in all the sstables in the queue.
2481 2 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2482 2 :
2483 2 : // If we fail to validate any files due to reasons other than uncovered
2484 2 : // corruption, accumulate them and re-queue them for another attempt.
2485 2 : var retry []manifest.NewFileEntry
2486 2 :
2487 2 : for _, f := range pending {
2488 2 : // The file may have been moved or deleted since it was ingested, in
2489 2 : // which case we skip.
2490 2 : if !rs.current.Contains(f.Level, f.Meta) {
2491 2 : // Assume the file was moved to a lower level. It is rare enough
2492 2 : // that a table is moved or deleted between the time it was ingested
2493 2 : // and the time the validation routine runs that the overall cost of
2494 2 : // this inner loop is tolerably low, when amortized over all
2495 2 : // ingested tables.
2496 2 : found := false
2497 2 : for i := f.Level + 1; i < numLevels; i++ {
2498 2 : if rs.current.Contains(i, f.Meta) {
2499 1 : found = true
2500 1 : break
2501 : }
2502 : }
2503 2 : if !found {
2504 2 : continue
2505 : }
2506 : }
2507 :
2508 2 : var err error
2509 2 : if f.Meta.Virtual {
2510 2 : err = d.tableCache.withVirtualReader(
2511 2 : f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
2512 2 : return v.ValidateBlockChecksumsOnBacking()
2513 2 : })
2514 2 : } else {
2515 2 : err = d.tableCache.withReader(
2516 2 : f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
2517 2 : return r.ValidateBlockChecksums()
2518 2 : })
2519 : }
2520 :
2521 2 : if err != nil {
2522 1 : if IsCorruptionError(err) {
2523 1 : // TODO(travers): Hook into the corruption reporting pipeline, once
2524 1 : // available. See pebble#1192.
2525 1 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2526 1 : } else {
2527 1 : // If there was some other, possibly transient, error that
2528 1 : // caused table validation to fail inform the EventListener and
2529 1 : // move on. We remember the table so that we can retry it in a
2530 1 : // subsequent table validation job.
2531 1 : //
2532 1 : // TODO(jackson): If the error is not transient, this will retry
2533 1 : // validation indefinitely. While not great, it's the same
2534 1 : // behavior as erroring flushes and compactions. We should
2535 1 : // address this as a part of #270.
2536 1 : d.opts.EventListener.BackgroundError(err)
2537 1 : retry = append(retry, f)
2538 1 : continue
2539 : }
2540 : }
2541 :
2542 2 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2543 2 : JobID: int(jobID),
2544 2 : Meta: f.Meta,
2545 2 : })
2546 : }
2547 2 : rs.unref()
2548 2 : d.mu.Lock()
2549 2 : defer d.mu.Unlock()
2550 2 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2551 2 : d.mu.tableValidation.validating = false
2552 2 : d.mu.tableValidation.cond.Broadcast()
2553 2 : if d.shouldValidateSSTablesLocked() {
2554 2 : go d.validateSSTables()
2555 2 : }
2556 : }
|