Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sort"
12 : "time"
13 :
14 : "github.com/cockroachdb/errors"
15 : "github.com/cockroachdb/pebble/internal/base"
16 : "github.com/cockroachdb/pebble/internal/invariants"
17 : "github.com/cockroachdb/pebble/internal/keyspan"
18 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
19 : "github.com/cockroachdb/pebble/internal/manifest"
20 : "github.com/cockroachdb/pebble/internal/private"
21 : "github.com/cockroachdb/pebble/objstorage"
22 : "github.com/cockroachdb/pebble/objstorage/remote"
23 : "github.com/cockroachdb/pebble/sstable"
24 : )
25 :
26 1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
27 1 : c := userCmp(a.UserKey, b.UserKey)
28 1 : if c != 0 {
29 1 : return c
30 1 : }
31 1 : if a.IsExclusiveSentinel() {
32 1 : if !b.IsExclusiveSentinel() {
33 1 : return -1
34 1 : }
35 1 : } else if b.IsExclusiveSentinel() {
36 1 : return +1
37 1 : }
38 1 : return 0
39 : }
40 :
41 : // KeyRange encodes a key range in user key space. A KeyRange's Start is
42 : // inclusive while its End is exclusive.
43 : //
44 : // KeyRange is equivalent to base.UserKeyBounds with exclusive end.
45 : type KeyRange struct {
46 : Start, End []byte
47 : }
48 :
49 : // Valid returns true if the KeyRange is defined.
50 1 : func (k *KeyRange) Valid() bool {
51 1 : return k.Start != nil && k.End != nil
52 1 : }
53 :
54 : // Contains returns whether the specified key exists in the KeyRange.
55 1 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
56 1 : v := cmp(key.UserKey, k.End)
57 1 : return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
58 1 : }
59 :
60 : // UserKeyBounds returns the KeyRange as UserKeyBounds. Also implements the internal `bounded` interface.
61 1 : func (k KeyRange) UserKeyBounds() base.UserKeyBounds {
62 1 : return base.UserKeyBoundsEndExclusive(k.Start, k.End)
63 1 : }
64 :
65 : // OverlapsInternalKeyRange checks if the specified internal key range has an
66 : // overlap with the KeyRange. Note that we aren't checking for full containment
67 : // of smallest-largest within k, rather just that there's some intersection
68 : // between the two ranges.
69 1 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
70 1 : ukb := k.UserKeyBounds()
71 1 : b := base.UserKeyBoundsFromInternal(smallest, largest)
72 1 : return ukb.Overlaps(cmp, &b)
73 1 : }
74 :
75 : // Overlaps checks if the specified file has an overlap with the KeyRange.
76 : // Note that we aren't checking for full containment of m within k, rather just
77 : // that there's some intersection between m and k's bounds.
78 0 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
79 0 : b := k.UserKeyBounds()
80 0 : return m.Overlaps(cmp, &b)
81 0 : }
82 :
83 : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
84 : // Note that we aren't checking for full containment of either span in the other,
85 : // just that there's a key x that is in both key ranges.
86 1 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
87 1 : return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
88 1 : }
89 :
90 1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
91 1 : if key.Kind() == InternalKeyKindInvalid {
92 0 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
93 0 : key.Pretty(opts.Comparer.FormatKey))
94 0 : }
95 1 : if key.SeqNum() != 0 {
96 0 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
97 0 : key.Pretty(opts.Comparer.FormatKey))
98 0 : }
99 1 : return nil
100 : }
101 :
102 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
103 : // or shared by another node.
104 : func ingestSynthesizeShared(
105 : opts *Options, sm SharedSSTMeta, fileNum base.FileNum,
106 1 : ) (*fileMetadata, error) {
107 1 : if sm.Size == 0 {
108 0 : // Disallow 0 file sizes
109 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
110 0 : }
111 : // Don't load table stats. Doing a round trip to shared storage, one SST
112 : // at a time is not worth it as it slows down ingestion.
113 1 : meta := &fileMetadata{
114 1 : FileNum: fileNum,
115 1 : CreationTime: time.Now().Unix(),
116 1 : Virtual: true,
117 1 : Size: sm.Size,
118 1 : }
119 1 : // For simplicity, we use the same number for both the FileNum and the
120 1 : // DiskFileNum (even though this is a virtual sstable). Pass the underlying
121 1 : // FileBacking's size to the same size as the virtualized view of the sstable.
122 1 : // This ensures that we don't over-prioritize this sstable for compaction just
123 1 : // yet, as we do not have a clear sense of what parts of this sstable are
124 1 : // referenced by other nodes.
125 1 : meta.InitProviderBacking(base.DiskFileNum(fileNum), sm.Size)
126 1 :
127 1 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
128 1 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
129 1 : //
130 1 : // NB: We create new internal keys and pass them into ExtendPointKeyBounds
131 1 : // so that we can sub a zero sequence number into the bounds. We can set
132 1 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
133 1 : // anyway. However, we do need to use the same sequence number across all
134 1 : // bound keys at this step so that we end up with bounds that are consistent
135 1 : // across point/range keys.
136 1 : //
137 1 : // Because of the sequence number rewriting, we cannot use the Kind of
138 1 : // sm.SmallestPointKey. For example, the original SST might start with
139 1 : // a.SET.2 and a.RANGEDEL.1 (with a.SET.2 being the smallest key); after
140 1 : // rewriting the sequence numbers, these keys become a.SET.100 and
141 1 : // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a
142 1 : // correct bound, we just use the maximum key kind (which sorts first).
143 1 : // Similarly, we use the smallest key kind for the largest key.
144 1 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax)
145 1 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0)
146 1 : if sm.LargestPointKey.IsExclusiveSentinel() {
147 1 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
148 1 : }
149 1 : if opts.Comparer.Equal(smallestPointKey.UserKey, largestPointKey.UserKey) &&
150 1 : smallestPointKey.Trailer < largestPointKey.Trailer {
151 0 : // We get kinds from the sender, however we substitute our own sequence
152 0 : // numbers. This can result in cases where an sstable [b#5,SET-b#4,DELSIZED]
153 0 : // becomes [b#0,SET-b#0,DELSIZED] when we synthesize it here, but the
154 0 : // kinds need to be reversed now because DelSized > Set.
155 0 : smallestPointKey, largestPointKey = largestPointKey, smallestPointKey
156 0 : }
157 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
158 : }
159 1 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
160 1 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
161 1 : //
162 1 : // See comment above on why we use a zero sequence number and these key
163 1 : // kinds here.
164 1 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, base.InternalKeyKindRangeKeyMax)
165 1 : largestRangeKey := base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeyMin, sm.LargestRangeKey.UserKey)
166 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
167 1 : }
168 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
169 0 : return nil, err
170 0 : }
171 1 : return meta, nil
172 : }
173 :
174 : // ingestLoad1External loads the fileMetadata for one external sstable.
175 : // Sequence number and target level calculation happens during prepare/apply.
176 : func ingestLoad1External(
177 : opts *Options, e ExternalFile, fileNum base.FileNum,
178 1 : ) (*fileMetadata, error) {
179 1 : if e.Size == 0 {
180 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
181 0 : }
182 1 : if !e.HasRangeKey && !e.HasPointKey {
183 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
184 0 : }
185 :
186 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) > 0 {
187 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
188 0 : }
189 1 : if opts.Comparer.Compare(e.StartKey, e.EndKey) == 0 && !e.EndKeyIsInclusive {
190 0 : return nil, errors.Newf("pebble: external file bounds [%q, %q) are invalid", e.StartKey, e.EndKey)
191 0 : }
192 1 : if n := opts.Comparer.Split(e.StartKey); n != len(e.StartKey) {
193 0 : return nil, errors.Newf("pebble: external file bounds start key %q has suffix", e.StartKey)
194 0 : }
195 1 : if n := opts.Comparer.Split(e.EndKey); n != len(e.EndKey) {
196 0 : return nil, errors.Newf("pebble: external file bounds end key %q has suffix", e.EndKey)
197 0 : }
198 :
199 : // #3287: range keys don't yet work correctly when the range key bounds are not tight.
200 1 : if e.HasRangeKey {
201 0 : return nil, errors.New("pebble: range keys not supported in external files")
202 0 : }
203 :
204 : // Don't load table stats. Doing a round trip to shared storage, one SST
205 : // at a time is not worth it as it slows down ingestion.
206 1 : meta := &fileMetadata{
207 1 : FileNum: fileNum,
208 1 : CreationTime: time.Now().Unix(),
209 1 : Virtual: true,
210 1 : Size: e.Size,
211 1 : }
212 1 :
213 1 : // In the name of keeping this ingestion as fast as possible, we avoid
214 1 : // *all* existence checks and synthesize a file metadata with smallest/largest
215 1 : // keys that overlap whatever the passed-in span was.
216 1 : smallestCopy := slices.Clone(e.StartKey)
217 1 : largestCopy := slices.Clone(e.EndKey)
218 1 : if e.HasPointKey {
219 1 : // Sequence numbers are updated later by
220 1 : // ingestUpdateSeqNum, applying a squence number that
221 1 : // is applied to all keys in the sstable.
222 1 : if e.EndKeyIsInclusive {
223 0 : meta.ExtendPointKeyBounds(
224 0 : opts.Comparer.Compare,
225 0 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
226 0 : base.MakeInternalKey(largestCopy, 0, 0))
227 1 : } else {
228 1 : meta.ExtendPointKeyBounds(
229 1 : opts.Comparer.Compare,
230 1 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
231 1 : base.MakeRangeDeleteSentinelKey(largestCopy))
232 1 : }
233 : }
234 1 : if e.HasRangeKey {
235 0 : meta.ExtendRangeKeyBounds(
236 0 : opts.Comparer.Compare,
237 0 : base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeyMax),
238 0 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyMin, largestCopy),
239 0 : )
240 0 : }
241 :
242 1 : meta.SyntheticPrefix = e.SyntheticPrefix
243 1 : meta.SyntheticSuffix = e.SyntheticSuffix
244 1 :
245 1 : return meta, nil
246 : }
247 :
248 : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
249 : // by this store.
250 : func ingestLoad1(
251 : opts *Options,
252 : fmv FormatMajorVersion,
253 : readable objstorage.Readable,
254 : cacheID uint64,
255 : fileNum base.FileNum,
256 1 : ) (*fileMetadata, error) {
257 1 : cacheOpts := private.SSTableCacheOpts(cacheID, base.PhysicalTableDiskFileNum(fileNum)).(sstable.ReaderOption)
258 1 : r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
259 1 : if err != nil {
260 0 : return nil, err
261 0 : }
262 1 : defer r.Close()
263 1 :
264 1 : // Avoid ingesting tables with format versions this DB doesn't support.
265 1 : tf, err := r.TableFormat()
266 1 : if err != nil {
267 0 : return nil, err
268 0 : }
269 1 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
270 0 : return nil, errors.Newf(
271 0 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
272 0 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
273 0 : )
274 0 : }
275 :
276 1 : meta := &fileMetadata{}
277 1 : meta.FileNum = fileNum
278 1 : meta.Size = uint64(readable.Size())
279 1 : meta.CreationTime = time.Now().Unix()
280 1 : meta.InitPhysicalBacking()
281 1 :
282 1 : // Avoid loading into the table cache for collecting stats if we
283 1 : // don't need to. If there are no range deletions, we have all the
284 1 : // information to compute the stats here.
285 1 : //
286 1 : // This is helpful in tests for avoiding awkwardness around deletion of
287 1 : // ingested files from MemFS. MemFS implements the Windows semantics of
288 1 : // disallowing removal of an open file. Under MemFS, if we don't populate
289 1 : // meta.Stats here, the file will be loaded into the table cache for
290 1 : // calculating stats before we can remove the original link.
291 1 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
292 1 :
293 1 : {
294 1 : iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
295 1 : if err != nil {
296 0 : return nil, err
297 0 : }
298 1 : defer iter.Close()
299 1 : var smallest InternalKey
300 1 : if kv := iter.First(); kv != nil {
301 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
302 0 : return nil, err
303 0 : }
304 1 : smallest = kv.K.Clone()
305 : }
306 1 : if err := iter.Error(); err != nil {
307 0 : return nil, err
308 0 : }
309 1 : if kv := iter.Last(); kv != nil {
310 1 : if err := ingestValidateKey(opts, &kv.K); err != nil {
311 0 : return nil, err
312 0 : }
313 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, kv.K.Clone())
314 : }
315 1 : if err := iter.Error(); err != nil {
316 0 : return nil, err
317 0 : }
318 : }
319 :
320 1 : iter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
321 1 : if err != nil {
322 0 : return nil, err
323 0 : }
324 1 : if iter != nil {
325 1 : defer iter.Close()
326 1 : var smallest InternalKey
327 1 : if s, err := iter.First(); err != nil {
328 0 : return nil, err
329 1 : } else if s != nil {
330 1 : key := s.SmallestKey()
331 1 : if err := ingestValidateKey(opts, &key); err != nil {
332 0 : return nil, err
333 0 : }
334 1 : smallest = key.Clone()
335 : }
336 1 : if s, err := iter.Last(); err != nil {
337 0 : return nil, err
338 1 : } else if s != nil {
339 1 : k := s.SmallestKey()
340 1 : if err := ingestValidateKey(opts, &k); err != nil {
341 0 : return nil, err
342 0 : }
343 1 : largest := s.LargestKey().Clone()
344 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
345 : }
346 : }
347 :
348 : // Update the range-key bounds for the table.
349 1 : {
350 1 : iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms)
351 1 : if err != nil {
352 0 : return nil, err
353 0 : }
354 1 : if iter != nil {
355 1 : defer iter.Close()
356 1 : var smallest InternalKey
357 1 : if s, err := iter.First(); err != nil {
358 0 : return nil, err
359 1 : } else if s != nil {
360 1 : key := s.SmallestKey()
361 1 : if err := ingestValidateKey(opts, &key); err != nil {
362 0 : return nil, err
363 0 : }
364 1 : smallest = key.Clone()
365 : }
366 1 : if s, err := iter.Last(); err != nil {
367 0 : return nil, err
368 1 : } else if s != nil {
369 1 : k := s.SmallestKey()
370 1 : if err := ingestValidateKey(opts, &k); err != nil {
371 0 : return nil, err
372 0 : }
373 : // As range keys are fragmented, the end key of the last range key in
374 : // the table provides the upper bound for the table.
375 1 : largest := s.LargestKey().Clone()
376 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
377 : }
378 : }
379 : }
380 :
381 1 : if !meta.HasPointKeys && !meta.HasRangeKeys {
382 1 : return nil, nil
383 1 : }
384 :
385 : // Sanity check that the various bounds on the file were set consistently.
386 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
387 0 : return nil, err
388 0 : }
389 :
390 1 : return meta, nil
391 : }
392 :
393 : type ingestLoadResult struct {
394 : local []ingestLocalMeta
395 : shared []ingestSharedMeta
396 : external []ingestExternalMeta
397 :
398 : externalFilesHaveLevel bool
399 : }
400 :
401 : type ingestLocalMeta struct {
402 : *fileMetadata
403 : path string
404 : }
405 :
406 : type ingestSharedMeta struct {
407 : *fileMetadata
408 : shared SharedSSTMeta
409 : }
410 :
411 : type ingestExternalMeta struct {
412 : *fileMetadata
413 : external ExternalFile
414 : // usedExistingBacking is true if the external file is reusing a backing
415 : // that existed before this ingestion. In this case, we called
416 : // VirtualBackings.Protect() on that backing; we will need to call
417 : // Unprotect() after the ingestion.
418 : usedExistingBacking bool
419 : }
420 :
421 1 : func (r *ingestLoadResult) fileCount() int {
422 1 : return len(r.local) + len(r.shared) + len(r.external)
423 1 : }
424 :
425 : func ingestLoad(
426 : opts *Options,
427 : fmv FormatMajorVersion,
428 : paths []string,
429 : shared []SharedSSTMeta,
430 : external []ExternalFile,
431 : cacheID uint64,
432 : pending []base.FileNum,
433 1 : ) (ingestLoadResult, error) {
434 1 : localFileNums := pending[:len(paths)]
435 1 : sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
436 1 : externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
437 1 :
438 1 : var result ingestLoadResult
439 1 : result.local = make([]ingestLocalMeta, 0, len(paths))
440 1 : for i := range paths {
441 1 : f, err := opts.FS.Open(paths[i])
442 1 : if err != nil {
443 0 : return ingestLoadResult{}, err
444 0 : }
445 :
446 1 : readable, err := sstable.NewSimpleReadable(f)
447 1 : if err != nil {
448 0 : return ingestLoadResult{}, err
449 0 : }
450 1 : m, err := ingestLoad1(opts, fmv, readable, cacheID, localFileNums[i])
451 1 : if err != nil {
452 0 : return ingestLoadResult{}, err
453 0 : }
454 1 : if m != nil {
455 1 : result.local = append(result.local, ingestLocalMeta{
456 1 : fileMetadata: m,
457 1 : path: paths[i],
458 1 : })
459 1 : }
460 : }
461 :
462 : // Sort the shared files according to level.
463 1 : sort.Sort(sharedByLevel(shared))
464 1 :
465 1 : result.shared = make([]ingestSharedMeta, 0, len(shared))
466 1 : for i := range shared {
467 1 : m, err := ingestSynthesizeShared(opts, shared[i], sharedFileNums[i])
468 1 : if err != nil {
469 0 : return ingestLoadResult{}, err
470 0 : }
471 1 : if shared[i].Level < sharedLevelsStart {
472 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
473 0 : }
474 1 : result.shared = append(result.shared, ingestSharedMeta{
475 1 : fileMetadata: m,
476 1 : shared: shared[i],
477 1 : })
478 : }
479 1 : result.external = make([]ingestExternalMeta, 0, len(external))
480 1 : for i := range external {
481 1 : m, err := ingestLoad1External(opts, external[i], externalFileNums[i])
482 1 : if err != nil {
483 0 : return ingestLoadResult{}, err
484 0 : }
485 1 : result.external = append(result.external, ingestExternalMeta{
486 1 : fileMetadata: m,
487 1 : external: external[i],
488 1 : })
489 1 : if external[i].Level > 0 {
490 0 : if i != 0 && !result.externalFilesHaveLevel {
491 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
492 0 : }
493 0 : result.externalFilesHaveLevel = true
494 1 : } else if result.externalFilesHaveLevel {
495 0 : return ingestLoadResult{}, base.AssertionFailedf("pebble: external sstables must all have level set or unset")
496 0 : }
497 : }
498 1 : return result, nil
499 : }
500 :
501 1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
502 1 : // Verify that all the shared files (i.e. files in sharedMeta)
503 1 : // fit within the exciseSpan.
504 1 : for _, f := range lr.shared {
505 1 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
506 0 : return errors.Newf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
507 0 : }
508 : }
509 :
510 1 : if lr.externalFilesHaveLevel {
511 0 : for _, f := range lr.external {
512 0 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
513 0 : return base.AssertionFailedf("pebble: external file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
514 0 : }
515 : }
516 : }
517 :
518 1 : if len(lr.external) > 0 {
519 1 : if len(lr.shared) > 0 {
520 0 : // If external files are present alongside shared files,
521 0 : // return an error.
522 0 : return base.AssertionFailedf("pebble: external files cannot be ingested atomically alongside shared files")
523 0 : }
524 :
525 : // Sort according to the smallest key.
526 1 : slices.SortFunc(lr.external, func(a, b ingestExternalMeta) int {
527 1 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
528 1 : })
529 1 : for i := 1; i < len(lr.external); i++ {
530 1 : if sstableKeyCompare(cmp, lr.external[i-1].Largest, lr.external[i].Smallest) >= 0 {
531 0 : return errors.Newf("pebble: external sstables have overlapping ranges")
532 0 : }
533 : }
534 1 : return nil
535 : }
536 1 : if len(lr.local) <= 1 {
537 1 : return nil
538 1 : }
539 :
540 : // Sort according to the smallest key.
541 1 : slices.SortFunc(lr.local, func(a, b ingestLocalMeta) int {
542 1 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
543 1 : })
544 :
545 1 : for i := 1; i < len(lr.local); i++ {
546 1 : if sstableKeyCompare(cmp, lr.local[i-1].Largest, lr.local[i].Smallest) >= 0 {
547 1 : return errors.Newf("pebble: local ingestion sstables have overlapping ranges")
548 1 : }
549 : }
550 1 : if len(lr.shared) == 0 {
551 1 : return nil
552 1 : }
553 0 : filesInLevel := make([]*fileMetadata, 0, len(lr.shared))
554 0 : for l := sharedLevelsStart; l < numLevels; l++ {
555 0 : filesInLevel = filesInLevel[:0]
556 0 : for i := range lr.shared {
557 0 : if lr.shared[i].shared.Level == uint8(l) {
558 0 : filesInLevel = append(filesInLevel, lr.shared[i].fileMetadata)
559 0 : }
560 : }
561 0 : for i := range lr.external {
562 0 : if lr.external[i].external.Level == uint8(l) {
563 0 : filesInLevel = append(filesInLevel, lr.external[i].fileMetadata)
564 0 : }
565 : }
566 0 : slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
567 0 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
568 0 : })
569 0 : for i := 1; i < len(filesInLevel); i++ {
570 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
571 0 : return base.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
572 0 : }
573 : }
574 : }
575 0 : return nil
576 : }
577 :
578 0 : func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) error {
579 0 : var firstErr error
580 0 : for i := range meta {
581 0 : if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
582 0 : firstErr = firstError(firstErr, err)
583 0 : }
584 : }
585 0 : return firstErr
586 : }
587 :
588 : // ingestLinkLocal creates new objects which are backed by either hardlinks to or
589 : // copies of the ingested files.
590 : func ingestLinkLocal(
591 : jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
592 1 : ) error {
593 1 : for i := range localMetas {
594 1 : objMeta, err := objProvider.LinkOrCopyFromLocal(
595 1 : context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
596 1 : objstorage.CreateOptions{PreferSharedStorage: true},
597 1 : )
598 1 : if err != nil {
599 0 : if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
600 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
601 0 : }
602 0 : return err
603 : }
604 1 : if opts.EventListener.TableCreated != nil {
605 1 : opts.EventListener.TableCreated(TableCreateInfo{
606 1 : JobID: int(jobID),
607 1 : Reason: "ingesting",
608 1 : Path: objProvider.Path(objMeta),
609 1 : FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
610 1 : })
611 1 : }
612 : }
613 1 : return nil
614 : }
615 :
616 : // ingestAttachRemote attaches remote objects to the storage provider.
617 : //
618 : // For external objects, we reuse existing FileBackings from the current version
619 : // when possible.
620 : //
621 : // ingestUnprotectExternalBackings() must be called after this function (even in
622 : // error cases).
623 1 : func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
624 1 : remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
625 1 : for i := range lr.shared {
626 1 : backing, err := lr.shared[i].shared.Backing.Get()
627 1 : if err != nil {
628 0 : return err
629 0 : }
630 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
631 1 : FileNum: lr.shared[i].FileBacking.DiskFileNum,
632 1 : FileType: fileTypeTable,
633 1 : Backing: backing,
634 1 : })
635 : }
636 :
637 1 : d.findExistingBackingsForExternalObjects(lr.external)
638 1 :
639 1 : newFileBackings := make(map[remote.ObjectKey]*fileBacking, len(lr.external))
640 1 : for i := range lr.external {
641 1 : meta := lr.external[i].fileMetadata
642 1 : if meta.FileBacking != nil {
643 1 : // The backing was filled in by findExistingBackingsForExternalObjects().
644 1 : continue
645 : }
646 1 : key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
647 1 : if backing, ok := newFileBackings[key]; ok {
648 1 : // We already created the same backing in this loop.
649 1 : meta.FileBacking = backing
650 1 : continue
651 : }
652 1 : providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
653 1 : if err != nil {
654 0 : return err
655 0 : }
656 : // We have to attach the remote object (and assign it a DiskFileNum). For
657 : // simplicity, we use the same number for both the FileNum and the
658 : // DiskFileNum (even though this is a virtual sstable).
659 1 : meta.InitProviderBacking(base.DiskFileNum(meta.FileNum), lr.external[i].external.Size)
660 1 :
661 1 : // Set the underlying FileBacking's size to the same size as the virtualized
662 1 : // view of the sstable. This ensures that we don't over-prioritize this
663 1 : // sstable for compaction just yet, as we do not have a clear sense of
664 1 : // what parts of this sstable are referenced by other nodes.
665 1 : meta.FileBacking.Size = lr.external[i].external.Size
666 1 : newFileBackings[key] = meta.FileBacking
667 1 :
668 1 : remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
669 1 : FileNum: meta.FileBacking.DiskFileNum,
670 1 : FileType: fileTypeTable,
671 1 : Backing: providerBacking,
672 1 : })
673 : }
674 :
675 1 : for i := range lr.external {
676 1 : if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
677 0 : return err
678 0 : }
679 : }
680 :
681 1 : remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
682 1 : if err != nil {
683 0 : return err
684 0 : }
685 :
686 1 : for i := range lr.shared {
687 1 : // One corner case around file sizes we need to be mindful of, is that
688 1 : // if one of the shareObjs was initially created by us (and has boomeranged
689 1 : // back from another node), we'll need to update the FileBacking's size
690 1 : // to be the true underlying size. Otherwise, we could hit errors when we
691 1 : // open the db again after a crash/restart (see checkConsistency in open.go),
692 1 : // plus it more accurately allows us to prioritize compactions of files
693 1 : // that were originally created by us.
694 1 : if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
695 1 : size, err := d.objProvider.Size(remoteObjMetas[i])
696 1 : if err != nil {
697 0 : return err
698 0 : }
699 1 : lr.shared[i].FileBacking.Size = uint64(size)
700 : }
701 : }
702 :
703 1 : if invariants.Enabled || d.opts.Experimental.CheckExternalIngestions {
704 1 : // Verify that the ingestion is not empty. Empty tables cause correctness
705 1 : // issues in some iterators.
706 1 : for i := range lr.external {
707 1 : e := &lr.external[i]
708 1 : isEmpty, err := func() (bool, error) {
709 1 : readable, err := d.objProvider.OpenForReading(
710 1 : context.Background(), fileTypeTable, e.FileBacking.DiskFileNum, objstorage.OpenOptions{},
711 1 : )
712 1 : if err != nil {
713 0 : return false, err
714 0 : }
715 1 : reader, err := sstable.NewReader(readable, d.opts.MakeReaderOptions())
716 1 : if err != nil {
717 0 : readable.Close()
718 0 : return false, err
719 0 : }
720 1 : defer reader.Close()
721 1 : iter, err := reader.NewIter(e.IterTransforms(), nil, nil)
722 1 : if err != nil {
723 0 : return false, err
724 0 : }
725 1 : if iter != nil {
726 1 : defer iter.Close()
727 1 : if kv := iter.SeekGE(e.external.StartKey, base.SeekGEFlagsNone); kv != nil {
728 1 : cmp := d.opts.Comparer.Compare(kv.K.UserKey, e.external.EndKey)
729 1 : if cmp < 0 || (cmp == 0 && e.external.EndKeyIsInclusive) {
730 1 : return false, nil
731 1 : }
732 : }
733 1 : if err := iter.Error(); err != nil {
734 0 : return false, err
735 0 : }
736 : }
737 1 : rangeIter, err := reader.NewRawRangeDelIter(e.IterTransforms())
738 1 : if err != nil {
739 0 : return false, err
740 0 : }
741 1 : if rangeIter != nil {
742 1 : defer rangeIter.Close()
743 1 : span, err := rangeIter.SeekGE(e.external.StartKey)
744 1 : if err != nil {
745 0 : return false, err
746 0 : }
747 1 : if span != nil {
748 1 : cmp := d.opts.Comparer.Compare(span.Start, e.external.EndKey)
749 1 : if cmp < 0 || (cmp == 0 && e.external.EndKeyIsInclusive) {
750 1 : return false, nil
751 1 : }
752 : }
753 : }
754 0 : return true, nil
755 : }()
756 1 : if err != nil {
757 0 : return err
758 0 : }
759 1 : if isEmpty {
760 0 : panic(fmt.Sprintf("external ingestion is empty: %s (%q %q)", e.external.ObjName, e.external.StartKey, e.external.EndKey))
761 : }
762 : }
763 : }
764 :
765 1 : if d.opts.EventListener.TableCreated != nil {
766 1 : for i := range remoteObjMetas {
767 1 : d.opts.EventListener.TableCreated(TableCreateInfo{
768 1 : JobID: int(jobID),
769 1 : Reason: "ingesting",
770 1 : Path: d.objProvider.Path(remoteObjMetas[i]),
771 1 : FileNum: remoteObjMetas[i].DiskFileNum,
772 1 : })
773 1 : }
774 : }
775 :
776 1 : return nil
777 : }
778 :
779 : // findExistingBackingsForExternalObjects populates the FileBacking for external
780 : // files which are already in use by the current version.
781 : //
782 : // We take a Ref and LatestRef on populated backings.
783 1 : func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
784 1 : d.mu.Lock()
785 1 : defer d.mu.Unlock()
786 1 :
787 1 : for i := range metas {
788 1 : diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
789 1 : // We cross-check against fileBackings in the current version because it is
790 1 : // possible that the external object is referenced by an sstable which only
791 1 : // exists in a previous version. In that case, that object could be removed
792 1 : // at any time so we cannot reuse it.
793 1 : for _, n := range diskFileNums {
794 1 : if backing, ok := d.mu.versions.virtualBackings.Get(n); ok {
795 1 : // Protect this backing from being removed from the latest version. We
796 1 : // will unprotect in ingestUnprotectExternalBackings.
797 1 : d.mu.versions.virtualBackings.Protect(n)
798 1 : metas[i].usedExistingBacking = true
799 1 : metas[i].FileBacking = backing
800 1 : break
801 : }
802 : }
803 : }
804 : }
805 :
806 : // ingestUnprotectExternalBackings unprotects the file backings that were reused
807 : // for external objects when the ingestion fails.
808 1 : func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
809 1 : d.mu.Lock()
810 1 : defer d.mu.Unlock()
811 1 :
812 1 : for _, meta := range lr.external {
813 1 : if meta.usedExistingBacking {
814 1 : // If the backing is not use anywhere else and the ingest failed (or the
815 1 : // ingested tables were already compacted away), this call will cause in
816 1 : // the next version update to remove the backing.
817 1 : d.mu.versions.virtualBackings.Unprotect(meta.FileBacking.DiskFileNum)
818 1 : }
819 : }
820 : }
821 :
822 1 : func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format base.FormatKey) error {
823 1 : setSeqFn := func(k base.InternalKey) base.InternalKey {
824 1 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
825 1 : }
826 : // NB: we set the fields directly here, rather than via their Extend*
827 : // methods, as we are updating sequence numbers.
828 1 : if m.HasPointKeys {
829 1 : m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
830 1 : }
831 1 : if m.HasRangeKeys {
832 1 : m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
833 1 : }
834 1 : m.Smallest = setSeqFn(m.Smallest)
835 1 : // Only update the seqnum for the largest key if that key is not an
836 1 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
837 1 : // boundary), as doing so effectively drops the exclusive sentinel (by
838 1 : // lowering the seqnum from the max value), and extends the bounds of the
839 1 : // table.
840 1 : // NB: as the largest range key is always an exclusive sentinel, it is never
841 1 : // updated.
842 1 : if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
843 1 : m.LargestPointKey = setSeqFn(m.LargestPointKey)
844 1 : }
845 1 : if !m.Largest.IsExclusiveSentinel() {
846 1 : m.Largest = setSeqFn(m.Largest)
847 1 : }
848 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
849 : // Properties.GlobalSeqNum when an sstable is loaded.
850 1 : m.SmallestSeqNum = seqNum
851 1 : m.LargestSeqNum = seqNum
852 1 : // Ensure the new bounds are consistent.
853 1 : if err := m.Validate(cmp, format); err != nil {
854 0 : return err
855 0 : }
856 1 : return nil
857 : }
858 :
859 : func ingestUpdateSeqNum(
860 : cmp Compare, format base.FormatKey, seqNum uint64, loadResult ingestLoadResult,
861 1 : ) error {
862 1 : // Shared sstables are required to be sorted by level ascending. We then
863 1 : // iterate the shared sstables in reverse, assigning the lower sequence
864 1 : // numbers to the shared sstables that will be ingested into the lower
865 1 : // (larger numbered) levels first. This ensures sequence number shadowing is
866 1 : // correct.
867 1 : for i := len(loadResult.shared) - 1; i >= 0; i-- {
868 1 : if i-1 >= 0 && loadResult.shared[i-1].shared.Level > loadResult.shared[i].shared.Level {
869 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.shared[i-1], loadResult.shared[i]))
870 : }
871 1 : if err := setSeqNumInMetadata(loadResult.shared[i].fileMetadata, seqNum, cmp, format); err != nil {
872 0 : return err
873 0 : }
874 1 : seqNum++
875 : }
876 1 : for i := range loadResult.external {
877 1 : if err := setSeqNumInMetadata(loadResult.external[i].fileMetadata, seqNum, cmp, format); err != nil {
878 0 : return err
879 0 : }
880 1 : seqNum++
881 : }
882 1 : for i := range loadResult.local {
883 1 : if err := setSeqNumInMetadata(loadResult.local[i].fileMetadata, seqNum, cmp, format); err != nil {
884 0 : return err
885 0 : }
886 1 : seqNum++
887 : }
888 1 : return nil
889 : }
890 :
891 : // overlapWIthIterator returns true if the given iterators produce keys or spans
892 : // overlapping the given UserKeyBounds. May return false positives (e.g. in
893 : // error cases).
894 : func overlapWithIterator(
895 : iter internalIterator,
896 : rangeDelIter *keyspan.FragmentIterator,
897 : rkeyIter keyspan.FragmentIterator,
898 : bounds base.UserKeyBounds,
899 : cmp Compare,
900 1 : ) bool {
901 1 : // Check overlap with point operations.
902 1 : //
903 1 : // When using levelIter, it seeks to the SST whose boundaries
904 1 : // contain keyRange.smallest.UserKey(S).
905 1 : // It then tries to find a point in that SST that is >= S.
906 1 : // If there's no such point it means the SST ends in a tombstone in which case
907 1 : // levelIter.SeekGE generates a boundary range del sentinel.
908 1 : // The comparison of this boundary with keyRange.largest(L) below
909 1 : // is subtle but maintains correctness.
910 1 : // 1) boundary < L,
911 1 : // since boundary is also > S (initial seek),
912 1 : // whatever the boundary's start key may be, we're always overlapping.
913 1 : // 2) boundary > L,
914 1 : // overlap with boundary cannot be determined since we don't know boundary's start key.
915 1 : // We require checking for overlap with rangeDelIter.
916 1 : // 3) boundary == L and L is not sentinel,
917 1 : // means boundary < L and hence is similar to 1).
918 1 : // 4) boundary == L and L is sentinel,
919 1 : // we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap.
920 1 : kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
921 1 : if kv != nil {
922 1 : if bounds.End.IsUpperBoundForInternalKey(cmp, kv.K) {
923 1 : return true
924 1 : }
925 : }
926 : // Assume overlap if iterator errored.
927 1 : if err := iter.Error(); err != nil {
928 0 : return true
929 0 : }
930 :
931 1 : computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) (bool, error) {
932 1 : // NB: The spans surfaced by the fragment iterator are non-overlapping.
933 1 : span, err := rIter.SeekGE(bounds.Start)
934 1 : if err != nil {
935 0 : return false, err
936 0 : }
937 1 : for ; span != nil; span, err = rIter.Next() {
938 1 : if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
939 1 : // The span starts after our bounds.
940 1 : return false, nil
941 1 : }
942 1 : if !span.Empty() {
943 1 : return true, nil
944 1 : }
945 : }
946 1 : return false, err
947 : }
948 :
949 : // rkeyIter is either a range key level iter, or a range key iterator
950 : // over a single file.
951 1 : if rkeyIter != nil {
952 1 : // If an error occurs, assume overlap.
953 1 : if overlap, err := computeOverlapWithSpans(rkeyIter); overlap || err != nil {
954 1 : return true
955 1 : }
956 : }
957 :
958 : // Check overlap with range deletions.
959 1 : if rangeDelIter == nil || *rangeDelIter == nil {
960 1 : return false
961 1 : }
962 1 : overlap, err := computeOverlapWithSpans(*rangeDelIter)
963 1 : // If an error occurs, assume overlap.
964 1 : return overlap || err != nil
965 : }
966 :
967 : // ingestTargetLevel returns the target level for a file being ingested.
968 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
969 : // its target level calculation, and if a split candidate is found, that file
970 : // is returned as the splitFile.
971 : func ingestTargetLevel(
972 : newIters tableNewIters,
973 : newRangeKeyIter keyspanimpl.TableNewSpanIter,
974 : iterOps IterOptions,
975 : comparer *Comparer,
976 : v *version,
977 : baseLevel int,
978 : compactions map[*compaction]struct{},
979 : meta *fileMetadata,
980 : suggestSplit bool,
981 1 : ) (targetLevel int, splitFile *fileMetadata, err error) {
982 1 : // Find the lowest level which does not have any files which overlap meta. We
983 1 : // search from L0 to L6 looking for whether there are any files in the level
984 1 : // which overlap meta. We want the "lowest" level (where lower means
985 1 : // increasing level number) in order to reduce write amplification.
986 1 : //
987 1 : // There are 2 kinds of overlap we need to check for: file boundary overlap
988 1 : // and data overlap. Data overlap implies file boundary overlap. Note that it
989 1 : // is always possible to ingest into L0.
990 1 : //
991 1 : // To place meta at level i where i > 0:
992 1 : // - there must not be any data overlap with levels <= i, since that will
993 1 : // violate the sequence number invariant.
994 1 : // - no file boundary overlap with level i, since that will violate the
995 1 : // invariant that files do not overlap in levels i > 0.
996 1 : // - if there is only a file overlap at a given level, and no data overlap,
997 1 : // we can still slot a file at that level. We return the fileMetadata with
998 1 : // which we have file boundary overlap (must be only one file, as sstable
999 1 : // bounds are usually tight on user keys) and the caller is expected to split
1000 1 : // that sstable into two virtual sstables, allowing this file to go into that
1001 1 : // level. Note that if we have file boundary overlap with two files, which
1002 1 : // should only happen on rare occasions, we treat it as data overlap and
1003 1 : // don't use this optimization.
1004 1 : //
1005 1 : // The file boundary overlap check is simpler to conceptualize. Consider the
1006 1 : // following example, in which the ingested file lies completely before or
1007 1 : // after the file being considered.
1008 1 : //
1009 1 : // |--| |--| ingested file: [a,b] or [f,g]
1010 1 : // |-----| existing file: [c,e]
1011 1 : // _____________________
1012 1 : // a b c d e f g
1013 1 : //
1014 1 : // In both cases the ingested file can move to considering the next level.
1015 1 : //
1016 1 : // File boundary overlap does not necessarily imply data overlap. The check
1017 1 : // for data overlap is a little more nuanced. Consider the following examples:
1018 1 : //
1019 1 : // 1. No data overlap:
1020 1 : //
1021 1 : // |-| |--| ingested file: [cc-d] or [ee-ff]
1022 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
1023 1 : // _____________________
1024 1 : // a b c d e f g
1025 1 : //
1026 1 : // In this case the ingested files can "fall through" this level. The checks
1027 1 : // continue at the next level.
1028 1 : //
1029 1 : // 2. Data overlap:
1030 1 : //
1031 1 : // |--| ingested file: [d-e]
1032 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
1033 1 : // _____________________
1034 1 : // a b c d e f g
1035 1 : //
1036 1 : // In this case the file cannot be ingested into this level as the point 'dd'
1037 1 : // is in the way.
1038 1 : //
1039 1 : // It is worth noting that the check for data overlap is only approximate. In
1040 1 : // the previous example, the ingested table [d-e] could contain only the
1041 1 : // points 'd' and 'e', in which case the table would be eligible for
1042 1 : // considering lower levels. However, such a fine-grained check would need to
1043 1 : // be exhaustive (comparing points and ranges in both the ingested existing
1044 1 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
1045 1 : // existing point that falls within the ingested table bounds as being "data
1046 1 : // overlap".
1047 1 :
1048 1 : // This assertion implicitly checks that we have the current version of
1049 1 : // the metadata.
1050 1 : if v.L0Sublevels == nil {
1051 0 : return 0, nil, base.AssertionFailedf("could not read L0 sublevels")
1052 0 : }
1053 1 : iterOps.CategoryAndQoS = sstable.CategoryAndQoS{
1054 1 : Category: "pebble-ingest",
1055 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1056 1 : }
1057 1 : // Check for overlap over the keys of L0 by iterating over the sublevels.
1058 1 : for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
1059 1 : iter := newLevelIter(context.Background(),
1060 1 : iterOps, comparer, newIters, v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})
1061 1 :
1062 1 : var rangeDelIter keyspan.FragmentIterator
1063 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
1064 1 : // sets it up for the target file.
1065 1 : iter.initRangeDel(&rangeDelIter)
1066 1 :
1067 1 : levelIter := keyspanimpl.LevelIter{}
1068 1 : levelIter.Init(
1069 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
1070 1 : v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange,
1071 1 : )
1072 1 :
1073 1 : overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, meta.UserKeyBounds(), comparer.Compare)
1074 1 : err := iter.Close() // Closes range del iter as well.
1075 1 : err = firstError(err, levelIter.Close())
1076 1 : if err != nil {
1077 0 : return 0, nil, err
1078 0 : }
1079 1 : if overlap {
1080 1 : return targetLevel, nil, nil
1081 1 : }
1082 : }
1083 :
1084 1 : level := baseLevel
1085 1 : for ; level < numLevels; level++ {
1086 1 : levelIter := newLevelIter(context.Background(),
1087 1 : iterOps, comparer, newIters, v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
1088 1 : var rangeDelIter keyspan.FragmentIterator
1089 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
1090 1 : // sets it up for the target file.
1091 1 : levelIter.initRangeDel(&rangeDelIter)
1092 1 :
1093 1 : rkeyLevelIter := &keyspanimpl.LevelIter{}
1094 1 : rkeyLevelIter.Init(
1095 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
1096 1 : v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange,
1097 1 : )
1098 1 :
1099 1 : overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, meta.UserKeyBounds(), comparer.Compare)
1100 1 : err := levelIter.Close() // Closes range del iter as well.
1101 1 : err = firstError(err, rkeyLevelIter.Close())
1102 1 : if err != nil {
1103 0 : return 0, nil, err
1104 0 : }
1105 1 : if overlap {
1106 1 : return targetLevel, splitFile, nil
1107 1 : }
1108 :
1109 : // Check boundary overlap.
1110 1 : var candidateSplitFile *fileMetadata
1111 1 : boundaryOverlaps := v.Overlaps(level, meta.UserKeyBounds())
1112 1 : if !boundaryOverlaps.Empty() {
1113 1 : // We are already guaranteed to not have any data overlaps with files
1114 1 : // in boundaryOverlaps, otherwise we'd have returned in the above if
1115 1 : // statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for
1116 1 : // the case where we can slot this file into the current level despite
1117 1 : // a boundary overlap, by splitting one existing file into two virtual
1118 1 : // sstables.
1119 1 : if suggestSplit && boundaryOverlaps.Len() == 1 {
1120 1 : iter := boundaryOverlaps.Iter()
1121 1 : candidateSplitFile = iter.First()
1122 1 : } else {
1123 1 : // We either don't want to suggest ingest-time splits (i.e.
1124 1 : // !suggestSplit), or we boundary-overlapped with more than one file.
1125 1 : continue
1126 : }
1127 : }
1128 :
1129 : // Check boundary overlap with any ongoing compactions. We consider an
1130 : // overlapping compaction that's writing files to an output level as
1131 : // equivalent to boundary overlap with files in that output level.
1132 : //
1133 : // We cannot check for data overlap with the new SSTs compaction will produce
1134 : // since compaction hasn't been done yet. However, there's no need to check
1135 : // since all keys in them will be from levels in [c.startLevel,
1136 : // c.outputLevel], and all those levels have already had their data overlap
1137 : // tested negative (else we'd have returned earlier).
1138 : //
1139 : // An alternative approach would be to cancel these compactions and proceed
1140 : // with an ingest-time split on this level if necessary. However, compaction
1141 : // cancellation can result in significant wasted effort and is best avoided
1142 : // unless necessary.
1143 1 : overlaps := false
1144 1 : for c := range compactions {
1145 1 : if c.outputLevel == nil || level != c.outputLevel.level {
1146 1 : continue
1147 : }
1148 1 : if comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
1149 1 : comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
1150 1 : overlaps = true
1151 1 : break
1152 : }
1153 : }
1154 1 : if !overlaps {
1155 1 : targetLevel = level
1156 1 : splitFile = candidateSplitFile
1157 1 : }
1158 : }
1159 1 : return targetLevel, splitFile, nil
1160 : }
1161 :
1162 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
1163 : // atomic and semantically equivalent to creating a single batch containing all
1164 : // of the mutations in the sstables. Ingestion may require the memtable to be
1165 : // flushed. The ingested sstable files are moved into the DB and must reside on
1166 : // the same filesystem as the DB. Sstables can be created for ingestion using
1167 : // sstable.Writer. On success, Ingest removes the input paths.
1168 : //
1169 : // Two types of sstables are accepted for ingestion(s): one is sstables present
1170 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
1171 : // present in remote.Storage, referred to as shared or foreign sstables. These
1172 : // shared sstables can be linked through objstorageprovider.Provider, and do not
1173 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
1174 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
1175 : //
1176 : // All sstables *must* be Sync()'d by the caller after all bytes are written
1177 : // and before its file handle is closed; failure to do so could violate
1178 : // durability or lead to corrupted on-disk state. This method cannot, in a
1179 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1180 : // properly synced to disk. Opening new file handles and Sync()-ing them
1181 : // does not always guarantee durability; see the discussion here on that:
1182 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1183 : //
1184 : // Ingestion loads each sstable into the lowest level of the LSM which it
1185 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1186 : // ingestion forces the memtable to flush, and then waits for the flush to
1187 : // occur. In some cases, such as with no foreign sstables and no excise span,
1188 : // ingestion that gets blocked on a memtable can join the flushable queue and
1189 : // finish even before the memtable has been flushed.
1190 : //
1191 : // The steps for ingestion are:
1192 : //
1193 : // 1. Allocate file numbers for every sstable being ingested.
1194 : // 2. Load the metadata for all sstables being ingested.
1195 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1196 : // sstables).
1197 : // 4. Hard link (or copy) the local sstables into the DB directory.
1198 : // 5. Allocate a sequence number to use for all of the entries in the
1199 : // local sstables. This is the step where overlap with memtables is
1200 : // determined. If there is overlap, we remember the most recent memtable
1201 : // that overlaps.
1202 : // 6. Update the sequence number in the ingested local sstables. (Remote
1203 : // sstables get fixed sequence numbers that were determined at load time.)
1204 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1205 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1206 : // 8.1. If an excise span was specified, figure out what sstables in the
1207 : // current version overlap with the excise span, and create new virtual
1208 : // sstables out of those sstables that exclude the excised span (DB.excise).
1209 : // 9. Publish the ingestion sequence number.
1210 : //
1211 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1212 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1213 : // mutations that get sequence numbers larger than the ingestion sequence
1214 : // number get queued up behind the ingestion waiting for it to complete. This
1215 : // can produce a noticeable hiccup in performance. See
1216 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1217 : // this hiccup.
1218 1 : func (d *DB) Ingest(paths []string) error {
1219 1 : if err := d.closed.Load(); err != nil {
1220 0 : panic(err)
1221 : }
1222 1 : if d.opts.ReadOnly {
1223 0 : return ErrReadOnly
1224 0 : }
1225 1 : _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, false, nil /* external */)
1226 1 : return err
1227 : }
1228 :
1229 : // IngestOperationStats provides some information about where in the LSM the
1230 : // bytes were ingested.
1231 : type IngestOperationStats struct {
1232 : // Bytes is the total bytes in the ingested sstables.
1233 : Bytes uint64
1234 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1235 : // into L0. This value is approximate when flushable ingests are active and
1236 : // an ingest overlaps an entry in the flushable queue. Currently, this
1237 : // approximation is very rough, only including tables that overlapped the
1238 : // memtable. This estimate may be improved with #2112.
1239 : ApproxIngestedIntoL0Bytes uint64
1240 : // MemtableOverlappingFiles is the count of ingested sstables
1241 : // that overlapped keys in the memtables.
1242 : MemtableOverlappingFiles int
1243 : }
1244 :
1245 : // ExternalFile are external sstables that can be referenced through
1246 : // objprovider and ingested as remote files that will not be refcounted or
1247 : // cleaned up. For use with online restore. Note that the underlying sstable
1248 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1249 : // is expected to only read the keys within those bounds.
1250 : type ExternalFile struct {
1251 : // Locator is the shared.Locator that can be used with objProvider to
1252 : // resolve a reference to this external sstable.
1253 : Locator remote.Locator
1254 :
1255 : // ObjName is the unique name of this sstable on Locator.
1256 : ObjName string
1257 :
1258 : // Size of the referenced proportion of the virtualized sstable. An estimate
1259 : // is acceptable in lieu of the backing file size.
1260 : Size uint64
1261 :
1262 : // StartKey and EndKey define the bounds of the sstable; the ingestion
1263 : // of this file will only result in keys within [StartKey, EndKey) if
1264 : // EndKeyIsInclusive is false or [StartKey, EndKey] if it is true.
1265 : // These bounds are loose i.e. it's possible for keys to not span the
1266 : // entirety of this range.
1267 : //
1268 : // StartKey and EndKey user keys must not have suffixes.
1269 : //
1270 : // Multiple ExternalFiles in one ingestion must all have non-overlapping
1271 : // bounds.
1272 : StartKey, EndKey []byte
1273 :
1274 : // EndKeyIsInclusive is true if EndKey should be treated as inclusive.
1275 : EndKeyIsInclusive bool
1276 :
1277 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1278 : // or range keys. If both structs are false, an error is returned during
1279 : // ingestion.
1280 : HasPointKey, HasRangeKey bool
1281 :
1282 : // SyntheticPrefix will prepend this suffix to all keys in the file during
1283 : // iteration. Note that the backing file itself is not modified.
1284 : //
1285 : // SyntheticPrefix must be a prefix of both Bounds.Start and Bounds.End.
1286 : SyntheticPrefix []byte
1287 :
1288 : // SyntheticSuffix will replace the suffix of every key in the file during
1289 : // iteration. Note that the file itself is not modified, rather, every key
1290 : // returned by an iterator will have the synthetic suffix.
1291 : //
1292 : // SyntheticSuffix can only be used under the following conditions:
1293 : // - the synthetic suffix must sort before any non-empty suffixes in the
1294 : // backing sst (the entire sst, not just the part restricted to Bounds).
1295 : // - the backing sst must not contain multiple keys with the same prefix.
1296 : SyntheticSuffix []byte
1297 :
1298 : // Level denotes the level at which this file was present at read time
1299 : // if the external file was returned by a scan of an existing Pebble
1300 : // instance. If Level is 0, this field is ignored.
1301 : Level uint8
1302 : }
1303 :
1304 : // IngestWithStats does the same as Ingest, and additionally returns
1305 : // IngestOperationStats.
1306 0 : func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
1307 0 : if err := d.closed.Load(); err != nil {
1308 0 : panic(err)
1309 : }
1310 0 : if d.opts.ReadOnly {
1311 0 : return IngestOperationStats{}, ErrReadOnly
1312 0 : }
1313 0 : return d.ingest(paths, ingestTargetLevel, nil, KeyRange{}, false, nil)
1314 : }
1315 :
1316 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1317 : // accepts external files (with locator info that can be resolved using
1318 : // d.opts.SharedStorage). These files must also be non-overlapping with
1319 : // each other, and must be resolvable through d.objProvider.
1320 1 : func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
1321 1 : if err := d.closed.Load(); err != nil {
1322 0 : panic(err)
1323 : }
1324 :
1325 1 : if d.opts.ReadOnly {
1326 0 : return IngestOperationStats{}, ErrReadOnly
1327 0 : }
1328 1 : if d.opts.Experimental.RemoteStorage == nil {
1329 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1330 0 : }
1331 1 : return d.ingest(nil, ingestTargetLevel, nil, KeyRange{}, false, external)
1332 : }
1333 :
1334 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1335 : // list of shared files to ingest that can be read from a remote.Storage through
1336 : // a Provider. All the shared files must live within exciseSpan, and any existing
1337 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1338 : // sstables (if not virtual already) and shrinking their spans to exclude
1339 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1340 : // ingestion process.
1341 : //
1342 : // Panics if this DB instance was not instantiated with a remote.Storage and
1343 : // shared sstables are present.
1344 : func (d *DB) IngestAndExcise(
1345 : paths []string,
1346 : shared []SharedSSTMeta,
1347 : external []ExternalFile,
1348 : exciseSpan KeyRange,
1349 : sstsContainExciseTombstone bool,
1350 1 : ) (IngestOperationStats, error) {
1351 1 : if err := d.closed.Load(); err != nil {
1352 0 : panic(err)
1353 : }
1354 1 : if d.opts.ReadOnly {
1355 0 : return IngestOperationStats{}, ErrReadOnly
1356 0 : }
1357 1 : if invariants.Enabled {
1358 1 : // Excise is only supported on prefix keys.
1359 1 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1360 0 : panic("IngestAndExcise called with suffixed start key")
1361 : }
1362 1 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1363 0 : panic("IngestAndExcise called with suffixed end key")
1364 : }
1365 : }
1366 1 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1367 0 : return IngestOperationStats{}, errors.Errorf(
1368 0 : "store has format major version %d; IngestAndExcise requires at least %d",
1369 0 : v, FormatMinForSharedObjects,
1370 0 : )
1371 0 : }
1372 1 : return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, sstsContainExciseTombstone, external)
1373 : }
1374 :
1375 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1376 : func (d *DB) newIngestedFlushableEntry(
1377 : meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange,
1378 1 : ) (*flushableEntry, error) {
1379 1 : // Update the sequence number for all of the sstables in the
1380 1 : // metadata. Writing the metadata to the manifest when the
1381 1 : // version edit is applied is the mechanism that persists the
1382 1 : // sequence number. The sstables themselves are left unmodified.
1383 1 : // In this case, a version edit will only be written to the manifest
1384 1 : // when the flushable is eventually flushed. If Pebble restarts in that
1385 1 : // time, then we'll lose the ingest sequence number information. But this
1386 1 : // information will also be reconstructed on node restart.
1387 1 : for i, m := range meta {
1388 1 : if err := setSeqNumInMetadata(m, seqNum+uint64(i), d.cmp, d.opts.Comparer.FormatKey); err != nil {
1389 0 : return nil, err
1390 0 : }
1391 : }
1392 :
1393 1 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan)
1394 1 :
1395 1 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1396 1 : // to and the sequence number within the WAL which we'll write this entry
1397 1 : // to.
1398 1 : entry := d.newFlushableEntry(f, logNum, seqNum)
1399 1 : // The flushable entry starts off with a single reader ref, so increment
1400 1 : // the FileMetadata.Refs.
1401 1 : for _, file := range f.files {
1402 1 : file.FileBacking.Ref()
1403 1 : }
1404 1 : entry.unrefFiles = func() []*fileBacking {
1405 1 : var obsolete []*fileBacking
1406 1 : for _, file := range f.files {
1407 1 : if file.FileBacking.Unref() == 0 {
1408 1 : obsolete = append(obsolete, file.FileMetadata.FileBacking)
1409 1 : }
1410 : }
1411 1 : return obsolete
1412 : }
1413 :
1414 1 : entry.flushForced = true
1415 1 : entry.releaseMemAccounting = func() {}
1416 1 : return entry, nil
1417 : }
1418 :
1419 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1420 : // we're holding both locks, the order in which we rotate the memtable or
1421 : // recycle the WAL in this function is irrelevant as long as the correct log
1422 : // numbers are assigned to the appropriate flushable.
1423 : func (d *DB) handleIngestAsFlushable(
1424 : meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange,
1425 1 : ) error {
1426 1 : b := d.NewBatch()
1427 1 : for _, m := range meta {
1428 1 : b.ingestSST(m.FileNum)
1429 1 : }
1430 1 : b.setSeqNum(seqNum)
1431 1 :
1432 1 : // If the WAL is disabled, then the logNum used to create the flushable
1433 1 : // entry doesn't matter. We just use the logNum assigned to the current
1434 1 : // mutable memtable. If the WAL is enabled, then this logNum will be
1435 1 : // overwritten by the logNum of the log which will contain the log entry
1436 1 : // for the ingestedFlushable.
1437 1 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1438 1 : if !d.opts.DisableWAL {
1439 1 : // We create a new WAL for the flushable instead of reusing the end of
1440 1 : // the previous WAL. This simplifies the increment of the minimum
1441 1 : // unflushed log number, and also simplifies WAL replay.
1442 1 : logNum, _ = d.recycleWAL()
1443 1 : d.mu.Unlock()
1444 1 : err := d.commit.directWrite(b)
1445 1 : if err != nil {
1446 0 : d.opts.Logger.Fatalf("%v", err)
1447 0 : }
1448 1 : d.mu.Lock()
1449 : }
1450 :
1451 1 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
1452 1 : if err != nil {
1453 0 : return err
1454 0 : }
1455 1 : nextSeqNum := seqNum + uint64(b.Count())
1456 1 :
1457 1 : // Set newLogNum to the logNum of the previous flushable. This value is
1458 1 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1459 1 : // the appropriate value below.
1460 1 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1461 1 : if !d.opts.DisableWAL {
1462 1 : // This is WAL num of the next mutable memtable which comes after the
1463 1 : // ingestedFlushable in the flushable queue. The mutable memtable
1464 1 : // will be created below.
1465 1 : newLogNum, _ = d.recycleWAL()
1466 1 : if err != nil {
1467 0 : return err
1468 0 : }
1469 : }
1470 :
1471 1 : d.mu.versions.metrics.Ingest.Count++
1472 1 : currMem := d.mu.mem.mutable
1473 1 : // NB: Placing ingested sstables above the current memtables
1474 1 : // requires rotating of the existing memtables/WAL. There is
1475 1 : // some concern of churning through tiny memtables due to
1476 1 : // ingested sstables being placed on top of them, but those
1477 1 : // memtables would have to be flushed anyways.
1478 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1479 1 : d.rotateMemtable(newLogNum, nextSeqNum, currMem)
1480 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1481 1 : // TODO(aaditya): is this necessary? we call this already in rotateMemtable above
1482 1 : d.maybeScheduleFlush()
1483 1 : return nil
1484 : }
1485 :
1486 : // See comment at Ingest() for details on how this works.
1487 : func (d *DB) ingest(
1488 : paths []string,
1489 : targetLevelFunc ingestTargetLevelFunc,
1490 : shared []SharedSSTMeta,
1491 : exciseSpan KeyRange,
1492 : sstsContainExciseTombstone bool,
1493 : external []ExternalFile,
1494 1 : ) (IngestOperationStats, error) {
1495 1 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1496 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1497 : }
1498 1 : if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1499 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1500 0 : }
1501 1 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixSuffix {
1502 0 : for i := range external {
1503 0 : if len(external[i].SyntheticPrefix) > 0 {
1504 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1505 0 : }
1506 0 : if len(external[i].SyntheticSuffix) > 0 {
1507 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic suffix ingestion")
1508 0 : }
1509 : }
1510 : }
1511 : // Allocate file numbers for all of the files being ingested and mark them as
1512 : // pending in order to prevent them from being deleted. Note that this causes
1513 : // the file number ordering to be out of alignment with sequence number
1514 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1515 : // that (busted) invariant.
1516 1 : d.mu.Lock()
1517 1 : pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
1518 1 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1519 1 : pendingOutputs[i] = d.mu.versions.getNextFileNum()
1520 1 : }
1521 :
1522 1 : jobID := d.newJobIDLocked()
1523 1 : d.mu.Unlock()
1524 1 :
1525 1 : // Load the metadata for all the files being ingested. This step detects
1526 1 : // and elides empty sstables.
1527 1 : loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
1528 1 : if err != nil {
1529 0 : return IngestOperationStats{}, err
1530 0 : }
1531 :
1532 1 : if loadResult.fileCount() == 0 {
1533 1 : // All of the sstables to be ingested were empty. Nothing to do.
1534 1 : return IngestOperationStats{}, nil
1535 1 : }
1536 :
1537 : // Verify the sstables do not overlap.
1538 1 : if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
1539 1 : return IngestOperationStats{}, err
1540 1 : }
1541 :
1542 : // Hard link the sstables into the DB directory. Since the sstables aren't
1543 : // referenced by a version, they won't be used. If the hard linking fails
1544 : // (e.g. because the files reside on a different filesystem), ingestLinkLocal
1545 : // will fall back to copying, and if that fails we undo our work and return an
1546 : // error.
1547 1 : if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
1548 0 : return IngestOperationStats{}, err
1549 0 : }
1550 :
1551 1 : err = d.ingestAttachRemote(jobID, loadResult)
1552 1 : defer d.ingestUnprotectExternalBackings(loadResult)
1553 1 : if err != nil {
1554 0 : return IngestOperationStats{}, err
1555 0 : }
1556 :
1557 : // Make the new tables durable. We need to do this at some point before we
1558 : // update the MANIFEST (via logAndApply), otherwise a crash can have the
1559 : // tables referenced in the MANIFEST, but not present in the provider.
1560 1 : if err := d.objProvider.Sync(); err != nil {
1561 0 : return IngestOperationStats{}, err
1562 0 : }
1563 :
1564 : // metaFlushableOverlaps is a map indicating which of the ingested sstables
1565 : // overlap some table in the flushable queue. It's used to approximate
1566 : // ingest-into-L0 stats when using flushable ingests.
1567 1 : metaFlushableOverlaps := make(map[FileNum]bool, loadResult.fileCount())
1568 1 : var mem *flushableEntry
1569 1 : var mut *memTable
1570 1 : // asFlushable indicates whether the sstable was ingested as a flushable.
1571 1 : var asFlushable bool
1572 1 : prepare := func(seqNum uint64) {
1573 1 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1574 1 :
1575 1 : // Determine the set of bounds we care about for the purpose of checking
1576 1 : // for overlap among the flushables. If there's an excise span, we need
1577 1 : // to check for overlap with its bounds as well.
1578 1 : overlapBounds := make([]bounded, 0, loadResult.fileCount()+1)
1579 1 : for _, m := range loadResult.local {
1580 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1581 1 : }
1582 1 : for _, m := range loadResult.shared {
1583 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1584 1 : }
1585 1 : for _, m := range loadResult.external {
1586 1 : overlapBounds = append(overlapBounds, m.fileMetadata)
1587 1 : }
1588 1 : if exciseSpan.Valid() {
1589 1 : overlapBounds = append(overlapBounds, &exciseSpan)
1590 1 : }
1591 :
1592 1 : d.mu.Lock()
1593 1 : defer d.mu.Unlock()
1594 1 :
1595 1 : // Check if any of the currently-open EventuallyFileOnlySnapshots overlap
1596 1 : // in key ranges with the excise span. If so, we need to check for memtable
1597 1 : // overlaps with all bounds of that EventuallyFileOnlySnapshot in addition
1598 1 : // to the ingestion's own bounds too.
1599 1 :
1600 1 : if exciseSpan.Valid() {
1601 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
1602 1 : if s.efos == nil {
1603 0 : continue
1604 : }
1605 1 : if base.Visible(seqNum, s.efos.seqNum, base.InternalKeySeqNumMax) {
1606 0 : // We only worry about snapshots older than the excise. Any snapshots
1607 0 : // created after the excise should see the excised view of the LSM
1608 0 : // anyway.
1609 0 : //
1610 0 : // Since we delay publishing the excise seqnum as visible until after
1611 0 : // the apply step, this case will never be hit in practice until we
1612 0 : // make excises flushable ingests.
1613 0 : continue
1614 : }
1615 1 : if invariants.Enabled {
1616 1 : if s.efos.hasTransitioned() {
1617 0 : panic("unexpected transitioned EFOS in snapshots list")
1618 : }
1619 : }
1620 1 : for i := range s.efos.protectedRanges {
1621 1 : if !s.efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
1622 1 : continue
1623 : }
1624 : // Our excise conflicts with this EFOS. We need to add its protected
1625 : // ranges to our overlapBounds. Grow overlapBounds in one allocation
1626 : // if necesary.
1627 1 : prs := s.efos.protectedRanges
1628 1 : if cap(overlapBounds) < len(overlapBounds)+len(prs) {
1629 1 : oldOverlapBounds := overlapBounds
1630 1 : overlapBounds = make([]bounded, len(oldOverlapBounds), len(oldOverlapBounds)+len(prs))
1631 1 : copy(overlapBounds, oldOverlapBounds)
1632 1 : }
1633 1 : for i := range prs {
1634 1 : overlapBounds = append(overlapBounds, &prs[i])
1635 1 : }
1636 1 : break
1637 : }
1638 : }
1639 : }
1640 :
1641 : // Check to see if any files overlap with any of the memtables. The queue
1642 : // is ordered from oldest to newest with the mutable memtable being the
1643 : // last element in the slice. We want to wait for the newest table that
1644 : // overlaps.
1645 :
1646 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1647 1 : m := d.mu.mem.queue[i]
1648 1 : m.computePossibleOverlaps(func(b bounded) shouldContinue {
1649 1 : // If this is the first table to overlap a flushable, save
1650 1 : // the flushable. This ingest must be ingested or flushed
1651 1 : // after it.
1652 1 : if mem == nil {
1653 1 : mem = m
1654 1 : }
1655 :
1656 1 : switch v := b.(type) {
1657 1 : case *fileMetadata:
1658 1 : // NB: False positives are possible if `m` is a flushable
1659 1 : // ingest that overlaps the file `v` in bounds but doesn't
1660 1 : // contain overlapping data. This is considered acceptable
1661 1 : // because it's rare (in CockroachDB a bound overlap likely
1662 1 : // indicates a data overlap), and blocking the commit
1663 1 : // pipeline while we perform I/O to check for overlap may be
1664 1 : // more disruptive than enqueueing this ingestion on the
1665 1 : // flushable queue and switching to a new memtable.
1666 1 : metaFlushableOverlaps[v.FileNum] = true
1667 1 : case *KeyRange:
1668 : // An excise span or an EventuallyFileOnlySnapshot protected range;
1669 : // not a file.
1670 0 : default:
1671 0 : panic("unreachable")
1672 : }
1673 1 : return continueIteration
1674 : }, overlapBounds...)
1675 : }
1676 :
1677 1 : if mem == nil {
1678 1 : // No overlap with any of the queued flushables, so no need to queue
1679 1 : // after them.
1680 1 :
1681 1 : // New writes with higher sequence numbers may be concurrently
1682 1 : // committed. We must ensure they don't flush before this ingest
1683 1 : // completes. To do that, we ref the mutable memtable as a writer,
1684 1 : // preventing its flushing (and the flushing of all subsequent
1685 1 : // flushables in the queue). Once we've acquired the manifest lock
1686 1 : // to add the ingested sstables to the LSM, we can unref as we're
1687 1 : // guaranteed that the flush won't edit the LSM before this ingest.
1688 1 : mut = d.mu.mem.mutable
1689 1 : mut.writerRef()
1690 1 : return
1691 1 : }
1692 :
1693 : // The ingestion overlaps with some entry in the flushable queue. If the
1694 : // pre-conditions are met below, we can treat this ingestion as a flushable
1695 : // ingest, otherwise we wait on the memtable flush before ingestion.
1696 : //
1697 : // TODO(aaditya): We should make flushableIngest compatible with remote
1698 : // files.
1699 1 : hasRemoteFiles := len(shared) > 0 || len(external) > 0
1700 1 : canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest &&
1701 1 : (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) &&
1702 1 : !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles
1703 1 :
1704 1 : if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) {
1705 1 : // We're not able to ingest as a flushable,
1706 1 : // so we must synchronously flush.
1707 1 : //
1708 1 : // TODO(bilal): Currently, if any of the files being ingested are shared or
1709 1 : // there's an excise span present, we cannot use flushable ingests and need
1710 1 : // to wait synchronously. Either remove this caveat by fleshing out
1711 1 : // flushable ingest logic to also account for these cases, or remove this
1712 1 : // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676
1713 1 : if mem.flushable == d.mu.mem.mutable {
1714 1 : err = d.makeRoomForWrite(nil)
1715 1 : }
1716 : // New writes with higher sequence numbers may be concurrently
1717 : // committed. We must ensure they don't flush before this ingest
1718 : // completes. To do that, we ref the mutable memtable as a writer,
1719 : // preventing its flushing (and the flushing of all subsequent
1720 : // flushables in the queue). Once we've acquired the manifest lock
1721 : // to add the ingested sstables to the LSM, we can unref as we're
1722 : // guaranteed that the flush won't edit the LSM before this ingest.
1723 1 : mut = d.mu.mem.mutable
1724 1 : mut.writerRef()
1725 1 : mem.flushForced = true
1726 1 : d.maybeScheduleFlush()
1727 1 : return
1728 : }
1729 : // Since there aren't too many memtables already queued up, we can
1730 : // slide the ingested sstables on top of the existing memtables.
1731 1 : asFlushable = true
1732 1 : fileMetas := make([]*fileMetadata, len(loadResult.local))
1733 1 : for i := range fileMetas {
1734 1 : fileMetas[i] = loadResult.local[i].fileMetadata
1735 1 : }
1736 1 : err = d.handleIngestAsFlushable(fileMetas, seqNum, exciseSpan)
1737 : }
1738 :
1739 1 : var ve *versionEdit
1740 1 : apply := func(seqNum uint64) {
1741 1 : if err != nil || asFlushable {
1742 1 : // An error occurred during prepare.
1743 1 : if mut != nil {
1744 0 : if mut.writerUnref() {
1745 0 : d.mu.Lock()
1746 0 : d.maybeScheduleFlush()
1747 0 : d.mu.Unlock()
1748 0 : }
1749 : }
1750 1 : return
1751 : }
1752 :
1753 : // If there's an excise being done atomically with the same ingest, we
1754 : // assign the lowest sequence number in the set of sequence numbers for this
1755 : // ingestion to the excise. Note that we've already allocated fileCount+1
1756 : // sequence numbers in this case.
1757 1 : if exciseSpan.Valid() {
1758 1 : seqNum++ // the first seqNum is reserved for the excise.
1759 1 : }
1760 : // Update the sequence numbers for all ingested sstables'
1761 : // metadata. When the version edit is applied, the metadata is
1762 : // written to the manifest, persisting the sequence number.
1763 : // The sstables themselves are left unmodified.
1764 1 : if err = ingestUpdateSeqNum(
1765 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1766 1 : ); err != nil {
1767 0 : if mut != nil {
1768 0 : if mut.writerUnref() {
1769 0 : d.mu.Lock()
1770 0 : d.maybeScheduleFlush()
1771 0 : d.mu.Unlock()
1772 0 : }
1773 : }
1774 0 : return
1775 : }
1776 :
1777 : // If we overlapped with a memtable in prepare wait for the flush to
1778 : // finish.
1779 1 : if mem != nil {
1780 1 : <-mem.flushed
1781 1 : }
1782 :
1783 : // Assign the sstables to the correct level in the LSM and apply the
1784 : // version edit.
1785 1 : ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan, seqNum)
1786 : }
1787 :
1788 : // Only one ingest can occur at a time because if not, one would block waiting
1789 : // for the other to finish applying. This blocking would happen while holding
1790 : // the commit mutex which would prevent unrelated batches from writing their
1791 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1792 : // during ingestion.
1793 1 : seqNumCount := loadResult.fileCount()
1794 1 : if exciseSpan.Valid() {
1795 1 : seqNumCount++
1796 1 : }
1797 1 : d.commit.ingestSem <- struct{}{}
1798 1 : d.commit.AllocateSeqNum(seqNumCount, prepare, apply)
1799 1 : <-d.commit.ingestSem
1800 1 :
1801 1 : if err != nil {
1802 0 : if err2 := ingestCleanup(d.objProvider, loadResult.local); err2 != nil {
1803 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1804 0 : }
1805 1 : } else {
1806 1 : // Since we either created a hard link to the ingesting files, or copied
1807 1 : // them over, it is safe to remove the originals paths.
1808 1 : for i := range loadResult.local {
1809 1 : path := loadResult.local[i].path
1810 1 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1811 0 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1812 0 : }
1813 : }
1814 : }
1815 :
1816 1 : info := TableIngestInfo{
1817 1 : JobID: int(jobID),
1818 1 : Err: err,
1819 1 : flushable: asFlushable,
1820 1 : }
1821 1 : if len(loadResult.local) > 0 {
1822 1 : info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
1823 1 : } else if len(loadResult.shared) > 0 {
1824 1 : info.GlobalSeqNum = loadResult.shared[0].SmallestSeqNum
1825 1 : } else {
1826 1 : info.GlobalSeqNum = loadResult.external[0].SmallestSeqNum
1827 1 : }
1828 1 : var stats IngestOperationStats
1829 1 : if ve != nil {
1830 1 : info.Tables = make([]struct {
1831 1 : TableInfo
1832 1 : Level int
1833 1 : }, len(ve.NewFiles))
1834 1 : for i := range ve.NewFiles {
1835 1 : e := &ve.NewFiles[i]
1836 1 : info.Tables[i].Level = e.Level
1837 1 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1838 1 : stats.Bytes += e.Meta.Size
1839 1 : if e.Level == 0 {
1840 1 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1841 1 : }
1842 1 : if metaFlushableOverlaps[e.Meta.FileNum] {
1843 1 : stats.MemtableOverlappingFiles++
1844 1 : }
1845 : }
1846 1 : } else if asFlushable {
1847 1 : // NB: If asFlushable == true, there are no shared sstables.
1848 1 : info.Tables = make([]struct {
1849 1 : TableInfo
1850 1 : Level int
1851 1 : }, len(loadResult.local))
1852 1 : for i, f := range loadResult.local {
1853 1 : info.Tables[i].Level = -1
1854 1 : info.Tables[i].TableInfo = f.TableInfo()
1855 1 : stats.Bytes += f.Size
1856 1 : // We don't have exact stats on which files will be ingested into
1857 1 : // L0, because actual ingestion into the LSM has been deferred until
1858 1 : // flush time. Instead, we infer based on memtable overlap.
1859 1 : //
1860 1 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1861 1 : // before entering the commit pipeline, we can use that overlap to
1862 1 : // improve our approximation by incorporating overlap with L0, not
1863 1 : // just memtables.
1864 1 : if metaFlushableOverlaps[f.FileNum] {
1865 1 : stats.ApproxIngestedIntoL0Bytes += f.Size
1866 1 : stats.MemtableOverlappingFiles++
1867 1 : }
1868 : }
1869 : }
1870 1 : d.opts.EventListener.TableIngested(info)
1871 1 :
1872 1 : return stats, err
1873 : }
1874 :
1875 : // excise updates ve to include a replacement of the file m with new virtual
1876 : // sstables that exclude exciseSpan, returning a slice of newly-created files if
1877 : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
1878 : // and m is deleted. Note that ve is updated in-place.
1879 : //
1880 : // The manifest lock must be held when calling this method.
1881 : func (d *DB) excise(
1882 : exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
1883 1 : ) ([]manifest.NewFileEntry, error) {
1884 1 : numCreatedFiles := 0
1885 1 : // Check if there's actually an overlap between m and exciseSpan.
1886 1 : mBounds := base.UserKeyBoundsFromInternal(m.Smallest, m.Largest)
1887 1 : if !exciseSpan.Overlaps(d.cmp, &mBounds) {
1888 1 : return nil, nil
1889 1 : }
1890 1 : ve.DeletedFiles[deletedFileEntry{
1891 1 : Level: level,
1892 1 : FileNum: m.FileNum,
1893 1 : }] = m
1894 1 : // Fast path: m sits entirely within the exciseSpan, so just delete it.
1895 1 : if exciseSpan.ContainsInternalKey(d.cmp, m.Smallest) && exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
1896 1 : return nil, nil
1897 1 : }
1898 1 : var iter internalIterator
1899 1 : var rangeDelIter keyspan.FragmentIterator
1900 1 : var rangeKeyIter keyspan.FragmentIterator
1901 1 : needsBacking := false
1902 1 : // Create a file to the left of the excise span, if necessary.
1903 1 : // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
1904 1 : //
1905 1 : // We create bounds that are tight on user keys, and we make the effort to find
1906 1 : // the last key in the original sstable that's smaller than exciseSpan.Start
1907 1 : // even though it requires some sstable reads. We could choose to create
1908 1 : // virtual sstables on loose userKey bounds, in which case we could just set
1909 1 : // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
1910 1 : // issue with that approach would be that it'd lead to lots of small virtual
1911 1 : // sstables in the LSM that have no guarantee on containing even a single user
1912 1 : // key within the file bounds. This has the potential to increase both read and
1913 1 : // write-amp as we will be opening up these sstables only to find no relevant
1914 1 : // keys in the read path, and compacting sstables on top of them instead of
1915 1 : // directly into the space occupied by them. We choose to incur the cost of
1916 1 : // calculating tight bounds at this time instead of creating more work in the
1917 1 : // future.
1918 1 : //
1919 1 : // TODO(bilal): Some of this work can happen without grabbing the manifest
1920 1 : // lock; we could grab one currentVersion, release the lock, calculate excised
1921 1 : // files, then grab the lock again and recalculate for just the files that
1922 1 : // have changed since our previous calculation. Do this optimiaztino as part of
1923 1 : // https://github.com/cockroachdb/pebble/issues/2112 .
1924 1 : if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
1925 1 : leftFile := &fileMetadata{
1926 1 : Virtual: true,
1927 1 : FileBacking: m.FileBacking,
1928 1 : FileNum: d.mu.versions.getNextFileNum(),
1929 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1930 1 : // sufficient for maintaining correctness.
1931 1 : SmallestSeqNum: m.SmallestSeqNum,
1932 1 : LargestSeqNum: m.LargestSeqNum,
1933 1 : SyntheticPrefix: m.SyntheticPrefix,
1934 1 : SyntheticSuffix: m.SyntheticSuffix,
1935 1 : }
1936 1 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
1937 1 : // This file will probably contain point keys.
1938 1 : smallestPointKey := m.SmallestPointKey
1939 1 : var err error
1940 1 : iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
1941 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1942 1 : Category: "pebble-ingest",
1943 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1944 1 : },
1945 1 : level: manifest.Level(level),
1946 1 : }, internalIterOpts{})
1947 1 : if err != nil {
1948 0 : return nil, err
1949 0 : }
1950 1 : var kv *base.InternalKV
1951 1 : if iter != nil {
1952 1 : defer iter.Close()
1953 1 : kv = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone)
1954 1 : } else {
1955 0 : iter = emptyIter
1956 0 : }
1957 1 : if kv != nil {
1958 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, kv.K.Clone())
1959 1 : }
1960 : // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
1961 : // needs to be a copy if the key is owned by the range del iter.
1962 1 : var lastRangeDel []byte
1963 1 : if rangeDelIter != nil {
1964 1 : defer rangeDelIter.Close()
1965 1 : if rdel, err := rangeDelIter.SeekLT(exciseSpan.Start); err != nil {
1966 0 : return nil, err
1967 1 : } else if rdel != nil {
1968 1 : lastRangeDel = append(lastRangeDel[:0], rdel.End...)
1969 1 : if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
1970 1 : lastRangeDel = exciseSpan.Start
1971 1 : }
1972 : }
1973 1 : } else {
1974 1 : rangeDelIter = emptyKeyspanIter
1975 1 : }
1976 1 : if lastRangeDel != nil {
1977 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
1978 1 : }
1979 : }
1980 1 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestRangeKey) {
1981 1 : // This file will probably contain range keys.
1982 1 : var err error
1983 1 : smallestRangeKey := m.SmallestRangeKey
1984 1 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
1985 1 : if err != nil {
1986 0 : return nil, err
1987 0 : }
1988 : // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
1989 : // needs to be a copy if the key is owned by the range key iter.
1990 1 : var lastRangeKey []byte
1991 1 : var lastRangeKeyKind InternalKeyKind
1992 1 : defer rangeKeyIter.Close()
1993 1 : if rkey, err := rangeKeyIter.SeekLT(exciseSpan.Start); err != nil {
1994 0 : return nil, err
1995 1 : } else if rkey != nil {
1996 1 : lastRangeKey = append(lastRangeKey[:0], rkey.End...)
1997 1 : if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
1998 1 : lastRangeKey = exciseSpan.Start
1999 1 : }
2000 1 : lastRangeKeyKind = rkey.Keys[0].Kind()
2001 : }
2002 1 : if lastRangeKey != nil {
2003 1 : leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
2004 1 : }
2005 : }
2006 1 : if leftFile.HasRangeKeys || leftFile.HasPointKeys {
2007 1 : var err error
2008 1 : leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
2009 1 : if err != nil {
2010 0 : return nil, err
2011 0 : }
2012 1 : if leftFile.Size == 0 {
2013 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
2014 1 : // such as if the excised file only has range keys/dels and no point
2015 1 : // keys. This can cause panics in places where we divide by file sizes.
2016 1 : // Correct for it here.
2017 1 : leftFile.Size = 1
2018 1 : }
2019 1 : if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
2020 0 : return nil, err
2021 0 : }
2022 1 : leftFile.ValidateVirtual(m)
2023 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
2024 1 : needsBacking = true
2025 1 : numCreatedFiles++
2026 : }
2027 : }
2028 : // Create a file to the right, if necessary.
2029 1 : if exciseSpan.ContainsInternalKey(d.cmp, m.Largest) {
2030 1 : // No key exists to the right of the excise span in this file.
2031 1 : if needsBacking && !m.Virtual {
2032 1 : // If m is virtual, then its file backing is already known to the manifest.
2033 1 : // We don't need to create another file backing. Note that there must be
2034 1 : // only one CreatedBackingTables entry per backing sstable. This is
2035 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
2036 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2037 1 : }
2038 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
2039 : }
2040 : // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
2041 : //
2042 : // See comment before the definition of leftFile for the motivation behind
2043 : // calculating tight user-key bounds.
2044 1 : rightFile := &fileMetadata{
2045 1 : Virtual: true,
2046 1 : FileBacking: m.FileBacking,
2047 1 : FileNum: d.mu.versions.getNextFileNum(),
2048 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
2049 1 : // sufficient for maintaining correctness.
2050 1 : SmallestSeqNum: m.SmallestSeqNum,
2051 1 : LargestSeqNum: m.LargestSeqNum,
2052 1 : SyntheticPrefix: m.SyntheticPrefix,
2053 1 : SyntheticSuffix: m.SyntheticSuffix,
2054 1 : }
2055 1 : if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
2056 1 : // This file will probably contain point keys
2057 1 : largestPointKey := m.LargestPointKey
2058 1 : var err error
2059 1 : if iter == nil && rangeDelIter == nil {
2060 1 : iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
2061 1 : CategoryAndQoS: sstable.CategoryAndQoS{
2062 1 : Category: "pebble-ingest",
2063 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
2064 1 : },
2065 1 : level: manifest.Level(level),
2066 1 : }, internalIterOpts{})
2067 1 : if err != nil {
2068 0 : return nil, err
2069 0 : }
2070 1 : if iter != nil {
2071 1 : defer iter.Close()
2072 1 : } else {
2073 0 : iter = emptyIter
2074 0 : }
2075 1 : if rangeDelIter != nil {
2076 1 : defer rangeDelIter.Close()
2077 1 : } else {
2078 1 : rangeDelIter = emptyKeyspanIter
2079 1 : }
2080 : }
2081 1 : kv := iter.SeekGE(exciseSpan.End.Key, base.SeekGEFlagsNone)
2082 1 : if kv != nil {
2083 1 : if exciseSpan.End.Kind == base.Inclusive && d.equal(exciseSpan.End.Key, kv.K.UserKey) {
2084 0 : return nil, base.AssertionFailedf("cannot excise with an inclusive end key and data overlap at end key")
2085 0 : }
2086 1 : rightFile.ExtendPointKeyBounds(d.cmp, kv.K.Clone(), largestPointKey)
2087 : }
2088 : // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
2089 : // needs to be a copy if the key is owned by the range del iter.
2090 1 : var firstRangeDel []byte
2091 1 : rdel, err := rangeDelIter.SeekGE(exciseSpan.End.Key)
2092 1 : if err != nil {
2093 0 : return nil, err
2094 1 : } else if rdel != nil {
2095 1 : firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
2096 1 : if d.cmp(firstRangeDel, exciseSpan.End.Key) < 0 {
2097 1 : // NB: This can only be done if the end bound is exclusive.
2098 1 : if exciseSpan.End.Kind != base.Exclusive {
2099 0 : return nil, base.AssertionFailedf("cannot truncate rangedel during excise with an inclusive upper bound")
2100 0 : }
2101 1 : firstRangeDel = exciseSpan.End.Key
2102 : }
2103 : }
2104 1 : if firstRangeDel != nil {
2105 1 : smallestPointKey := rdel.SmallestKey()
2106 1 : smallestPointKey.UserKey = firstRangeDel
2107 1 : rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
2108 1 : }
2109 : }
2110 1 : if m.HasRangeKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestRangeKey) {
2111 1 : // This file will probably contain range keys.
2112 1 : largestRangeKey := m.LargestRangeKey
2113 1 : if rangeKeyIter == nil {
2114 1 : var err error
2115 1 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
2116 1 : if err != nil {
2117 0 : return nil, err
2118 0 : }
2119 1 : defer rangeKeyIter.Close()
2120 : }
2121 : // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
2122 : // needs to be a copy if the key is owned by the range key iter.
2123 1 : var firstRangeKey []byte
2124 1 : rkey, err := rangeKeyIter.SeekGE(exciseSpan.End.Key)
2125 1 : if err != nil {
2126 0 : return nil, err
2127 1 : } else if rkey != nil {
2128 1 : firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
2129 1 : if d.cmp(firstRangeKey, exciseSpan.End.Key) < 0 {
2130 1 : if exciseSpan.End.Kind != base.Exclusive {
2131 0 : return nil, base.AssertionFailedf("cannot truncate range key during excise with an inclusive upper bound")
2132 0 : }
2133 1 : firstRangeKey = exciseSpan.End.Key
2134 : }
2135 : }
2136 1 : if firstRangeKey != nil {
2137 1 : smallestRangeKey := rkey.SmallestKey()
2138 1 : smallestRangeKey.UserKey = firstRangeKey
2139 1 : // We call ExtendRangeKeyBounds so any internal boundType fields are
2140 1 : // set correctly. Note that this is mildly wasteful as we'll be comparing
2141 1 : // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
2142 1 : // avoided if we exported ExtendOverallKeyBounds or so.
2143 1 : rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
2144 1 : }
2145 : }
2146 1 : if rightFile.HasRangeKeys || rightFile.HasPointKeys {
2147 1 : var err error
2148 1 : rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
2149 1 : if err != nil {
2150 0 : return nil, err
2151 0 : }
2152 1 : if rightFile.Size == 0 {
2153 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
2154 1 : // such as if the excised file only has range keys/dels and no point keys.
2155 1 : // This can cause panics in places where we divide by file sizes. Correct
2156 1 : // for it here.
2157 1 : rightFile.Size = 1
2158 1 : }
2159 1 : if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
2160 0 : return nil, err
2161 0 : }
2162 1 : rightFile.ValidateVirtual(m)
2163 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
2164 1 : needsBacking = true
2165 1 : numCreatedFiles++
2166 : }
2167 :
2168 1 : if needsBacking && !m.Virtual {
2169 1 : // If m is virtual, then its file backing is already known to the manifest.
2170 1 : // We don't need to create another file backing. Note that there must be
2171 1 : // only one CreatedBackingTables entry per backing sstable. This is
2172 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
2173 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2174 1 : }
2175 :
2176 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
2177 : }
2178 :
2179 : type ingestTargetLevelFunc func(
2180 : newIters tableNewIters,
2181 : newRangeKeyIter keyspanimpl.TableNewSpanIter,
2182 : iterOps IterOptions,
2183 : comparer *Comparer,
2184 : v *version,
2185 : baseLevel int,
2186 : compactions map[*compaction]struct{},
2187 : meta *fileMetadata,
2188 : suggestSplit bool,
2189 : ) (int, *fileMetadata, error)
2190 :
2191 : type ingestSplitFile struct {
2192 : // ingestFile is the file being ingested.
2193 : ingestFile *fileMetadata
2194 : // splitFile is the file that needs to be split to allow ingestFile to slot
2195 : // into `level` level.
2196 : splitFile *fileMetadata
2197 : // The level where ingestFile will go (and where splitFile already is).
2198 : level int
2199 : }
2200 :
2201 : // ingestSplit splits files specified in `files` and updates ve in-place to
2202 : // account for existing files getting split into two virtual sstables. The map
2203 : // `replacedFiles` contains an in-progress map of all files that have been
2204 : // replaced with new virtual sstables in this version edit so far, which is also
2205 : // updated in-place.
2206 : //
2207 : // d.mu as well as the manifest lock must be held when calling this method.
2208 : func (d *DB) ingestSplit(
2209 : ve *versionEdit,
2210 : updateMetrics func(*fileMetadata, int, []newFileEntry),
2211 : files []ingestSplitFile,
2212 : replacedFiles map[base.FileNum][]newFileEntry,
2213 1 : ) error {
2214 1 : for _, s := range files {
2215 1 : ingestFileBounds := s.ingestFile.UserKeyBounds()
2216 1 : // replacedFiles can be thought of as a tree, where we start iterating with
2217 1 : // s.splitFile and run its fileNum through replacedFiles, then find which of
2218 1 : // the replaced files overlaps with s.ingestFile, which becomes the new
2219 1 : // splitFile, then we check splitFile's replacements in replacedFiles again
2220 1 : // for overlap with s.ingestFile, and so on until we either can't find the
2221 1 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
2222 1 : // be split), or we don't find a file that overlaps with s.ingestFile, which
2223 1 : // means a prior ingest split already produced enough room for s.ingestFile
2224 1 : // to go into this level without necessitating another ingest split.
2225 1 : splitFile := s.splitFile
2226 1 : for splitFile != nil {
2227 1 : replaced, ok := replacedFiles[splitFile.FileNum]
2228 1 : if !ok {
2229 1 : break
2230 : }
2231 1 : updatedSplitFile := false
2232 1 : for i := range replaced {
2233 1 : if replaced[i].Meta.Overlaps(d.cmp, &ingestFileBounds) {
2234 1 : if updatedSplitFile {
2235 0 : // This should never happen because the earlier ingestTargetLevel
2236 0 : // function only finds split file candidates that are guaranteed to
2237 0 : // have no data overlap, only boundary overlap. See the comments
2238 0 : // in that method to see the definitions of data vs boundary
2239 0 : // overlap. That, plus the fact that files in `replaced` are
2240 0 : // guaranteed to have file bounds that are tight on user keys
2241 0 : // (as that's what `d.excise` produces), means that the only case
2242 0 : // where we overlap with two or more files in `replaced` is if we
2243 0 : // actually had data overlap all along, or if the ingestion files
2244 0 : // were overlapping, either of which is an invariant violation.
2245 0 : panic("updated with two files in ingestSplit")
2246 : }
2247 1 : splitFile = replaced[i].Meta
2248 1 : updatedSplitFile = true
2249 : }
2250 : }
2251 1 : if !updatedSplitFile {
2252 1 : // None of the replaced files overlapped with the file being ingested.
2253 1 : // This can happen if we've already excised a span overlapping with
2254 1 : // this file, or if we have consecutive ingested files that can slide
2255 1 : // within the same gap between keys in an existing file. For instance,
2256 1 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
2257 1 : // the first loop iteration will split the existing file into one that
2258 1 : // ends in a and another that starts at g, and the second iteration will
2259 1 : // fall into this case and require no splitting.
2260 1 : //
2261 1 : // No splitting necessary.
2262 1 : splitFile = nil
2263 1 : }
2264 : }
2265 1 : if splitFile == nil {
2266 1 : continue
2267 : }
2268 : // NB: excise operates on [start, end). We're splitting at [start, end]
2269 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
2270 : // of exclusive vs inclusive end bounds should not make a difference here
2271 : // as we're guaranteed to not have any data overlap between splitFile and
2272 : // s.ingestFile. d.excise will return an error if we pass an inclusive user
2273 : // key bound _and_ we end up seeing data overlap at the end key.
2274 1 : added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
2275 1 : if err != nil {
2276 0 : return err
2277 0 : }
2278 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2279 1 : Level: s.level,
2280 1 : FileNum: splitFile.FileNum,
2281 1 : }]; !ok {
2282 0 : panic("did not split file that was expected to be split")
2283 : }
2284 1 : replacedFiles[splitFile.FileNum] = added
2285 1 : for i := range added {
2286 1 : addedBounds := added[i].Meta.UserKeyBounds()
2287 1 : if s.ingestFile.Overlaps(d.cmp, &addedBounds) {
2288 0 : panic("ingest-time split produced a file that overlaps with ingested file")
2289 : }
2290 : }
2291 1 : updateMetrics(splitFile, s.level, added)
2292 : }
2293 : // Flatten the version edit by removing any entries from ve.NewFiles that
2294 : // are also in ve.DeletedFiles.
2295 1 : newNewFiles := ve.NewFiles[:0]
2296 1 : for i := range ve.NewFiles {
2297 1 : fn := ve.NewFiles[i].Meta.FileNum
2298 1 : deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
2299 1 : if _, ok := ve.DeletedFiles[deEntry]; ok {
2300 1 : delete(ve.DeletedFiles, deEntry)
2301 1 : } else {
2302 1 : newNewFiles = append(newNewFiles, ve.NewFiles[i])
2303 1 : }
2304 : }
2305 1 : ve.NewFiles = newNewFiles
2306 1 : return nil
2307 : }
2308 :
2309 : func (d *DB) ingestApply(
2310 : jobID JobID,
2311 : lr ingestLoadResult,
2312 : findTargetLevel ingestTargetLevelFunc,
2313 : mut *memTable,
2314 : exciseSpan KeyRange,
2315 : exciseSeqNum uint64,
2316 1 : ) (*versionEdit, error) {
2317 1 : d.mu.Lock()
2318 1 : defer d.mu.Unlock()
2319 1 :
2320 1 : ve := &versionEdit{
2321 1 : NewFiles: make([]newFileEntry, lr.fileCount()),
2322 1 : }
2323 1 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
2324 1 : ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
2325 1 : }
2326 1 : metrics := make(map[int]*LevelMetrics)
2327 1 :
2328 1 : // Lock the manifest for writing before we use the current version to
2329 1 : // determine the target level. This prevents two concurrent ingestion jobs
2330 1 : // from using the same version to determine the target level, and also
2331 1 : // provides serialization with concurrent compaction and flush jobs.
2332 1 : // logAndApply unconditionally releases the manifest lock, but any earlier
2333 1 : // returns must unlock the manifest.
2334 1 : d.mu.versions.logLock()
2335 1 :
2336 1 : if mut != nil {
2337 1 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
2338 1 : // acquired the manifest lock, we can be certain that if the mutable
2339 1 : // memtable has received more recent conflicting writes, the flush won't
2340 1 : // beat us to applying to the manifest resulting in sequence number
2341 1 : // inversion. Even though we call maybeScheduleFlush right now, this flush
2342 1 : // will apply after our ingestion.
2343 1 : if mut.writerUnref() {
2344 1 : d.maybeScheduleFlush()
2345 1 : }
2346 : }
2347 :
2348 1 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
2349 1 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
2350 1 : current := d.mu.versions.currentVersion()
2351 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
2352 1 : iterOps := IterOptions{logger: d.opts.Logger}
2353 1 : // filesToSplit is a list where each element is a pair consisting of a file
2354 1 : // being ingested and a file being split to make room for an ingestion into
2355 1 : // that level. Each ingested file will appear at most once in this list. It
2356 1 : // is possible for split files to appear twice in this list.
2357 1 : filesToSplit := make([]ingestSplitFile, 0)
2358 1 : checkCompactions := false
2359 1 : for i := 0; i < lr.fileCount(); i++ {
2360 1 : // Determine the lowest level in the LSM for which the sstable doesn't
2361 1 : // overlap any existing files in the level.
2362 1 : var m *fileMetadata
2363 1 : specifiedLevel := -1
2364 1 : isShared := false
2365 1 : isExternal := false
2366 1 : if i < len(lr.local) {
2367 1 : // local file.
2368 1 : m = lr.local[i].fileMetadata
2369 1 : } else if (i - len(lr.local)) < len(lr.shared) {
2370 1 : // shared file.
2371 1 : isShared = true
2372 1 : sharedIdx := i - len(lr.local)
2373 1 : m = lr.shared[sharedIdx].fileMetadata
2374 1 : specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
2375 1 : } else {
2376 1 : // external file.
2377 1 : isExternal = true
2378 1 : externalIdx := i - (len(lr.local) + len(lr.shared))
2379 1 : m = lr.external[externalIdx].fileMetadata
2380 1 : if lr.externalFilesHaveLevel {
2381 0 : specifiedLevel = int(lr.external[externalIdx].external.Level)
2382 0 : }
2383 : }
2384 :
2385 : // Add to CreatedBackingTables if this is a new backing.
2386 : //
2387 : // Shared files always have a new backing. External files have new backings
2388 : // iff the backing disk file num and the file num match (see ingestAttachRemote).
2389 1 : if isShared || (isExternal && m.FileBacking.DiskFileNum == base.DiskFileNum(m.FileNum)) {
2390 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2391 1 : }
2392 :
2393 1 : f := &ve.NewFiles[i]
2394 1 : var err error
2395 1 : if specifiedLevel != -1 {
2396 1 : f.Level = specifiedLevel
2397 1 : } else {
2398 1 : var splitFile *fileMetadata
2399 1 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
2400 1 : // This file fits perfectly within the excise span. We can slot it at
2401 1 : // L6, or sharedLevelsStart - 1 if we have shared files.
2402 1 : if len(lr.shared) > 0 || lr.externalFilesHaveLevel {
2403 1 : f.Level = sharedLevelsStart - 1
2404 1 : if baseLevel > f.Level {
2405 1 : f.Level = 0
2406 1 : }
2407 1 : } else {
2408 1 : f.Level = 6
2409 1 : }
2410 1 : } else {
2411 1 : // TODO(bilal): findTargetLevel does disk IO (reading files for data
2412 1 : // overlap) even though we're holding onto d.mu. Consider unlocking
2413 1 : // d.mu while we do this. We already hold versions.logLock so we should
2414 1 : // not see any version applications while we're at this. The one
2415 1 : // complication here would be pulling out the mu.compact.inProgress
2416 1 : // check from findTargetLevel, as that requires d.mu to be held.
2417 1 : f.Level, splitFile, err = findTargetLevel(
2418 1 : d.newIters, d.tableNewRangeKeyIter, iterOps, d.opts.Comparer, current, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
2419 1 : }
2420 :
2421 1 : if splitFile != nil {
2422 1 : if invariants.Enabled {
2423 1 : if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil {
2424 0 : panic("splitFile returned is not in level it should be")
2425 : }
2426 : }
2427 : // We take advantage of the fact that we won't drop the db mutex
2428 : // between now and the call to logAndApply. So, no files should
2429 : // get added to a new in-progress compaction at this point. We can
2430 : // avoid having to iterate on in-progress compactions to cancel them
2431 : // if none of the files being split have a compacting state.
2432 1 : if splitFile.IsCompacting() {
2433 1 : checkCompactions = true
2434 1 : }
2435 1 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
2436 : }
2437 : }
2438 1 : if err != nil {
2439 0 : d.mu.versions.logUnlock()
2440 0 : return nil, err
2441 0 : }
2442 1 : if isShared && f.Level < sharedLevelsStart {
2443 0 : panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
2444 0 : f.Level, sharedLevelsStart))
2445 : }
2446 1 : f.Meta = m
2447 1 : levelMetrics := metrics[f.Level]
2448 1 : if levelMetrics == nil {
2449 1 : levelMetrics = &LevelMetrics{}
2450 1 : metrics[f.Level] = levelMetrics
2451 1 : }
2452 1 : levelMetrics.NumFiles++
2453 1 : levelMetrics.Size += int64(m.Size)
2454 1 : levelMetrics.BytesIngested += m.Size
2455 1 : levelMetrics.TablesIngested++
2456 : }
2457 : // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
2458 : // by ingestTargetLevel), to files that were created to replace it. This map
2459 : // is used to resolve references to split files in filesToSplit, as it is
2460 : // possible for a file that we want to split to no longer exist or have a
2461 : // newer fileMetadata due to a split induced by another ingestion file, or an
2462 : // excise.
2463 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
2464 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
2465 1 : levelMetrics := metrics[level]
2466 1 : if levelMetrics == nil {
2467 1 : levelMetrics = &LevelMetrics{}
2468 1 : metrics[level] = levelMetrics
2469 1 : }
2470 1 : levelMetrics.NumFiles--
2471 1 : levelMetrics.Size -= int64(m.Size)
2472 1 : for i := range added {
2473 1 : levelMetrics.NumFiles++
2474 1 : levelMetrics.Size += int64(added[i].Meta.Size)
2475 1 : }
2476 : }
2477 1 : if exciseSpan.Valid() {
2478 1 : // Iterate through all levels and find files that intersect with exciseSpan.
2479 1 : //
2480 1 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2481 1 : // excises; we only need to hold the version lock which we already are
2482 1 : // holding. However releasing the DB mutex could mess with the
2483 1 : // ingestTargetLevel calculation that happened above, as it assumed that it
2484 1 : // had a complete view of in-progress compactions that wouldn't change
2485 1 : // until logAndApply is called. If we were to drop the mutex now, we could
2486 1 : // schedule another in-progress compaction that would go into the chosen target
2487 1 : // level and lead to file overlap within level (which would panic in
2488 1 : // logAndApply). We should drop the db mutex here, do the excise, then
2489 1 : // re-grab the DB mutex and rerun just the in-progress compaction check to
2490 1 : // see if any new compactions are conflicting with our chosen target levels
2491 1 : // for files, and if they are, we should signal those compactions to error
2492 1 : // out.
2493 1 : for level := range current.Levels {
2494 1 : overlaps := current.Overlaps(level, exciseSpan.UserKeyBounds())
2495 1 : iter := overlaps.Iter()
2496 1 :
2497 1 : for m := iter.First(); m != nil; m = iter.Next() {
2498 1 : newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level)
2499 1 : if err != nil {
2500 0 : return nil, err
2501 0 : }
2502 :
2503 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2504 1 : Level: level,
2505 1 : FileNum: m.FileNum,
2506 1 : }]; !ok {
2507 1 : // We did not excise this file.
2508 1 : continue
2509 : }
2510 1 : replacedFiles[m.FileNum] = newFiles
2511 1 : updateLevelMetricsOnExcise(m, level, newFiles)
2512 : }
2513 : }
2514 : }
2515 1 : if len(filesToSplit) > 0 {
2516 1 : // For the same reasons as the above call to excise, we hold the db mutex
2517 1 : // while calling this method.
2518 1 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2519 0 : return nil, err
2520 0 : }
2521 : }
2522 1 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2523 1 : for c := range d.mu.compact.inProgress {
2524 1 : if c.versionEditApplied {
2525 1 : continue
2526 : }
2527 : // Check if this compaction overlaps with the excise span. Note that just
2528 : // checking if the inputs individually overlap with the excise span
2529 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2530 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2531 : // doing a [c,d) excise at the same time as this compaction, we will have
2532 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2533 : // write a file overlapping with the excise span.
2534 1 : if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
2535 1 : c.cancel.Store(true)
2536 1 : }
2537 : // Check if this compaction's inputs have been replaced due to an
2538 : // ingest-time split. In that case, cancel the compaction as a newly picked
2539 : // compaction would need to include any new files that slid in between
2540 : // previously-existing files. Note that we cancel any compaction that has a
2541 : // file that was ingest-split as an input, even if it started before this
2542 : // ingestion.
2543 1 : if checkCompactions {
2544 1 : for i := range c.inputs {
2545 1 : iter := c.inputs[i].files.Iter()
2546 1 : for f := iter.First(); f != nil; f = iter.Next() {
2547 1 : if _, ok := replacedFiles[f.FileNum]; ok {
2548 1 : c.cancel.Store(true)
2549 1 : break
2550 : }
2551 : }
2552 : }
2553 : }
2554 : }
2555 : }
2556 :
2557 1 : if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2558 1 : return d.getInProgressCompactionInfoLocked(nil)
2559 1 : }); err != nil {
2560 0 : // Note: any error during logAndApply is fatal; this won't be reachable in production.
2561 0 : return nil, err
2562 0 : }
2563 :
2564 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2565 : // an excise on this span. There should be none as the
2566 : // computePossibleOverlaps steps should have forced these EFOS to transition
2567 : // to file-only snapshots by now. If we see any that conflict with this
2568 : // excise, panic.
2569 1 : if exciseSpan.Valid() {
2570 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2571 1 : // Skip non-EFOS snapshots, and also skip any EFOS that were created
2572 1 : // *after* the excise.
2573 1 : if s.efos == nil || base.Visible(exciseSeqNum, s.efos.seqNum, base.InternalKeySeqNumMax) {
2574 0 : continue
2575 : }
2576 1 : efos := s.efos
2577 1 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2578 1 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2579 1 : // global list of all protected ranges instead of having to peer into every
2580 1 : // snapshot.
2581 1 : for i := range efos.protectedRanges {
2582 1 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2583 0 : panic("unexpected excise of an EventuallyFileOnlySnapshot's bounds")
2584 : }
2585 : }
2586 : }
2587 : }
2588 :
2589 1 : d.mu.versions.metrics.Ingest.Count++
2590 1 :
2591 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2592 1 : // updateReadStateLocked could have generated obsolete tables, schedule a
2593 1 : // cleanup job if necessary.
2594 1 : d.deleteObsoleteFiles(jobID)
2595 1 : d.updateTableStatsLocked(ve.NewFiles)
2596 1 : // The ingestion may have pushed a level over the threshold for compaction,
2597 1 : // so check to see if one is necessary and schedule it.
2598 1 : d.maybeScheduleCompaction()
2599 1 : var toValidate []manifest.NewFileEntry
2600 1 : dedup := make(map[base.DiskFileNum]struct{})
2601 1 : for _, entry := range ve.NewFiles {
2602 1 : if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
2603 1 : toValidate = append(toValidate, entry)
2604 1 : dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
2605 1 : }
2606 : }
2607 1 : d.maybeValidateSSTablesLocked(toValidate)
2608 1 : return ve, nil
2609 : }
2610 :
2611 : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
2612 : // queue of files to be validated, when the feature is enabled.
2613 : //
2614 : // Note that if two entries with the same backing file are added twice, then the
2615 : // block checksums for the backing file will be validated twice.
2616 : //
2617 : // DB.mu must be locked when calling.
2618 1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
2619 1 : // Only add to the validation queue when the feature is enabled.
2620 1 : if !d.opts.Experimental.ValidateOnIngest {
2621 1 : return
2622 1 : }
2623 :
2624 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2625 1 : if d.shouldValidateSSTablesLocked() {
2626 1 : go d.validateSSTables()
2627 1 : }
2628 : }
2629 :
2630 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2631 : // DB.mu must be locked when calling.
2632 1 : func (d *DB) shouldValidateSSTablesLocked() bool {
2633 1 : return !d.mu.tableValidation.validating &&
2634 1 : d.closed.Load() == nil &&
2635 1 : d.opts.Experimental.ValidateOnIngest &&
2636 1 : len(d.mu.tableValidation.pending) > 0
2637 1 : }
2638 :
2639 : // validateSSTables runs a round of validation on the tables in the pending
2640 : // queue.
2641 1 : func (d *DB) validateSSTables() {
2642 1 : d.mu.Lock()
2643 1 : if !d.shouldValidateSSTablesLocked() {
2644 1 : d.mu.Unlock()
2645 1 : return
2646 1 : }
2647 :
2648 1 : pending := d.mu.tableValidation.pending
2649 1 : d.mu.tableValidation.pending = nil
2650 1 : d.mu.tableValidation.validating = true
2651 1 : jobID := d.newJobIDLocked()
2652 1 : rs := d.loadReadState()
2653 1 :
2654 1 : // Drop DB.mu before performing IO.
2655 1 : d.mu.Unlock()
2656 1 :
2657 1 : // Validate all tables in the pending queue. This could lead to a situation
2658 1 : // where we are starving IO from other tasks due to having to page through
2659 1 : // all the blocks in all the sstables in the queue.
2660 1 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2661 1 :
2662 1 : // If we fail to validate any files due to reasons other than uncovered
2663 1 : // corruption, accumulate them and re-queue them for another attempt.
2664 1 : var retry []manifest.NewFileEntry
2665 1 :
2666 1 : for _, f := range pending {
2667 1 : // The file may have been moved or deleted since it was ingested, in
2668 1 : // which case we skip.
2669 1 : if !rs.current.Contains(f.Level, f.Meta) {
2670 1 : // Assume the file was moved to a lower level. It is rare enough
2671 1 : // that a table is moved or deleted between the time it was ingested
2672 1 : // and the time the validation routine runs that the overall cost of
2673 1 : // this inner loop is tolerably low, when amortized over all
2674 1 : // ingested tables.
2675 1 : found := false
2676 1 : for i := f.Level + 1; i < numLevels; i++ {
2677 1 : if rs.current.Contains(i, f.Meta) {
2678 1 : found = true
2679 1 : break
2680 : }
2681 : }
2682 1 : if !found {
2683 1 : continue
2684 : }
2685 : }
2686 :
2687 1 : var err error
2688 1 : if f.Meta.Virtual {
2689 1 : err = d.tableCache.withVirtualReader(
2690 1 : f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
2691 1 : return v.ValidateBlockChecksumsOnBacking()
2692 1 : })
2693 1 : } else {
2694 1 : err = d.tableCache.withReader(
2695 1 : f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
2696 1 : return r.ValidateBlockChecksums()
2697 1 : })
2698 : }
2699 :
2700 1 : if err != nil {
2701 0 : if IsCorruptionError(err) {
2702 0 : // TODO(travers): Hook into the corruption reporting pipeline, once
2703 0 : // available. See pebble#1192.
2704 0 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2705 0 : } else {
2706 0 : // If there was some other, possibly transient, error that
2707 0 : // caused table validation to fail inform the EventListener and
2708 0 : // move on. We remember the table so that we can retry it in a
2709 0 : // subsequent table validation job.
2710 0 : //
2711 0 : // TODO(jackson): If the error is not transient, this will retry
2712 0 : // validation indefinitely. While not great, it's the same
2713 0 : // behavior as erroring flushes and compactions. We should
2714 0 : // address this as a part of #270.
2715 0 : d.opts.EventListener.BackgroundError(err)
2716 0 : retry = append(retry, f)
2717 0 : continue
2718 : }
2719 : }
2720 :
2721 1 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2722 1 : JobID: int(jobID),
2723 1 : Meta: f.Meta,
2724 1 : })
2725 : }
2726 1 : rs.unref()
2727 1 : d.mu.Lock()
2728 1 : defer d.mu.Unlock()
2729 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2730 1 : d.mu.tableValidation.validating = false
2731 1 : d.mu.tableValidation.cond.Broadcast()
2732 1 : if d.shouldValidateSSTablesLocked() {
2733 1 : go d.validateSSTables()
2734 1 : }
2735 : }
|