Line data Source code
1 : // Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "slices"
10 : "sort"
11 : "time"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/internal/private"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 1 : func sstableKeyCompare(userCmp Compare, a, b InternalKey) int {
25 1 : c := userCmp(a.UserKey, b.UserKey)
26 1 : if c != 0 {
27 1 : return c
28 1 : }
29 1 : if a.IsExclusiveSentinel() {
30 1 : if !b.IsExclusiveSentinel() {
31 1 : return -1
32 1 : }
33 1 : } else if b.IsExclusiveSentinel() {
34 1 : return +1
35 1 : }
36 1 : return 0
37 : }
38 :
39 : // KeyRange encodes a key range in user key space. A KeyRange's Start is
40 : // inclusive while its End is exclusive.
41 : type KeyRange struct {
42 : Start, End []byte
43 : }
44 :
45 : // Valid returns true if the KeyRange is defined.
46 1 : func (k *KeyRange) Valid() bool {
47 1 : return k.Start != nil && k.End != nil
48 1 : }
49 :
50 : // Contains returns whether the specified key exists in the KeyRange.
51 1 : func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool {
52 1 : v := cmp(key.UserKey, k.End)
53 1 : return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0
54 1 : }
55 :
56 : // OverlapsInternalKeyRange checks if the specified internal key range has an
57 : // overlap with the KeyRange. Note that we aren't checking for full containment
58 : // of smallest-largest within k, rather just that there's some intersection
59 : // between the two ranges.
60 1 : func (k *KeyRange) OverlapsInternalKeyRange(cmp base.Compare, smallest, largest InternalKey) bool {
61 1 : v := cmp(k.Start, largest.UserKey)
62 1 : return v <= 0 && !(largest.IsExclusiveSentinel() && v == 0) &&
63 1 : cmp(k.End, smallest.UserKey) > 0
64 1 : }
65 :
66 : // Overlaps checks if the specified file has an overlap with the KeyRange.
67 : // Note that we aren't checking for full containment of m within k, rather just
68 : // that there's some intersection between m and k's bounds.
69 1 : func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool {
70 1 : return k.OverlapsInternalKeyRange(cmp, m.Smallest, m.Largest)
71 1 : }
72 :
73 : // OverlapsKeyRange checks if this span overlaps with the provided KeyRange.
74 : // Note that we aren't checking for full containment of either span in the other,
75 : // just that there's a key x that is in both key ranges.
76 0 : func (k *KeyRange) OverlapsKeyRange(cmp Compare, span KeyRange) bool {
77 0 : return cmp(k.Start, span.End) < 0 && cmp(k.End, span.Start) > 0
78 0 : }
79 :
80 1 : func ingestValidateKey(opts *Options, key *InternalKey) error {
81 1 : if key.Kind() == InternalKeyKindInvalid {
82 0 : return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s",
83 0 : key.Pretty(opts.Comparer.FormatKey))
84 0 : }
85 1 : if key.SeqNum() != 0 {
86 0 : return base.CorruptionErrorf("pebble: external sstable has non-zero seqnum: %s",
87 0 : key.Pretty(opts.Comparer.FormatKey))
88 0 : }
89 1 : return nil
90 : }
91 :
92 : // ingestSynthesizeShared constructs a fileMetadata for one shared sstable owned
93 : // or shared by another node.
94 : func ingestSynthesizeShared(
95 : opts *Options, sm SharedSSTMeta, fileNum base.DiskFileNum,
96 0 : ) (*fileMetadata, error) {
97 0 : if sm.Size == 0 {
98 0 : // Disallow 0 file sizes
99 0 : return nil, errors.New("pebble: cannot ingest shared file with size 0")
100 0 : }
101 : // Don't load table stats. Doing a round trip to shared storage, one SST
102 : // at a time is not worth it as it slows down ingestion.
103 0 : meta := &fileMetadata{
104 0 : FileNum: fileNum.FileNum(),
105 0 : CreationTime: time.Now().Unix(),
106 0 : Virtual: true,
107 0 : Size: sm.Size,
108 0 : }
109 0 : meta.InitProviderBacking(fileNum)
110 0 : // Set the underlying FileBacking's size to the same size as the virtualized
111 0 : // view of the sstable. This ensures that we don't over-prioritize this
112 0 : // sstable for compaction just yet, as we do not have a clear sense of what
113 0 : // parts of this sstable are referenced by other nodes.
114 0 : meta.FileBacking.Size = sm.Size
115 0 : if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil {
116 0 : // Initialize meta.{HasRangeKeys,Smallest,Largest}, etc.
117 0 : //
118 0 : // NB: We create new internal keys and pass them into ExternalRangeKeyBounds
119 0 : // so that we can sub a zero sequence number into the bounds. We can set
120 0 : // the sequence number to anything here; it'll be reset in ingestUpdateSeqNum
121 0 : // anyway. However we do need to use the same sequence number across all
122 0 : // bound keys at this step so that we end up with bounds that are consistent
123 0 : // across point/range keys.
124 0 : smallestRangeKey := base.MakeInternalKey(sm.SmallestRangeKey.UserKey, 0, sm.SmallestRangeKey.Kind())
125 0 : largestRangeKey := base.MakeExclusiveSentinelKey(sm.LargestRangeKey.Kind(), sm.LargestRangeKey.UserKey)
126 0 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallestRangeKey, largestRangeKey)
127 0 : }
128 0 : if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil {
129 0 : // Initialize meta.{HasPointKeys,Smallest,Largest}, etc.
130 0 : //
131 0 : // See point above in the ExtendRangeKeyBounds call on why we use a zero
132 0 : // sequence number here.
133 0 : smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, sm.SmallestPointKey.Kind())
134 0 : largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, sm.LargestPointKey.Kind())
135 0 : if sm.LargestPointKey.IsExclusiveSentinel() {
136 0 : largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey)
137 0 : }
138 0 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallestPointKey, largestPointKey)
139 : }
140 0 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
141 0 : return nil, err
142 0 : }
143 0 : return meta, nil
144 : }
145 :
146 : // ingestLoad1External loads the fileMetadata for one external sstable.
147 : // Sequence number and target level calculation happens during prepare/apply.
148 : func ingestLoad1External(
149 : opts *Options,
150 : e ExternalFile,
151 : fileNum base.DiskFileNum,
152 : objprovider objstorage.Provider,
153 : jobID int,
154 0 : ) (*fileMetadata, error) {
155 0 : if e.Size == 0 {
156 0 : // Disallow 0 file sizes
157 0 : return nil, errors.New("pebble: cannot ingest external file with size 0")
158 0 : }
159 0 : if !e.HasRangeKey && !e.HasPointKey {
160 0 : return nil, errors.New("pebble: cannot ingest external file with no point or range keys")
161 0 : }
162 : // Don't load table stats. Doing a round trip to shared storage, one SST
163 : // at a time is not worth it as it slows down ingestion.
164 0 : meta := &fileMetadata{}
165 0 : meta.FileNum = fileNum.FileNum()
166 0 : meta.CreationTime = time.Now().Unix()
167 0 : meta.Virtual = true
168 0 : meta.Size = e.Size
169 0 : meta.InitProviderBacking(fileNum)
170 0 :
171 0 : // Try to resolve a reference to the external file.
172 0 : backing, err := objprovider.CreateExternalObjectBacking(e.Locator, e.ObjName)
173 0 : if err != nil {
174 0 : return nil, err
175 0 : }
176 0 : metas, err := objprovider.AttachRemoteObjects([]objstorage.RemoteObjectToAttach{{
177 0 : FileNum: fileNum,
178 0 : FileType: fileTypeTable,
179 0 : Backing: backing,
180 0 : }})
181 0 : if err != nil {
182 0 : return nil, err
183 0 : }
184 0 : if opts.EventListener.TableCreated != nil {
185 0 : opts.EventListener.TableCreated(TableCreateInfo{
186 0 : JobID: jobID,
187 0 : Reason: "ingesting",
188 0 : Path: objprovider.Path(metas[0]),
189 0 : FileNum: fileNum.FileNum(),
190 0 : })
191 0 : }
192 : // In the name of keeping this ingestion as fast as possible, we avoid
193 : // *all* existence checks and synthesize a file metadata with smallest/largest
194 : // keys that overlap whatever the passed-in span was.
195 0 : smallestCopy := make([]byte, len(e.SmallestUserKey))
196 0 : copy(smallestCopy, e.SmallestUserKey)
197 0 : largestCopy := make([]byte, len(e.LargestUserKey))
198 0 : copy(largestCopy, e.LargestUserKey)
199 0 : if e.HasPointKey {
200 0 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax),
201 0 : base.MakeRangeDeleteSentinelKey(largestCopy))
202 0 : }
203 0 : if e.HasRangeKey {
204 0 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, base.MakeInternalKey(smallestCopy, 0, InternalKeyKindRangeKeySet),
205 0 : base.MakeExclusiveSentinelKey(InternalKeyKindRangeKeyDelete, largestCopy))
206 0 : }
207 :
208 : // Set the underlying FileBacking's size to the same size as the virtualized
209 : // view of the sstable. This ensures that we don't over-prioritize this
210 : // sstable for compaction just yet, as we do not have a clear sense of
211 : // what parts of this sstable are referenced by other nodes.
212 0 : meta.FileBacking.Size = e.Size
213 0 :
214 0 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
215 0 : return nil, err
216 0 : }
217 0 : return meta, nil
218 : }
219 :
220 : // ingestLoad1 creates the FileMetadata for one file. This file will be owned
221 : // by this store.
222 : func ingestLoad1(
223 : opts *Options,
224 : fmv FormatMajorVersion,
225 : readable objstorage.Readable,
226 : cacheID uint64,
227 : fileNum base.DiskFileNum,
228 1 : ) (*fileMetadata, error) {
229 1 : cacheOpts := private.SSTableCacheOpts(cacheID, fileNum).(sstable.ReaderOption)
230 1 : r, err := sstable.NewReader(readable, opts.MakeReaderOptions(), cacheOpts)
231 1 : if err != nil {
232 0 : return nil, err
233 0 : }
234 1 : defer r.Close()
235 1 :
236 1 : // Avoid ingesting tables with format versions this DB doesn't support.
237 1 : tf, err := r.TableFormat()
238 1 : if err != nil {
239 0 : return nil, err
240 0 : }
241 1 : if tf < fmv.MinTableFormat() || tf > fmv.MaxTableFormat() {
242 0 : return nil, errors.Newf(
243 0 : "pebble: table format %s is not within range supported at DB format major version %d, (%s,%s)",
244 0 : tf, fmv, fmv.MinTableFormat(), fmv.MaxTableFormat(),
245 0 : )
246 0 : }
247 :
248 1 : meta := &fileMetadata{}
249 1 : meta.FileNum = fileNum.FileNum()
250 1 : meta.Size = uint64(readable.Size())
251 1 : meta.CreationTime = time.Now().Unix()
252 1 : meta.InitPhysicalBacking()
253 1 :
254 1 : // Avoid loading into the table cache for collecting stats if we
255 1 : // don't need to. If there are no range deletions, we have all the
256 1 : // information to compute the stats here.
257 1 : //
258 1 : // This is helpful in tests for avoiding awkwardness around deletion of
259 1 : // ingested files from MemFS. MemFS implements the Windows semantics of
260 1 : // disallowing removal of an open file. Under MemFS, if we don't populate
261 1 : // meta.Stats here, the file will be loaded into the table cache for
262 1 : // calculating stats before we can remove the original link.
263 1 : maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
264 1 :
265 1 : {
266 1 : iter, err := r.NewIter(nil /* lower */, nil /* upper */)
267 1 : if err != nil {
268 0 : return nil, err
269 0 : }
270 1 : defer iter.Close()
271 1 : var smallest InternalKey
272 1 : if key, _ := iter.First(); key != nil {
273 1 : if err := ingestValidateKey(opts, key); err != nil {
274 0 : return nil, err
275 0 : }
276 1 : smallest = (*key).Clone()
277 : }
278 1 : if err := iter.Error(); err != nil {
279 0 : return nil, err
280 0 : }
281 1 : if key, _ := iter.Last(); key != nil {
282 1 : if err := ingestValidateKey(opts, key); err != nil {
283 0 : return nil, err
284 0 : }
285 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, key.Clone())
286 : }
287 1 : if err := iter.Error(); err != nil {
288 0 : return nil, err
289 0 : }
290 : }
291 :
292 1 : iter, err := r.NewRawRangeDelIter()
293 1 : if err != nil {
294 0 : return nil, err
295 0 : }
296 1 : if iter != nil {
297 1 : defer iter.Close()
298 1 : var smallest InternalKey
299 1 : if s := iter.First(); s != nil {
300 1 : key := s.SmallestKey()
301 1 : if err := ingestValidateKey(opts, &key); err != nil {
302 0 : return nil, err
303 0 : }
304 1 : smallest = key.Clone()
305 : }
306 1 : if err := iter.Error(); err != nil {
307 0 : return nil, err
308 0 : }
309 1 : if s := iter.Last(); s != nil {
310 1 : k := s.SmallestKey()
311 1 : if err := ingestValidateKey(opts, &k); err != nil {
312 0 : return nil, err
313 0 : }
314 1 : largest := s.LargestKey().Clone()
315 1 : meta.ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest)
316 : }
317 : }
318 :
319 : // Update the range-key bounds for the table.
320 1 : {
321 1 : iter, err := r.NewRawRangeKeyIter()
322 1 : if err != nil {
323 0 : return nil, err
324 0 : }
325 1 : if iter != nil {
326 1 : defer iter.Close()
327 1 : var smallest InternalKey
328 1 : if s := iter.First(); s != nil {
329 1 : key := s.SmallestKey()
330 1 : if err := ingestValidateKey(opts, &key); err != nil {
331 0 : return nil, err
332 0 : }
333 1 : smallest = key.Clone()
334 : }
335 1 : if err := iter.Error(); err != nil {
336 0 : return nil, err
337 0 : }
338 1 : if s := iter.Last(); s != nil {
339 1 : k := s.SmallestKey()
340 1 : if err := ingestValidateKey(opts, &k); err != nil {
341 0 : return nil, err
342 0 : }
343 : // As range keys are fragmented, the end key of the last range key in
344 : // the table provides the upper bound for the table.
345 1 : largest := s.LargestKey().Clone()
346 1 : meta.ExtendRangeKeyBounds(opts.Comparer.Compare, smallest, largest)
347 : }
348 1 : if err := iter.Error(); err != nil {
349 0 : return nil, err
350 0 : }
351 : }
352 : }
353 :
354 1 : if !meta.HasPointKeys && !meta.HasRangeKeys {
355 1 : return nil, nil
356 1 : }
357 :
358 : // Sanity check that the various bounds on the file were set consistently.
359 1 : if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
360 0 : return nil, err
361 0 : }
362 :
363 1 : return meta, nil
364 : }
365 :
366 : type ingestLoadResult struct {
367 : localMeta, sharedMeta []*fileMetadata
368 : externalMeta []*fileMetadata
369 : localPaths []string
370 : sharedLevels []uint8
371 : fileCount int
372 : }
373 :
374 : func ingestLoad(
375 : opts *Options,
376 : fmv FormatMajorVersion,
377 : paths []string,
378 : shared []SharedSSTMeta,
379 : external []ExternalFile,
380 : cacheID uint64,
381 : pending []base.DiskFileNum,
382 : objProvider objstorage.Provider,
383 : jobID int,
384 1 : ) (ingestLoadResult, error) {
385 1 : meta := make([]*fileMetadata, 0, len(paths))
386 1 : newPaths := make([]string, 0, len(paths))
387 1 : for i := range paths {
388 1 : f, err := opts.FS.Open(paths[i])
389 1 : if err != nil {
390 0 : return ingestLoadResult{}, err
391 0 : }
392 :
393 1 : readable, err := sstable.NewSimpleReadable(f)
394 1 : if err != nil {
395 0 : return ingestLoadResult{}, err
396 0 : }
397 1 : m, err := ingestLoad1(opts, fmv, readable, cacheID, pending[i])
398 1 : if err != nil {
399 0 : return ingestLoadResult{}, err
400 0 : }
401 1 : if m != nil {
402 1 : meta = append(meta, m)
403 1 : newPaths = append(newPaths, paths[i])
404 1 : }
405 : }
406 1 : if len(shared) == 0 && len(external) == 0 {
407 1 : return ingestLoadResult{localMeta: meta, localPaths: newPaths, fileCount: len(meta)}, nil
408 1 : }
409 :
410 : // Sort the shared files according to level.
411 0 : sort.Sort(sharedByLevel(shared))
412 0 :
413 0 : sharedMeta := make([]*fileMetadata, 0, len(shared))
414 0 : levels := make([]uint8, 0, len(shared))
415 0 : for i := range shared {
416 0 : m, err := ingestSynthesizeShared(opts, shared[i], pending[len(paths)+i])
417 0 : if err != nil {
418 0 : return ingestLoadResult{}, err
419 0 : }
420 0 : if shared[i].Level < sharedLevelsStart {
421 0 : return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart")
422 0 : }
423 0 : sharedMeta = append(sharedMeta, m)
424 0 : levels = append(levels, shared[i].Level)
425 : }
426 0 : externalMeta := make([]*fileMetadata, 0, len(external))
427 0 : for i := range external {
428 0 : m, err := ingestLoad1External(opts, external[i], pending[len(paths)+len(shared)+i], objProvider, jobID)
429 0 : if err != nil {
430 0 : return ingestLoadResult{}, err
431 0 : }
432 0 : externalMeta = append(externalMeta, m)
433 : }
434 0 : result := ingestLoadResult{
435 0 : localMeta: meta,
436 0 : sharedMeta: sharedMeta,
437 0 : externalMeta: externalMeta,
438 0 : localPaths: newPaths,
439 0 : sharedLevels: levels,
440 0 : fileCount: len(meta) + len(sharedMeta) + len(externalMeta),
441 0 : }
442 0 : return result, nil
443 : }
444 :
445 : // Struct for sorting metadatas by smallest user keys, while ensuring the
446 : // matching path also gets swapped to the same index. For use in
447 : // ingestSortAndVerify.
448 : type metaAndPaths struct {
449 : meta []*fileMetadata
450 : paths []string
451 : cmp Compare
452 : }
453 :
454 1 : func (m metaAndPaths) Len() int {
455 1 : return len(m.meta)
456 1 : }
457 :
458 1 : func (m metaAndPaths) Less(i, j int) bool {
459 1 : return m.cmp(m.meta[i].Smallest.UserKey, m.meta[j].Smallest.UserKey) < 0
460 1 : }
461 :
462 1 : func (m metaAndPaths) Swap(i, j int) {
463 1 : m.meta[i], m.meta[j] = m.meta[j], m.meta[i]
464 1 : if m.paths != nil {
465 1 : m.paths[i], m.paths[j] = m.paths[j], m.paths[i]
466 1 : }
467 : }
468 :
469 1 : func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error {
470 1 : // Verify that all the shared files (i.e. files in sharedMeta)
471 1 : // fit within the exciseSpan.
472 1 : for i := range lr.sharedMeta {
473 0 : f := lr.sharedMeta[i]
474 0 : if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) {
475 0 : return errors.AssertionFailedf("pebble: shared file outside of excise span, span [%s-%s), file = %s", exciseSpan.Start, exciseSpan.End, f.String())
476 0 : }
477 : }
478 1 : if len(lr.externalMeta) > 0 {
479 0 : if len(lr.localMeta) > 0 || len(lr.sharedMeta) > 0 {
480 0 : // Currently we only support external ingests on their own. If external
481 0 : // files are present alongside local/shared files, return an error.
482 0 : return errors.AssertionFailedf("pebble: external files cannot be ingested atomically alongside other types of files")
483 0 : }
484 0 : sort.Sort(&metaAndPaths{
485 0 : meta: lr.externalMeta,
486 0 : cmp: cmp,
487 0 : })
488 0 : for i := 1; i < len(lr.externalMeta); i++ {
489 0 : if sstableKeyCompare(cmp, lr.externalMeta[i-1].Largest, lr.externalMeta[i].Smallest) >= 0 {
490 0 : return errors.AssertionFailedf("pebble: external sstables have overlapping ranges")
491 0 : }
492 : }
493 0 : return nil
494 : }
495 1 : if len(lr.localMeta) <= 1 || len(lr.localPaths) <= 1 {
496 1 : return nil
497 1 : }
498 :
499 1 : sort.Sort(&metaAndPaths{
500 1 : meta: lr.localMeta,
501 1 : paths: lr.localPaths,
502 1 : cmp: cmp,
503 1 : })
504 1 :
505 1 : for i := 1; i < len(lr.localPaths); i++ {
506 1 : if sstableKeyCompare(cmp, lr.localMeta[i-1].Largest, lr.localMeta[i].Smallest) >= 0 {
507 1 : return errors.AssertionFailedf("pebble: local ingestion sstables have overlapping ranges")
508 1 : }
509 : }
510 1 : if len(lr.sharedMeta) == 0 {
511 1 : return nil
512 1 : }
513 0 : filesInLevel := make([]*fileMetadata, 0, len(lr.sharedMeta))
514 0 : for l := sharedLevelsStart; l < numLevels; l++ {
515 0 : filesInLevel = filesInLevel[:0]
516 0 : for i := range lr.sharedMeta {
517 0 : if lr.sharedLevels[i] == uint8(l) {
518 0 : filesInLevel = append(filesInLevel, lr.sharedMeta[i])
519 0 : }
520 : }
521 0 : slices.SortFunc(filesInLevel, func(a, b *fileMetadata) int {
522 0 : return cmp(a.Smallest.UserKey, b.Smallest.UserKey)
523 0 : })
524 0 : for i := 1; i < len(filesInLevel); i++ {
525 0 : if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 {
526 0 : return errors.AssertionFailedf("pebble: external shared sstables have overlapping ranges")
527 0 : }
528 : }
529 : }
530 0 : return nil
531 : }
532 :
533 0 : func ingestCleanup(objProvider objstorage.Provider, meta []*fileMetadata) error {
534 0 : var firstErr error
535 0 : for i := range meta {
536 0 : if err := objProvider.Remove(fileTypeTable, meta[i].FileBacking.DiskFileNum); err != nil {
537 0 : firstErr = firstError(firstErr, err)
538 0 : }
539 : }
540 0 : return firstErr
541 : }
542 :
543 : // ingestLink creates new objects which are backed by either hardlinks to or
544 : // copies of the ingested files. It also attaches shared objects to the provider.
545 : func ingestLink(
546 : jobID int,
547 : opts *Options,
548 : objProvider objstorage.Provider,
549 : lr ingestLoadResult,
550 : shared []SharedSSTMeta,
551 1 : ) error {
552 1 : for i := range lr.localPaths {
553 1 : objMeta, err := objProvider.LinkOrCopyFromLocal(
554 1 : context.TODO(), opts.FS, lr.localPaths[i], fileTypeTable, lr.localMeta[i].FileBacking.DiskFileNum,
555 1 : objstorage.CreateOptions{PreferSharedStorage: true},
556 1 : )
557 1 : if err != nil {
558 0 : if err2 := ingestCleanup(objProvider, lr.localMeta[:i]); err2 != nil {
559 0 : opts.Logger.Errorf("ingest cleanup failed: %v", err2)
560 0 : }
561 0 : return err
562 : }
563 1 : if opts.EventListener.TableCreated != nil {
564 1 : opts.EventListener.TableCreated(TableCreateInfo{
565 1 : JobID: jobID,
566 1 : Reason: "ingesting",
567 1 : Path: objProvider.Path(objMeta),
568 1 : FileNum: lr.localMeta[i].FileNum,
569 1 : })
570 1 : }
571 : }
572 1 : sharedObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared))
573 1 : for i := range shared {
574 0 : backing, err := shared[i].Backing.Get()
575 0 : if err != nil {
576 0 : return err
577 0 : }
578 0 : sharedObjs = append(sharedObjs, objstorage.RemoteObjectToAttach{
579 0 : FileNum: lr.sharedMeta[i].FileBacking.DiskFileNum,
580 0 : FileType: fileTypeTable,
581 0 : Backing: backing,
582 0 : })
583 : }
584 1 : sharedObjMetas, err := objProvider.AttachRemoteObjects(sharedObjs)
585 1 : if err != nil {
586 0 : return err
587 0 : }
588 1 : for i := range sharedObjMetas {
589 0 : // One corner case around file sizes we need to be mindful of, is that
590 0 : // if one of the shareObjs was initially created by us (and has boomeranged
591 0 : // back from another node), we'll need to update the FileBacking's size
592 0 : // to be the true underlying size. Otherwise, we could hit errors when we
593 0 : // open the db again after a crash/restart (see checkConsistency in open.go),
594 0 : // plus it more accurately allows us to prioritize compactions of files
595 0 : // that were originally created by us.
596 0 : if sharedObjMetas[i].IsShared() && !objProvider.IsSharedForeign(sharedObjMetas[i]) {
597 0 : size, err := objProvider.Size(sharedObjMetas[i])
598 0 : if err != nil {
599 0 : return err
600 0 : }
601 0 : lr.sharedMeta[i].FileBacking.Size = uint64(size)
602 : }
603 0 : if opts.EventListener.TableCreated != nil {
604 0 : opts.EventListener.TableCreated(TableCreateInfo{
605 0 : JobID: jobID,
606 0 : Reason: "ingesting",
607 0 : Path: objProvider.Path(sharedObjMetas[i]),
608 0 : FileNum: lr.sharedMeta[i].FileNum,
609 0 : })
610 0 : }
611 : }
612 : // We do not need to do anything about lr.externalMetas. Those were already
613 : // linked in ingestLoad.
614 :
615 1 : return nil
616 : }
617 :
618 1 : func ingestMemtableOverlaps(cmp Compare, mem flushable, keyRanges []internalKeyRange) bool {
619 1 : iter := mem.newIter(nil)
620 1 : rangeDelIter := mem.newRangeDelIter(nil)
621 1 : rkeyIter := mem.newRangeKeyIter(nil)
622 1 :
623 1 : closeIters := func() error {
624 1 : err := iter.Close()
625 1 : if rangeDelIter != nil {
626 1 : err = firstError(err, rangeDelIter.Close())
627 1 : }
628 1 : if rkeyIter != nil {
629 1 : err = firstError(err, rkeyIter.Close())
630 1 : }
631 1 : return err
632 : }
633 :
634 1 : for _, kr := range keyRanges {
635 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) {
636 1 : closeIters()
637 1 : return true
638 1 : }
639 : }
640 :
641 : // Assume overlap if any iterator errored out.
642 1 : return closeIters() != nil
643 : }
644 :
645 : func ingestUpdateSeqNum(
646 : cmp Compare, format base.FormatKey, seqNum uint64, loadResult ingestLoadResult,
647 1 : ) error {
648 1 : setSeqFn := func(k base.InternalKey) base.InternalKey {
649 1 : return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
650 1 : }
651 1 : updateMetadata := func(m *fileMetadata) error {
652 1 : // NB: we set the fields directly here, rather than via their Extend*
653 1 : // methods, as we are updating sequence numbers.
654 1 : if m.HasPointKeys {
655 1 : m.SmallestPointKey = setSeqFn(m.SmallestPointKey)
656 1 : }
657 1 : if m.HasRangeKeys {
658 1 : m.SmallestRangeKey = setSeqFn(m.SmallestRangeKey)
659 1 : }
660 1 : m.Smallest = setSeqFn(m.Smallest)
661 1 : // Only update the seqnum for the largest key if that key is not an
662 1 : // "exclusive sentinel" (i.e. a range deletion sentinel or a range key
663 1 : // boundary), as doing so effectively drops the exclusive sentinel (by
664 1 : // lowering the seqnum from the max value), and extends the bounds of the
665 1 : // table.
666 1 : // NB: as the largest range key is always an exclusive sentinel, it is never
667 1 : // updated.
668 1 : if m.HasPointKeys && !m.LargestPointKey.IsExclusiveSentinel() {
669 1 : m.LargestPointKey = setSeqFn(m.LargestPointKey)
670 1 : }
671 1 : if !m.Largest.IsExclusiveSentinel() {
672 1 : m.Largest = setSeqFn(m.Largest)
673 1 : }
674 : // Setting smallestSeqNum == largestSeqNum triggers the setting of
675 : // Properties.GlobalSeqNum when an sstable is loaded.
676 1 : m.SmallestSeqNum = seqNum
677 1 : m.LargestSeqNum = seqNum
678 1 : // Ensure the new bounds are consistent.
679 1 : if err := m.Validate(cmp, format); err != nil {
680 0 : return err
681 0 : }
682 1 : seqNum++
683 1 : return nil
684 : }
685 :
686 : // Shared sstables are required to be sorted by level ascending. We then
687 : // iterate the shared sstables in reverse, assigning the lower sequence
688 : // numbers to the shared sstables that will be ingested into the lower
689 : // (larger numbered) levels first. This ensures sequence number shadowing is
690 : // correct.
691 1 : for i := len(loadResult.sharedMeta) - 1; i >= 0; i-- {
692 0 : if i-1 >= 0 && loadResult.sharedLevels[i-1] > loadResult.sharedLevels[i] {
693 0 : panic(errors.AssertionFailedf("shared files %s, %s out of order", loadResult.sharedMeta[i-1], loadResult.sharedMeta[i]))
694 : }
695 0 : if err := updateMetadata(loadResult.sharedMeta[i]); err != nil {
696 0 : return err
697 0 : }
698 : }
699 1 : for i := range loadResult.localMeta {
700 1 : if err := updateMetadata(loadResult.localMeta[i]); err != nil {
701 0 : return err
702 0 : }
703 : }
704 1 : for i := range loadResult.externalMeta {
705 0 : if err := updateMetadata(loadResult.externalMeta[i]); err != nil {
706 0 : return err
707 0 : }
708 : }
709 1 : return nil
710 : }
711 :
712 : // Denotes an internal key range. Smallest and largest are both inclusive.
713 : type internalKeyRange struct {
714 : smallest, largest InternalKey
715 : }
716 :
717 : func overlapWithIterator(
718 : iter internalIterator,
719 : rangeDelIter *keyspan.FragmentIterator,
720 : rkeyIter keyspan.FragmentIterator,
721 : keyRange internalKeyRange,
722 : cmp Compare,
723 1 : ) bool {
724 1 : // Check overlap with point operations.
725 1 : //
726 1 : // When using levelIter, it seeks to the SST whose boundaries
727 1 : // contain keyRange.smallest.UserKey(S).
728 1 : // It then tries to find a point in that SST that is >= S.
729 1 : // If there's no such point it means the SST ends in a tombstone in which case
730 1 : // levelIter.SeekGE generates a boundary range del sentinel.
731 1 : // The comparison of this boundary with keyRange.largest(L) below
732 1 : // is subtle but maintains correctness.
733 1 : // 1) boundary < L,
734 1 : // since boundary is also > S (initial seek),
735 1 : // whatever the boundary's start key may be, we're always overlapping.
736 1 : // 2) boundary > L,
737 1 : // overlap with boundary cannot be determined since we don't know boundary's start key.
738 1 : // We require checking for overlap with rangeDelIter.
739 1 : // 3) boundary == L and L is not sentinel,
740 1 : // means boundary < L and hence is similar to 1).
741 1 : // 4) boundary == L and L is sentinel,
742 1 : // we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap.
743 1 : key, _ := iter.SeekGE(keyRange.smallest.UserKey, base.SeekGEFlagsNone)
744 1 : if key != nil {
745 1 : c := sstableKeyCompare(cmp, *key, keyRange.largest)
746 1 : if c <= 0 {
747 1 : return true
748 1 : }
749 : }
750 : // Assume overlap if iterator errored.
751 1 : if err := iter.Error(); err != nil {
752 0 : return true
753 0 : }
754 :
755 1 : computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) bool {
756 1 : // NB: The spans surfaced by the fragment iterator are non-overlapping.
757 1 : span := rIter.SeekLT(keyRange.smallest.UserKey)
758 1 : if span == nil {
759 1 : span = rIter.Next()
760 1 : }
761 1 : for ; span != nil; span = rIter.Next() {
762 1 : if span.Empty() {
763 1 : continue
764 : }
765 1 : key := span.SmallestKey()
766 1 : c := sstableKeyCompare(cmp, key, keyRange.largest)
767 1 : if c > 0 {
768 1 : // The start of the span is after the largest key in the
769 1 : // ingested table.
770 1 : return false
771 1 : }
772 1 : if cmp(span.End, keyRange.smallest.UserKey) > 0 {
773 1 : // The end of the span is greater than the smallest in the
774 1 : // table. Note that the span end key is exclusive, thus ">0"
775 1 : // instead of ">=0".
776 1 : return true
777 1 : }
778 : }
779 : // Assume overlap if iterator errored.
780 1 : if err := rIter.Error(); err != nil {
781 0 : return true
782 0 : }
783 1 : return false
784 : }
785 :
786 : // rkeyIter is either a range key level iter, or a range key iterator
787 : // over a single file.
788 1 : if rkeyIter != nil {
789 1 : if computeOverlapWithSpans(rkeyIter) {
790 1 : return true
791 1 : }
792 : }
793 :
794 : // Check overlap with range deletions.
795 1 : if rangeDelIter == nil || *rangeDelIter == nil {
796 1 : return false
797 1 : }
798 1 : return computeOverlapWithSpans(*rangeDelIter)
799 : }
800 :
801 : // ingestTargetLevel returns the target level for a file being ingested.
802 : // If suggestSplit is true, it accounts for ingest-time splitting as part of
803 : // its target level calculation, and if a split candidate is found, that file
804 : // is returned as the splitFile.
805 : func ingestTargetLevel(
806 : newIters tableNewIters,
807 : newRangeKeyIter keyspan.TableNewSpanIter,
808 : iterOps IterOptions,
809 : comparer *Comparer,
810 : v *version,
811 : baseLevel int,
812 : compactions map[*compaction]struct{},
813 : meta *fileMetadata,
814 : suggestSplit bool,
815 1 : ) (targetLevel int, splitFile *fileMetadata, err error) {
816 1 : // Find the lowest level which does not have any files which overlap meta. We
817 1 : // search from L0 to L6 looking for whether there are any files in the level
818 1 : // which overlap meta. We want the "lowest" level (where lower means
819 1 : // increasing level number) in order to reduce write amplification.
820 1 : //
821 1 : // There are 2 kinds of overlap we need to check for: file boundary overlap
822 1 : // and data overlap. Data overlap implies file boundary overlap. Note that it
823 1 : // is always possible to ingest into L0.
824 1 : //
825 1 : // To place meta at level i where i > 0:
826 1 : // - there must not be any data overlap with levels <= i, since that will
827 1 : // violate the sequence number invariant.
828 1 : // - no file boundary overlap with level i, since that will violate the
829 1 : // invariant that files do not overlap in levels i > 0.
830 1 : // - if there is only a file overlap at a given level, and no data overlap,
831 1 : // we can still slot a file at that level. We return the fileMetadata with
832 1 : // which we have file boundary overlap (must be only one file, as sstable
833 1 : // bounds are usually tight on user keys) and the caller is expected to split
834 1 : // that sstable into two virtual sstables, allowing this file to go into that
835 1 : // level. Note that if we have file boundary overlap with two files, which
836 1 : // should only happen on rare occasions, we treat it as data overlap and
837 1 : // don't use this optimization.
838 1 : //
839 1 : // The file boundary overlap check is simpler to conceptualize. Consider the
840 1 : // following example, in which the ingested file lies completely before or
841 1 : // after the file being considered.
842 1 : //
843 1 : // |--| |--| ingested file: [a,b] or [f,g]
844 1 : // |-----| existing file: [c,e]
845 1 : // _____________________
846 1 : // a b c d e f g
847 1 : //
848 1 : // In both cases the ingested file can move to considering the next level.
849 1 : //
850 1 : // File boundary overlap does not necessarily imply data overlap. The check
851 1 : // for data overlap is a little more nuanced. Consider the following examples:
852 1 : //
853 1 : // 1. No data overlap:
854 1 : //
855 1 : // |-| |--| ingested file: [cc-d] or [ee-ff]
856 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
857 1 : // _____________________
858 1 : // a b c d e f g
859 1 : //
860 1 : // In this case the ingested files can "fall through" this level. The checks
861 1 : // continue at the next level.
862 1 : //
863 1 : // 2. Data overlap:
864 1 : //
865 1 : // |--| ingested file: [d-e]
866 1 : // |*--*--*----*------*| existing file: [a-g], points: [a, b, c, dd, g]
867 1 : // _____________________
868 1 : // a b c d e f g
869 1 : //
870 1 : // In this case the file cannot be ingested into this level as the point 'dd'
871 1 : // is in the way.
872 1 : //
873 1 : // It is worth noting that the check for data overlap is only approximate. In
874 1 : // the previous example, the ingested table [d-e] could contain only the
875 1 : // points 'd' and 'e', in which case the table would be eligible for
876 1 : // considering lower levels. However, such a fine-grained check would need to
877 1 : // be exhaustive (comparing points and ranges in both the ingested existing
878 1 : // tables) and such a check is prohibitively expensive. Thus Pebble treats any
879 1 : // existing point that falls within the ingested table bounds as being "data
880 1 : // overlap".
881 1 :
882 1 : // This assertion implicitly checks that we have the current version of
883 1 : // the metadata.
884 1 : if v.L0Sublevels == nil {
885 0 : return 0, nil, errors.AssertionFailedf("could not read L0 sublevels")
886 0 : }
887 : // Check for overlap over the keys of L0 by iterating over the sublevels.
888 1 : for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
889 1 : iter := newLevelIter(iterOps, comparer, newIters,
890 1 : v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})
891 1 :
892 1 : var rangeDelIter keyspan.FragmentIterator
893 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
894 1 : // sets it up for the target file.
895 1 : iter.initRangeDel(&rangeDelIter)
896 1 :
897 1 : levelIter := keyspan.LevelIter{}
898 1 : levelIter.Init(
899 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
900 1 : v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange,
901 1 : )
902 1 :
903 1 : kr := internalKeyRange{
904 1 : smallest: meta.Smallest,
905 1 : largest: meta.Largest,
906 1 : }
907 1 : overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, kr, comparer.Compare)
908 1 : err := iter.Close() // Closes range del iter as well.
909 1 : err = firstError(err, levelIter.Close())
910 1 : if err != nil {
911 0 : return 0, nil, err
912 0 : }
913 1 : if overlap {
914 1 : return targetLevel, nil, nil
915 1 : }
916 : }
917 :
918 1 : level := baseLevel
919 1 : for ; level < numLevels; level++ {
920 1 : levelIter := newLevelIter(iterOps, comparer, newIters,
921 1 : v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
922 1 : var rangeDelIter keyspan.FragmentIterator
923 1 : // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
924 1 : // sets it up for the target file.
925 1 : levelIter.initRangeDel(&rangeDelIter)
926 1 :
927 1 : rkeyLevelIter := &keyspan.LevelIter{}
928 1 : rkeyLevelIter.Init(
929 1 : keyspan.SpanIterOptions{}, comparer.Compare, newRangeKeyIter,
930 1 : v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange,
931 1 : )
932 1 :
933 1 : kr := internalKeyRange{
934 1 : smallest: meta.Smallest,
935 1 : largest: meta.Largest,
936 1 : }
937 1 : overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, kr, comparer.Compare)
938 1 : err := levelIter.Close() // Closes range del iter as well.
939 1 : err = firstError(err, rkeyLevelIter.Close())
940 1 : if err != nil {
941 0 : return 0, nil, err
942 0 : }
943 1 : if overlap {
944 1 : return targetLevel, splitFile, nil
945 1 : }
946 :
947 : // Check boundary overlap.
948 1 : var candidateSplitFile *fileMetadata
949 1 : boundaryOverlaps := v.Overlaps(level, comparer.Compare, meta.Smallest.UserKey,
950 1 : meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
951 1 : if !boundaryOverlaps.Empty() {
952 1 : // We are already guaranteed to not have any data overlaps with files
953 1 : // in boundaryOverlaps, otherwise we'd have returned in the above if
954 1 : // statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for
955 1 : // the case where we can slot this file into the current level despite
956 1 : // a boundary overlap, by splitting one existing file into two virtual
957 1 : // sstables.
958 1 : if suggestSplit && boundaryOverlaps.Len() == 1 {
959 1 : iter := boundaryOverlaps.Iter()
960 1 : candidateSplitFile = iter.First()
961 1 : } else {
962 1 : // We either don't want to suggest ingest-time splits (i.e.
963 1 : // !suggestSplit), or we boundary-overlapped with more than one file.
964 1 : continue
965 : }
966 : }
967 :
968 : // Check boundary overlap with any ongoing compactions. We consider an
969 : // overlapping compaction that's writing files to an output level as
970 : // equivalent to boundary overlap with files in that output level.
971 : //
972 : // We cannot check for data overlap with the new SSTs compaction will produce
973 : // since compaction hasn't been done yet. However, there's no need to check
974 : // since all keys in them will be from levels in [c.startLevel,
975 : // c.outputLevel], and all those levels have already had their data overlap
976 : // tested negative (else we'd have returned earlier).
977 : //
978 : // An alternative approach would be to cancel these compactions and proceed
979 : // with an ingest-time split on this level if necessary. However, compaction
980 : // cancellation can result in significant wasted effort and is best avoided
981 : // unless necessary.
982 1 : overlaps := false
983 1 : for c := range compactions {
984 1 : if c.outputLevel == nil || level != c.outputLevel.level {
985 1 : continue
986 : }
987 1 : if comparer.Compare(meta.Smallest.UserKey, c.largest.UserKey) <= 0 &&
988 1 : comparer.Compare(meta.Largest.UserKey, c.smallest.UserKey) >= 0 {
989 1 : overlaps = true
990 1 : break
991 : }
992 : }
993 1 : if !overlaps {
994 1 : targetLevel = level
995 1 : splitFile = candidateSplitFile
996 1 : }
997 : }
998 1 : return targetLevel, splitFile, nil
999 : }
1000 :
1001 : // Ingest ingests a set of sstables into the DB. Ingestion of the files is
1002 : // atomic and semantically equivalent to creating a single batch containing all
1003 : // of the mutations in the sstables. Ingestion may require the memtable to be
1004 : // flushed. The ingested sstable files are moved into the DB and must reside on
1005 : // the same filesystem as the DB. Sstables can be created for ingestion using
1006 : // sstable.Writer. On success, Ingest removes the input paths.
1007 : //
1008 : // Two types of sstables are accepted for ingestion(s): one is sstables present
1009 : // in the instance's vfs.FS and can be referenced locally. The other is sstables
1010 : // present in remote.Storage, referred to as shared or foreign sstables. These
1011 : // shared sstables can be linked through objstorageprovider.Provider, and do not
1012 : // need to already be present on the local vfs.FS. Foreign sstables must all fit
1013 : // in an excise span, and are destined for a level specified in SharedSSTMeta.
1014 : //
1015 : // All sstables *must* be Sync()'d by the caller after all bytes are written
1016 : // and before its file handle is closed; failure to do so could violate
1017 : // durability or lead to corrupted on-disk state. This method cannot, in a
1018 : // platform-and-FS-agnostic way, ensure that all sstables in the input are
1019 : // properly synced to disk. Opening new file handles and Sync()-ing them
1020 : // does not always guarantee durability; see the discussion here on that:
1021 : // https://github.com/cockroachdb/pebble/pull/835#issuecomment-663075379
1022 : //
1023 : // Ingestion loads each sstable into the lowest level of the LSM which it
1024 : // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable,
1025 : // ingestion forces the memtable to flush, and then waits for the flush to
1026 : // occur. In some cases, such as with no foreign sstables and no excise span,
1027 : // ingestion that gets blocked on a memtable can join the flushable queue and
1028 : // finish even before the memtable has been flushed.
1029 : //
1030 : // The steps for ingestion are:
1031 : //
1032 : // 1. Allocate file numbers for every sstable being ingested.
1033 : // 2. Load the metadata for all sstables being ingested.
1034 : // 3. Sort the sstables by smallest key, verifying non overlap (for local
1035 : // sstables).
1036 : // 4. Hard link (or copy) the local sstables into the DB directory.
1037 : // 5. Allocate a sequence number to use for all of the entries in the
1038 : // local sstables. This is the step where overlap with memtables is
1039 : // determined. If there is overlap, we remember the most recent memtable
1040 : // that overlaps.
1041 : // 6. Update the sequence number in the ingested local sstables. (Remote
1042 : // sstables get fixed sequence numbers that were determined at load time.)
1043 : // 7. Wait for the most recent memtable that overlaps to flush (if any).
1044 : // 8. Add the ingested sstables to the version (DB.ingestApply).
1045 : // 8.1. If an excise span was specified, figure out what sstables in the
1046 : // current version overlap with the excise span, and create new virtual
1047 : // sstables out of those sstables that exclude the excised span (DB.excise).
1048 : // 9. Publish the ingestion sequence number.
1049 : //
1050 : // Note that if the mutable memtable overlaps with ingestion, a flush of the
1051 : // memtable is forced equivalent to DB.Flush. Additionally, subsequent
1052 : // mutations that get sequence numbers larger than the ingestion sequence
1053 : // number get queued up behind the ingestion waiting for it to complete. This
1054 : // can produce a noticeable hiccup in performance. See
1055 : // https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
1056 : // this hiccup.
1057 1 : func (d *DB) Ingest(paths []string) error {
1058 1 : if err := d.closed.Load(); err != nil {
1059 0 : panic(err)
1060 : }
1061 1 : if d.opts.ReadOnly {
1062 0 : return ErrReadOnly
1063 0 : }
1064 1 : _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
1065 1 : return err
1066 : }
1067 :
1068 : // IngestOperationStats provides some information about where in the LSM the
1069 : // bytes were ingested.
1070 : type IngestOperationStats struct {
1071 : // Bytes is the total bytes in the ingested sstables.
1072 : Bytes uint64
1073 : // ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
1074 : // into L0. This value is approximate when flushable ingests are active and
1075 : // an ingest overlaps an entry in the flushable queue. Currently, this
1076 : // approximation is very rough, only including tables that overlapped the
1077 : // memtable. This estimate may be improved with #2112.
1078 : ApproxIngestedIntoL0Bytes uint64
1079 : // MemtableOverlappingFiles is the count of ingested sstables
1080 : // that overlapped keys in the memtables.
1081 : MemtableOverlappingFiles int
1082 : }
1083 :
1084 : // ExternalFile are external sstables that can be referenced through
1085 : // objprovider and ingested as remote files that will not be refcounted or
1086 : // cleaned up. For use with online restore. Note that the underlying sstable
1087 : // could contain keys outside the [Smallest,Largest) bounds; however Pebble
1088 : // is expected to only read the keys within those bounds.
1089 : type ExternalFile struct {
1090 : // Locator is the shared.Locator that can be used with objProvider to
1091 : // resolve a reference to this external sstable.
1092 : Locator remote.Locator
1093 : // ObjName is the unique name of this sstable on Locator.
1094 : ObjName string
1095 : // Size of the referenced proportion of the virtualized sstable. An estimate
1096 : // is acceptable in lieu of the backing file size.
1097 : Size uint64
1098 : // SmallestUserKey and LargestUserKey are the [smallest,largest) user key
1099 : // bounds of the sstable. Both these bounds are loose i.e. it's possible for
1100 : // the sstable to not span the entirety of this range. However, multiple
1101 : // ExternalFiles in one ingestion must all have non-overlapping
1102 : // [smallest, largest) spans. Note that this Largest bound is exclusive.
1103 : SmallestUserKey, LargestUserKey []byte
1104 : // HasPointKey and HasRangeKey denote whether this file contains point keys
1105 : // or range keys. If both structs are false, an error is returned during
1106 : // ingestion.
1107 : HasPointKey, HasRangeKey bool
1108 : }
1109 :
1110 : // IngestWithStats does the same as Ingest, and additionally returns
1111 : // IngestOperationStats.
1112 0 : func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
1113 0 : if err := d.closed.Load(); err != nil {
1114 0 : panic(err)
1115 : }
1116 0 : if d.opts.ReadOnly {
1117 0 : return IngestOperationStats{}, ErrReadOnly
1118 0 : }
1119 0 : return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}, nil /* external */)
1120 : }
1121 :
1122 : // IngestExternalFiles does the same as IngestWithStats, and additionally
1123 : // accepts external files (with locator info that can be resolved using
1124 : // d.opts.SharedStorage). These files must also be non-overlapping with
1125 : // each other, and must be resolvable through d.objProvider.
1126 0 : func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
1127 0 : if err := d.closed.Load(); err != nil {
1128 0 : panic(err)
1129 : }
1130 :
1131 0 : if d.opts.ReadOnly {
1132 0 : return IngestOperationStats{}, ErrReadOnly
1133 0 : }
1134 0 : if d.opts.Experimental.RemoteStorage == nil {
1135 0 : return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
1136 0 : }
1137 0 : return d.ingest(nil, ingestTargetLevel, nil /* shared */, KeyRange{}, external)
1138 : }
1139 :
1140 : // IngestAndExcise does the same as IngestWithStats, and additionally accepts a
1141 : // list of shared files to ingest that can be read from a remote.Storage through
1142 : // a Provider. All the shared files must live within exciseSpan, and any existing
1143 : // keys in exciseSpan are deleted by turning existing sstables into virtual
1144 : // sstables (if not virtual already) and shrinking their spans to exclude
1145 : // exciseSpan. See the comment at Ingest for a more complete picture of the
1146 : // ingestion process.
1147 : //
1148 : // Panics if this DB instance was not instantiated with a remote.Storage and
1149 : // shared sstables are present.
1150 : func (d *DB) IngestAndExcise(
1151 : paths []string, shared []SharedSSTMeta, exciseSpan KeyRange,
1152 0 : ) (IngestOperationStats, error) {
1153 0 : if err := d.closed.Load(); err != nil {
1154 0 : panic(err)
1155 : }
1156 0 : if d.opts.ReadOnly {
1157 0 : return IngestOperationStats{}, ErrReadOnly
1158 0 : }
1159 0 : return d.ingest(paths, ingestTargetLevel, shared, exciseSpan, nil /* external */)
1160 : }
1161 :
1162 : // Both DB.mu and commitPipeline.mu must be held while this is called.
1163 : func (d *DB) newIngestedFlushableEntry(
1164 : meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum,
1165 1 : ) (*flushableEntry, error) {
1166 1 : // Update the sequence number for all of the sstables in the
1167 1 : // metadata. Writing the metadata to the manifest when the
1168 1 : // version edit is applied is the mechanism that persists the
1169 1 : // sequence number. The sstables themselves are left unmodified.
1170 1 : // In this case, a version edit will only be written to the manifest
1171 1 : // when the flushable is eventually flushed. If Pebble restarts in that
1172 1 : // time, then we'll lose the ingest sequence number information. But this
1173 1 : // information will also be reconstructed on node restart.
1174 1 : if err := ingestUpdateSeqNum(
1175 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, ingestLoadResult{localMeta: meta},
1176 1 : ); err != nil {
1177 0 : return nil, err
1178 0 : }
1179 :
1180 1 : f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter)
1181 1 :
1182 1 : // NB: The logNum/seqNum are the WAL number which we're writing this entry
1183 1 : // to and the sequence number within the WAL which we'll write this entry
1184 1 : // to.
1185 1 : entry := d.newFlushableEntry(f, logNum, seqNum)
1186 1 : // The flushable entry starts off with a single reader ref, so increment
1187 1 : // the FileMetadata.Refs.
1188 1 : for _, file := range f.files {
1189 1 : file.Ref()
1190 1 : }
1191 1 : entry.unrefFiles = func() []*fileBacking {
1192 1 : var obsolete []*fileBacking
1193 1 : for _, file := range f.files {
1194 1 : if file.Unref() == 0 {
1195 1 : obsolete = append(obsolete, file.FileMetadata.FileBacking)
1196 1 : }
1197 : }
1198 1 : return obsolete
1199 : }
1200 :
1201 1 : entry.flushForced = true
1202 1 : entry.releaseMemAccounting = func() {}
1203 1 : return entry, nil
1204 : }
1205 :
1206 : // Both DB.mu and commitPipeline.mu must be held while this is called. Since
1207 : // we're holding both locks, the order in which we rotate the memtable or
1208 : // recycle the WAL in this function is irrelevant as long as the correct log
1209 : // numbers are assigned to the appropriate flushable.
1210 1 : func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error {
1211 1 : b := d.NewBatch()
1212 1 : for _, m := range meta {
1213 1 : b.ingestSST(m.FileNum)
1214 1 : }
1215 1 : b.setSeqNum(seqNum)
1216 1 :
1217 1 : // If the WAL is disabled, then the logNum used to create the flushable
1218 1 : // entry doesn't matter. We just use the logNum assigned to the current
1219 1 : // mutable memtable. If the WAL is enabled, then this logNum will be
1220 1 : // overwritten by the logNum of the log which will contain the log entry
1221 1 : // for the ingestedFlushable.
1222 1 : logNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1223 1 : if !d.opts.DisableWAL {
1224 1 : // We create a new WAL for the flushable instead of reusing the end of
1225 1 : // the previous WAL. This simplifies the increment of the minimum
1226 1 : // unflushed log number, and also simplifies WAL replay.
1227 1 : logNum, _ = d.recycleWAL()
1228 1 : d.mu.Unlock()
1229 1 : err := d.commit.directWrite(b)
1230 1 : if err != nil {
1231 0 : d.opts.Logger.Fatalf("%v", err)
1232 0 : }
1233 1 : d.mu.Lock()
1234 : }
1235 :
1236 1 : entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum)
1237 1 : if err != nil {
1238 0 : return err
1239 0 : }
1240 1 : nextSeqNum := seqNum + uint64(b.Count())
1241 1 :
1242 1 : // Set newLogNum to the logNum of the previous flushable. This value is
1243 1 : // irrelevant if the WAL is disabled. If the WAL is enabled, then we set
1244 1 : // the appropriate value below.
1245 1 : newLogNum := d.mu.mem.queue[len(d.mu.mem.queue)-1].logNum
1246 1 : if !d.opts.DisableWAL {
1247 1 : // This is WAL num of the next mutable memtable which comes after the
1248 1 : // ingestedFlushable in the flushable queue. The mutable memtable
1249 1 : // will be created below.
1250 1 : newLogNum, _ = d.recycleWAL()
1251 1 : if err != nil {
1252 0 : return err
1253 0 : }
1254 : }
1255 :
1256 1 : currMem := d.mu.mem.mutable
1257 1 : // NB: Placing ingested sstables above the current memtables
1258 1 : // requires rotating of the existing memtables/WAL. There is
1259 1 : // some concern of churning through tiny memtables due to
1260 1 : // ingested sstables being placed on top of them, but those
1261 1 : // memtables would have to be flushed anyways.
1262 1 : d.mu.mem.queue = append(d.mu.mem.queue, entry)
1263 1 : d.rotateMemtable(newLogNum, nextSeqNum, currMem)
1264 1 : d.updateReadStateLocked(d.opts.DebugCheck)
1265 1 : d.maybeScheduleFlush()
1266 1 : return nil
1267 : }
1268 :
1269 : // See comment at Ingest() for details on how this works.
1270 : func (d *DB) ingest(
1271 : paths []string,
1272 : targetLevelFunc ingestTargetLevelFunc,
1273 : shared []SharedSSTMeta,
1274 : exciseSpan KeyRange,
1275 : external []ExternalFile,
1276 1 : ) (IngestOperationStats, error) {
1277 1 : if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil {
1278 0 : panic("cannot ingest shared sstables with nil SharedStorage")
1279 : }
1280 1 : if (exciseSpan.Valid() || len(shared) > 0 || len(external) > 0) && d.FormatMajorVersion() < FormatVirtualSSTables {
1281 0 : return IngestOperationStats{}, errors.New("pebble: format major version too old for excise, shared or external sstable ingestion")
1282 0 : }
1283 : // Allocate file numbers for all of the files being ingested and mark them as
1284 : // pending in order to prevent them from being deleted. Note that this causes
1285 : // the file number ordering to be out of alignment with sequence number
1286 : // ordering. The sorting of L0 tables by sequence number avoids relying on
1287 : // that (busted) invariant.
1288 1 : d.mu.Lock()
1289 1 : pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)+len(external))
1290 1 : for i := 0; i < len(paths)+len(shared)+len(external); i++ {
1291 1 : pendingOutputs[i] = d.mu.versions.getNextDiskFileNum()
1292 1 : }
1293 :
1294 1 : jobID := d.mu.nextJobID
1295 1 : d.mu.nextJobID++
1296 1 : d.mu.Unlock()
1297 1 :
1298 1 : // Load the metadata for all the files being ingested. This step detects
1299 1 : // and elides empty sstables.
1300 1 : loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs, d.objProvider, jobID)
1301 1 : if err != nil {
1302 0 : return IngestOperationStats{}, err
1303 0 : }
1304 :
1305 1 : if loadResult.fileCount == 0 {
1306 1 : // All of the sstables to be ingested were empty. Nothing to do.
1307 1 : return IngestOperationStats{}, nil
1308 1 : }
1309 :
1310 : // Verify the sstables do not overlap.
1311 1 : if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil {
1312 1 : return IngestOperationStats{}, err
1313 1 : }
1314 :
1315 : // Hard link the sstables into the DB directory. Since the sstables aren't
1316 : // referenced by a version, they won't be used. If the hard linking fails
1317 : // (e.g. because the files reside on a different filesystem), ingestLink will
1318 : // fall back to copying, and if that fails we undo our work and return an
1319 : // error.
1320 1 : if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil {
1321 0 : return IngestOperationStats{}, err
1322 0 : }
1323 :
1324 : // Make the new tables durable. We need to do this at some point before we
1325 : // update the MANIFEST (via logAndApply), otherwise a crash can have the
1326 : // tables referenced in the MANIFEST, but not present in the provider.
1327 1 : if err := d.objProvider.Sync(); err != nil {
1328 0 : return IngestOperationStats{}, err
1329 0 : }
1330 :
1331 : // metaFlushableOverlaps is a slice parallel to meta indicating which of the
1332 : // ingested sstables overlap some table in the flushable queue. It's used to
1333 : // approximate ingest-into-L0 stats when using flushable ingests.
1334 1 : metaFlushableOverlaps := make([]bool, loadResult.fileCount)
1335 1 : var mem *flushableEntry
1336 1 : var mut *memTable
1337 1 : // asFlushable indicates whether the sstable was ingested as a flushable.
1338 1 : var asFlushable bool
1339 1 : prepare := func(seqNum uint64) {
1340 1 : // Note that d.commit.mu is held by commitPipeline when calling prepare.
1341 1 :
1342 1 : d.mu.Lock()
1343 1 : defer d.mu.Unlock()
1344 1 :
1345 1 : // Check to see if any files overlap with any of the memtables. The queue
1346 1 : // is ordered from oldest to newest with the mutable memtable being the
1347 1 : // last element in the slice. We want to wait for the newest table that
1348 1 : // overlaps.
1349 1 :
1350 1 : for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
1351 1 : m := d.mu.mem.queue[i]
1352 1 : iter := m.newIter(nil)
1353 1 : rangeDelIter := m.newRangeDelIter(nil)
1354 1 : rkeyIter := m.newRangeKeyIter(nil)
1355 1 :
1356 1 : checkForOverlap := func(i int, meta *fileMetadata) {
1357 1 : if metaFlushableOverlaps[i] {
1358 1 : // This table already overlapped a more recent flushable.
1359 1 : return
1360 1 : }
1361 1 : kr := internalKeyRange{
1362 1 : smallest: meta.Smallest,
1363 1 : largest: meta.Largest,
1364 1 : }
1365 1 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) {
1366 1 : // If this is the first table to overlap a flushable, save
1367 1 : // the flushable. This ingest must be ingested or flushed
1368 1 : // after it.
1369 1 : if mem == nil {
1370 1 : mem = m
1371 1 : }
1372 1 : metaFlushableOverlaps[i] = true
1373 : }
1374 : }
1375 1 : for i := range loadResult.localMeta {
1376 1 : checkForOverlap(i, loadResult.localMeta[i])
1377 1 : }
1378 1 : for i := range loadResult.sharedMeta {
1379 0 : checkForOverlap(len(loadResult.localMeta)+i, loadResult.sharedMeta[i])
1380 0 : }
1381 1 : for i := range loadResult.externalMeta {
1382 0 : checkForOverlap(len(loadResult.localMeta)+len(loadResult.sharedMeta)+i, loadResult.externalMeta[i])
1383 0 : }
1384 1 : if exciseSpan.Valid() {
1385 0 : kr := internalKeyRange{
1386 0 : smallest: base.MakeInternalKey(exciseSpan.Start, InternalKeySeqNumMax, InternalKeyKindMax),
1387 0 : largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, exciseSpan.End),
1388 0 : }
1389 0 : if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) {
1390 0 : if mem == nil {
1391 0 : mem = m
1392 0 : }
1393 : }
1394 : }
1395 1 : err := iter.Close()
1396 1 : if rangeDelIter != nil {
1397 1 : err = firstError(err, rangeDelIter.Close())
1398 1 : }
1399 1 : if rkeyIter != nil {
1400 1 : err = firstError(err, rkeyIter.Close())
1401 1 : }
1402 1 : if err != nil {
1403 0 : d.opts.Logger.Errorf("ingest error reading flushable for log %s: %s", m.logNum, err)
1404 0 : }
1405 : }
1406 :
1407 1 : if mem == nil {
1408 1 : // No overlap with any of the queued flushables, so no need to queue
1409 1 : // after them.
1410 1 :
1411 1 : // New writes with higher sequence numbers may be concurrently
1412 1 : // committed. We must ensure they don't flush before this ingest
1413 1 : // completes. To do that, we ref the mutable memtable as a writer,
1414 1 : // preventing its flushing (and the flushing of all subsequent
1415 1 : // flushables in the queue). Once we've acquired the manifest lock
1416 1 : // to add the ingested sstables to the LSM, we can unref as we're
1417 1 : // guaranteed that the flush won't edit the LSM before this ingest.
1418 1 : mut = d.mu.mem.mutable
1419 1 : mut.writerRef()
1420 1 : return
1421 1 : }
1422 : // The ingestion overlaps with some entry in the flushable queue.
1423 1 : if d.FormatMajorVersion() < FormatFlushableIngest ||
1424 1 : d.opts.Experimental.DisableIngestAsFlushable() ||
1425 1 : len(shared) > 0 || exciseSpan.Valid() || len(external) > 0 ||
1426 1 : (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {
1427 1 : // We're not able to ingest as a flushable,
1428 1 : // so we must synchronously flush.
1429 1 : //
1430 1 : // TODO(bilal): Currently, if any of the files being ingested are shared or
1431 1 : // there's an excise span present, we cannot use flushable ingests and need
1432 1 : // to wait synchronously. Either remove this caveat by fleshing out
1433 1 : // flushable ingest logic to also account for these cases, or remove this
1434 1 : // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676
1435 1 : if mem.flushable == d.mu.mem.mutable {
1436 1 : err = d.makeRoomForWrite(nil)
1437 1 : }
1438 : // New writes with higher sequence numbers may be concurrently
1439 : // committed. We must ensure they don't flush before this ingest
1440 : // completes. To do that, we ref the mutable memtable as a writer,
1441 : // preventing its flushing (and the flushing of all subsequent
1442 : // flushables in the queue). Once we've acquired the manifest lock
1443 : // to add the ingested sstables to the LSM, we can unref as we're
1444 : // guaranteed that the flush won't edit the LSM before this ingest.
1445 1 : mut = d.mu.mem.mutable
1446 1 : mut.writerRef()
1447 1 : mem.flushForced = true
1448 1 : d.maybeScheduleFlush()
1449 1 : return
1450 : }
1451 : // Since there aren't too many memtables already queued up, we can
1452 : // slide the ingested sstables on top of the existing memtables.
1453 1 : asFlushable = true
1454 1 : err = d.handleIngestAsFlushable(loadResult.localMeta, seqNum)
1455 : }
1456 :
1457 1 : var ve *versionEdit
1458 1 : apply := func(seqNum uint64) {
1459 1 : if err != nil || asFlushable {
1460 1 : // An error occurred during prepare.
1461 1 : if mut != nil {
1462 0 : if mut.writerUnref() {
1463 0 : d.mu.Lock()
1464 0 : d.maybeScheduleFlush()
1465 0 : d.mu.Unlock()
1466 0 : }
1467 : }
1468 1 : return
1469 : }
1470 :
1471 : // Update the sequence numbers for all ingested sstables'
1472 : // metadata. When the version edit is applied, the metadata is
1473 : // written to the manifest, persisting the sequence number.
1474 : // The sstables themselves are left unmodified.
1475 1 : if err = ingestUpdateSeqNum(
1476 1 : d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult,
1477 1 : ); err != nil {
1478 0 : if mut != nil {
1479 0 : if mut.writerUnref() {
1480 0 : d.mu.Lock()
1481 0 : d.maybeScheduleFlush()
1482 0 : d.mu.Unlock()
1483 0 : }
1484 : }
1485 0 : return
1486 : }
1487 :
1488 : // If we overlapped with a memtable in prepare wait for the flush to
1489 : // finish.
1490 1 : if mem != nil {
1491 1 : <-mem.flushed
1492 1 : }
1493 :
1494 : // Assign the sstables to the correct level in the LSM and apply the
1495 : // version edit.
1496 1 : ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan)
1497 : }
1498 :
1499 : // Only one ingest can occur at a time because if not, one would block waiting
1500 : // for the other to finish applying. This blocking would happen while holding
1501 : // the commit mutex which would prevent unrelated batches from writing their
1502 : // changes to the WAL and memtable. This will cause a bigger commit hiccup
1503 : // during ingestion.
1504 1 : d.commit.ingestSem <- struct{}{}
1505 1 : d.commit.AllocateSeqNum(loadResult.fileCount, prepare, apply)
1506 1 : <-d.commit.ingestSem
1507 1 :
1508 1 : if err != nil {
1509 0 : if err2 := ingestCleanup(d.objProvider, loadResult.localMeta); err2 != nil {
1510 0 : d.opts.Logger.Errorf("ingest cleanup failed: %v", err2)
1511 0 : }
1512 1 : } else {
1513 1 : // Since we either created a hard link to the ingesting files, or copied
1514 1 : // them over, it is safe to remove the originals paths.
1515 1 : for _, path := range loadResult.localPaths {
1516 1 : if err2 := d.opts.FS.Remove(path); err2 != nil {
1517 0 : d.opts.Logger.Errorf("ingest failed to remove original file: %s", err2)
1518 0 : }
1519 : }
1520 : }
1521 :
1522 1 : if invariants.Enabled {
1523 1 : for _, sharedMeta := range loadResult.sharedMeta {
1524 0 : d.checkVirtualBounds(sharedMeta)
1525 0 : }
1526 : }
1527 :
1528 1 : info := TableIngestInfo{
1529 1 : JobID: jobID,
1530 1 : Err: err,
1531 1 : flushable: asFlushable,
1532 1 : }
1533 1 : if len(loadResult.localMeta) > 0 {
1534 1 : info.GlobalSeqNum = loadResult.localMeta[0].SmallestSeqNum
1535 1 : } else if len(loadResult.sharedMeta) > 0 {
1536 0 : info.GlobalSeqNum = loadResult.sharedMeta[0].SmallestSeqNum
1537 0 : } else {
1538 0 : info.GlobalSeqNum = loadResult.externalMeta[0].SmallestSeqNum
1539 0 : }
1540 1 : var stats IngestOperationStats
1541 1 : if ve != nil {
1542 1 : info.Tables = make([]struct {
1543 1 : TableInfo
1544 1 : Level int
1545 1 : }, len(ve.NewFiles))
1546 1 : for i := range ve.NewFiles {
1547 1 : e := &ve.NewFiles[i]
1548 1 : info.Tables[i].Level = e.Level
1549 1 : info.Tables[i].TableInfo = e.Meta.TableInfo()
1550 1 : stats.Bytes += e.Meta.Size
1551 1 : if e.Level == 0 {
1552 1 : stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
1553 1 : }
1554 1 : if i < len(metaFlushableOverlaps) && metaFlushableOverlaps[i] {
1555 1 : stats.MemtableOverlappingFiles++
1556 1 : }
1557 : }
1558 1 : } else if asFlushable {
1559 1 : // NB: If asFlushable == true, there are no shared sstables.
1560 1 : info.Tables = make([]struct {
1561 1 : TableInfo
1562 1 : Level int
1563 1 : }, len(loadResult.localMeta))
1564 1 : for i, f := range loadResult.localMeta {
1565 1 : info.Tables[i].Level = -1
1566 1 : info.Tables[i].TableInfo = f.TableInfo()
1567 1 : stats.Bytes += f.Size
1568 1 : // We don't have exact stats on which files will be ingested into
1569 1 : // L0, because actual ingestion into the LSM has been deferred until
1570 1 : // flush time. Instead, we infer based on memtable overlap.
1571 1 : //
1572 1 : // TODO(jackson): If we optimistically compute data overlap (#2112)
1573 1 : // before entering the commit pipeline, we can use that overlap to
1574 1 : // improve our approximation by incorporating overlap with L0, not
1575 1 : // just memtables.
1576 1 : if metaFlushableOverlaps[i] {
1577 1 : stats.ApproxIngestedIntoL0Bytes += f.Size
1578 1 : stats.MemtableOverlappingFiles++
1579 1 : }
1580 : }
1581 : }
1582 1 : d.opts.EventListener.TableIngested(info)
1583 1 :
1584 1 : return stats, err
1585 : }
1586 :
1587 : // excise updates ve to include a replacement of the file m with new virtual
1588 : // sstables that exclude exciseSpan, returning a slice of newly-created files if
1589 : // any. If the entirety of m is deleted by exciseSpan, no new sstables are added
1590 : // and m is deleted. Note that ve is updated in-place.
1591 : //
1592 : // The manifest lock must be held when calling this method.
1593 : func (d *DB) excise(
1594 : exciseSpan KeyRange, m *fileMetadata, ve *versionEdit, level int,
1595 1 : ) ([]manifest.NewFileEntry, error) {
1596 1 : numCreatedFiles := 0
1597 1 : // Check if there's actually an overlap between m and exciseSpan.
1598 1 : if !exciseSpan.Overlaps(d.cmp, m) {
1599 0 : return nil, nil
1600 0 : }
1601 1 : ve.DeletedFiles[deletedFileEntry{
1602 1 : Level: level,
1603 1 : FileNum: m.FileNum,
1604 1 : }] = m
1605 1 : // Fast path: m sits entirely within the exciseSpan, so just delete it.
1606 1 : if exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
1607 0 : return nil, nil
1608 0 : }
1609 1 : var iter internalIterator
1610 1 : var rangeDelIter keyspan.FragmentIterator
1611 1 : var rangeKeyIter keyspan.FragmentIterator
1612 1 : needsBacking := false
1613 1 : // Create a file to the left of the excise span, if necessary.
1614 1 : // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)].
1615 1 : //
1616 1 : // We create bounds that are tight on user keys, and we make the effort to find
1617 1 : // the last key in the original sstable that's smaller than exciseSpan.Start
1618 1 : // even though it requires some sstable reads. We could choose to create
1619 1 : // virtual sstables on loose userKey bounds, in which case we could just set
1620 1 : // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest
1621 1 : // issue with that approach would be that it'd lead to lots of small virtual
1622 1 : // sstables in the LSM that have no guarantee on containing even a single user
1623 1 : // key within the file bounds. This has the potential to increase both read and
1624 1 : // write-amp as we will be opening up these sstables only to find no relevant
1625 1 : // keys in the read path, and compacting sstables on top of them instead of
1626 1 : // directly into the space occupied by them. We choose to incur the cost of
1627 1 : // calculating tight bounds at this time instead of creating more work in the
1628 1 : // future.
1629 1 : //
1630 1 : // TODO(bilal): Some of this work can happen without grabbing the manifest
1631 1 : // lock; we could grab one currentVersion, release the lock, calculate excised
1632 1 : // files, then grab the lock again and recalculate for just the files that
1633 1 : // have changed since our previous calculation. Do this optimiaztino as part of
1634 1 : // https://github.com/cockroachdb/pebble/issues/2112 .
1635 1 : if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 {
1636 1 : leftFile := &fileMetadata{
1637 1 : Virtual: true,
1638 1 : FileBacking: m.FileBacking,
1639 1 : FileNum: d.mu.versions.getNextFileNum(),
1640 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1641 1 : // sufficient for maintaining correctness.
1642 1 : SmallestSeqNum: m.SmallestSeqNum,
1643 1 : LargestSeqNum: m.LargestSeqNum,
1644 1 : }
1645 1 : if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.SmallestPointKey) {
1646 1 : // This file will contain point keys
1647 1 : smallestPointKey := m.SmallestPointKey
1648 1 : var err error
1649 1 : iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{})
1650 1 : if err != nil {
1651 0 : return nil, err
1652 0 : }
1653 1 : var key *InternalKey
1654 1 : if iter != nil {
1655 1 : defer iter.Close()
1656 1 : key, _ = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone)
1657 1 : } else {
1658 0 : iter = emptyIter
1659 0 : }
1660 1 : if key != nil {
1661 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, key.Clone())
1662 1 : }
1663 : // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This
1664 : // needs to be a copy if the key is owned by the range del iter.
1665 1 : var lastRangeDel []byte
1666 1 : if rangeDelIter != nil {
1667 1 : defer rangeDelIter.Close()
1668 1 : rdel := rangeDelIter.SeekLT(exciseSpan.Start)
1669 1 : if rdel != nil {
1670 1 : lastRangeDel = append(lastRangeDel[:0], rdel.End...)
1671 1 : if d.cmp(lastRangeDel, exciseSpan.Start) > 0 {
1672 0 : lastRangeDel = exciseSpan.Start
1673 0 : }
1674 : }
1675 1 : } else {
1676 1 : rangeDelIter = emptyKeyspanIter
1677 1 : }
1678 1 : if lastRangeDel != nil {
1679 1 : leftFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel))
1680 1 : }
1681 : }
1682 1 : if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.SmallestRangeKey) {
1683 1 : // This file will contain range keys
1684 1 : var err error
1685 1 : smallestRangeKey := m.SmallestRangeKey
1686 1 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
1687 1 : if err != nil {
1688 0 : return nil, err
1689 0 : }
1690 : // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This
1691 : // needs to be a copy if the key is owned by the range key iter.
1692 1 : var lastRangeKey []byte
1693 1 : var lastRangeKeyKind InternalKeyKind
1694 1 : defer rangeKeyIter.Close()
1695 1 : rkey := rangeKeyIter.SeekLT(exciseSpan.Start)
1696 1 : if rkey != nil {
1697 1 : lastRangeKey = append(lastRangeKey[:0], rkey.End...)
1698 1 : if d.cmp(lastRangeKey, exciseSpan.Start) > 0 {
1699 0 : lastRangeKey = exciseSpan.Start
1700 0 : }
1701 1 : lastRangeKeyKind = rkey.Keys[0].Kind()
1702 : }
1703 1 : if lastRangeKey != nil {
1704 1 : leftFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey))
1705 1 : }
1706 : }
1707 1 : if leftFile.HasRangeKeys || leftFile.HasPointKeys {
1708 1 : var err error
1709 1 : leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey)
1710 1 : if err != nil {
1711 0 : return nil, err
1712 0 : }
1713 1 : if leftFile.Size == 0 {
1714 0 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1715 0 : // such as if the excised file only has range keys/dels and no point
1716 0 : // keys. This can cause panics in places where we divide by file sizes.
1717 0 : // Correct for it here.
1718 0 : leftFile.Size = 1
1719 0 : }
1720 1 : if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1721 0 : return nil, err
1722 0 : }
1723 1 : leftFile.ValidateVirtual(m)
1724 1 : d.checkVirtualBounds(leftFile)
1725 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile})
1726 1 : needsBacking = true
1727 1 : numCreatedFiles++
1728 : }
1729 : }
1730 : // Create a file to the right, if necessary.
1731 1 : if exciseSpan.Contains(d.cmp, m.Largest) {
1732 0 : // No key exists to the right of the excise span in this file.
1733 0 : if needsBacking && !m.Virtual {
1734 0 : // If m is virtual, then its file backing is already known to the manifest.
1735 0 : // We don't need to create another file backing. Note that there must be
1736 0 : // only one CreatedBackingTables entry per backing sstable. This is
1737 0 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1738 0 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1739 0 : }
1740 0 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1741 : }
1742 : // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest].
1743 : //
1744 : // See comment before the definition of leftFile for the motivation behind
1745 : // calculating tight user-key bounds.
1746 1 : rightFile := &fileMetadata{
1747 1 : Virtual: true,
1748 1 : FileBacking: m.FileBacking,
1749 1 : FileNum: d.mu.versions.getNextFileNum(),
1750 1 : // Note that these are loose bounds for smallest/largest seqnums, but they're
1751 1 : // sufficient for maintaining correctness.
1752 1 : SmallestSeqNum: m.SmallestSeqNum,
1753 1 : LargestSeqNum: m.LargestSeqNum,
1754 1 : }
1755 1 : if m.HasPointKeys && !exciseSpan.Contains(d.cmp, m.LargestPointKey) {
1756 1 : // This file will contain point keys
1757 1 : largestPointKey := m.LargestPointKey
1758 1 : var err error
1759 1 : if iter == nil && rangeDelIter == nil {
1760 0 : iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{})
1761 0 : if err != nil {
1762 0 : return nil, err
1763 0 : }
1764 0 : if iter != nil {
1765 0 : defer iter.Close()
1766 0 : } else {
1767 0 : iter = emptyIter
1768 0 : }
1769 0 : if rangeDelIter != nil {
1770 0 : defer rangeDelIter.Close()
1771 0 : } else {
1772 0 : rangeDelIter = emptyKeyspanIter
1773 0 : }
1774 : }
1775 1 : key, _ := iter.SeekGE(exciseSpan.End, base.SeekGEFlagsNone)
1776 1 : if key != nil {
1777 1 : rightFile.ExtendPointKeyBounds(d.cmp, key.Clone(), largestPointKey)
1778 1 : }
1779 : // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This
1780 : // needs to be a copy if the key is owned by the range del iter.
1781 1 : var firstRangeDel []byte
1782 1 : rdel := rangeDelIter.SeekGE(exciseSpan.End)
1783 1 : if rdel != nil {
1784 1 : firstRangeDel = append(firstRangeDel[:0], rdel.Start...)
1785 1 : if d.cmp(firstRangeDel, exciseSpan.End) < 0 {
1786 0 : firstRangeDel = exciseSpan.End
1787 0 : }
1788 : }
1789 1 : if firstRangeDel != nil {
1790 1 : smallestPointKey := rdel.SmallestKey()
1791 1 : smallestPointKey.UserKey = firstRangeDel
1792 1 : rightFile.ExtendPointKeyBounds(d.cmp, smallestPointKey, largestPointKey)
1793 1 : }
1794 : }
1795 1 : if m.HasRangeKeys && !exciseSpan.Contains(d.cmp, m.LargestRangeKey) {
1796 1 : // This file will contain range keys.
1797 1 : largestRangeKey := m.LargestRangeKey
1798 1 : if rangeKeyIter == nil {
1799 0 : var err error
1800 0 : rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
1801 0 : if err != nil {
1802 0 : return nil, err
1803 0 : }
1804 0 : defer rangeKeyIter.Close()
1805 : }
1806 : // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This
1807 : // needs to be a copy if the key is owned by the range key iter.
1808 1 : var firstRangeKey []byte
1809 1 : rkey := rangeKeyIter.SeekGE(exciseSpan.End)
1810 1 : if rkey != nil {
1811 1 : firstRangeKey = append(firstRangeKey[:0], rkey.Start...)
1812 1 : if d.cmp(firstRangeKey, exciseSpan.End) < 0 {
1813 0 : firstRangeKey = exciseSpan.End
1814 0 : }
1815 : }
1816 1 : if firstRangeKey != nil {
1817 1 : smallestRangeKey := rkey.SmallestKey()
1818 1 : smallestRangeKey.UserKey = firstRangeKey
1819 1 : // We call ExtendRangeKeyBounds so any internal boundType fields are
1820 1 : // set correctly. Note that this is mildly wasteful as we'll be comparing
1821 1 : // rightFile.{Smallest,Largest}RangeKey with themselves, which can be
1822 1 : // avoided if we exported ExtendOverallKeyBounds or so.
1823 1 : rightFile.ExtendRangeKeyBounds(d.cmp, smallestRangeKey, largestRangeKey)
1824 1 : }
1825 : }
1826 1 : if rightFile.HasRangeKeys || rightFile.HasPointKeys {
1827 1 : var err error
1828 1 : rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey)
1829 1 : if err != nil {
1830 0 : return nil, err
1831 0 : }
1832 1 : if rightFile.Size == 0 {
1833 1 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size,
1834 1 : // such as if the excised file only has range keys/dels and no point keys.
1835 1 : // This can cause panics in places where we divide by file sizes. Correct
1836 1 : // for it here.
1837 1 : rightFile.Size = 1
1838 1 : }
1839 1 : rightFile.ValidateVirtual(m)
1840 1 : d.checkVirtualBounds(rightFile)
1841 1 : ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile})
1842 1 : needsBacking = true
1843 1 : numCreatedFiles++
1844 : }
1845 :
1846 1 : if needsBacking && !m.Virtual {
1847 1 : // If m is virtual, then its file backing is already known to the manifest.
1848 1 : // We don't need to create another file backing. Note that there must be
1849 1 : // only one CreatedBackingTables entry per backing sstable. This is
1850 1 : // indicated by the VersionEdit.CreatedBackingTables invariant.
1851 1 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
1852 1 : }
1853 :
1854 1 : if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
1855 0 : return nil, err
1856 0 : }
1857 1 : return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil
1858 : }
1859 :
1860 : type ingestTargetLevelFunc func(
1861 : newIters tableNewIters,
1862 : newRangeKeyIter keyspan.TableNewSpanIter,
1863 : iterOps IterOptions,
1864 : comparer *Comparer,
1865 : v *version,
1866 : baseLevel int,
1867 : compactions map[*compaction]struct{},
1868 : meta *fileMetadata,
1869 : suggestSplit bool,
1870 : ) (int, *fileMetadata, error)
1871 :
1872 : type ingestSplitFile struct {
1873 : // ingestFile is the file being ingested.
1874 : ingestFile *fileMetadata
1875 : // splitFile is the file that needs to be split to allow ingestFile to slot
1876 : // into `level` level.
1877 : splitFile *fileMetadata
1878 : // The level where ingestFile will go (and where splitFile already is).
1879 : level int
1880 : }
1881 :
1882 : // ingestSplit splits files specified in `files` and updates ve in-place to
1883 : // account for existing files getting split into two virtual sstables. The map
1884 : // `replacedFiles` contains an in-progress map of all files that have been
1885 : // replaced with new virtual sstables in this version edit so far, which is also
1886 : // updated in-place.
1887 : //
1888 : // d.mu as well as the manifest lock must be held when calling this method.
1889 : func (d *DB) ingestSplit(
1890 : ve *versionEdit,
1891 : updateMetrics func(*fileMetadata, int, []newFileEntry),
1892 : files []ingestSplitFile,
1893 : replacedFiles map[base.FileNum][]newFileEntry,
1894 1 : ) error {
1895 1 : for _, s := range files {
1896 1 : // replacedFiles can be thought of as a tree, where we start iterating with
1897 1 : // s.splitFile and run its fileNum through replacedFiles, then find which of
1898 1 : // the replaced files overlaps with s.ingestFile, which becomes the new
1899 1 : // splitFile, then we check splitFile's replacements in replacedFiles again
1900 1 : // for overlap with s.ingestFile, and so on until we either can't find the
1901 1 : // current splitFile in replacedFiles (i.e. that's the file that now needs to
1902 1 : // be split), or we don't find a file that overlaps with s.ingestFile, which
1903 1 : // means a prior ingest split already produced enough room for s.ingestFile
1904 1 : // to go into this level without necessitating another ingest split.
1905 1 : splitFile := s.splitFile
1906 1 : for splitFile != nil {
1907 1 : replaced, ok := replacedFiles[splitFile.FileNum]
1908 1 : if !ok {
1909 1 : break
1910 : }
1911 1 : updatedSplitFile := false
1912 1 : for i := range replaced {
1913 1 : if replaced[i].Meta.Overlaps(d.cmp, s.ingestFile.Smallest.UserKey, s.ingestFile.Largest.UserKey, s.ingestFile.Largest.IsExclusiveSentinel()) {
1914 1 : if updatedSplitFile {
1915 0 : // This should never happen because the earlier ingestTargetLevel
1916 0 : // function only finds split file candidates that are guaranteed to
1917 0 : // have no data overlap, only boundary overlap. See the comments
1918 0 : // in that method to see the definitions of data vs boundary
1919 0 : // overlap. That, plus the fact that files in `replaced` are
1920 0 : // guaranteed to have file bounds that are tight on user keys
1921 0 : // (as that's what `d.excise` produces), means that the only case
1922 0 : // where we overlap with two or more files in `replaced` is if we
1923 0 : // actually had data overlap all along, or if the ingestion files
1924 0 : // were overlapping, either of which is an invariant violation.
1925 0 : panic("updated with two files in ingestSplit")
1926 : }
1927 1 : splitFile = replaced[i].Meta
1928 1 : updatedSplitFile = true
1929 : }
1930 : }
1931 1 : if !updatedSplitFile {
1932 0 : // None of the replaced files overlapped with the file being ingested.
1933 0 : // This can happen if we've already excised a span overlapping with
1934 0 : // this file, or if we have consecutive ingested files that can slide
1935 0 : // within the same gap between keys in an existing file. For instance,
1936 0 : // if an existing file has keys a and g and we're ingesting b-c, d-e,
1937 0 : // the first loop iteration will split the existing file into one that
1938 0 : // ends in a and another that starts at g, and the second iteration will
1939 0 : // fall into this case and require no splitting.
1940 0 : //
1941 0 : // No splitting necessary.
1942 0 : splitFile = nil
1943 0 : }
1944 : }
1945 1 : if splitFile == nil {
1946 0 : continue
1947 : }
1948 : // NB: excise operates on [start, end). We're splitting at [start, end]
1949 : // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
1950 : // of exclusive vs inclusive end bounds should not make a difference here
1951 : // as we're guaranteed to not have any data overlap between splitFile and
1952 : // s.ingestFile, so panic if we do see a newly added file with an endKey
1953 : // equalling s.ingestFile.Largest, and !s.ingestFile.Largest.IsExclusiveSentinel()
1954 1 : added, err := d.excise(KeyRange{Start: s.ingestFile.Smallest.UserKey, End: s.ingestFile.Largest.UserKey}, splitFile, ve, s.level)
1955 1 : if err != nil {
1956 0 : return err
1957 0 : }
1958 1 : if _, ok := ve.DeletedFiles[deletedFileEntry{
1959 1 : Level: s.level,
1960 1 : FileNum: splitFile.FileNum,
1961 1 : }]; !ok {
1962 0 : panic("did not split file that was expected to be split")
1963 : }
1964 1 : replacedFiles[splitFile.FileNum] = added
1965 1 : for i := range added {
1966 1 : if s.ingestFile.Overlaps(d.cmp, added[i].Meta.Smallest.UserKey, added[i].Meta.Largest.UserKey, added[i].Meta.Largest.IsExclusiveSentinel()) {
1967 0 : panic("ingest-time split produced a file that overlaps with ingested file")
1968 : }
1969 : }
1970 1 : updateMetrics(splitFile, s.level, added)
1971 : }
1972 : // Flatten the version edit by removing any entries from ve.NewFiles that
1973 : // are also in ve.DeletedFiles.
1974 1 : newNewFiles := ve.NewFiles[:0]
1975 1 : for i := range ve.NewFiles {
1976 1 : fn := ve.NewFiles[i].Meta.FileNum
1977 1 : deEntry := deletedFileEntry{Level: ve.NewFiles[i].Level, FileNum: fn}
1978 1 : if _, ok := ve.DeletedFiles[deEntry]; ok {
1979 1 : delete(ve.DeletedFiles, deEntry)
1980 1 : } else {
1981 1 : newNewFiles = append(newNewFiles, ve.NewFiles[i])
1982 1 : }
1983 : }
1984 1 : ve.NewFiles = newNewFiles
1985 1 : return nil
1986 : }
1987 :
1988 : func (d *DB) ingestApply(
1989 : jobID int,
1990 : lr ingestLoadResult,
1991 : findTargetLevel ingestTargetLevelFunc,
1992 : mut *memTable,
1993 : exciseSpan KeyRange,
1994 1 : ) (*versionEdit, error) {
1995 1 : d.mu.Lock()
1996 1 : defer d.mu.Unlock()
1997 1 :
1998 1 : ve := &versionEdit{
1999 1 : NewFiles: make([]newFileEntry, lr.fileCount),
2000 1 : }
2001 1 : if exciseSpan.Valid() || (d.opts.Experimental.IngestSplit != nil && d.opts.Experimental.IngestSplit()) {
2002 1 : ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
2003 1 : }
2004 1 : metrics := make(map[int]*LevelMetrics)
2005 1 :
2006 1 : // Lock the manifest for writing before we use the current version to
2007 1 : // determine the target level. This prevents two concurrent ingestion jobs
2008 1 : // from using the same version to determine the target level, and also
2009 1 : // provides serialization with concurrent compaction and flush jobs.
2010 1 : // logAndApply unconditionally releases the manifest lock, but any earlier
2011 1 : // returns must unlock the manifest.
2012 1 : d.mu.versions.logLock()
2013 1 :
2014 1 : if mut != nil {
2015 1 : // Unref the mutable memtable to allows its flush to proceed. Now that we've
2016 1 : // acquired the manifest lock, we can be certain that if the mutable
2017 1 : // memtable has received more recent conflicting writes, the flush won't
2018 1 : // beat us to applying to the manifest resulting in sequence number
2019 1 : // inversion. Even though we call maybeScheduleFlush right now, this flush
2020 1 : // will apply after our ingestion.
2021 1 : if mut.writerUnref() {
2022 1 : d.maybeScheduleFlush()
2023 1 : }
2024 : }
2025 :
2026 1 : shouldIngestSplit := d.opts.Experimental.IngestSplit != nil &&
2027 1 : d.opts.Experimental.IngestSplit() && d.FormatMajorVersion() >= FormatVirtualSSTables
2028 1 : current := d.mu.versions.currentVersion()
2029 1 : baseLevel := d.mu.versions.picker.getBaseLevel()
2030 1 : iterOps := IterOptions{logger: d.opts.Logger}
2031 1 : // filesToSplit is a list where each element is a pair consisting of a file
2032 1 : // being ingested and a file being split to make room for an ingestion into
2033 1 : // that level. Each ingested file will appear at most once in this list. It
2034 1 : // is possible for split files to appear twice in this list.
2035 1 : filesToSplit := make([]ingestSplitFile, 0)
2036 1 : checkCompactions := false
2037 1 : for i := 0; i < lr.fileCount; i++ {
2038 1 : // Determine the lowest level in the LSM for which the sstable doesn't
2039 1 : // overlap any existing files in the level.
2040 1 : var m *fileMetadata
2041 1 : sharedIdx := -1
2042 1 : sharedLevel := -1
2043 1 : externalFile := false
2044 1 : if i < len(lr.localMeta) {
2045 1 : // local file.
2046 1 : m = lr.localMeta[i]
2047 1 : } else if (i - len(lr.localMeta)) < len(lr.sharedMeta) {
2048 0 : // shared file.
2049 0 : sharedIdx = i - len(lr.localMeta)
2050 0 : m = lr.sharedMeta[sharedIdx]
2051 0 : sharedLevel = int(lr.sharedLevels[sharedIdx])
2052 0 : } else {
2053 0 : // external file.
2054 0 : externalFile = true
2055 0 : m = lr.externalMeta[i-(len(lr.localMeta)+len(lr.sharedMeta))]
2056 0 : }
2057 1 : f := &ve.NewFiles[i]
2058 1 : var err error
2059 1 : if sharedIdx >= 0 {
2060 0 : f.Level = sharedLevel
2061 0 : if f.Level < sharedLevelsStart {
2062 0 : panic("cannot slot a shared file higher than the highest shared level")
2063 : }
2064 0 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2065 1 : } else {
2066 1 : if externalFile {
2067 0 : ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
2068 0 : }
2069 1 : var splitFile *fileMetadata
2070 1 : if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
2071 0 : // This file fits perfectly within the excise span. We can slot it at
2072 0 : // L6, or sharedLevelsStart - 1 if we have shared files.
2073 0 : if len(lr.sharedMeta) > 0 {
2074 0 : f.Level = sharedLevelsStart - 1
2075 0 : if baseLevel > f.Level {
2076 0 : f.Level = 0
2077 0 : }
2078 0 : } else {
2079 0 : f.Level = 6
2080 0 : }
2081 1 : } else {
2082 1 : // TODO(bilal): findTargetLevel does disk IO (reading files for data
2083 1 : // overlap) even though we're holding onto d.mu. Consider unlocking
2084 1 : // d.mu while we do this. We already hold versions.logLock so we should
2085 1 : // not see any version applications while we're at this. The one
2086 1 : // complication here would be pulling out the mu.compact.inProgress
2087 1 : // check from findTargetLevel, as that requires d.mu to be held.
2088 1 : f.Level, splitFile, err = findTargetLevel(
2089 1 : d.newIters, d.tableNewRangeKeyIter, iterOps, d.opts.Comparer, current, baseLevel, d.mu.compact.inProgress, m, shouldIngestSplit)
2090 1 : }
2091 :
2092 1 : if splitFile != nil {
2093 1 : if invariants.Enabled {
2094 1 : if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil {
2095 0 : panic("splitFile returned is not in level it should be")
2096 : }
2097 : }
2098 : // We take advantage of the fact that we won't drop the db mutex
2099 : // between now and the call to logAndApply. So, no files should
2100 : // get added to a new in-progress compaction at this point. We can
2101 : // avoid having to iterate on in-progress compactions to cancel them
2102 : // if none of the files being split have a compacting state.
2103 1 : if splitFile.IsCompacting() {
2104 1 : checkCompactions = true
2105 1 : }
2106 1 : filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
2107 : }
2108 : }
2109 1 : if err != nil {
2110 0 : d.mu.versions.logUnlock()
2111 0 : return nil, err
2112 0 : }
2113 1 : f.Meta = m
2114 1 : levelMetrics := metrics[f.Level]
2115 1 : if levelMetrics == nil {
2116 1 : levelMetrics = &LevelMetrics{}
2117 1 : metrics[f.Level] = levelMetrics
2118 1 : }
2119 1 : levelMetrics.NumFiles++
2120 1 : levelMetrics.Size += int64(m.Size)
2121 1 : levelMetrics.BytesIngested += m.Size
2122 1 : levelMetrics.TablesIngested++
2123 : }
2124 : // replacedFiles maps files excised due to exciseSpan (or splitFiles returned
2125 : // by ingestTargetLevel), to files that were created to replace it. This map
2126 : // is used to resolve references to split files in filesToSplit, as it is
2127 : // possible for a file that we want to split to no longer exist or have a
2128 : // newer fileMetadata due to a split induced by another ingestion file, or an
2129 : // excise.
2130 1 : replacedFiles := make(map[base.FileNum][]newFileEntry)
2131 1 : updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
2132 1 : levelMetrics := metrics[level]
2133 1 : if levelMetrics == nil {
2134 0 : levelMetrics = &LevelMetrics{}
2135 0 : metrics[level] = levelMetrics
2136 0 : }
2137 1 : levelMetrics.NumFiles--
2138 1 : levelMetrics.Size -= int64(m.Size)
2139 1 : for i := range added {
2140 1 : levelMetrics.NumFiles++
2141 1 : levelMetrics.Size += int64(added[i].Meta.Size)
2142 1 : }
2143 : }
2144 1 : if exciseSpan.Valid() {
2145 0 : // Iterate through all levels and find files that intersect with exciseSpan.
2146 0 : //
2147 0 : // TODO(bilal): We could drop the DB mutex here as we don't need it for
2148 0 : // excises; we only need to hold the version lock which we already are
2149 0 : // holding. However releasing the DB mutex could mess with the
2150 0 : // ingestTargetLevel calculation that happened above, as it assumed that it
2151 0 : // had a complete view of in-progress compactions that wouldn't change
2152 0 : // until logAndApply is called. If we were to drop the mutex now, we could
2153 0 : // schedule another in-progress compaction that would go into the chosen target
2154 0 : // level and lead to file overlap within level (which would panic in
2155 0 : // logAndApply). We should drop the db mutex here, do the excise, then
2156 0 : // re-grab the DB mutex and rerun just the in-progress compaction check to
2157 0 : // see if any new compactions are conflicting with our chosen target levels
2158 0 : // for files, and if they are, we should signal those compactions to error
2159 0 : // out.
2160 0 : for level := range current.Levels {
2161 0 : overlaps := current.Overlaps(level, d.cmp, exciseSpan.Start, exciseSpan.End, true /* exclusiveEnd */)
2162 0 : iter := overlaps.Iter()
2163 0 :
2164 0 : for m := iter.First(); m != nil; m = iter.Next() {
2165 0 : newFiles, err := d.excise(exciseSpan, m, ve, level)
2166 0 : if err != nil {
2167 0 : return nil, err
2168 0 : }
2169 :
2170 0 : if _, ok := ve.DeletedFiles[deletedFileEntry{
2171 0 : Level: level,
2172 0 : FileNum: m.FileNum,
2173 0 : }]; !ok {
2174 0 : // We did not excise this file.
2175 0 : continue
2176 : }
2177 0 : replacedFiles[m.FileNum] = newFiles
2178 0 : updateLevelMetricsOnExcise(m, level, newFiles)
2179 : }
2180 : }
2181 : }
2182 1 : if len(filesToSplit) > 0 {
2183 1 : // For the same reasons as the above call to excise, we hold the db mutex
2184 1 : // while calling this method.
2185 1 : if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
2186 0 : return nil, err
2187 0 : }
2188 : }
2189 1 : if len(filesToSplit) > 0 || exciseSpan.Valid() {
2190 1 : for c := range d.mu.compact.inProgress {
2191 1 : if c.versionEditApplied {
2192 0 : continue
2193 : }
2194 : // Check if this compaction overlaps with the excise span. Note that just
2195 : // checking if the inputs individually overlap with the excise span
2196 : // isn't sufficient; for instance, a compaction could have [a,b] and [e,f]
2197 : // as inputs and write it all out as [a,b,e,f] in one sstable. If we're
2198 : // doing a [c,d) excise at the same time as this compaction, we will have
2199 : // to error out the whole compaction as we can't guarantee it hasn't/won't
2200 : // write a file overlapping with the excise span.
2201 1 : if exciseSpan.OverlapsInternalKeyRange(d.cmp, c.smallest, c.largest) {
2202 0 : c.cancel.Store(true)
2203 0 : }
2204 : // Check if this compaction's inputs have been replaced due to an
2205 : // ingest-time split. In that case, cancel the compaction as a newly picked
2206 : // compaction would need to include any new files that slid in between
2207 : // previously-existing files. Note that we cancel any compaction that has a
2208 : // file that was ingest-split as an input, even if it started before this
2209 : // ingestion.
2210 1 : if checkCompactions {
2211 1 : for i := range c.inputs {
2212 1 : iter := c.inputs[i].files.Iter()
2213 1 : for f := iter.First(); f != nil; f = iter.Next() {
2214 1 : if _, ok := replacedFiles[f.FileNum]; ok {
2215 1 : c.cancel.Store(true)
2216 1 : break
2217 : }
2218 : }
2219 : }
2220 : }
2221 : }
2222 : // Check for any EventuallyFileOnlySnapshots that could be watching for
2223 : // an excise on this span.
2224 1 : if exciseSpan.Valid() {
2225 0 : for s := d.mu.snapshots.root.next; s != &d.mu.snapshots.root; s = s.next {
2226 0 : if s.efos == nil {
2227 0 : continue
2228 : }
2229 0 : efos := s.efos
2230 0 : // TODO(bilal): We can make this faster by taking advantage of the sorted
2231 0 : // nature of protectedRanges to do a sort.Search, or even maintaining a
2232 0 : // global list of all protected ranges instead of having to peer into every
2233 0 : // snapshot.
2234 0 : for i := range efos.protectedRanges {
2235 0 : if efos.protectedRanges[i].OverlapsKeyRange(d.cmp, exciseSpan) {
2236 0 : efos.excised.Store(true)
2237 0 : break
2238 : }
2239 : }
2240 : }
2241 : }
2242 : }
2243 1 : if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
2244 1 : return d.getInProgressCompactionInfoLocked(nil)
2245 1 : }); err != nil {
2246 0 : return nil, err
2247 0 : }
2248 :
2249 1 : d.mu.versions.metrics.Ingest.Count++
2250 1 :
2251 1 : d.updateReadStateLocked(d.opts.DebugCheck)
2252 1 : // updateReadStateLocked could have generated obsolete tables, schedule a
2253 1 : // cleanup job if necessary.
2254 1 : d.deleteObsoleteFiles(jobID)
2255 1 : d.updateTableStatsLocked(ve.NewFiles)
2256 1 : // The ingestion may have pushed a level over the threshold for compaction,
2257 1 : // so check to see if one is necessary and schedule it.
2258 1 : d.maybeScheduleCompaction()
2259 1 : var toValidate []manifest.NewFileEntry
2260 1 : dedup := make(map[base.DiskFileNum]struct{})
2261 1 : for _, entry := range ve.NewFiles {
2262 1 : if _, ok := dedup[entry.Meta.FileBacking.DiskFileNum]; !ok {
2263 1 : toValidate = append(toValidate, entry)
2264 1 : dedup[entry.Meta.FileBacking.DiskFileNum] = struct{}{}
2265 1 : }
2266 : }
2267 1 : d.maybeValidateSSTablesLocked(toValidate)
2268 1 : return ve, nil
2269 : }
2270 :
2271 : // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending
2272 : // queue of files to be validated, when the feature is enabled.
2273 : //
2274 : // Note that if two entries with the same backing file are added twice, then the
2275 : // block checksums for the backing file will be validated twice.
2276 : //
2277 : // DB.mu must be locked when calling.
2278 1 : func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) {
2279 1 : // Only add to the validation queue when the feature is enabled.
2280 1 : if !d.opts.Experimental.ValidateOnIngest {
2281 1 : return
2282 1 : }
2283 :
2284 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...)
2285 1 : if d.shouldValidateSSTablesLocked() {
2286 1 : go d.validateSSTables()
2287 1 : }
2288 : }
2289 :
2290 : // shouldValidateSSTablesLocked returns true if SSTable validation should run.
2291 : // DB.mu must be locked when calling.
2292 1 : func (d *DB) shouldValidateSSTablesLocked() bool {
2293 1 : return !d.mu.tableValidation.validating &&
2294 1 : d.closed.Load() == nil &&
2295 1 : d.opts.Experimental.ValidateOnIngest &&
2296 1 : len(d.mu.tableValidation.pending) > 0
2297 1 : }
2298 :
2299 : // validateSSTables runs a round of validation on the tables in the pending
2300 : // queue.
2301 1 : func (d *DB) validateSSTables() {
2302 1 : d.mu.Lock()
2303 1 : if !d.shouldValidateSSTablesLocked() {
2304 1 : d.mu.Unlock()
2305 1 : return
2306 1 : }
2307 :
2308 1 : pending := d.mu.tableValidation.pending
2309 1 : d.mu.tableValidation.pending = nil
2310 1 : d.mu.tableValidation.validating = true
2311 1 : jobID := d.mu.nextJobID
2312 1 : d.mu.nextJobID++
2313 1 : rs := d.loadReadState()
2314 1 :
2315 1 : // Drop DB.mu before performing IO.
2316 1 : d.mu.Unlock()
2317 1 :
2318 1 : // Validate all tables in the pending queue. This could lead to a situation
2319 1 : // where we are starving IO from other tasks due to having to page through
2320 1 : // all the blocks in all the sstables in the queue.
2321 1 : // TODO(travers): Add some form of pacing to avoid IO starvation.
2322 1 :
2323 1 : // If we fail to validate any files due to reasons other than uncovered
2324 1 : // corruption, accumulate them and re-queue them for another attempt.
2325 1 : var retry []manifest.NewFileEntry
2326 1 :
2327 1 : for _, f := range pending {
2328 1 : // The file may have been moved or deleted since it was ingested, in
2329 1 : // which case we skip.
2330 1 : if !rs.current.Contains(f.Level, d.cmp, f.Meta) {
2331 1 : // Assume the file was moved to a lower level. It is rare enough
2332 1 : // that a table is moved or deleted between the time it was ingested
2333 1 : // and the time the validation routine runs that the overall cost of
2334 1 : // this inner loop is tolerably low, when amortized over all
2335 1 : // ingested tables.
2336 1 : found := false
2337 1 : for i := f.Level + 1; i < numLevels; i++ {
2338 1 : if rs.current.Contains(i, d.cmp, f.Meta) {
2339 1 : found = true
2340 1 : break
2341 : }
2342 : }
2343 1 : if !found {
2344 1 : continue
2345 : }
2346 : }
2347 :
2348 1 : var err error
2349 1 : if f.Meta.Virtual {
2350 1 : err = d.tableCache.withVirtualReader(
2351 1 : f.Meta.VirtualMeta(), func(v sstable.VirtualReader) error {
2352 1 : return v.ValidateBlockChecksumsOnBacking()
2353 1 : })
2354 1 : } else {
2355 1 : err = d.tableCache.withReader(
2356 1 : f.Meta.PhysicalMeta(), func(r *sstable.Reader) error {
2357 1 : return r.ValidateBlockChecksums()
2358 1 : })
2359 : }
2360 :
2361 1 : if err != nil {
2362 0 : if IsCorruptionError(err) {
2363 0 : // TODO(travers): Hook into the corruption reporting pipeline, once
2364 0 : // available. See pebble#1192.
2365 0 : d.opts.Logger.Fatalf("pebble: encountered corruption during ingestion: %s", err)
2366 0 : } else {
2367 0 : // If there was some other, possibly transient, error that
2368 0 : // caused table validation to fail inform the EventListener and
2369 0 : // move on. We remember the table so that we can retry it in a
2370 0 : // subsequent table validation job.
2371 0 : //
2372 0 : // TODO(jackson): If the error is not transient, this will retry
2373 0 : // validation indefinitely. While not great, it's the same
2374 0 : // behavior as erroring flushes and compactions. We should
2375 0 : // address this as a part of #270.
2376 0 : d.opts.EventListener.BackgroundError(err)
2377 0 : retry = append(retry, f)
2378 0 : continue
2379 : }
2380 : }
2381 :
2382 1 : d.opts.EventListener.TableValidated(TableValidatedInfo{
2383 1 : JobID: jobID,
2384 1 : Meta: f.Meta,
2385 1 : })
2386 : }
2387 1 : rs.unref()
2388 1 : d.mu.Lock()
2389 1 : defer d.mu.Unlock()
2390 1 : d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, retry...)
2391 1 : d.mu.tableValidation.validating = false
2392 1 : d.mu.tableValidation.cond.Broadcast()
2393 1 : if d.shouldValidateSSTablesLocked() {
2394 1 : go d.validateSSTables()
2395 1 : }
2396 : }
|