Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "slices"
10 : "sort"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/internal/private"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
25 1 : c := userCmp(a.UserKey, b.UserKey)
26 1 : if c != 0 {
27 1 : return c
28 1 : }
29 1 : if a.IsExclusiveSentinel() {
30 1 : if !b.IsExclusiveSentinel() {
31 1 : return -1
32 1 : }
33 1 : } else if b.IsExclusiveSentinel() {
34 1 : return +1
35 1 : }
36 1 : return 0
37 : }
38 :
39 : // KeyRange encodes a key range in user key space. A KeyRange's Start is
40 : // inclusive while its End is exclusive.
41 : type KeyRange struct {
42 : Start, End []byte
43 : }
44 :
45 : // Valid returns true if the KeyRange is defined.
46 1 : func (k *KeyRange) Valid() bool {
47 1 : return k.Start != nil && k.End != nil
48 1 : }
49 :
50 : // Contains returns whether the specified key exists in the KeyRange.
51 1 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
52 1 : v := cmp(key.UserKey, k.End)
53 1 : return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
54 1 : }
55 :
56 : // OverlapsInternalKeyRange checks if the specified internal key range has an
57 : // overlap with the KeyRange. Note that we aren't checking for full containment
58 : // of smallest-largest within k, rather just that there's some intersection
59 : // between the two ranges.
60 1 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
61 1 : v := cmp(k.Start, largest.UserKey)
62 1 : return v <= 0 && !(largest.IsExclusiveSentinel() && v == 0) &&
63 1 : cmp(k.End, smallest.UserKey) > 0
64 1 : }
65 :
66 : // Overlaps checks if the specified file has an overlap with the KeyRange.
67 : // Note that we aren't checking for full containment of m within k, rather just
68 : // that there's some intersection between m and k's bounds.
69 1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
70 1 : return k.OverlapsInternalKeyRange(cmp, m.Smallest, m.Largest)
71 1 : }
72 :
73 : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
74 : // Note that we aren't checking for full containment of either span in the other,
75 : // just that there's a key x that is in both key ranges.
76 1 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
77 1 : return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
78 1 : }
79 :
80 1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
81 1 : if key.Kind() == InternalKeyKindInvalid {
82 1 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
83 1 : key.Pretty(opts.Comparer.FormatKey))
84 1 : }
85 1 : if key.SeqNum() != 0 {
86 1 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
87 1 : key.Pretty(opts.Comparer.FormatKey))
88 1 : }
89 1 : return nil
90 : }
91 :
92 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
93 : // or shared by another node.
94 : func ingestSynthesizeShared(
95 : opts *Options, sm SharedSSTMeta, fileNum base.DiskFileNum,
96 1 : ) (*fileMetadata, error) {
97 1 : if sm.Size == 0 {
98 0 : // Disallow 0 file sizes
99 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
100 0 : }
101 : // Don't load table stats. Doing a round trip to shared storage, one SST
102 : // at a time is not worth it as it slows down ingestion.
103 1 : meta := &fileMetadata{
104 1 : FileNum: fileNum.FileNum(),
105 1 : CreationTime: time.Now().Unix(),
106 1 : Virtual: true,
107 1 : Size: sm.Size,
108 1 : }
109 1 : meta.InitProviderBacking(fileNum)
110 1 : // Set the underlying FileBacking's size to the same size as the virtualized
111 1 : // view of the sstable. This ensures that we don't over-prioritize this
112 1 : // sstable for compaction just yet, as we do not have a clear sense of what
113 1 : // parts of this sstable are referenced by other nodes.
114 1 : meta.FileBacking.Size = sm.Size
115 1 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
116 1 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
117 1 : //
118 1 : // NB: We create new internal keys and pass them into ExternalRangeKeyBounds
119 1 : // so that we can sub a zero sequence number into the bounds. We can set
120 1 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
121 1 : // anyway. However, we do need to use the same sequence number across all
122 1 : // bound keys at this step so that we end up with bounds that are consistent
123 1 : // across point/range keys.
124 1 : // Note that the kind of the smallest key might change because of the seqnum
125 1 : // rewriting. For example, the sstable could start with a.SET.2 and
126 1 : // a.RANGEDEL.1 (with smallest key being a.SET.2) but after rewriting the seqnum we have `a.RANGEDEL.1`a.SET.100
127 1 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, sm.SmallestRangeKey.Kind())
128 1 : largestRangeKey := base.MakeExclusiveSentinelKey(sm.LargestRangeKey.Kind(), sm.LargestRangeKey.UserKey)
129 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
130 1 : }
131 1 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
132 1 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
133 1 : //
134 1 : // See point above in the ExtendRangeKeyBounds call on why we use a zero
135 1 : // sequence number here.
136 1 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, sm.SmallestPointKey.Kind())
137 1 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, sm.LargestPointKey.Kind())
138 1 : if sm.LargestPointKey.IsExclusiveSentinel() {
139 1 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
140 1 : }
141 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
142 : }
143 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
144 0 : return nil, err
145 0 : }
146 1 : return meta, nil
147 : }
148 :
149 : // ingestLoad1External loads the fileMetadata for one external sstable.
150 : // Sequence number and target level calculation happens during prepare/apply.
151 : func ingestLoad1External(
152 : opts *Options,
153 : e ExternalFile,
154 : fileNum base.DiskFileNum,
155 : objprovider objstorage.Provider,
156 : jobID int,
157 1 : ) (*fileMetadata, error) {
158 1 : if e.Size == 0 {
159 0 : // Disallow 0 file sizes
160 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
161 0 : }
162 1 : if !e.HasRangeKey && !e.HasPointKey {
163 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
164 0 : }
165 : // Don't load table stats. Doing a round trip to shared storage, one SST
166 : // at a time is not worth it as it slows down ingestion.
167 1 : meta := &fileMetadata{}
168 1 : meta.FileNum = fileNum.FileNum()
169 1 : meta.CreationTime = time.Now().Unix()
170 1 : meta.Virtual = true
171 1 : meta.Size = e.Size
172 1 : meta.InitProviderBacking(fileNum)
173 1 :
174 1 : // Try to resolve a reference to the external file.
175 1 : backing, err := objprovider.CreateExternalObjectBacking(e.Locator, e.ObjName)
176 1 : if err != nil {
177 0 : return nil, err
178 0 : }
179 1 : metas, err := objprovider.AttachRemoteObjects([]objstorage.RemoteObjectToAttach{{
180 1 : FileNum: fileNum,
181 1 : FileType: fileTypeTable,
182 1 : Backing: backing,
183 1 : }})
184 1 : if err != nil {
185 0 : return nil, err
186 0 : }
187 1 : if opts.EventListener.TableCreated != nil {
188 1 : opts.EventListener.TableCreated(TableCreateInfo{
189 1 : JobID: jobID,
190 1 : Reason: "ingesting",
191 1 : Path: objprovider.Path(metas[0]),
192 1 : FileNum: fileNum.FileNum(),
193 1 : })
194 1 : }
195 : // In the name of keeping this ingestion as fast as possible, we avoid
196 : // *all* existence checks and synthesize a file metadata with smallest/largest
197 : // keys that overlap whatever the passed-in span was.
198 1 : smallestCopy := make([]byte, len(e.SmallestUserKey))
199 1 : copy(smallestCopy, e.SmallestUserKey)
200 1 : largestCopy := make([]byte, len(e.LargestUserKey))
201 1 : copy(largestCopy, e.LargestUserKey)
202 1 : if e.HasPointKey {
203 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
204 1 : base.MakeRangeDeleteSentinelKey(largestCopy))
205 1 : }
206 1 : if e.HasRangeKey {
207 0 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeySet),
208 0 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyDelete, largestCopy))
209 0 : }
210 :
211 : // Set the underlying FileBacking's size to the same size as the virtualized
212 : // view of the sstable. This ensures that we don't over-prioritize this
213 : // sstable for compaction just yet, as we do not have a clear sense of
214 : // what parts of this sstable are referenced by other nodes.
215 1 : meta.FileBacking.Size = e.Size
216 1 :
217 1 : if len(e.SyntheticPrefix) != 0 {
218 1 : meta.PrefixReplacement = &manifest.PrefixReplacement{
219 1 : ContentPrefix: e.ContentPrefix,
220 1 : SyntheticPrefix: e.SyntheticPrefix,
221 1 : }
222 1 : }
223 :
224 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
225 0 : return nil, err
226 0 : }
227 1 : return meta, nil
228 : }
229 :
230 : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
231 : // by this store.
232 : func ingestLoad1(
233 : opts *Options,
234 : fmv FormatMajorVersion,
235 : readable objstorage.Readable,
236 : cacheID uint64,
237 : fileNum base.DiskFileNum,
238 1 : ) (*fileMetadata, error) {
239 1 : cacheOpts := private.SSTableCacheOpts(cacheID, fileNum).(sstable.ReaderOption)
240 1 : r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
241 1 : if err != nil {
242 1 : return nil, err
243 1 : }
244 1 : defer r.Close()
245 1 :
246 1 : // Avoid ingesting tables with format versions this DB doesn't support.
247 1 : tf, err := r.TableFormat()
248 1 : if err != nil {
249 0 : return nil, err
250 0 : }
251 1 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
252 1 : return nil, errors.Newf(
253 1 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
254 1 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
255 1 : )
256 1 : }
257 :
258 1 : meta := &fileMetadata{}
259 1 : meta.FileNum = fileNum.FileNum()
260 1 : meta.Size = uint64(readable.Size())
261 1 : meta.CreationTime = time.Now().Unix()
262 1 : meta.InitPhysicalBacking()
263 1 :
264 1 : // Avoid loading into the table cache for collecting stats if we
265 1 : // don't need to. If there are no range deletions, we have all the
266 1 : // information to compute the stats here.
267 1 : //
268 1 : // This is helpful in tests for avoiding awkwardness around deletion of
269 1 : // ingested files from MemFS. MemFS implements the Windows semantics of
270 1 : // disallowing removal of an open file. Under MemFS, if we don't populate
271 1 : // meta.Stats here, the file will be loaded into the table cache for
272 1 : // calculating stats before we can remove the original link.
273 1 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
274 1 :
275 1 : {
276 1 : iter, err := r.NewIter(nil /* lower */, nil /* upper */)
277 1 : if err != nil {
278 1 : return nil, err
279 1 : }
280 1 : defer iter.Close()
281 1 : var smallest InternalKey
282 1 : if key, _ := iter.First(); key != nil {
283 1 : if err := ingestValidateKey(opts, key); err != nil {
284 1 : return nil, err
285 1 : }
286 1 : smallest = (*key).Clone()
287 : }
288 1 : if err := iter.Error(); err != nil {
289 1 : return nil, err
290 1 : }
291 1 : if key, _ := iter.Last(); key != nil {
292 1 : if err := ingestValidateKey(opts, key); err != nil {
293 0 : return nil, err
294 0 : }
295 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, key.Clone())
296 : }
297 1 : if err := iter.Error(); err != nil {
298 1 : return nil, err
299 1 : }
300 : }
301 :
302 1 : iter, err := r.NewRawRangeDelIter()
303 1 : if err != nil {
304 0 : return nil, err
305 0 : }
306 1 : if iter != nil {
307 1 : defer iter.Close()
308 1 : var smallest InternalKey
309 1 : if s := iter.First(); s != nil {
310 1 : key := s.SmallestKey()
311 1 : if err := ingestValidateKey(opts, &key); err != nil {
312 0 : return nil, err
313 0 : }
314 1 : smallest = key.Clone()
315 : }
316 1 : if err := iter.Error(); err != nil {
317 0 : return nil, err
318 0 : }
319 1 : if s := iter.Last(); s != nil {
320 1 : k := s.SmallestKey()
321 1 : if err := ingestValidateKey(opts, &k); err != nil {
322 0 : return nil, err
323 0 : }
324 1 : largest := s.LargestKey().Clone()
325 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
326 : }
327 : }
328 :
329 : // Update the range-key bounds for the table.
330 1 : {
331 1 : iter, err := r.NewRawRangeKeyIter()
332 1 : if err != nil {
333 0 : return nil, err
334 0 : }
335 1 : if iter != nil {
336 1 : defer iter.Close()
337 1 : var smallest InternalKey
338 1 : if s := iter.First(); s != nil {
339 1 : key := s.SmallestKey()
340 1 : if err := ingestValidateKey(opts, &key); err != nil {
341 0 : return nil, err
342 0 : }
343 1 : smallest = key.Clone()
344 : }
345 1 : if err := iter.Error(); err != nil {
346 0 : return nil, err
347 0 : }
348 1 : if s := iter.Last(); s != nil {
349 1 : k := s.SmallestKey()
350 1 : if err := ingestValidateKey(opts, &k); err != nil {
351 0 : return nil, err
352 0 : }
353 : // As range keys are fragmented, the end key of the last range key in
354 : // the table provides the upper bound for the table.
355 1 : largest := s.LargestKey().Clone()
356 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
357 : }
358 1 : if err := iter.Error(); err != nil {
359 0 : return nil, err
360 0 : }
361 : }
362 : }
363 :
364 1 : if !meta.HasPointKeys && !meta.HasRangeKeys {
365 1 : return nil, nil
366 1 : }
367 :
368 : // Sanity check that the various bounds on the file were set consistently.
369 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
370 0 : return nil, err
371 0 : }
372 :
373 1 : return meta, nil
374 : }
375 :
376 : type ingestLoadResult struct {
377 : localMeta, sharedMeta []*fileMetadata
378 : externalMeta []*fileMetadata
379 : localPaths []string
380 : sharedLevels []uint8
381 : fileCount int
382 : }
383 :
384 : func ingestLoad(
385 : opts *Options,
386 : fmv FormatMajorVersion,
387 : paths []string,
388 : shared []SharedSSTMeta,
389 : external []ExternalFile,
390 : cacheID uint64,
391 : pending []base.DiskFileNum,
392 : objProvider objstorage.Provider,
393 : jobID int,
394 1 : ) (ingestLoadResult, error) {
395 1 : meta := make([]*fileMetadata, 0, len(paths))
396 1 : newPaths := make([]string, 0, len(paths))
397 1 : for i := range paths {
398 1 : f, err := opts.FS.Open(paths[i])
399 1 : if err != nil {
400 1 : return ingestLoadResult{}, err
401 1 : }
402 :
403 1 : readable, err := sstable.NewSimpleReadable(f)
404 1 : if err != nil {
405 1 : return ingestLoadResult{}, err
406 1 : }
407 1 : m, err := ingestLoad1(opts, fmv, readable, cacheID, pending[i])
408 1 : if err != nil {
409 1 : return ingestLoadResult{}, err
410 1 : }
411 1 : if m != nil {
412 1 : meta = append(meta, m)
413 1 : newPaths = append(newPaths, paths[i])
414 1 : }
415 : }
416 1 : if len(shared) == 0 && len(external) == 0 {
417 1 : return ingestLoadResult{localMeta: meta, localPaths: newPaths, fileCount: len(meta)}, nil
418 1 : }
419 :
420 : // Sort the shared files according to level.
421 1 : sort.Sort(sharedByLevel(shared))
422 1 :
423 1 : sharedMeta := make([]*fileMetadata, 0, len(shared))
424 1 : levels := make([]uint8, 0, len(shared))
425 1 : for i := range shared {
426 1 : m, err := ingestSynthesizeShared(opts, shared[i], pending[len(paths)+i])
427 1 : if err != nil {
428 0 : return ingestLoadResult{}, err
429 0 : }
430 1 : if shared[i].Level < sharedLevelsStart {
431 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
432 0 : }
433 1 : sharedMeta = append(sharedMeta, m)
434 1 : levels = append(levels, shared[i].Level)
435 : }
436 1 : externalMeta := make([]*fileMetadata, 0, len(external))
437 1 : for i := range external {
438 1 : m, err := ingestLoad1External(opts, external[i], pending[len(paths)+len(shared)+i], objProvider, jobID)
439 1 : if err != nil {
440 0 : return ingestLoadResult{}, err
441 0 : }
442 1 : externalMeta = append(externalMeta, m)
443 : }
444 1 : result := ingestLoadResult{
445 1 : localMeta: meta,
446 1 : sharedMeta: sharedMeta,
447 1 : externalMeta: externalMeta,
448 1 : localPaths: newPaths,
449 1 : sharedLevels: levels,
450 1 : fileCount: len(meta) + len(sharedMeta) + len(externalMeta),
451 1 : }
452 1 : return result, nil
453 : }
454 :
455 : // Struct for sorting metadatas by smallest user keys, while ensuring the
456 : // matching path also gets swapped to the same index. For use in
457 : // ingestSortAndVerify.
458 : type metaAndPaths struct {
459 : meta []*fileMetadata
460 : paths []string
461 : cmp Compare
462 : }
463 :
464 1 : func (m metaAndPaths) Len() int {
465 1 : return len(m.meta)
466 1 : }
467 :
468 1 : func (m metaAndPaths) Less(i, j int) bool {
469 1 : return m.cmp(m.meta[i].Smallest.UserKey, m.meta[j].Smallest.UserKey) < 0
470 1 : }
471 :
472 1 : func (m metaAndPaths) Swap(i, j int) {
473 1 : m.meta[i], m.meta[j] = m.meta[j], m.meta[i]
474 1 : if m.paths != nil {
475 1 : m.paths[i], m.paths[j] = m.paths[j], m.paths[i]
476 1 : }
477 : }
478 :
479 1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
480 1 : // Verify that all the shared files (i.e. files in sharedMeta)
481 1 : // fit within the exciseSpan.
482 1 : for i := range lr.sharedMeta {
483 1 : f := lr.sharedMeta[i]
484 1 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
485 0 : return errors.AssertionFailedf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
486 0 : }
487 : }
488 1 : if len(lr.externalMeta) > 0 {
489 1 : if len(lr.localMeta) > 0 || len(lr.sharedMeta) > 0 {
490 0 : // Currently we only support external ingests on their own. If external
491 0 : // files are present alongside local/shared files, return an error.
492 0 : return errors.AssertionFailedf("pebble: external files cannot be ingested atomically alongside other types of files")
493 0 : }
494 1 : sort.Sort(&metaAndPaths{
495 1 : meta: lr.externalMeta,
496 1 : cmp: cmp,
497 1 : })
498 1 : for i := 1; i < len(lr.externalMeta); i++ {
499 1 : if sstableKeyCompare(cmp, lr.externalMeta[i-1].Largest, lr.externalMeta[i].Smallest) >= 0 {
500 1 : return errors.AssertionFailedf("pebble: external sstables have overlapping ranges")
501 1 : }
502 : }
503 1 : return nil
504 : }
505 1 : if len(lr.localMeta) <= 1 || len(lr.localPaths) <= 1 {
506 1 : return nil
507 1 : }
508 :
509 1 : sort.Sort(&metaAndPaths{
510 1 : meta: lr.localMeta,
511 1 : paths: lr.localPaths,
512 1 : cmp: cmp,
513 1 : })
514 1 :
515 1 : for i := 1; i < len(lr.localPaths); i++ {
516 1 : if sstableKeyCompare(cmp, lr.localMeta[i-1].Largest, lr.localMeta[i].Smallest) >= 0 {
517 1 : return errors.AssertionFailedf("pebble: local ingestion sstables have overlapping ranges")
518 1 : }
519 : }
520 1 : if len(lr.sharedMeta) == 0 {
521 1 : return nil
522 1 : }
523 0 : filesInLevel := make([]*fileMetadata, 0, len(lr.sharedMeta))
524 0 : for l := sharedLevelsStart; l < numLevels; l++ {
525 0 : filesInLevel = filesInLevel[:0]
526 0 : for i := range lr.sharedMeta {
527 0 : if lr.sharedLevels[i] == uint8(l) {
528 0 : filesInLevel = append(filesInLevel, lr.sharedMeta[i])
529 0 : }
530 : }
531 0 : slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
532 0 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
533 0 : })
534 0 : for i := 1; i < len(filesInLevel); i++ {
535 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
536 0 : return errors.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
537 0 : }
538 : }
539 : }
540 0 : return nil
541 : }
542 :
543 1 : func ingestCleanup(objProvider objstorage.Provider, meta []*fileMetadata) error {
544 1 : var firstErr error
545 1 : for i := range meta {
546 1 : if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
547 1 : firstErr = firstError(firstErr, err)
548 1 : }
549 : }
550 1 : return firstErr
551 : }
552 :
553 : // ingestLink creates new objects which are backed by either hardlinks to or
554 : // copies of the ingested files. It also attaches shared objects to the provider.
555 : func ingestLink(
556 : jobID int,
557 : opts *Options,
558 : objProvider objstorage.Provider,
559 : lr ingestLoadResult,
560 : shared []SharedSSTMeta,
561 1 : ) error {
562 1 : for i := range lr.localPaths {
563 1 : objMeta, err := objProvider.LinkOrCopyFromLocal(
564 1 : context.TODO(), opts.FS, lr.localPaths[i], fileTypeTable, lr.localMeta[i].FileBacking.DiskFileNum,
565 1 : objstorage.CreateOptions{PreferSharedStorage: true},
566 1 : )
567 1 : if err != nil {
568 1 : if err2 := ingestCleanup(objProvider, lr.localMeta[:i]); err2 != nil {
569 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
570 0 : }
571 1 : return err
572 : }
573 1 : if opts.EventListener.TableCreated != nil {
574 1 : opts.EventListener.TableCreated(TableCreateInfo{
575 1 : JobID: jobID,
576 1 : Reason: "ingesting",
577 1 : Path: objProvider.Path(objMeta),
578 1 : FileNum: lr.localMeta[i].FileNum,
579 1 : })
580 1 : }
581 : }
582 1 : sharedObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared))
583 1 : for i := range shared {
584 1 : backing, err := shared[i].Backing.Get()
585 1 : if err != nil {
586 0 : return err
587 0 : }
588 1 : sharedObjs = append(sharedObjs, objstorage.RemoteObjectToAttach{
589 1 : FileNum: lr.sharedMeta[i].FileBacking.DiskFileNum,
590 1 : FileType: fileTypeTable,
591 1 : Backing: backing,
592 1 : })
593 : }
594 1 : sharedObjMetas, err := objProvider.AttachRemoteObjects(sharedObjs)
595 1 : if err != nil {
596 0 : return err
597 0 : }
598 1 : for i := range sharedObjMetas {
599 1 : // One corner case around file sizes we need to be mindful of, is that
600 1 : // if one of the shareObjs was initially created by us (and has boomeranged
601 1 : // back from another node), we'll need to update the FileBacking's size
602 1 : // to be the true underlying size. Otherwise, we could hit errors when we
603 1 : // open the db again after a crash/restart (see checkConsistency in open.go),
604 1 : // plus it more accurately allows us to prioritize compactions of files
605 1 : // that were originally created by us.
606 1 : if sharedObjMetas[i].IsShared() && !objProvider.IsSharedForeign(sharedObjMetas[i]) {
607 1 : size, err := objProvider.Size(sharedObjMetas[i])
608 1 : if err != nil {
609 0 : return err
610 0 : }
611 1 : lr.sharedMeta[i].FileBacking.Size = uint64(size)
612 : }
613 1 : if opts.EventListener.TableCreated != nil {
614 1 : opts.EventListener.TableCreated(TableCreateInfo{
615 1 : JobID: jobID,
616 1 : Reason: "ingesting",
617 1 : Path: objProvider.Path(sharedObjMetas[i]),
618 1 : FileNum: lr.sharedMeta[i].FileNum,
619 1 : })
620 1 : }
621 : }
622 : // We do not need to do anything about lr.externalMetas. Those were already
623 : // linked in ingestLoad.
624 :
625 1 : return nil
626 : }
627 :
628 1 : func ingestMemtableOverlaps(cmp Compare, mem flushable, keyRanges []internalKeyRange) bool {
629 1 : iter := mem.newIter(nil)
630 1 : rangeDelIter := mem.newRangeDelIter(nil)
631 1 : rkeyIter := mem.newRangeKeyIter(nil)
632 1 :
633 1 : closeIters := func() error {
634 1 : err := iter.Close()
635 1 : if rangeDelIter != nil {
636 1 : err = firstError(err, rangeDelIter.Close())
637 1 : }
638 1 : if rkeyIter != nil {
639 1 : err = firstError(err, rkeyIter.Close())
640 1 : }
641 1 : return err
642 : }
643 :
644 1 : for _, kr := range keyRanges {
645 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
646 1 : closeIters()
647 1 : return true
648 1 : }
649 : }
650 :
651 : // Assume overlap if any iterator errored out.
652 1 : return closeIters() != nil
653 : }
654 :
655 : func ingestUpdateSeqNum(
656 : cmp Compare, format base.FormatKey, seqNum uint64, loadResult ingestLoadResult,
657 1 : ) error {
658 1 : setSeqFn := func(k base.InternalKey) base.InternalKey {
659 1 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
660 1 : }
661 1 : updateMetadata := func(m *fileMetadata) error {
662 1 : // NB: we set the fields directly here, rather than via their Extend*
663 1 : // methods, as we are updating sequence numbers.
664 1 : if m.HasPointKeys {
665 1 : m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
666 1 : }
667 1 : if m.HasRangeKeys {
668 1 : m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
669 1 : }
670 1 : m.Smallest = setSeqFn(m.Smallest)
671 1 : // Only update the seqnum for the largest key if that key is not an
672 1 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
673 1 : // boundary), as doing so effectively drops the exclusive sentinel (by
674 1 : // lowering the seqnum from the max value), and extends the bounds of the
675 1 : // table.
676 1 : // NB: as the largest range key is always an exclusive sentinel, it is never
677 1 : // updated.
678 1 : if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
679 1 : m.LargestPointKey = setSeqFn(m.LargestPointKey)
680 1 : }
681 1 : if !m.Largest.IsExclusiveSentinel() {
682 1 : m.Largest = setSeqFn(m.Largest)
683 1 : }
684 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
685 : // Properties.GlobalSeqNum when an sstable is loaded.
686 1 : m.SmallestSeqNum = seqNum
687 1 : m.LargestSeqNum = seqNum
688 1 : // Ensure the new bounds are consistent.
689 1 : if err := m.Validate(cmp, format); err != nil {
690 0 : return err
691 0 : }
692 1 : seqNum++
693 1 : return nil
694 : }
695 :
696 : // Shared sstables are required to be sorted by level ascending. We then
697 : // iterate the shared sstables in reverse, assigning the lower sequence
698 : // numbers to the shared sstables that will be ingested into the lower
699 : // (larger numbered) levels first. This ensures sequence number shadowing is
700 : // correct.
701 1 : for i := len(loadResult.sharedMeta) - 1; i >= 0; i-- {
702 1 : if i-1 >= 0 && loadResult.sharedLevels[i-1] > loadResult.sharedLevels[i] {
703 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.sharedMeta[i-1], loadResult.sharedMeta[i]))
704 : }
705 1 : if err := updateMetadata(loadResult.sharedMeta[i]); err != nil {
706 0 : return err
707 0 : }
708 : }
709 1 : for i := range loadResult.localMeta {
710 1 : if err := updateMetadata(loadResult.localMeta[i]); err != nil {
711 0 : return err
712 0 : }
713 : }
714 1 : for i := range loadResult.externalMeta {
715 1 : if err := updateMetadata(loadResult.externalMeta[i]); err != nil {
716 0 : return err
717 0 : }
718 : }
719 1 : return nil
720 : }
721 :
722 : // Denotes an internal key range. Smallest and largest are both inclusive.
723 : type internalKeyRange struct {
724 : smallest, largest InternalKey
725 : }
726 :
727 : func overlapWithIterator(
728 : iter internalIterator,
729 : rangeDelIter *keyspan.FragmentIterator,
730 : rkeyIter keyspan.FragmentIterator,
731 : keyRange internalKeyRange,
732 : cmp Compare,
733 1 : ) bool {
734 1 : // Check overlap with point operations.
735 1 : //
736 1 : // When using levelIter, it seeks to the SST whose boundaries
737 1 : // contain keyRange.smallest.UserKey(S).
738 1 : // It then tries to find a point in that SST that is >= S.
739 1 : // If there's no such point it means the SST ends in a tombstone in which case
740 1 : // levelIter.SeekGE generates a boundary range del sentinel.
741 1 : // The comparison of this boundary with keyRange.largest(L) below
742 1 : // is subtle but maintains correctness.
743 1 : // 1) boundary < L,
744 1 : // since boundary is also > S (initial seek),
745 1 : // whatever the boundary's start key may be, we're always overlapping.
746 1 : // 2) boundary > L,
747 1 : // overlap with boundary cannot be determined since we don't know boundary's start key.
748 1 : // We require checking for overlap with rangeDelIter.
749 1 : // 3) boundary == L and L is not sentinel,
750 1 : // means boundary < L and hence is similar to 1).
751 1 : // 4) boundary == L and L is sentinel,
752 1 : // we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap.
753 1 : key, _ := iter.SeekGE(keyRange.smallest.UserKey, base.SeekGEFlagsNone)
754 1 : if key != nil {
755 1 : c := sstableKeyCompare(cmp, *key, keyRange.largest)
756 1 : if c <= 0 {
757 1 : return true
758 1 : }
759 : }
760 : // Assume overlap if iterator errored.
761 1 : if err := iter.Error(); err != nil {
762 1 : return true
763 1 : }
764 :
765 1 : computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) bool {
766 1 : // NB: The spans surfaced by the fragment iterator are non-overlapping.
767 1 : span := rIter.SeekLT(keyRange.smallest.UserKey)
768 1 : if span == nil {
769 1 : span = rIter.Next()
770 1 : }
771 1 : for ; span != nil; span = rIter.Next() {
772 1 : if span.Empty() {
773 1 : continue
774 : }
775 1 : key := span.SmallestKey()
776 1 : c := sstableKeyCompare(cmp, key, keyRange.largest)
777 1 : if c > 0 {
778 1 : // The start of the span is after the largest key in the
779 1 : // ingested table.
780 1 : return false
781 1 : }
782 1 : if cmp(span.End, keyRange.smallest.UserKey) > 0 {
783 1 : // The end of the span is greater than the smallest in the
784 1 : // table. Note that the span end key is exclusive, thus ">0"
785 1 : // instead of ">=0".
786 1 : return true
787 1 : }
788 : }
789 : // Assume overlap if iterator errored.
790 1 : if err := rIter.Error(); err != nil {
791 0 : return true
792 0 : }
793 1 : return false
794 : }
795 :
796 : // rkeyIter is either a range key level iter, or a range key iterator
797 : // over a single file.
798 1 : if rkeyIter != nil {
799 1 : if computeOverlapWithSpans(rkeyIter) {
800 1 : return true
801 1 : }
802 : }
803 :
804 : // Check overlap with range deletions.
805 1 : if rangeDelIter == nil || *rangeDelIter == nil {
806 1 : return false
807 1 : }
808 1 : return computeOverlapWithSpans(*rangeDelIter)
809 : }
810 :
811 : // ingestTargetLevel returns the target level for a file being ingested.
812 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
813 : // its target level calculation, and if a split candidate is found, that file
814 : // is returned as the splitFile.
815 : func ingestTargetLevel(
816 : newIters tableNewIters,
817 : newRangeKeyIter keyspan.TableNewSpanIter,
818 : iterOps IterOptions,
819 : comparer *Comparer,
820 : v *version,
821 : baseLevel int,
822 : compactions map[*compaction]struct{},
823 : meta *fileMetadata,
824 : suggestSplit bool,
825 1 : ) (targetLevel int, splitFile *fileMetadata, err error) {
826 1 : // Find the lowest level which does not have any files which overlap meta. We
827 1 : // search from L0 to L6 looking for whether there are any files in the level
828 1 : // which overlap meta. We want the "lowest" level (where lower means
829 1 : // increasing level number) in order to reduce write amplification.
830 1 : //
831 1 : // There are 2 kinds of overlap we need to check for: file boundary overlap
832 1 : // and data overlap. Data overlap implies file boundary overlap. Note that it
833 1 : // is always possible to ingest into L0.
834 1 : //
835 1 : // To place meta at level i where i > 0:
836 1 : // - there must not be any data overlap with levels <= i, since that will
837 1 : // violate the sequence number invariant.
838 1 : // - no file boundary overlap with level i, since that will violate the
839 1 : // invariant that files do not overlap in levels i > 0.
840 1 : // - if there is only a file overlap at a given level, and no data overlap,
841 1 : // we can still slot a file at that level. We return the fileMetadata with
842 1 : // which we have file boundary overlap (must be only one file, as sstable
843 1 : // bounds are usually tight on user keys) and the caller is expected to split
844 1 : // that sstable into two virtual sstables, allowing this file to go into that
845 1 : // level. Note that if we have file boundary overlap with two files, which
846 1 : // should only happen on rare occasions, we treat it as data overlap and
847 1 : // don't use this optimization.
848 1 : //
849 1 : // The file boundary overlap check is simpler to conceptualize. Consider the
850 1 : // following example, in which the ingested file lies completely before or
851 1 : // after the file being considered.
852 1 : //
853 1 : // |--| |--| ingested file: [a,b] or [f,g]
854 1 : // |-----| existing file: [c,e]
855 1 : // _____________________
856 1 : // a b c d e f g
857 1 : //
858 1 : // In both cases the ingested file can move to considering the next level.
859 1 : //
860 1 : // File boundary overlap does not necessarily imply data overlap. The check
861 1 : // for data overlap is a little more nuanced. Consider the following examples:
862 1 : //
863 1 : // 1. No data overlap:
864 1 : //
865 1 : // |-| |--| ingested file: [cc-d] or [ee-ff]
866 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
867 1 : // _____________________
868 1 : // a b c d e f g
869 1 : //
870 1 : // In this case the ingested files can "fall through" this level. The checks
871 1 : // continue at the next level.
872 1 : //
873 1 : // 2. Data overlap:
874 1 : //
875 1 : // |--| ingested file: [d-e]
876 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
877 1 : // _____________________
878 1 : // a b c d e f g
879 1 : //
880 1 : // In this case the file cannot be ingested into this level as the point 'dd'
881 1 : // is in the way.
882 1 : //
883 1 : // It is worth noting that the check for data overlap is only approximate. In
884 1 : // the previous example, the ingested table [d-e] could contain only the
885 1 : // points 'd' and 'e', in which case the table would be eligible for
886 1 : // considering lower levels. However, such a fine-grained check would need to
887 1 : // be exhaustive (comparing points and ranges in both the ingested existing
888 1 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
889 1 : // existing point that falls within the ingested table bounds as being "data
890 1 : // overlap".
891 1 :
892 1 : // This assertion implicitly checks that we have the current version of
893 1 : // the metadata.
894 1 : if v.L0Sublevels == nil {
895 0 : return 0, nil, errors.AssertionFailedf("could not read L0 sublevels")
896 0 : }
897 1 : iterOps.CategoryAndQoS = sstable.CategoryAndQoS{
898 1 : Category: "pebble-ingest",
899 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
900 1 : }
901 1 : // Check for overlap over the keys of L0 by iterating over the sublevels.
902 1 : for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
903 1 : iter := newLevelIter(context.Background(),
904 1 : iterOps, comparer, newIters, v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})
905 1 :
906 1 : var rangeDelIter keyspan.FragmentIterator
907 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
908 1 : // sets it up for the target file.
909 1 : iter.initRangeDel(&rangeDelIter)
910 1 :
911 1 : levelIter := keyspan.LevelIter{}
912 1 : levelIter.Init(
913 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
914 1 : v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange,
915 1 : )
916 1 :
917 1 : kr := internalKeyRange{
918 1 : smallest: meta.Smallest,
919 1 : largest: meta.Largest,
920 1 : }
921 1 : overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, kr, comparer.Compare)
922 1 : err := iter.Close() // Closes range del iter as well.
923 1 : err = firstError(err, levelIter.Close())
924 1 : if err != nil {
925 1 : return 0, nil, err
926 1 : }
927 1 : if overlap {
928 1 : return targetLevel, nil, nil
929 1 : }
930 : }
931 :
932 1 : level := baseLevel
933 1 : for ; level < numLevels; level++ {
934 1 : levelIter := newLevelIter(context.Background(),
935 1 : iterOps, comparer, newIters, v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
936 1 : var rangeDelIter keyspan.FragmentIterator
937 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
938 1 : // sets it up for the target file.
939 1 : levelIter.initRangeDel(&rangeDelIter)
940 1 :
941 1 : rkeyLevelIter := &keyspan.LevelIter{}
942 1 : rkeyLevelIter.Init(
943 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
944 1 : v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange,
945 1 : )
946 1 :
947 1 : kr := internalKeyRange{
948 1 : smallest: meta.Smallest,
949 1 : largest: meta.Largest,
950 1 : }
951 1 : overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, kr, comparer.Compare)
952 1 : err := levelIter.Close() // Closes range del iter as well.
953 1 : err = firstError(err, rkeyLevelIter.Close())
954 1 : if err != nil {
955 0 : return 0, nil, err
956 0 : }
957 1 : if overlap {
958 1 : return targetLevel, splitFile, nil
959 1 : }
960 :
961 : // Check boundary overlap.
962 1 : var candidateSplitFile *fileMetadata
963 1 : boundaryOverlaps := v.Overlaps(level, comparer.Compare, meta.Smallest.UserKey,
964 1 : meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
965 1 : if !boundaryOverlaps.Empty() {
966 1 : // We are already guaranteed to not have any data overlaps with files
967 1 : // in boundaryOverlaps, otherwise we'd have returned in the above if
968 1 : // statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for
969 1 : // the case where we can slot this file into the current level despite
970 1 : // a boundary overlap, by splitting one existing file into two virtual
971 1 : // sstables.
972 1 : if suggestSplit && boundaryOverlaps.Len() == 1 {
973 1 : iter := boundaryOverlaps.Iter()
974 1 : candidateSplitFile = iter.First()
975 1 : } else {
976 1 : // We either don't want to suggest ingest-time splits (i.e.
977 1 : // !suggestSplit), or we boundary-overlapped with more than one file.
978 1 : continue
979 : }
980 : }
981 :
982 : // Check boundary overlap with any ongoing compactions. We consider an
983 : // overlapping compaction that's writing files to an output level as
984 : // equivalent to boundary overlap with files in that output level.
985 : //
986 : // We cannot check for data overlap with the new SSTs compaction will produce
987 : // since compaction hasn't been done yet. However, there's no need to check
988 : // since all keys in them will be from levels in [c.startLevel,
989 : // c.outputLevel], and all those levels have already had their data overlap
990 : // tested negative (else we'd have returned earlier).
991 : //
992 : // An alternative approach would be to cancel these compactions and proceed
993 : // with an ingest-time split on this level if necessary. However, compaction
994 : // cancellation can result in significant wasted effort and is best avoided
995 : // unless necessary.
996 1 : overlaps := false
997 1 : for c := range compactions {
998 1 : if c.outputLevel == nil || level != c.outputLevel.level {
999 1 : continue
1000 : }
1001 1 : if comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
1002 1 : comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
1003 1 : overlaps = true
1004 1 : break
1005 : }
1006 : }
1007 1 : if !overlaps {
1008 1 : targetLevel = level
1009 1 : splitFile = candidateSplitFile
1010 1 : }
1011 : }
1012 1 : return targetLevel, splitFile, nil
1013 : }
1014 :
1015 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
1016 : // atomic and semantically equivalent to creating a single batch containing all
1017 : // of the mutations in the sstables. Ingestion may require the memtable to be
1018 : // flushed. The ingested sstable files are moved into the DB and must reside on
1019 : // the same filesystem as the DB. Sstables can be created for ingestion using
1020 : // sstable.Writer. On success, Ingest removes the input paths.
1021 : //
1022 : // Two types of sstables are accepted for ingestion(s): one is sstables present
1023 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
1024 : // present in remote.Storage, referred to as shared or foreign sstables. These
1025 : // shared sstables can be linked through objstorageprovider.Provider, and do not
1026 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
1027 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
1028 : //
1029 : // All sstables *must* be Sync()'d by the caller after all bytes are written
1030 : // and before its file handle is closed; failure to do so could violate
1031 : // durability or lead to corrupted on-disk state. This method cannot, in a
1032 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1033 : // properly synced to disk. Opening new file handles and Sync()-ing them
1034 : // does not always guarantee durability; see the discussion here on that:
1035 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1036 : //
1037 : // Ingestion loads each sstable into the lowest level of the LSM which it
1038 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1039 : // ingestion forces the memtable to flush, and then waits for the flush to
1040 : // occur. In some cases, such as with no foreign sstables and no excise span,
1041 : // ingestion that gets blocked on a memtable can join the flushable queue and
1042 : // finish even before the memtable has been flushed.
1043 : //
1044 : // The steps for ingestion are:
1045 : //
1046 : // 1. Allocate file numbers for every sstable being ingested.
1047 : // 2. Load the metadata for all sstables being ingested.
1048 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1049 : // sstables).
1050 : // 4. Hard link (or copy) the local sstables into the DB directory.
1051 : // 5. Allocate a sequence number to use for all of the entries in the
1052 : // local sstables. This is the step where overlap with memtables is
1053 : // determined. If there is overlap, we remember the most recent memtable
1054 : // that overlaps.
1055 : // 6. Update the sequence number in the ingested local sstables. (Remote
1056 : // sstables get fixed sequence numbers that were determined at load time.)
1057 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1058 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1059 : // 8.1. If an excise span was specified, figure out what sstables in the
1060 : // current version overlap with the excise span, and create new virtual
1061 : // sstables out of those sstables that exclude the excised span (DB.excise).
1062 : // 9. Publish the ingestion sequence number.
1063 : //
1064 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1065 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1066 : // mutations that get sequence numbers larger than the ingestion sequence
1067 : // number get queued up behind the ingestion waiting for it to complete. This
1068 : // can produce a noticeable hiccup in performance. See
1069 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1070 : // this hiccup.
1071 1 : func (d *DB) Ingest(paths []string) error {
1072 1 : if err := d.closed.Load(); err != nil {
1073 1 : panic(err)
1074 : }
1075 1 : if d.opts.ReadOnly {
1076 1 : return ErrReadOnly
1077 1 : }
1078 1 : _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
1079 1 : return err
1080 : }
1081 :
1082 : // IngestOperationStats provides some information about where in the LSM the
1083 : // bytes were ingested.
1084 : type IngestOperationStats struct {
1085 : // Bytes is the total bytes in the ingested sstables.
1086 : Bytes uint64
1087 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1088 : // into L0. This value is approximate when flushable ingests are active and
1089 : // an ingest overlaps an entry in the flushable queue. Currently, this
1090 : // approximation is very rough, only including tables that overlapped the
1091 : // memtable. This estimate may be improved with #2112.
1092 : ApproxIngestedIntoL0Bytes uint64
1093 : // MemtableOverlappingFiles is the count of ingested sstables
1094 : // that overlapped keys in the memtables.
1095 : MemtableOverlappingFiles int
1096 : }
1097 :
1098 : // ExternalFile are external sstables that can be referenced through
1099 : // objprovider and ingested as remote files that will not be refcounted or
1100 : // cleaned up. For use with online restore. Note that the underlying sstable
1101 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1102 : // is expected to only read the keys within those bounds.
1103 : type ExternalFile struct {
1104 : // Locator is the shared.Locator that can be used with objProvider to
1105 : // resolve a reference to this external sstable.
1106 : Locator remote.Locator
1107 : // ObjName is the unique name of this sstable on Locator.
1108 : ObjName string
1109 : // Size of the referenced proportion of the virtualized sstable. An estimate
1110 : // is acceptable in lieu of the backing file size.
1111 : Size uint64
1112 : // SmallestUserKey and LargestUserKey are the [smallest,largest) user key
1113 : // bounds of the sstable. Both these bounds are loose i.e. it's possible for
1114 : // the sstable to not span the entirety of this range. However, multiple
1115 : // ExternalFiles in one ingestion must all have non-overlapping
1116 : // [smallest, largest) spans. Note that this Largest bound is exclusive.
1117 : SmallestUserKey, LargestUserKey []byte
1118 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1119 : // or range keys. If both structs are false, an error is returned during
1120 : // ingestion.
1121 : HasPointKey, HasRangeKey bool
1122 : // ContentPrefix and SyntheticPrefix denote a prefix replacement rule causing
1123 : // a file, in which all keys have prefix ContentPrefix, to appear whenever it
1124 : // is accessed as if those keys all instead have prefix SyntheticPrefix.
1125 : // SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey.
1126 : ContentPrefix, SyntheticPrefix []byte
1127 : }
1128 :
1129 : // IngestWithStats does the same as Ingest, and additionally returns
1130 : // IngestOperationStats.
1131 1 : func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
1132 1 : if err := d.closed.Load(); err != nil {
1133 0 : panic(err)
1134 : }
1135 1 : if d.opts.ReadOnly {
1136 0 : return IngestOperationStats{}, ErrReadOnly
1137 0 : }
1138 1 : return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
1139 : }
1140 :
1141 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1142 : // accepts external files (with locator info that can be resolved using
1143 : // d.opts.SharedStorage). These files must also be non-overlapping with
1144 : // each other, and must be resolvable through d.objProvider.
1145 1 : func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
1146 1 : if err := d.closed.Load(); err != nil {
1147 0 : panic(err)
1148 : }
1149 :
1150 1 : if d.opts.ReadOnly {
1151 0 : return IngestOperationStats{}, ErrReadOnly
1152 0 : }
1153 1 : if d.opts.Experimental.RemoteStorage == nil {
1154 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1155 0 : }
1156 1 : return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external)
1157 : }
1158 :
1159 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1160 : // list of shared files to ingest that can be read from a remote.Storage through
1161 : // a Provider. All the shared files must live within exciseSpan, and any existing
1162 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1163 : // sstables (if not virtual already) and shrinking their spans to exclude
1164 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1165 : // ingestion process.
1166 : //
1167 : // Panics if this DB instance was not instantiated with a remote.Storage and
1168 : // shared sstables are present.
1169 : func (d *DB) IngestAndExcise(
1170 : paths []string, shared []SharedSSTMeta, exciseSpan KeyRange,
1171 1 : ) (IngestOperationStats, error) {
1172 1 : if err := d.closed.Load(); err != nil {
1173 0 : panic(err)
1174 : }
1175 1 : if d.opts.ReadOnly {
1176 0 : return IngestOperationStats{}, ErrReadOnly
1177 0 : }
1178 1 : if invariants.Enabled && d.opts.Comparer.Split != nil {
1179 1 : // Excise is only supported on prefix keys.
1180 1 : if d.opts.Comparer.Split(exciseSpan.Start) != len(exciseSpan.Start) {
1181 0 : panic("IngestAndExcise called with suffixed start key")
1182 : }
1183 1 : if d.opts.Comparer.Split(exciseSpan.End) != len(exciseSpan.End) {
1184 0 : panic("IngestAndExcise called with suffixed end key")
1185 : }
1186 : }
1187 1 : if v := d.FormatMajorVersion(); v < FormatMinForSharedObjects {
1188 0 : return IngestOperationStats{}, errors.Errorf(
1189 0 : "store has format major version %d; IngestAndExise requires at least %d",
1190 0 : v, FormatMinForSharedObjects,
1191 0 : )
1192 0 : }
1193 1 : return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, nil /* external */)
1194 : }
1195 :
1196 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1197 : func (d *DB) newIngestedFlushableEntry(
1198 : meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
1199 1 : ) (*flushableEntry, error) {
1200 1 : // Update the sequence number for all of the sstables in the
1201 1 : // metadata. Writing the metadata to the manifest when the
1202 1 : // version edit is applied is the mechanism that persists the
1203 1 : // sequence number. The sstables themselves are left unmodified.
1204 1 : // In this case, a version edit will only be written to the manifest
1205 1 : // when the flushable is eventually flushed. If Pebble restarts in that
1206 1 : // time, then we'll lose the ingest sequence number information. But this
1207 1 : // information will also be reconstructed on node restart.
1208 1 : if err := ingestUpdateSeqNum(
1209 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, ingestLoadResult{localMeta: meta},
1210 1 : ); err != nil {
1211 0 : return nil, err
1212 0 : }
1213 :
1214 1 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter)
1215 1 :
1216 1 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1217 1 : // to and the sequence number within the WAL which we'll write this entry
1218 1 : // to.
1219 1 : entry := d.newFlushableEntry(f, logNum, seqNum)
1220 1 : // The flushable entry starts off with a single reader ref, so increment
1221 1 : // the FileMetadata.Refs.
1222 1 : for _, file := range f.files {
1223 1 : file.Ref()
1224 1 : }
1225 1 : entry.unrefFiles = func() []*fileBacking {
1226 1 : var obsolete []*fileBacking
1227 1 : for _, file := range f.files {
1228 1 : if file.Unref() == 0 {
1229 1 : obsolete = append(obsolete, file.FileMetadata.FileBacking)
1230 1 : }
1231 : }
1232 1 : return obsolete
1233 : }
1234 :
1235 1 : entry.flushForced = true
1236 1 : entry.releaseMemAccounting = func() {}
1237 1 : return entry, nil
1238 : }
1239 :
1240 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1241 : // we're holding both locks, the order in which we rotate the memtable or
1242 : // recycle the WAL in this function is irrelevant as long as the correct log
1243 : // numbers are assigned to the appropriate flushable.
1244 1 : func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
1245 1 : b := d.NewBatch()
1246 1 : for _, m := range meta {
1247 1 : b.ingestSST(m.FileNum)
1248 1 : }
1249 1 : b.setSeqNum(seqNum)
1250 1 :
1251 1 : // If the WAL is disabled, then the logNum used to create the flushable
1252 1 : // entry doesn't matter. We just use the logNum assigned to the current
1253 1 : // mutable memtable. If the WAL is enabled, then this logNum will be
1254 1 : // overwritten by the logNum of the log which will contain the log entry
1255 1 : // for the ingestedFlushable.
1256 1 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1257 1 : if !d.opts.DisableWAL {
1258 1 : // We create a new WAL for the flushable instead of reusing the end of
1259 1 : // the previous WAL. This simplifies the increment of the minimum
1260 1 : // unflushed log number, and also simplifies WAL replay.
1261 1 : logNum, _ = d.recycleWAL()
1262 1 : d.mu.Unlock()
1263 1 : err := d.commit.directWrite(b)
1264 1 : if err != nil {
1265 0 : d.opts.Logger.Fatalf("%v", err)
1266 0 : }
1267 1 : d.mu.Lock()
1268 : }
1269 :
1270 1 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum)
1271 1 : if err != nil {
1272 0 : return err
1273 0 : }
1274 1 : nextSeqNum := seqNum + uint64(b.Count())
1275 1 :
1276 1 : // Set newLogNum to the logNum of the previous flushable. This value is
1277 1 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1278 1 : // the appropriate value below.
1279 1 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1280 1 : if !d.opts.DisableWAL {
1281 1 : // This is WAL num of the next mutable memtable which comes after the
1282 1 : // ingestedFlushable in the flushable queue. The mutable memtable
1283 1 : // will be created below.
1284 1 : newLogNum, _ = d.recycleWAL()
1285 1 : if err != nil {
1286 0 : return err
1287 0 : }
1288 : }
1289 :
1290 1 : currMem := d.mu.mem.mutable
1291 1 : // NB: Placing ingested sstables above the current memtables
1292 1 : // requires rotating of the existing memtables/WAL. There is
1293 1 : // some concern of churning through tiny memtables due to
1294 1 : // ingested sstables being placed on top of them, but those
1295 1 : // memtables would have to be flushed anyways.
1296 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1297 1 : d.rotateMemtable(newLogNum, nextSeqNum, currMem)
1298 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1299 1 : d.maybeScheduleFlush()
1300 1 : return nil
1301 : }
1302 :
1303 : // See comment at Ingest() for details on how this works.
1304 : func (d *DB) ingest(
1305 : paths []string,
1306 : targetLevelFunc ingestTargetLevelFunc,
1307 : shared []SharedSSTMeta,
1308 : exciseSpan KeyRange,
1309 : external []ExternalFile,
1310 1 : ) (IngestOperationStats, error) {
1311 1 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1312 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1313 : }
1314 1 : if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1315 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1316 0 : }
1317 1 : if len(external) > 0 && d.FormatMajorVersion() < FormatSyntheticPrefixes {
1318 0 : for i := range external {
1319 0 : if len(external[i].SyntheticPrefix) > 0 {
1320 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for synthetic prefix ingestion")
1321 0 : }
1322 : }
1323 : }
1324 : // Allocate file numbers for all of the files being ingested and mark them as
1325 : // pending in order to prevent them from being deleted. Note that this causes
1326 : // the file number ordering to be out of alignment with sequence number
1327 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1328 : // that (busted) invariant.
1329 1 : d.mu.Lock()
1330 1 : pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)+len(external))
1331 1 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1332 1 : pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
1333 1 : }
1334 :
1335 1 : jobID := d.mu.nextJobID
1336 1 : d.mu.nextJobID++
1337 1 : d.mu.Unlock()
1338 1 :
1339 1 : // Load the metadata for all the files being ingested. This step detects
1340 1 : // and elides empty sstables.
1341 1 : loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs, d.objProvider, jobID)
1342 1 : if err != nil {
1343 1 : return IngestOperationStats{}, err
1344 1 : }
1345 :
1346 1 : if loadResult.fileCount == 0 {
1347 1 : // All of the sstables to be ingested were empty. Nothing to do.
1348 1 : return IngestOperationStats{}, nil
1349 1 : }
1350 :
1351 : // Verify the sstables do not overlap.
1352 1 : if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
1353 1 : return IngestOperationStats{}, err
1354 1 : }
1355 :
1356 : // Hard link the sstables into the DB directory. Since the sstables aren't
1357 : // referenced by a version, they won't be used. If the hard linking fails
1358 : // (e.g. because the files reside on a different filesystem), ingestLink will
1359 : // fall back to copying, and if that fails we undo our work and return an
1360 : // error.
1361 1 : if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil {
1362 0 : return IngestOperationStats{}, err
1363 0 : }
1364 :
1365 : // Make the new tables durable. We need to do this at some point before we
1366 : // update the MANIFEST (via logAndApply), otherwise a crash can have the
1367 : // tables referenced in the MANIFEST, but not present in the provider.
1368 1 : if err := d.objProvider.Sync(); err != nil {
1369 1 : return IngestOperationStats{}, err
1370 1 : }
1371 :
1372 : // metaFlushableOverlaps is a slice parallel to meta indicating which of the
1373 : // ingested sstables overlap some table in the flushable queue. It's used to
1374 : // approximate ingest-into-L0 stats when using flushable ingests.
1375 1 : metaFlushableOverlaps := make([]bool, loadResult.fileCount)
1376 1 : var mem *flushableEntry
1377 1 : var mut *memTable
1378 1 : // asFlushable indicates whether the sstable was ingested as a flushable.
1379 1 : var asFlushable bool
1380 1 : iterOps := IterOptions{
1381 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1382 1 : Category: "pebble-ingest",
1383 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1384 1 : },
1385 1 : }
1386 1 : prepare := func(seqNum uint64) {
1387 1 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1388 1 :
1389 1 : d.mu.Lock()
1390 1 : defer d.mu.Unlock()
1391 1 :
1392 1 : // Check to see if any files overlap with any of the memtables. The queue
1393 1 : // is ordered from oldest to newest with the mutable memtable being the
1394 1 : // last element in the slice. We want to wait for the newest table that
1395 1 : // overlaps.
1396 1 :
1397 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1398 1 : m := d.mu.mem.queue[i]
1399 1 : iter := m.newIter(&iterOps)
1400 1 : rangeDelIter := m.newRangeDelIter(&iterOps)
1401 1 : rkeyIter := m.newRangeKeyIter(&iterOps)
1402 1 :
1403 1 : checkForOverlap := func(i int, meta *fileMetadata) {
1404 1 : if metaFlushableOverlaps[i] {
1405 1 : // This table already overlapped a more recent flushable.
1406 1 : return
1407 1 : }
1408 1 : kr := internalKeyRange{
1409 1 : smallest: meta.Smallest,
1410 1 : largest: meta.Largest,
1411 1 : }
1412 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) {
1413 1 : // If this is the first table to overlap a flushable, save
1414 1 : // the flushable. This ingest must be ingested or flushed
1415 1 : // after it.
1416 1 : if mem == nil {
1417 1 : mem = m
1418 1 : }
1419 1 : metaFlushableOverlaps[i] = true
1420 : }
1421 : }
1422 1 : for i := range loadResult.localMeta {
1423 1 : checkForOverlap(i, loadResult.localMeta[i])
1424 1 : }
1425 1 : for i := range loadResult.sharedMeta {
1426 1 : checkForOverlap(len(loadResult.localMeta)+i, loadResult.sharedMeta[i])
1427 1 : }
1428 1 : for i := range loadResult.externalMeta {
1429 1 : checkForOverlap(len(loadResult.localMeta)+len(loadResult.sharedMeta)+i, loadResult.externalMeta[i])
1430 1 : }
1431 1 : if exciseSpan.Valid() {
1432 1 : kr := internalKeyRange{
1433 1 : smallest: base.MakeInternalKey(exciseSpan.Start, InternalKeySeqNumMax, InternalKeyKindMax),
1434 1 : largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, exciseSpan.End),
1435 1 : }
1436 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) {
1437 1 : if mem == nil {
1438 1 : mem = m
1439 1 : }
1440 : }
1441 : }
1442 1 : err := iter.Close()
1443 1 : if rangeDelIter != nil {
1444 1 : err = firstError(err, rangeDelIter.Close())
1445 1 : }
1446 1 : if rkeyIter != nil {
1447 1 : err = firstError(err, rkeyIter.Close())
1448 1 : }
1449 1 : if err != nil {
1450 0 : d.opts.Logger.Errorf("ingest error reading flushable for log %s: %s", m.logNum, err)
1451 0 : }
1452 : }
1453 :
1454 1 : if mem == nil {
1455 1 : // No overlap with any of the queued flushables, so no need to queue
1456 1 : // after them.
1457 1 :
1458 1 : // New writes with higher sequence numbers may be concurrently
1459 1 : // committed. We must ensure they don't flush before this ingest
1460 1 : // completes. To do that, we ref the mutable memtable as a writer,
1461 1 : // preventing its flushing (and the flushing of all subsequent
1462 1 : // flushables in the queue). Once we've acquired the manifest lock
1463 1 : // to add the ingested sstables to the LSM, we can unref as we're
1464 1 : // guaranteed that the flush won't edit the LSM before this ingest.
1465 1 : mut = d.mu.mem.mutable
1466 1 : mut.writerRef()
1467 1 : return
1468 1 : }
1469 : // The ingestion overlaps with some entry in the flushable queue.
1470 1 : if d.FormatMajorVersion() < FormatFlushableIngest ||
1471 1 : d.opts.Experimental.DisableIngestAsFlushable() ||
1472 1 : len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 ||
1473 1 : (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {
1474 1 : // We're not able to ingest as a flushable,
1475 1 : // so we must synchronously flush.
1476 1 : //
1477 1 : // TODO(bilal): Currently, if any of the files being ingested are shared or
1478 1 : // there's an excise span present, we cannot use flushable ingests and need
1479 1 : // to wait synchronously. Either remove this caveat by fleshing out
1480 1 : // flushable ingest logic to also account for these cases, or remove this
1481 1 : // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676
1482 1 : if mem.flushable == d.mu.mem.mutable {
1483 1 : err = d.makeRoomForWrite(nil)
1484 1 : }
1485 : // New writes with higher sequence numbers may be concurrently
1486 : // committed. We must ensure they don't flush before this ingest
1487 : // completes. To do that, we ref the mutable memtable as a writer,
1488 : // preventing its flushing (and the flushing of all subsequent
1489 : // flushables in the queue). Once we've acquired the manifest lock
1490 : // to add the ingested sstables to the LSM, we can unref as we're
1491 : // guaranteed that the flush won't edit the LSM before this ingest.
1492 1 : mut = d.mu.mem.mutable
1493 1 : mut.writerRef()
1494 1 : mem.flushForced = true
1495 1 : d.maybeScheduleFlush()
1496 1 : return
1497 : }
1498 : // Since there aren't too many memtables already queued up, we can
1499 : // slide the ingested sstables on top of the existing memtables.
1500 1 : asFlushable = true
1501 1 : err = d.handleIngestAsFlushable(loadResult.localMeta, seqNum)
1502 : }
1503 :
1504 1 : var ve *versionEdit
1505 1 : apply := func(seqNum uint64) {
1506 1 : if err != nil || asFlushable {
1507 1 : // An error occurred during prepare.
1508 1 : if mut != nil {
1509 0 : if mut.writerUnref() {
1510 0 : d.mu.Lock()
1511 0 : d.maybeScheduleFlush()
1512 0 : d.mu.Unlock()
1513 0 : }
1514 : }
1515 1 : return
1516 : }
1517 :
1518 : // Update the sequence numbers for all ingested sstables'
1519 : // metadata. When the version edit is applied, the metadata is
1520 : // written to the manifest, persisting the sequence number.
1521 : // The sstables themselves are left unmodified.
1522 1 : if err = ingestUpdateSeqNum(
1523 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1524 1 : ); err != nil {
1525 0 : if mut != nil {
1526 0 : if mut.writerUnref() {
1527 0 : d.mu.Lock()
1528 0 : d.maybeScheduleFlush()
1529 0 : d.mu.Unlock()
1530 0 : }
1531 : }
1532 0 : return
1533 : }
1534 :
1535 : // If we overlapped with a memtable in prepare wait for the flush to
1536 : // finish.
1537 1 : if mem != nil {
1538 1 : <-mem.flushed
1539 1 : }
1540 :
1541 : // Assign the sstables to the correct level in the LSM and apply the
1542 : // version edit.
1543 1 : ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan)
1544 : }
1545 :
1546 : // Only one ingest can occur at a time because if not, one would block waiting
1547 : // for the other to finish applying. This blocking would happen while holding
1548 : // the commit mutex which would prevent unrelated batches from writing their
1549 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1550 : // during ingestion.
1551 1 : d.commit.ingestSem <- struct{}{}
1552 1 : d.commit.AllocateSeqNum(loadResult.fileCount, prepare, apply)
1553 1 : <-d.commit.ingestSem
1554 1 :
1555 1 : if err != nil {
1556 1 : if err2 := ingestCleanup(d.objProvider, loadResult.localMeta); err2 != nil {
1557 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1558 0 : }
1559 1 : } else {
1560 1 : // Since we either created a hard link to the ingesting files, or copied
1561 1 : // them over, it is safe to remove the originals paths.
1562 1 : for _, path := range loadResult.localPaths {
1563 1 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1564 1 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1565 1 : }
1566 : }
1567 : }
1568 :
1569 1 : info := TableIngestInfo{
1570 1 : JobID: jobID,
1571 1 : Err: err,
1572 1 : flushable: asFlushable,
1573 1 : }
1574 1 : if len(loadResult.localMeta) > 0 {
1575 1 : info.GlobalSeqNum = loadResult.localMeta[0].SmallestSeqNum
1576 1 : } else if len(loadResult.sharedMeta) > 0 {
1577 1 : info.GlobalSeqNum = loadResult.sharedMeta[0].SmallestSeqNum
1578 1 : } else {
1579 1 : info.GlobalSeqNum = loadResult.externalMeta[0].SmallestSeqNum
1580 1 : }
1581 1 : var stats IngestOperationStats
1582 1 : if ve != nil {
1583 1 : info.Tables = make([]struct {
1584 1 : TableInfo
1585 1 : Level int
1586 1 : }, len(ve.NewFiles))
1587 1 : for i := range ve.NewFiles {
1588 1 : e := &ve.NewFiles[i]
1589 1 : info.Tables[i].Level = e.Level
1590 1 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1591 1 : stats.Bytes += e.Meta.Size
1592 1 : if e.Level == 0 {
1593 1 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1594 1 : }
1595 1 : if i < len(metaFlushableOverlaps) && metaFlushableOverlaps[i] {
1596 1 : stats.MemtableOverlappingFiles++
1597 1 : }
1598 : }
1599 1 : } else if asFlushable {
1600 1 : // NB: If asFlushable == true, there are no shared sstables.
1601 1 : info.Tables = make([]struct {
1602 1 : TableInfo
1603 1 : Level int
1604 1 : }, len(loadResult.localMeta))
1605 1 : for i, f := range loadResult.localMeta {
1606 1 : info.Tables[i].Level = -1
1607 1 : info.Tables[i].TableInfo = f.TableInfo()
1608 1 : stats.Bytes += f.Size
1609 1 : // We don't have exact stats on which files will be ingested into
1610 1 : // L0, because actual ingestion into the LSM has been deferred until
1611 1 : // flush time. Instead, we infer based on memtable overlap.
1612 1 : //
1613 1 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1614 1 : // before entering the commit pipeline, we can use that overlap to
1615 1 : // improve our approximation by incorporating overlap with L0, not
1616 1 : // just memtables.
1617 1 : if metaFlushableOverlaps[i] {
1618 1 : stats.ApproxIngestedIntoL0Bytes += f.Size
1619 1 : stats.MemtableOverlappingFiles++
1620 1 : }
1621 : }
1622 : }
1623 1 : d.opts.EventListener.TableIngested(info)
1624 1 :
1625 1 : return stats, err
1626 : }
1627 :
1628 : // excise updates ve to include a replacement of the file m with new virtual
1629 : // sstables that exclude exciseSpan, returning a slice of newly-created files if
1630 : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
1631 : // and m is deleted. Note that ve is updated in-place.
1632 : //
1633 : // The manifest lock must be held when calling this method.
1634 : func (d *DB) excise(
1635 : exciseSpan KeyRange, m *fileMetadata, ve *versionEdit, level int,
1636 1 : ) ([]manifest.NewFileEntry, error) {
1637 1 : numCreatedFiles := 0
1638 1 : // Check if there's actually an overlap between m and exciseSpan.
1639 1 : if !exciseSpan.Overlaps(d.cmp, m) {
1640 1 : return nil, nil
1641 1 : }
1642 1 : ve.DeletedFiles[deletedFileEntry{
1643 1 : Level: level,
1644 1 : FileNum: m.FileNum,
1645 1 : }] = m
1646 1 : // Fast path: m sits entirely within the exciseSpan, so just delete it.
1647 1 : if exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
1648 1 : return nil, nil
1649 1 : }
1650 1 : var iter internalIterator
1651 1 : var rangeDelIter keyspan.FragmentIterator
1652 1 : var rangeKeyIter keyspan.FragmentIterator
1653 1 : needsBacking := false
1654 1 : // Create a file to the left of the excise span, if necessary.
1655 1 : // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
1656 1 : //
1657 1 : // We create bounds that are tight on user keys, and we make the effort to find
1658 1 : // the last key in the original sstable that's smaller than exciseSpan.Start
1659 1 : // even though it requires some sstable reads. We could choose to create
1660 1 : // virtual sstables on loose userKey bounds, in which case we could just set
1661 1 : // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
1662 1 : // issue with that approach would be that it'd lead to lots of small virtual
1663 1 : // sstables in the LSM that have no guarantee on containing even a single user
1664 1 : // key within the file bounds. This has the potential to increase both read and
1665 1 : // write-amp as we will be opening up these sstables only to find no relevant
1666 1 : // keys in the read path, and compacting sstables on top of them instead of
1667 1 : // directly into the space occupied by them. We choose to incur the cost of
1668 1 : // calculating tight bounds at this time instead of creating more work in the
1669 1 : // future.
1670 1 : //
1671 1 : // TODO(bilal): Some of this work can happen without grabbing the manifest
1672 1 : // lock; we could grab one currentVersion, release the lock, calculate excised
1673 1 : // files, then grab the lock again and recalculate for just the files that
1674 1 : // have changed since our previous calculation. Do this optimiaztino as part of
1675 1 : // https://github.com/cockroachdb/pebble/issues/2112 .
1676 1 : if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
1677 1 : leftFile := &fileMetadata{
1678 1 : Virtual: true,
1679 1 : FileBacking: m.FileBacking,
1680 1 : FileNum: d.mu.versions.getNextFileNum(),
1681 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1682 1 : // sufficient for maintaining correctness.
1683 1 : SmallestSeqNum: m.SmallestSeqNum,
1684 1 : LargestSeqNum: m.LargestSeqNum,
1685 1 : }
1686 1 : if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.SmallestPointKey) {
1687 1 : // This file will contain point keys
1688 1 : smallestPointKey := m.SmallestPointKey
1689 1 : var err error
1690 1 : iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{
1691 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1692 1 : Category: "pebble-ingest",
1693 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1694 1 : },
1695 1 : level: manifest.Level(level),
1696 1 : }, internalIterOpts{})
1697 1 : if err != nil {
1698 0 : return nil, err
1699 0 : }
1700 1 : var key *InternalKey
1701 1 : if iter != nil {
1702 1 : defer iter.Close()
1703 1 : key, _ = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone)
1704 1 : } else {
1705 0 : iter = emptyIter
1706 0 : }
1707 1 : if key != nil {
1708 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, key.Clone())
1709 1 : }
1710 : // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
1711 : // needs to be a copy if the key is owned by the range del iter.
1712 1 : var lastRangeDel []byte
1713 1 : if rangeDelIter != nil {
1714 1 : defer rangeDelIter.Close()
1715 1 : rdel := rangeDelIter.SeekLT(exciseSpan.Start)
1716 1 : if rdel != nil {
1717 1 : lastRangeDel = append(lastRangeDel[:0], rdel.End...)
1718 1 : if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
1719 1 : lastRangeDel = exciseSpan.Start
1720 1 : }
1721 : }
1722 1 : } else {
1723 1 : rangeDelIter = emptyKeyspanIter
1724 1 : }
1725 1 : if lastRangeDel != nil {
1726 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
1727 1 : }
1728 : }
1729 1 : if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.SmallestRangeKey) {
1730 1 : // This file will contain range keys
1731 1 : var err error
1732 1 : smallestRangeKey := m.SmallestRangeKey
1733 1 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
1734 1 : if err != nil {
1735 0 : return nil, err
1736 0 : }
1737 : // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
1738 : // needs to be a copy if the key is owned by the range key iter.
1739 1 : var lastRangeKey []byte
1740 1 : var lastRangeKeyKind InternalKeyKind
1741 1 : defer rangeKeyIter.Close()
1742 1 : rkey := rangeKeyIter.SeekLT(exciseSpan.Start)
1743 1 : if rkey != nil {
1744 1 : lastRangeKey = append(lastRangeKey[:0], rkey.End...)
1745 1 : if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
1746 0 : lastRangeKey = exciseSpan.Start
1747 0 : }
1748 1 : lastRangeKeyKind = rkey.Keys[0].Kind()
1749 : }
1750 1 : if lastRangeKey != nil {
1751 1 : leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
1752 1 : }
1753 : }
1754 1 : if leftFile.HasRangeKeys || leftFile.HasPointKeys {
1755 1 : var err error
1756 1 : leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
1757 1 : if err != nil {
1758 0 : return nil, err
1759 0 : }
1760 1 : if leftFile.Size == 0 {
1761 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1762 1 : // such as if the excised file only has range keys/dels and no point
1763 1 : // keys. This can cause panics in places where we divide by file sizes.
1764 1 : // Correct for it here.
1765 1 : leftFile.Size = 1
1766 1 : }
1767 1 : if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1768 0 : return nil, err
1769 0 : }
1770 1 : leftFile.ValidateVirtual(m)
1771 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
1772 1 : needsBacking = true
1773 1 : numCreatedFiles++
1774 : }
1775 : }
1776 : // Create a file to the right, if necessary.
1777 1 : if exciseSpan.Contains(d.cmp, m.Largest) {
1778 1 : // No key exists to the right of the excise span in this file.
1779 1 : if needsBacking && !m.Virtual {
1780 1 : // If m is virtual, then its file backing is already known to the manifest.
1781 1 : // We don't need to create another file backing. Note that there must be
1782 1 : // only one CreatedBackingTables entry per backing sstable. This is
1783 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1784 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1785 1 : }
1786 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1787 : }
1788 : // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
1789 : //
1790 : // See comment before the definition of leftFile for the motivation behind
1791 : // calculating tight user-key bounds.
1792 1 : rightFile := &fileMetadata{
1793 1 : Virtual: true,
1794 1 : FileBacking: m.FileBacking,
1795 1 : FileNum: d.mu.versions.getNextFileNum(),
1796 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1797 1 : // sufficient for maintaining correctness.
1798 1 : SmallestSeqNum: m.SmallestSeqNum,
1799 1 : LargestSeqNum: m.LargestSeqNum,
1800 1 : }
1801 1 : if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.LargestPointKey) {
1802 1 : // This file will contain point keys
1803 1 : largestPointKey := m.LargestPointKey
1804 1 : var err error
1805 1 : if iter == nil && rangeDelIter == nil {
1806 1 : iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{
1807 1 : CategoryAndQoS: sstable.CategoryAndQoS{
1808 1 : Category: "pebble-ingest",
1809 1 : QoSLevel: sstable.LatencySensitiveQoSLevel,
1810 1 : },
1811 1 : level: manifest.Level(level),
1812 1 : }, internalIterOpts{})
1813 1 : if err != nil {
1814 0 : return nil, err
1815 0 : }
1816 1 : if iter != nil {
1817 1 : defer iter.Close()
1818 1 : } else {
1819 0 : iter = emptyIter
1820 0 : }
1821 1 : if rangeDelIter != nil {
1822 1 : defer rangeDelIter.Close()
1823 1 : } else {
1824 1 : rangeDelIter = emptyKeyspanIter
1825 1 : }
1826 : }
1827 1 : key, _ := iter.SeekGE(exciseSpan.End, base.SeekGEFlagsNone)
1828 1 : if key != nil {
1829 1 : rightFile.ExtendPointKeyBounds(d.cmp, key.Clone(), largestPointKey)
1830 1 : }
1831 : // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
1832 : // needs to be a copy if the key is owned by the range del iter.
1833 1 : var firstRangeDel []byte
1834 1 : rdel := rangeDelIter.SeekGE(exciseSpan.End)
1835 1 : if rdel != nil {
1836 1 : firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
1837 1 : if d.cmp(firstRangeDel, exciseSpan.End) < 0 {
1838 1 : firstRangeDel = exciseSpan.End
1839 1 : }
1840 : }
1841 1 : if firstRangeDel != nil {
1842 1 : smallestPointKey := rdel.SmallestKey()
1843 1 : smallestPointKey.UserKey = firstRangeDel
1844 1 : rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
1845 1 : }
1846 : }
1847 1 : if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.LargestRangeKey) {
1848 1 : // This file will contain range keys.
1849 1 : largestRangeKey := m.LargestRangeKey
1850 1 : if rangeKeyIter == nil {
1851 1 : var err error
1852 1 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
1853 1 : if err != nil {
1854 0 : return nil, err
1855 0 : }
1856 1 : defer rangeKeyIter.Close()
1857 : }
1858 : // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
1859 : // needs to be a copy if the key is owned by the range key iter.
1860 1 : var firstRangeKey []byte
1861 1 : rkey := rangeKeyIter.SeekGE(exciseSpan.End)
1862 1 : if rkey != nil {
1863 1 : firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
1864 1 : if d.cmp(firstRangeKey, exciseSpan.End) < 0 {
1865 1 : firstRangeKey = exciseSpan.End
1866 1 : }
1867 : }
1868 1 : if firstRangeKey != nil {
1869 1 : smallestRangeKey := rkey.SmallestKey()
1870 1 : smallestRangeKey.UserKey = firstRangeKey
1871 1 : // We call ExtendRangeKeyBounds so any internal boundType fields are
1872 1 : // set correctly. Note that this is mildly wasteful as we'll be comparing
1873 1 : // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
1874 1 : // avoided if we exported ExtendOverallKeyBounds or so.
1875 1 : rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
1876 1 : }
1877 : }
1878 1 : if rightFile.HasRangeKeys || rightFile.HasPointKeys {
1879 1 : var err error
1880 1 : rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
1881 1 : if err != nil {
1882 0 : return nil, err
1883 0 : }
1884 1 : if rightFile.Size == 0 {
1885 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1886 1 : // such as if the excised file only has range keys/dels and no point keys.
1887 1 : // This can cause panics in places where we divide by file sizes. Correct
1888 1 : // for it here.
1889 1 : rightFile.Size = 1
1890 1 : }
1891 1 : rightFile.ValidateVirtual(m)
1892 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
1893 1 : needsBacking = true
1894 1 : numCreatedFiles++
1895 : }
1896 :
1897 1 : if needsBacking && !m.Virtual {
1898 1 : // If m is virtual, then its file backing is already known to the manifest.
1899 1 : // We don't need to create another file backing. Note that there must be
1900 1 : // only one CreatedBackingTables entry per backing sstable. This is
1901 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1902 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1903 1 : }
1904 :
1905 1 : if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1906 0 : return nil, err
1907 0 : }
1908 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1909 : }
1910 :
1911 : type ingestTargetLevelFunc func(
1912 : newIters tableNewIters,
1913 : newRangeKeyIter keyspan.TableNewSpanIter,
1914 : iterOps IterOptions,
1915 : comparer *Comparer,
1916 : v *version,
1917 : baseLevel int,
1918 : compactions map[*compaction]struct{},
1919 : meta *fileMetadata,
1920 : suggestSplit bool,
1921 : ) (int, *fileMetadata, error)
1922 :
1923 : type ingestSplitFile struct {
1924 : // ingestFile is the file being ingested.
1925 : ingestFile *fileMetadata
1926 : // splitFile is the file that needs to be split to allow ingestFile to slot
1927 : // into `level` level.
1928 : splitFile *fileMetadata
1929 : // The level where ingestFile will go (and where splitFile already is).
1930 : level int
1931 : }
1932 :
1933 : // ingestSplit splits files specified in `files` and updates ve in-place to
1934 : // account for existing files getting split into two virtual sstables. The map
1935 : // `replacedFiles` contains an in-progress map of all files that have been
1936 : // replaced with new virtual sstables in this version edit so far, which is also
1937 : // updated in-place.
1938 : //
1939 : // d.mu as well as the manifest lock must be held when calling this method.
1940 : func (d *DB) ingestSplit(
1941 : ve *versionEdit,
1942 : updateMetrics func(*fileMetadata, int, []newFileEntry),
1943 : files []ingestSplitFile,
1944 : replacedFiles map[base.FileNum][]newFileEntry,
1945 1 : ) error {
1946 1 : for _, s := range files {
1947 1 : // replacedFiles can be thought of as a tree, where we start iterating with
1948 1 : // s.splitFile and run its fileNum through replacedFiles, then find which of
1949 1 : // the replaced files overlaps with s.ingestFile, which becomes the new
1950 1 : // splitFile, then we check splitFile's replacements in replacedFiles again
1951 1 : // for overlap with s.ingestFile, and so on until we either can't find the
1952 1 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
1953 1 : // be split), or we don't find a file that overlaps with s.ingestFile, which
1954 1 : // means a prior ingest split already produced enough room for s.ingestFile
1955 1 : // to go into this level without necessitating another ingest split.
1956 1 : splitFile := s.splitFile
1957 1 : for splitFile != nil {
1958 1 : replaced, ok := replacedFiles[splitFile.FileNum]
1959 1 : if !ok {
1960 1 : break
1961 : }
1962 1 : updatedSplitFile := false
1963 1 : for i := range replaced {
1964 1 : if replaced[i].Meta.Overlaps(d.cmp, s.ingestFile.Smallest.UserKey, s.ingestFile.Largest.UserKey, s.ingestFile.Largest.IsExclusiveSentinel()) {
1965 1 : if updatedSplitFile {
1966 0 : // This should never happen because the earlier ingestTargetLevel
1967 0 : // function only finds split file candidates that are guaranteed to
1968 0 : // have no data overlap, only boundary overlap. See the comments
1969 0 : // in that method to see the definitions of data vs boundary
1970 0 : // overlap. That, plus the fact that files in `replaced` are
1971 0 : // guaranteed to have file bounds that are tight on user keys
1972 0 : // (as that's what `d.excise` produces), means that the only case
1973 0 : // where we overlap with two or more files in `replaced` is if we
1974 0 : // actually had data overlap all along, or if the ingestion files
1975 0 : // were overlapping, either of which is an invariant violation.
1976 0 : panic("updated with two files in ingestSplit")
1977 : }
1978 1 : splitFile = replaced[i].Meta
1979 1 : updatedSplitFile = true
1980 : }
1981 : }
1982 1 : if !updatedSplitFile {
1983 1 : // None of the replaced files overlapped with the file being ingested.
1984 1 : // This can happen if we've already excised a span overlapping with
1985 1 : // this file, or if we have consecutive ingested files that can slide
1986 1 : // within the same gap between keys in an existing file. For instance,
1987 1 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
1988 1 : // the first loop iteration will split the existing file into one that
1989 1 : // ends in a and another that starts at g, and the second iteration will
1990 1 : // fall into this case and require no splitting.
1991 1 : //
1992 1 : // No splitting necessary.
1993 1 : splitFile = nil
1994 1 : }
1995 : }
1996 1 : if splitFile == nil {
1997 1 : continue
1998 : }
1999 : // NB: excise operates on [start, end). We're splitting at [start, end]
2000 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
2001 : // of exclusive vs inclusive end bounds should not make a difference here
2002 : // as we're guaranteed to not have any data overlap between splitFile and
2003 : // s.ingestFile, so panic if we do see a newly added file with an endKey
2004 : // equalling s.ingestFile.Largest, and !s.ingestFile.Largest.IsExclusiveSentinel()
2005 1 : added, err := d.excise(KeyRange{Start: s.ingestFile.Smallest.UserKey, End: s.ingestFile.Largest.UserKey}, splitFile, ve, s.level)
2006 1 : if err != nil {
2007 0 : return err
2008 0 : }
2009 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2010 1 : Level: s.level,
2011 1 : FileNum: splitFile.FileNum,
2012 1 : }]; !ok {
2013 0 : panic("did not split file that was expected to be split")
2014 : }
2015 1 : replacedFiles[splitFile.FileNum] = added
2016 1 : for i := range added {
2017 1 : if s.ingestFile.Overlaps(d.cmp, added[i].Meta.Smallest.UserKey, added[i].Meta.Largest.UserKey, added[i].Meta.Largest.IsExclusiveSentinel()) {
2018 0 : panic("ingest-time split produced a file that overlaps with ingested file")
2019 : }
2020 : }
2021 1 : updateMetrics(splitFile, s.level, added)
2022 : }
2023 : // Flatten the version edit by removing any entries from ve.NewFiles that
2024 : // are also in ve.DeletedFiles.
2025 1 : newNewFiles := ve.NewFiles[:0]
2026 1 : for i := range ve.NewFiles {
2027 1 : fn := ve.NewFiles[i].Meta.FileNum
2028 1 : deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
2029 1 : if _, ok := ve.DeletedFiles[deEntry]; ok {
2030 1 : delete(ve.DeletedFiles, deEntry)
2031 1 : } else {
2032 1 : newNewFiles = append(newNewFiles, ve.NewFiles[i])
2033 1 : }
2034 : }
2035 1 : ve.NewFiles = newNewFiles
2036 1 : return nil
2037 : }
2038 :
2039 : func (d *DB) ingestApply(
2040 : jobID int,
2041 : lr ingestLoadResult,
2042 : findTargetLevel ingestTargetLevelFunc,
2043 : mut *memTable,
2044 : exciseSpan KeyRange,
2045 1 : ) (*versionEdit, error) {
2046 1 : d.mu.Lock()
2047 1 : defer d.mu.Unlock()
2048 1 :
2049 1 : ve := &versionEdit{
2050 1 : NewFiles: make([]newFileEntry, lr.fileCount),
2051 1 : }
2052 1 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
2053 1 : ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
2054 1 : }
2055 1 : metrics := make(map[int]*LevelMetrics)
2056 1 :
2057 1 : // Lock the manifest for writing before we use the current version to
2058 1 : // determine the target level. This prevents two concurrent ingestion jobs
2059 1 : // from using the same version to determine the target level, and also
2060 1 : // provides serialization with concurrent compaction and flush jobs.
2061 1 : // logAndApply unconditionally releases the manifest lock, but any earlier
2062 1 : // returns must unlock the manifest.
2063 1 : d.mu.versions.logLock()
2064 1 :
2065 1 : if mut != nil {
2066 1 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
2067 1 : // acquired the manifest lock, we can be certain that if the mutable
2068 1 : // memtable has received more recent conflicting writes, the flush won't
2069 1 : // beat us to applying to the manifest resulting in sequence number
2070 1 : // inversion. Even though we call maybeScheduleFlush right now, this flush
2071 1 : // will apply after our ingestion.
2072 1 : if mut.writerUnref() {
2073 1 : d.maybeScheduleFlush()
2074 1 : }
2075 : }
2076 :
2077 1 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
2078 1 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
2079 1 : current := d.mu.versions.currentVersion()
2080 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
2081 1 : iterOps := IterOptions{logger: d.opts.Logger}
2082 1 : // filesToSplit is a list where each element is a pair consisting of a file
2083 1 : // being ingested and a file being split to make room for an ingestion into
2084 1 : // that level. Each ingested file will appear at most once in this list. It
2085 1 : // is possible for split files to appear twice in this list.
2086 1 : filesToSplit := make([]ingestSplitFile, 0)
2087 1 : checkCompactions := false
2088 1 : for i := 0; i < lr.fileCount; i++ {
2089 1 : // Determine the lowest level in the LSM for which the sstable doesn't
2090 1 : // overlap any existing files in the level.
2091 1 : var m *fileMetadata
2092 1 : sharedIdx := -1
2093 1 : sharedLevel := -1
2094 1 : externalFile := false
2095 1 : if i < len(lr.localMeta) {
2096 1 : // local file.
2097 1 : m = lr.localMeta[i]
2098 1 : } else if (i - len(lr.localMeta)) < len(lr.sharedMeta) {
2099 1 : // shared file.
2100 1 : sharedIdx = i - len(lr.localMeta)
2101 1 : m = lr.sharedMeta[sharedIdx]
2102 1 : sharedLevel = int(lr.sharedLevels[sharedIdx])
2103 1 : } else {
2104 1 : // external file.
2105 1 : externalFile = true
2106 1 : m = lr.externalMeta[i-(len(lr.localMeta)+len(lr.sharedMeta))]
2107 1 : }
2108 1 : f := &ve.NewFiles[i]
2109 1 : var err error
2110 1 : if sharedIdx >= 0 {
2111 1 : f.Level = sharedLevel
2112 1 : if f.Level < sharedLevelsStart {
2113 0 : panic("cannot slot a shared file higher than the highest shared level")
2114 : }
2115 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2116 1 : } else {
2117 1 : if externalFile {
2118 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2119 1 : }
2120 1 : var splitFile *fileMetadata
2121 1 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
2122 1 : // This file fits perfectly within the excise span. We can slot it at
2123 1 : // L6, or sharedLevelsStart - 1 if we have shared files.
2124 1 : if len(lr.sharedMeta) > 0 {
2125 1 : f.Level = sharedLevelsStart - 1
2126 1 : if baseLevel > f.Level {
2127 1 : f.Level = 0
2128 1 : }
2129 1 : } else {
2130 1 : f.Level = 6
2131 1 : }
2132 1 : } else {
2133 1 : // TODO(bilal): findTargetLevel does disk IO (reading files for data
2134 1 : // overlap) even though we're holding onto d.mu. Consider unlocking
2135 1 : // d.mu while we do this. We already hold versions.logLock so we should
2136 1 : // not see any version applications while we're at this. The one
2137 1 : // complication here would be pulling out the mu.compact.inProgress
2138 1 : // check from findTargetLevel, as that requires d.mu to be held.
2139 1 : f.Level, splitFile, err = findTargetLevel(
2140 1 : d.newIters, d.tableNewRangeKeyIter, iterOps, d.opts.Comparer, current, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
2141 1 : }
2142 :
2143 1 : if splitFile != nil {
2144 1 : if invariants.Enabled {
2145 1 : if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil {
2146 0 : panic("splitFile returned is not in level it should be")
2147 : }
2148 : }
2149 : // We take advantage of the fact that we won't drop the db mutex
2150 : // between now and the call to logAndApply. So, no files should
2151 : // get added to a new in-progress compaction at this point. We can
2152 : // avoid having to iterate on in-progress compactions to cancel them
2153 : // if none of the files being split have a compacting state.
2154 1 : if splitFile.IsCompacting() {
2155 0 : checkCompactions = true
2156 0 : }
2157 1 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
2158 : }
2159 : }
2160 1 : if err != nil {
2161 1 : d.mu.versions.logUnlock()
2162 1 : return nil, err
2163 1 : }
2164 1 : f.Meta = m
2165 1 : levelMetrics := metrics[f.Level]
2166 1 : if levelMetrics == nil {
2167 1 : levelMetrics = &LevelMetrics{}
2168 1 : metrics[f.Level] = levelMetrics
2169 1 : }
2170 1 : levelMetrics.NumFiles++
2171 1 : levelMetrics.Size += int64(m.Size)
2172 1 : levelMetrics.BytesIngested += m.Size
2173 1 : levelMetrics.TablesIngested++
2174 : }
2175 : // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
2176 : // by ingestTargetLevel), to files that were created to replace it. This map
2177 : // is used to resolve references to split files in filesToSplit, as it is
2178 : // possible for a file that we want to split to no longer exist or have a
2179 : // newer fileMetadata due to a split induced by another ingestion file, or an
2180 : // excise.
2181 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
2182 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
2183 1 : levelMetrics := metrics[level]
2184 1 : if levelMetrics == nil {
2185 1 : levelMetrics = &LevelMetrics{}
2186 1 : metrics[level] = levelMetrics
2187 1 : }
2188 1 : levelMetrics.NumFiles--
2189 1 : levelMetrics.Size -= int64(m.Size)
2190 1 : for i := range added {
2191 1 : levelMetrics.NumFiles++
2192 1 : levelMetrics.Size += int64(added[i].Meta.Size)
2193 1 : }
2194 : }
2195 1 : if exciseSpan.Valid() {
2196 1 : // Iterate through all levels and find files that intersect with exciseSpan.
2197 1 : //
2198 1 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2199 1 : // excises; we only need to hold the version lock which we already are
2200 1 : // holding. However releasing the DB mutex could mess with the
2201 1 : // ingestTargetLevel calculation that happened above, as it assumed that it
2202 1 : // had a complete view of in-progress compactions that wouldn't change
2203 1 : // until logAndApply is called. If we were to drop the mutex now, we could
2204 1 : // schedule another in-progress compaction that would go into the chosen target
2205 1 : // level and lead to file overlap within level (which would panic in
2206 1 : // logAndApply). We should drop the db mutex here, do the excise, then
2207 1 : // re-grab the DB mutex and rerun just the in-progress compaction check to
2208 1 : // see if any new compactions are conflicting with our chosen target levels
2209 1 : // for files, and if they are, we should signal those compactions to error
2210 1 : // out.
2211 1 : for level := range current.Levels {
2212 1 : overlaps := current.Overlaps(level, d.cmp, exciseSpan.Start, exciseSpan.End, true /* exclusiveEnd */)
2213 1 : iter := overlaps.Iter()
2214 1 :
2215 1 : for m := iter.First(); m != nil; m = iter.Next() {
2216 1 : newFiles, err := d.excise(exciseSpan, m, ve, level)
2217 1 : if err != nil {
2218 0 : return nil, err
2219 0 : }
2220 :
2221 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2222 1 : Level: level,
2223 1 : FileNum: m.FileNum,
2224 1 : }]; !ok {
2225 1 : // We did not excise this file.
2226 1 : continue
2227 : }
2228 1 : replacedFiles[m.FileNum] = newFiles
2229 1 : updateLevelMetricsOnExcise(m, level, newFiles)
2230 : }
2231 : }
2232 : }
2233 1 : if len(filesToSplit) > 0 {
2234 1 : // For the same reasons as the above call to excise, we hold the db mutex
2235 1 : // while calling this method.
2236 1 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2237 0 : return nil, err
2238 0 : }
2239 : }
2240 1 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2241 1 : for c := range d.mu.compact.inProgress {
2242 1 : if c.versionEditApplied {
2243 0 : continue
2244 : }
2245 : // Check if this compaction overlaps with the excise span. Note that just
2246 : // checking if the inputs individually overlap with the excise span
2247 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2248 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2249 : // doing a [c,d) excise at the same time as this compaction, we will have
2250 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2251 : // write a file overlapping with the excise span.
2252 1 : if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
2253 1 : c.cancel.Store(true)
2254 1 : }
2255 : // Check if this compaction's inputs have been replaced due to an
2256 : // ingest-time split. In that case, cancel the compaction as a newly picked
2257 : // compaction would need to include any new files that slid in between
2258 : // previously-existing files. Note that we cancel any compaction that has a
2259 : // file that was ingest-split as an input, even if it started before this
2260 : // ingestion.
2261 1 : if checkCompactions {
2262 0 : for i := range c.inputs {
2263 0 : iter := c.inputs[i].files.Iter()
2264 0 : for f := iter.First(); f != nil; f = iter.Next() {
2265 0 : if _, ok := replacedFiles[f.FileNum]; ok {
2266 0 : c.cancel.Store(true)
2267 0 : break
2268 : }
2269 : }
2270 : }
2271 : }
2272 : }
2273 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2274 : // an excise on this span.
2275 1 : if exciseSpan.Valid() {
2276 1 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2277 1 : if s.efos == nil {
2278 0 : continue
2279 : }
2280 1 : efos := s.efos
2281 1 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2282 1 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2283 1 : // global list of all protected ranges instead of having to peer into every
2284 1 : // snapshot.
2285 1 : for i := range efos.protectedRanges {
2286 1 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2287 1 : efos.excised.Store(true)
2288 1 : break
2289 : }
2290 : }
2291 : }
2292 : }
2293 : }
2294 1 : if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2295 1 : return d.getInProgressCompactionInfoLocked(nil)
2296 1 : }); err != nil {
2297 1 : return nil, err
2298 1 : }
2299 :
2300 1 : d.mu.versions.metrics.Ingest.Count++
2301 1 :
2302 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2303 1 : // updateReadStateLocked could have generated obsolete tables, schedule a
2304 1 : // cleanup job if necessary.
2305 1 : d.deleteObsoleteFiles(jobID)
2306 1 : d.updateTableStatsLocked(ve.NewFiles)
2307 1 : // The ingestion may have pushed a level over the threshold for compaction,
2308 1 : // so check to see if one is necessary and schedule it.
2309 1 : d.maybeScheduleCompaction()
2310 1 : var toValidate []manifest.NewFileEntry
2311 1 : dedup := make(map[base.DiskFileNum]struct{})
2312 1 : for _, entry := range ve.NewFiles {
2313 1 : if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
2314 1 : toValidate = append(toValidate, entry)
2315 1 : dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
2316 1 : }
2317 : }
2318 1 : d.maybeValidateSSTablesLocked(toValidate)
2319 1 : return ve, nil
2320 : }
2321 :
2322 : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
2323 : // queue of files to be validated, when the feature is enabled.
2324 : //
2325 : // Note that if two entries with the same backing file are added twice, then the
2326 : // block checksums for the backing file will be validated twice.
2327 : //
2328 : // DB.mu must be locked when calling.
2329 1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
2330 1 : // Only add to the validation queue when the feature is enabled.
2331 1 : if !d.opts.Experimental.ValidateOnIngest {
2332 1 : return
2333 1 : }
2334 :
2335 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2336 1 : if d.shouldValidateSSTablesLocked() {
2337 1 : go d.validateSSTables()
2338 1 : }
2339 : }
2340 :
2341 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2342 : // DB.mu must be locked when calling.
2343 1 : func (d *DB) shouldValidateSSTablesLocked() bool {
2344 1 : return !d.mu.tableValidation.validating &&
2345 1 : d.closed.Load() == nil &&
2346 1 : d.opts.Experimental.ValidateOnIngest &&
2347 1 : len(d.mu.tableValidation.pending) > 0
2348 1 : }
2349 :
2350 : // validateSSTables runs a round of validation on the tables in the pending
2351 : // queue.
2352 1 : func (d *DB) validateSSTables() {
2353 1 : d.mu.Lock()
2354 1 : if !d.shouldValidateSSTablesLocked() {
2355 1 : d.mu.Unlock()
2356 1 : return
2357 1 : }
2358 :
2359 1 : pending := d.mu.tableValidation.pending
2360 1 : d.mu.tableValidation.pending = nil
2361 1 : d.mu.tableValidation.validating = true
2362 1 : jobID := d.mu.nextJobID
2363 1 : d.mu.nextJobID++
2364 1 : rs := d.loadReadState()
2365 1 :
2366 1 : // Drop DB.mu before performing IO.
2367 1 : d.mu.Unlock()
2368 1 :
2369 1 : // Validate all tables in the pending queue. This could lead to a situation
2370 1 : // where we are starving IO from other tasks due to having to page through
2371 1 : // all the blocks in all the sstables in the queue.
2372 1 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2373 1 :
2374 1 : // If we fail to validate any files due to reasons other than uncovered
2375 1 : // corruption, accumulate them and re-queue them for another attempt.
2376 1 : var retry []manifest.NewFileEntry
2377 1 :
2378 1 : for _, f := range pending {
2379 1 : // The file may have been moved or deleted since it was ingested, in
2380 1 : // which case we skip.
2381 1 : if !rs.current.Contains(f.Level, d.cmp, f.Meta) {
2382 0 : // Assume the file was moved to a lower level. It is rare enough
2383 0 : // that a table is moved or deleted between the time it was ingested
2384 0 : // and the time the validation routine runs that the overall cost of
2385 0 : // this inner loop is tolerably low, when amortized over all
2386 0 : // ingested tables.
2387 0 : found := false
2388 0 : for i := f.Level + 1; i < numLevels; i++ {
2389 0 : if rs.current.Contains(i, d.cmp, f.Meta) {
2390 0 : found = true
2391 0 : break
2392 : }
2393 : }
2394 0 : if !found {
2395 0 : continue
2396 : }
2397 : }
2398 :
2399 1 : var err error
2400 1 : if f.Meta.Virtual {
2401 0 : err = d.tableCache.withVirtualReader(
2402 0 : f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
2403 0 : return v.ValidateBlockChecksumsOnBacking()
2404 0 : })
2405 1 : } else {
2406 1 : err = d.tableCache.withReader(
2407 1 : f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
2408 1 : return r.ValidateBlockChecksums()
2409 1 : })
2410 : }
2411 :
2412 1 : if err != nil {
2413 1 : if IsCorruptionError(err) {
2414 1 : // TODO(travers): Hook into the corruption reporting pipeline, once
2415 1 : // available. See pebble#1192.
2416 1 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2417 1 : } else {
2418 1 : // If there was some other, possibly transient, error that
2419 1 : // caused table validation to fail inform the EventListener and
2420 1 : // move on. We remember the table so that we can retry it in a
2421 1 : // subsequent table validation job.
2422 1 : //
2423 1 : // TODO(jackson): If the error is not transient, this will retry
2424 1 : // validation indefinitely. While not great, it's the same
2425 1 : // behavior as erroring flushes and compactions. We should
2426 1 : // address this as a part of #270.
2427 1 : d.opts.EventListener.BackgroundError(err)
2428 1 : retry = append(retry, f)
2429 1 : continue
2430 : }
2431 : }
2432 :
2433 1 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2434 1 : JobID: jobID,
2435 1 : Meta: f.Meta,
2436 1 : })
2437 : }
2438 1 : rs.unref()
2439 1 : d.mu.Lock()
2440 1 : defer d.mu.Unlock()
2441 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2442 1 : d.mu.tableValidation.validating = false
2443 1 : d.mu.tableValidation.cond.Broadcast()
2444 1 : if d.shouldValidateSSTablesLocked() {
2445 1 : go d.validateSSTables()
2446 1 : }
2447 : }
|