Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 :
11 : "github.com/cockroachdb/errors"
12 : "github.com/cockroachdb/pebble/internal/base"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/objstorage"
17 : "github.com/cockroachdb/pebble/objstorage/remote"
18 : "github.com/cockroachdb/pebble/sstable"
19 : )
20 :
21 : const (
22 : // In skip-shared iteration mode, keys in levels sharedLevelsStart and greater
23 : // (i.e. lower in the LSM) are skipped.
24 : sharedLevelsStart = remote.SharedLevelsStart
25 : )
26 :
27 : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
28 : // with a shared file visitor function, and a file in a shareable level (i.e.
29 : // level >= sharedLevelsStart) was found to not be in shared storage according
30 : // to objstorage.Provider, or not shareable for another reason such as for
31 : // containing keys newer than the snapshot sequence number.
32 : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
33 :
34 : // SharedSSTMeta represents an sstable on shared storage that can be ingested
35 : // by another pebble instance. This struct must contain all fields that are
36 : // required for a Pebble instance to ingest a foreign sstable on shared storage,
37 : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
38 : // data structures, as well as creating virtual FileMetadatas.
39 : //
40 : // Note that the Pebble instance creating and returning a SharedSSTMeta might
41 : // not be the one that created the underlying sstable on shared storage to begin
42 : // with; it's possible for a Pebble instance to reshare an sstable that was
43 : // shared to it.
44 : type SharedSSTMeta struct {
45 : // Backing is the shared object underlying this SST. Can be attached to an
46 : // objstorage.Provider.
47 : Backing objstorage.RemoteObjectBackingHandle
48 :
49 : // Smallest and Largest internal keys for the overall bounds. The kind and
50 : // SeqNum of these will reflect what is physically present on the source Pebble
51 : // instance's view of the sstable; it's up to the ingesting instance to set the
52 : // sequence number in the trailer to match the read-time sequence numbers
53 : // reserved for the level this SST is being ingested into. The Kind is expected
54 : // to remain unchanged by the ingesting instance.
55 : //
56 : // Note that these bounds could be narrower than the bounds of the underlying
57 : // sstable; ScanInternal is expected to truncate sstable bounds to the user key
58 : // bounds passed into that method.
59 : Smallest, Largest InternalKey
60 :
61 : // SmallestRangeKey and LargestRangeKey are internal keys that denote the
62 : // range key bounds of this sstable. Must lie within [Smallest, Largest].
63 : SmallestRangeKey, LargestRangeKey InternalKey
64 :
65 : // SmallestPointKey and LargestPointKey are internal keys that denote the
66 : // point key bounds of this sstable. Must lie within [Smallest, Largest].
67 : SmallestPointKey, LargestPointKey InternalKey
68 :
69 : // Level denotes the level at which this file was present at read time.
70 : // For files visited by ScanInternal, this value will only be 5 or 6.
71 : Level uint8
72 :
73 : // Size contains an estimate of the size of this sstable.
74 : Size uint64
75 :
76 : // fileNum at time of creation in the creator instance. Only used for
77 : // debugging/tests.
78 : fileNum base.FileNum
79 : }
80 :
81 1 : func (s *SharedSSTMeta) cloneFromFileMeta(f *fileMetadata) {
82 1 : *s = SharedSSTMeta{
83 1 : Smallest: f.Smallest.Clone(),
84 1 : Largest: f.Largest.Clone(),
85 1 : SmallestRangeKey: f.SmallestRangeKey.Clone(),
86 1 : LargestRangeKey: f.LargestRangeKey.Clone(),
87 1 : SmallestPointKey: f.SmallestPointKey.Clone(),
88 1 : LargestPointKey: f.LargestPointKey.Clone(),
89 1 : Size: f.Size,
90 1 : fileNum: f.FileNum,
91 1 : }
92 1 : }
93 :
94 : type sharedByLevel []SharedSSTMeta
95 :
96 1 : func (s sharedByLevel) Len() int { return len(s) }
97 0 : func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
98 1 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
99 :
100 : type pcIterPos int
101 :
102 : const (
103 : pcIterPosCur pcIterPos = iota
104 : pcIterPosNext
105 : )
106 :
107 : // pointCollapsingIterator is an internalIterator that collapses point keys and
108 : // returns at most one point internal key for each user key. Merges and
109 : // SingleDels are not supported and result in a panic if encountered. Point keys
110 : // deleted by rangedels are considered shadowed and not exposed.
111 : //
112 : // Only used in ScanInternal to return at most one internal key per user key.
113 : type pointCollapsingIterator struct {
114 : iter keyspan.InterleavingIter
115 : pos pcIterPos
116 : comparer *base.Comparer
117 : merge base.Merge
118 : err error
119 : seqNum uint64
120 : // The current position of `iter`. Always owned by the underlying iter.
121 : iterKey *InternalKey
122 : // The last saved key. findNextEntry and similar methods are expected to save
123 : // the current value of iterKey to savedKey if they're iterating away from the
124 : // current key but still need to retain it. See comments in findNextEntry on
125 : // how this field is used.
126 : //
127 : // At the end of a positioning call:
128 : // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
129 : // by `iter` while savedKey is holding a copy to our current key.
130 : // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
131 : // key, and savedKey is either undefined or pointing to a version of the
132 : // current key owned by this iterator (i.e. backed by savedKeyBuf).
133 : savedKey InternalKey
134 : savedKeyBuf []byte
135 : // Value at the current iterator position, at iterKey.
136 : iterValue base.LazyValue
137 : // If fixedSeqNum is non-zero, all emitted points are verified to have this
138 : // fixed sequence number.
139 : fixedSeqNum uint64
140 : }
141 :
142 1 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
143 1 : return p.iter.Span()
144 1 : }
145 :
146 : // SeekPrefixGE implements the InternalIterator interface.
147 : func (p *pointCollapsingIterator) SeekPrefixGE(
148 : prefix, key []byte, flags base.SeekGEFlags,
149 0 : ) (*base.InternalKey, base.LazyValue) {
150 0 : p.resetKey()
151 0 : p.iterKey, p.iterValue = p.iter.SeekPrefixGE(prefix, key, flags)
152 0 : p.pos = pcIterPosCur
153 0 : if p.iterKey == nil {
154 0 : return nil, base.LazyValue{}
155 0 : }
156 0 : return p.findNextEntry()
157 : }
158 :
159 : // SeekGE implements the InternalIterator interface.
160 : func (p *pointCollapsingIterator) SeekGE(
161 : key []byte, flags base.SeekGEFlags,
162 1 : ) (*base.InternalKey, base.LazyValue) {
163 1 : p.resetKey()
164 1 : p.iterKey, p.iterValue = p.iter.SeekGE(key, flags)
165 1 : p.pos = pcIterPosCur
166 1 : if p.iterKey == nil {
167 1 : return nil, base.LazyValue{}
168 1 : }
169 1 : return p.findNextEntry()
170 : }
171 :
172 : // SeekLT implements the InternalIterator interface.
173 : func (p *pointCollapsingIterator) SeekLT(
174 : key []byte, flags base.SeekLTFlags,
175 0 : ) (*base.InternalKey, base.LazyValue) {
176 0 : panic("unimplemented")
177 : }
178 :
179 1 : func (p *pointCollapsingIterator) resetKey() {
180 1 : p.savedKey.UserKey = p.savedKeyBuf[:0]
181 1 : p.savedKey.Trailer = 0
182 1 : p.iterKey = nil
183 1 : p.pos = pcIterPosCur
184 1 : }
185 :
186 1 : func (p *pointCollapsingIterator) verifySeqNum(key *base.InternalKey) *base.InternalKey {
187 1 : if !invariants.Enabled {
188 0 : return key
189 0 : }
190 1 : if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete {
191 1 : return key
192 1 : }
193 0 : if key.SeqNum() != p.fixedSeqNum {
194 0 : panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, key.SeqNum()))
195 : }
196 0 : return key
197 : }
198 :
199 : // findNextEntry is called to return the next key. p.iter must be positioned at the
200 : // start of the first user key we are interested in.
201 1 : func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) {
202 1 : p.saveKey()
203 1 : // Saves a comparison in the fast path
204 1 : firstIteration := true
205 1 : for p.iterKey != nil {
206 1 : // NB: p.savedKey is either the current key (iff p.iterKey == firstKey),
207 1 : // or the previous key.
208 1 : if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
209 1 : p.saveKey()
210 1 : continue
211 : }
212 1 : firstIteration = false
213 1 : if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) {
214 1 : // All future keys for this user key must be deleted.
215 1 : if p.savedKey.Kind() == InternalKeyKindSingleDelete {
216 0 : panic("cannot process singledel key in point collapsing iterator")
217 : }
218 : // Fast forward to the next user key.
219 1 : p.saveKey()
220 1 : p.iterKey, p.iterValue = p.iter.Next()
221 1 : for p.iterKey != nil && p.savedKey.SeqNum() >= p.iterKey.SeqNum() && p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
222 1 : p.iterKey, p.iterValue = p.iter.Next()
223 1 : }
224 1 : continue
225 : }
226 1 : switch p.savedKey.Kind() {
227 1 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
228 1 : // Note that we return SETs directly, even if they would otherwise get
229 1 : // compacted into a Del to turn into a SetWithDelete. This is a fast
230 1 : // path optimization that can break SINGLEDEL determinism. To lead to
231 1 : // consistent SINGLEDEL behaviour, this iterator should *not* be used for
232 1 : // a keyspace where SINGLEDELs could be in use. If this iterator observes
233 1 : // a SINGLEDEL as the first internal key for a user key, it will panic.
234 1 : //
235 1 : // As p.value is a lazy value owned by the child iterator, we can thread
236 1 : // it through without loading it into p.valueBuf.
237 1 : //
238 1 : // TODO(bilal): We can even avoid saving the key in this fast path if
239 1 : // we are in a block where setHasSamePrefix = false in a v3 sstable,
240 1 : // guaranteeing that there's only one internal key for each user key.
241 1 : // Thread this logic through the sstable iterators and/or consider
242 1 : // collapsing (ha) this logic into the sstable iterators that are aware
243 1 : // of blocks and can determine user key changes without doing key saves
244 1 : // or comparisons.
245 1 : p.pos = pcIterPosCur
246 1 : return p.verifySeqNum(p.iterKey), p.iterValue
247 0 : case InternalKeyKindSingleDelete:
248 0 : // Panic, as this iterator is not expected to observe single deletes.
249 0 : panic("cannot process singledel key in point collapsing iterator")
250 0 : case InternalKeyKindMerge:
251 0 : // Panic, as this iterator is not expected to observe merges.
252 0 : panic("cannot process merge key in point collapsing iterator")
253 1 : case InternalKeyKindRangeDelete:
254 1 : // These are interleaved by the interleaving iterator ahead of all points.
255 1 : // We should pass them as-is, but also account for any points ahead of
256 1 : // them.
257 1 : p.pos = pcIterPosCur
258 1 : return p.verifySeqNum(p.iterKey), p.iterValue
259 0 : default:
260 0 : panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind()))
261 : }
262 : }
263 0 : p.resetKey()
264 0 : return nil, base.LazyValue{}
265 : }
266 :
267 : // First implements the InternalIterator interface.
268 1 : func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) {
269 1 : p.resetKey()
270 1 : p.iterKey, p.iterValue = p.iter.First()
271 1 : p.pos = pcIterPosCur
272 1 : if p.iterKey == nil {
273 0 : return nil, base.LazyValue{}
274 0 : }
275 1 : return p.findNextEntry()
276 : }
277 :
278 : // Last implements the InternalIterator interface.
279 0 : func (p *pointCollapsingIterator) Last() (*base.InternalKey, base.LazyValue) {
280 0 : panic("unimplemented")
281 : }
282 :
283 1 : func (p *pointCollapsingIterator) saveKey() {
284 1 : if p.iterKey == nil {
285 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
286 1 : return
287 1 : }
288 1 : p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKey.UserKey...)
289 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKey.Trailer}
290 : }
291 :
292 : // Next implements the InternalIterator interface.
293 1 : func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) {
294 1 : switch p.pos {
295 1 : case pcIterPosCur:
296 1 : p.saveKey()
297 1 : if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete {
298 1 : // Step over the interleaved range delete and process the very next
299 1 : // internal key, even if it's at the same user key. This is because a
300 1 : // point for that user key has not been returned yet.
301 1 : p.iterKey, p.iterValue = p.iter.Next()
302 1 : break
303 : }
304 : // Fast forward to the next user key.
305 1 : key, val := p.iter.Next()
306 1 : // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to
307 1 : // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has
308 1 : // changed, without needing to do the full key comparison.
309 1 : for key != nil && p.savedKey.SeqNum() >= key.SeqNum() &&
310 1 : p.comparer.Equal(p.savedKey.UserKey, key.UserKey) {
311 1 : key, val = p.iter.Next()
312 1 : }
313 1 : if key == nil {
314 1 : // There are no keys to return.
315 1 : p.resetKey()
316 1 : return nil, base.LazyValue{}
317 1 : }
318 1 : p.iterKey, p.iterValue = key, val
319 0 : case pcIterPosNext:
320 0 : p.pos = pcIterPosCur
321 : }
322 1 : if p.iterKey == nil {
323 1 : p.resetKey()
324 1 : return nil, base.LazyValue{}
325 1 : }
326 1 : return p.findNextEntry()
327 : }
328 :
329 : // NextPrefix implements the InternalIterator interface.
330 0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) {
331 0 : panic("unimplemented")
332 : }
333 :
334 : // Prev implements the InternalIterator interface.
335 0 : func (p *pointCollapsingIterator) Prev() (*base.InternalKey, base.LazyValue) {
336 0 : panic("unimplemented")
337 : }
338 :
339 : // Error implements the InternalIterator interface.
340 1 : func (p *pointCollapsingIterator) Error() error {
341 1 : if p.err != nil {
342 0 : return p.err
343 0 : }
344 1 : return p.iter.Error()
345 : }
346 :
347 : // Close implements the InternalIterator interface.
348 1 : func (p *pointCollapsingIterator) Close() error {
349 1 : return p.iter.Close()
350 1 : }
351 :
352 : // SetBounds implements the InternalIterator interface.
353 0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
354 0 : p.resetKey()
355 0 : p.iter.SetBounds(lower, upper)
356 0 : }
357 :
358 0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
359 0 : p.iter.SetContext(ctx)
360 0 : }
361 :
362 : // String implements the InternalIterator interface.
363 0 : func (p *pointCollapsingIterator) String() string {
364 0 : return p.iter.String()
365 0 : }
366 :
367 : var _ internalIterator = &pointCollapsingIterator{}
368 :
369 : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
370 : // is unknown, belongs to a flushable, or belongs to an LSM level type.
371 : type IteratorLevelKind int8
372 :
373 : const (
374 : // IteratorLevelUnknown indicates an unknown LSM level.
375 : IteratorLevelUnknown IteratorLevelKind = iota
376 : // IteratorLevelLSM indicates an LSM level.
377 : IteratorLevelLSM
378 : // IteratorLevelFlushable indicates a flushable (i.e. memtable).
379 : IteratorLevelFlushable
380 : )
381 :
382 : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
383 : // Note: this is struct is only provided for point keys.
384 : type IteratorLevel struct {
385 : Kind IteratorLevelKind
386 : // FlushableIndex indicates the position within the flushable queue of this level.
387 : // Only valid if kind == IteratorLevelFlushable.
388 : FlushableIndex int
389 : // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
390 : Level int
391 : // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
392 : Sublevel int
393 : }
394 :
395 : // scanInternalIterator is an iterator that returns all internal keys, including
396 : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
397 : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
398 : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
399 : // one with the higher sequence is returned. Useful if an external user of Pebble
400 : // needs to observe and rebuild Pebble's history of internal keys, such as in
401 : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
402 : //
403 : // scanInternalIterator is expected to ignore point keys deleted by range
404 : // deletions, and range keys shadowed by a range key unset or delete, however it
405 : // *must* return the range delete as well as the range key unset/delete that did
406 : // the shadowing.
407 : type scanInternalIterator struct {
408 : ctx context.Context
409 : db *DB
410 : opts scanInternalOptions
411 : comparer *base.Comparer
412 : merge Merge
413 : iter internalIterator
414 : readState *readState
415 : version *version
416 : rangeKey *iteratorRangeKeyState
417 : pointKeyIter internalIterator
418 : iterKey *InternalKey
419 : iterValue LazyValue
420 : alloc *iterAlloc
421 : newIters tableNewIters
422 : newIterRangeKey keyspan.TableNewSpanIter
423 : seqNum uint64
424 : iterLevels []IteratorLevel
425 : mergingIter *mergingIter
426 :
427 : // boundsBuf holds two buffers used to store the lower and upper bounds.
428 : // Whenever the InternalIterator's bounds change, the new bounds are copied
429 : // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
430 : // allocations. opts.LowerBound and opts.UpperBound point into this slice.
431 : boundsBuf [2][]byte
432 : boundsBufIdx int
433 : }
434 :
435 : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
436 : // [lower, upper), potentially opening iterators on the file to find keys within
437 : // the requested bounds. A SharedSSTMeta is produced that is suitable for
438 : // external consumption by other Pebble instances. If shouldSkip is true, this
439 : // file does not contain any keys in [lower, upper) and can be skipped.
440 : //
441 : // TODO(bilal): If opening iterators and doing reads in this method is too
442 : // inefficient, consider producing non-tight file bounds instead.
443 : func (d *DB) truncateSharedFile(
444 : ctx context.Context,
445 : lower, upper []byte,
446 : level int,
447 : file *fileMetadata,
448 : objMeta objstorage.ObjectMetadata,
449 1 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
450 1 : cmp := d.cmp
451 1 : sst = &SharedSSTMeta{}
452 1 : sst.cloneFromFileMeta(file)
453 1 : sst.Level = uint8(level)
454 1 : sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
455 1 : if err != nil {
456 0 : return nil, false, err
457 0 : }
458 1 : needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
459 1 : needsUpperTruncate := cmp(upper, file.Largest.UserKey) < 0 || (cmp(upper, file.Largest.UserKey) == 0 && !file.Largest.IsExclusiveSentinel())
460 1 : // Fast path: file is entirely within [lower, upper).
461 1 : if !needsLowerTruncate && !needsUpperTruncate {
462 1 : return sst, false, nil
463 1 : }
464 :
465 : // We will need to truncate file bounds in at least one direction. Open all
466 : // relevant iterators.
467 1 : iter, rangeDelIter, err := d.newIters.TODO(ctx, file, &IterOptions{
468 1 : LowerBound: lower,
469 1 : UpperBound: upper,
470 1 : level: manifest.Level(level),
471 1 : }, internalIterOpts{})
472 1 : if err != nil {
473 0 : return nil, false, err
474 0 : }
475 1 : defer iter.Close()
476 1 : if rangeDelIter != nil {
477 1 : rangeDelIter = keyspan.Truncate(
478 1 : cmp, rangeDelIter, lower, upper, nil, nil,
479 1 : false, /* panicOnUpperTruncate */
480 1 : )
481 1 : defer rangeDelIter.Close()
482 1 : }
483 1 : rangeKeyIter, err := d.tableNewRangeKeyIter(file, keyspan.SpanIterOptions{})
484 1 : if err != nil {
485 0 : return nil, false, err
486 0 : }
487 1 : if rangeKeyIter != nil {
488 1 : rangeKeyIter = keyspan.Truncate(
489 1 : cmp, rangeKeyIter, lower, upper, nil, nil,
490 1 : false, /* panicOnUpperTruncate */
491 1 : )
492 1 : defer rangeKeyIter.Close()
493 1 : }
494 : // Check if we need to truncate on the left side. This means finding a new
495 : // LargestPointKey and LargestRangeKey that is >= lower.
496 1 : if needsLowerTruncate {
497 1 : sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
498 1 : sst.SmallestPointKey.Trailer = 0
499 1 : key, _ := iter.SeekGE(lower, base.SeekGEFlagsNone)
500 1 : foundPointKey := key != nil
501 1 : if key != nil {
502 1 : sst.SmallestPointKey.CopyFrom(*key)
503 1 : }
504 1 : if rangeDelIter != nil {
505 1 : if span, err := rangeDelIter.SeekGE(lower); err != nil {
506 0 : return nil, false, err
507 1 : } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
508 1 : sst.SmallestPointKey.CopyFrom(span.SmallestKey())
509 1 : foundPointKey = true
510 1 : }
511 : }
512 1 : if !foundPointKey {
513 1 : // There are no point keys in the span we're interested in.
514 1 : sst.SmallestPointKey = InternalKey{}
515 1 : sst.LargestPointKey = InternalKey{}
516 1 : }
517 1 : sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
518 1 : sst.SmallestRangeKey.Trailer = 0
519 1 : if rangeKeyIter != nil {
520 1 : span, err := rangeKeyIter.SeekGE(lower)
521 1 : switch {
522 0 : case err != nil:
523 0 : return nil, false, err
524 1 : case span != nil:
525 1 : sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
526 1 : default:
527 1 : // There are no range keys in the span we're interested in.
528 1 : sst.SmallestRangeKey = InternalKey{}
529 1 : sst.LargestRangeKey = InternalKey{}
530 : }
531 : }
532 : }
533 : // Check if we need to truncate on the right side. This means finding a new
534 : // LargestPointKey and LargestRangeKey that is < upper.
535 1 : if needsUpperTruncate {
536 1 : sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
537 1 : sst.LargestPointKey.Trailer = 0
538 1 : key, _ := iter.SeekLT(upper, base.SeekLTFlagsNone)
539 1 : foundPointKey := key != nil
540 1 : if key != nil {
541 1 : sst.LargestPointKey.CopyFrom(*key)
542 1 : }
543 1 : if rangeDelIter != nil {
544 1 : if span, err := rangeDelIter.SeekLT(upper); err != nil {
545 0 : return nil, false, err
546 1 : } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
547 1 : sst.LargestPointKey.CopyFrom(span.LargestKey())
548 1 : foundPointKey = true
549 1 : }
550 : }
551 1 : if !foundPointKey {
552 1 : // There are no point keys in the span we're interested in.
553 1 : sst.SmallestPointKey = InternalKey{}
554 1 : sst.LargestPointKey = InternalKey{}
555 1 : }
556 1 : sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
557 1 : sst.LargestRangeKey.Trailer = 0
558 1 : if rangeKeyIter != nil {
559 1 : span, err := rangeKeyIter.SeekLT(upper)
560 1 : switch {
561 0 : case err != nil:
562 0 : return nil, false, err
563 1 : case span != nil:
564 1 : sst.LargestRangeKey.CopyFrom(span.LargestKey())
565 1 : default:
566 1 : // There are no range keys in the span we're interested in.
567 1 : sst.SmallestRangeKey = InternalKey{}
568 1 : sst.LargestRangeKey = InternalKey{}
569 : }
570 : }
571 : }
572 : // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
573 1 : switch {
574 1 : case len(sst.SmallestRangeKey.UserKey) == 0:
575 1 : sst.Smallest = sst.SmallestPointKey
576 1 : case len(sst.SmallestPointKey.UserKey) == 0:
577 1 : sst.Smallest = sst.SmallestRangeKey
578 1 : default:
579 1 : sst.Smallest = sst.SmallestPointKey
580 1 : if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
581 1 : sst.Smallest = sst.SmallestRangeKey
582 1 : }
583 : }
584 1 : switch {
585 1 : case len(sst.LargestRangeKey.UserKey) == 0:
586 1 : sst.Largest = sst.LargestPointKey
587 1 : case len(sst.LargestPointKey.UserKey) == 0:
588 1 : sst.Largest = sst.LargestRangeKey
589 1 : default:
590 1 : sst.Largest = sst.LargestPointKey
591 1 : if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
592 1 : sst.Largest = sst.LargestRangeKey
593 1 : }
594 : }
595 : // On rare occasion, a file might overlap with [lower, upper) but not actually
596 : // have any keys within those bounds. Skip such files.
597 1 : if len(sst.Smallest.UserKey) == 0 {
598 1 : return nil, true, nil
599 1 : }
600 1 : sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
601 1 : if err != nil {
602 0 : return nil, false, err
603 0 : }
604 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
605 : // can cause panics in places where we divide by file sizes. Correct for it
606 : // here.
607 1 : if sst.Size == 0 {
608 1 : sst.Size = 1
609 1 : }
610 1 : return sst, false, nil
611 : }
612 :
613 : func scanInternalImpl(
614 : ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions,
615 1 : ) error {
616 1 : if opts.visitSharedFile != nil && (lower == nil || upper == nil) {
617 0 : panic("lower and upper bounds must be specified in skip-shared iteration mode")
618 : }
619 : // Before starting iteration, check if any files in levels sharedLevelsStart
620 : // and below are *not* shared. Error out if that is the case, as skip-shared
621 : // iteration will not produce a consistent point-in-time view of this range
622 : // of keys. For files that are shared, call visitSharedFile with a truncated
623 : // version of that file.
624 1 : cmp := iter.comparer.Compare
625 1 : provider := iter.db.ObjProvider()
626 1 : seqNum := iter.seqNum
627 1 : current := iter.version
628 1 : if current == nil {
629 1 : current = iter.readState.current
630 1 : }
631 1 : if opts.visitSharedFile != nil {
632 1 : if provider == nil {
633 0 : panic("expected non-nil Provider in skip-shared iteration mode")
634 : }
635 1 : for level := sharedLevelsStart; level < numLevels; level++ {
636 1 : files := current.Levels[level].Iter()
637 1 : for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest.UserKey, upper) < 0; f = files.Next() {
638 1 : var objMeta objstorage.ObjectMetadata
639 1 : var err error
640 1 : objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
641 1 : if err != nil {
642 0 : return err
643 0 : }
644 1 : if !objMeta.IsShared() {
645 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared", objMeta.DiskFileNum)
646 0 : }
647 1 : if !base.Visible(f.LargestSeqNum, seqNum, base.InternalKeySeqNumMax) {
648 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
649 1 : }
650 1 : var sst *SharedSSTMeta
651 1 : var skip bool
652 1 : sst, skip, err = iter.db.truncateSharedFile(ctx, lower, upper, level, f, objMeta)
653 1 : if err != nil {
654 0 : return err
655 0 : }
656 1 : if skip {
657 1 : continue
658 : }
659 1 : if err = opts.visitSharedFile(sst); err != nil {
660 0 : return err
661 0 : }
662 : }
663 : }
664 : }
665 :
666 1 : for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
667 1 : key := iter.unsafeKey()
668 1 :
669 1 : if opts.rateLimitFunc != nil {
670 0 : if err := opts.rateLimitFunc(key, iter.lazyValue()); err != nil {
671 0 : return err
672 0 : }
673 : }
674 :
675 1 : switch key.Kind() {
676 1 : case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
677 1 : if opts.visitRangeKey != nil {
678 1 : span := iter.unsafeSpan()
679 1 : // NB: The caller isn't interested in the sequence numbers of these
680 1 : // range keys. Rather, the caller wants them to be in trailer order
681 1 : // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
682 1 : // call visitRangeKey.
683 1 : keysCopy := make([]keyspan.Key, len(span.Keys))
684 1 : for i := range span.Keys {
685 1 : keysCopy[i] = span.Keys[i]
686 1 : keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
687 1 : }
688 1 : keyspan.SortKeysByTrailer(&keysCopy)
689 1 : if err := opts.visitRangeKey(span.Start, span.End, keysCopy); err != nil {
690 0 : return err
691 0 : }
692 : }
693 1 : case InternalKeyKindRangeDelete:
694 1 : if opts.visitRangeDel != nil {
695 1 : rangeDel := iter.unsafeRangeDel()
696 1 : if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
697 0 : return err
698 0 : }
699 : }
700 1 : default:
701 1 : if opts.visitPointKey != nil {
702 1 : var info IteratorLevel
703 1 : if len(iter.mergingIter.heap.items) > 0 {
704 1 : mergingIterIdx := iter.mergingIter.heap.items[0].index
705 1 : info = iter.iterLevels[mergingIterIdx]
706 1 : } else {
707 0 : info = IteratorLevel{Kind: IteratorLevelUnknown}
708 0 : }
709 1 : val := iter.lazyValue()
710 1 : if err := opts.visitPointKey(key, val, info); err != nil {
711 0 : return err
712 0 : }
713 : }
714 : }
715 : }
716 :
717 1 : return nil
718 : }
719 :
720 : // constructPointIter constructs a merging iterator and sets i.iter to it.
721 : func (i *scanInternalIterator) constructPointIter(
722 : categoryAndQoS sstable.CategoryAndQoS, memtables flushableList, buf *iterAlloc,
723 1 : ) {
724 1 : // Merging levels and levels from iterAlloc.
725 1 : mlevels := buf.mlevels[:0]
726 1 : levels := buf.levels[:0]
727 1 :
728 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
729 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
730 1 : // should improve the performance.
731 1 : numMergingLevels := len(memtables)
732 1 : numLevelIters := 0
733 1 :
734 1 : current := i.version
735 1 : if current == nil {
736 1 : current = i.readState.current
737 1 : }
738 1 : numMergingLevels += len(current.L0SublevelFiles)
739 1 : numLevelIters += len(current.L0SublevelFiles)
740 1 :
741 1 : for level := 1; level < len(current.Levels); level++ {
742 1 : if current.Levels[level].Empty() {
743 1 : continue
744 : }
745 1 : if i.opts.skipSharedLevels && level >= sharedLevelsStart {
746 1 : continue
747 : }
748 1 : numMergingLevels++
749 1 : numLevelIters++
750 : }
751 :
752 1 : if numMergingLevels > cap(mlevels) {
753 0 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
754 0 : }
755 1 : if numLevelIters > cap(levels) {
756 0 : levels = make([]levelIter, 0, numLevelIters)
757 0 : }
758 : // TODO(bilal): Push these into the iterAlloc buf.
759 1 : var rangeDelMiter keyspan.MergingIter
760 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
761 1 : rangeDelLevels := make([]keyspan.LevelIter, 0, numLevelIters)
762 1 :
763 1 : i.iterLevels = make([]IteratorLevel, numMergingLevels)
764 1 : mlevelsIndex := 0
765 1 :
766 1 : // Next are the memtables.
767 1 : for j := len(memtables) - 1; j >= 0; j-- {
768 1 : mem := memtables[j]
769 1 : mlevels = append(mlevels, mergingIterLevel{
770 1 : iter: mem.newIter(&i.opts.IterOptions),
771 1 : })
772 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
773 1 : Kind: IteratorLevelFlushable,
774 1 : FlushableIndex: j,
775 1 : }
776 1 : mlevelsIndex++
777 1 : if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
778 1 : rangeDelIters = append(rangeDelIters, rdi)
779 1 : }
780 : }
781 :
782 : // Next are the file levels: L0 sub-levels followed by lower levels.
783 1 : levelsIndex := len(levels)
784 1 : mlevels = mlevels[:numMergingLevels]
785 1 : levels = levels[:numLevelIters]
786 1 : rangeDelLevels = rangeDelLevels[:numLevelIters]
787 1 : i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
788 1 : i.opts.IterOptions.CategoryAndQoS = categoryAndQoS
789 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
790 1 : li := &levels[levelsIndex]
791 1 : rli := &rangeDelLevels[levelsIndex]
792 1 :
793 1 : li.init(
794 1 : i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level,
795 1 : internalIterOpts{})
796 1 : li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
797 1 : mlevels[mlevelsIndex].iter = li
798 1 : rli.Init(keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
799 1 : i.comparer.Compare, tableNewRangeDelIter(i.ctx, i.newIters), files, level,
800 1 : manifest.KeyTypePoint)
801 1 : rangeDelIters = append(rangeDelIters, rli)
802 1 :
803 1 : levelsIndex++
804 1 : mlevelsIndex++
805 1 : }
806 :
807 1 : for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
808 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
809 1 : Kind: IteratorLevelLSM,
810 1 : Level: 0,
811 1 : Sublevel: j,
812 1 : }
813 1 : addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
814 1 : }
815 : // Add level iterators for the non-empty non-L0 levels.
816 1 : for level := 1; level < numLevels; level++ {
817 1 : if current.Levels[level].Empty() {
818 1 : continue
819 : }
820 1 : if i.opts.skipSharedLevels && level >= sharedLevelsStart {
821 1 : continue
822 : }
823 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
824 1 : addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level))
825 : }
826 :
827 1 : buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
828 1 : buf.merging.snapshot = i.seqNum
829 1 : rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspan.MergingBuffers), rangeDelIters...)
830 1 :
831 1 : if i.opts.includeObsoleteKeys {
832 1 : iiter := &keyspan.InterleavingIter{}
833 1 : iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
834 1 : keyspan.InterleavingIterOpts{
835 1 : LowerBound: i.opts.LowerBound,
836 1 : UpperBound: i.opts.UpperBound,
837 1 : })
838 1 : i.pointKeyIter = iiter
839 1 : } else {
840 1 : pcIter := &pointCollapsingIterator{
841 1 : comparer: i.comparer,
842 1 : merge: i.merge,
843 1 : seqNum: i.seqNum,
844 1 : }
845 1 : pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
846 1 : LowerBound: i.opts.LowerBound,
847 1 : UpperBound: i.opts.UpperBound,
848 1 : })
849 1 : i.pointKeyIter = pcIter
850 1 : }
851 1 : i.iter = i.pointKeyIter
852 : }
853 :
854 : // constructRangeKeyIter constructs the range-key iterator stack, populating
855 : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
856 : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
857 : // iterConfig does *not* elide unsets/deletes.
858 1 : func (i *scanInternalIterator) constructRangeKeyIter() error {
859 1 : // We want the bounded iter from iterConfig, but not the collapsing of
860 1 : // RangeKeyUnsets and RangeKeyDels.
861 1 : i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
862 1 : i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
863 1 : nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
864 1 : &i.rangeKey.rangeKeyBuffers.internal)
865 1 :
866 1 : // Next are the flushables: memtables and large batches.
867 1 : if i.readState != nil {
868 1 : for j := len(i.readState.memtables) - 1; j >= 0; j-- {
869 1 : mem := i.readState.memtables[j]
870 1 : // We only need to read from memtables which contain sequence numbers older
871 1 : // than seqNum.
872 1 : if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
873 1 : continue
874 : }
875 1 : if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
876 1 : i.rangeKey.iterConfig.AddLevel(rki)
877 1 : }
878 : }
879 : }
880 :
881 1 : current := i.version
882 1 : if current == nil {
883 1 : current = i.readState.current
884 1 : }
885 : // Next are the file levels: L0 sub-levels followed by lower levels.
886 : //
887 : // Add file-specific iterators for L0 files containing range keys. This is less
888 : // efficient than using levelIters for sublevels of L0 files containing
889 : // range keys, but range keys are expected to be sparse anyway, reducing the
890 : // cost benefit of maintaining a separate L0Sublevels instance for range key
891 : // files and then using it here.
892 : //
893 : // NB: We iterate L0's files in reverse order. They're sorted by
894 : // LargestSeqNum ascending, and we need to add them to the merging iterator
895 : // in LargestSeqNum descending to preserve the merging iterator's invariants
896 : // around Key Trailer order.
897 1 : iter := current.RangeKeyLevels[0].Iter()
898 1 : for f := iter.Last(); f != nil; f = iter.Prev() {
899 1 : spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions())
900 1 : if err != nil {
901 0 : return err
902 0 : }
903 1 : i.rangeKey.iterConfig.AddLevel(spanIter)
904 : }
905 :
906 : // Add level iterators for the non-empty non-L0 levels.
907 1 : for level := 1; level < len(current.RangeKeyLevels); level++ {
908 1 : if current.RangeKeyLevels[level].Empty() {
909 1 : continue
910 : }
911 1 : if i.opts.skipSharedLevels && level >= sharedLevelsStart {
912 1 : continue
913 : }
914 1 : li := i.rangeKey.iterConfig.NewLevelIter()
915 1 : spanIterOpts := i.opts.SpanIterOptions()
916 1 : li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, current.RangeKeyLevels[level].Iter(),
917 1 : manifest.Level(level), manifest.KeyTypeRange)
918 1 : i.rangeKey.iterConfig.AddLevel(li)
919 : }
920 1 : return nil
921 : }
922 :
923 : // seekGE seeks this iterator to the first key that's greater than or equal
924 : // to the specified user key.
925 1 : func (i *scanInternalIterator) seekGE(key []byte) bool {
926 1 : i.iterKey, i.iterValue = i.iter.SeekGE(key, base.SeekGEFlagsNone)
927 1 : return i.iterKey != nil
928 1 : }
929 :
930 : // unsafeKey returns the unsafe InternalKey at the current position. The value
931 : // is nil if the iterator is invalid or exhausted.
932 1 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
933 1 : return i.iterKey
934 1 : }
935 :
936 : // lazyValue returns a value pointer to the value at the current iterator
937 : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
938 : // kind key.
939 1 : func (i *scanInternalIterator) lazyValue() LazyValue {
940 1 : return i.iterValue
941 1 : }
942 :
943 : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
944 : // a non-rangedel kind.
945 1 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
946 1 : type spanInternalIterator interface {
947 1 : Span() *keyspan.Span
948 1 : }
949 1 : return i.pointKeyIter.(spanInternalIterator).Span()
950 1 : }
951 :
952 : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
953 : // a non-rangekey type.
954 1 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
955 1 : return i.rangeKey.iiter.Span()
956 1 : }
957 :
958 : // next advances the iterator in the forward direction, and returns the
959 : // iterator's new validity state.
960 1 : func (i *scanInternalIterator) next() bool {
961 1 : i.iterKey, i.iterValue = i.iter.Next()
962 1 : return i.iterKey != nil
963 1 : }
964 :
965 : // error returns an error from the internal iterator, if there's any.
966 1 : func (i *scanInternalIterator) error() error {
967 1 : return i.iter.Error()
968 1 : }
969 :
970 : // close closes this iterator, and releases any pooled objects.
971 1 : func (i *scanInternalIterator) close() error {
972 1 : if err := i.iter.Close(); err != nil {
973 0 : return err
974 0 : }
975 1 : if i.readState != nil {
976 1 : i.readState.unref()
977 1 : }
978 1 : if i.version != nil {
979 1 : i.version.Unref()
980 1 : }
981 1 : if i.rangeKey != nil {
982 1 : i.rangeKey.PrepareForReuse()
983 1 : *i.rangeKey = iteratorRangeKeyState{
984 1 : rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
985 1 : }
986 1 : iterRangeKeyStateAllocPool.Put(i.rangeKey)
987 1 : i.rangeKey = nil
988 1 : }
989 1 : if alloc := i.alloc; alloc != nil {
990 1 : for j := range i.boundsBuf {
991 1 : if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
992 0 : alloc.boundsBuf[j] = nil
993 1 : } else {
994 1 : alloc.boundsBuf[j] = i.boundsBuf[j]
995 1 : }
996 : }
997 1 : *alloc = iterAlloc{
998 1 : keyBuf: alloc.keyBuf[:0],
999 1 : boundsBuf: alloc.boundsBuf,
1000 1 : prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
1001 1 : }
1002 1 : iterAllocPool.Put(alloc)
1003 1 : i.alloc = nil
1004 : }
1005 1 : return nil
1006 : }
1007 :
1008 1 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
1009 1 : buf := i.boundsBuf[i.boundsBufIdx][:0]
1010 1 : if lower != nil {
1011 1 : buf = append(buf, lower...)
1012 1 : i.opts.LowerBound = buf
1013 1 : } else {
1014 1 : i.opts.LowerBound = nil
1015 1 : }
1016 1 : if upper != nil {
1017 1 : buf = append(buf, upper...)
1018 1 : i.opts.UpperBound = buf[len(buf)-len(upper):]
1019 1 : } else {
1020 1 : i.opts.UpperBound = nil
1021 1 : }
1022 1 : i.boundsBuf[i.boundsBufIdx] = buf
1023 1 : i.boundsBufIdx = 1 - i.boundsBufIdx
1024 : }
|