Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/objstorage/remote"
20 : "github.com/cockroachdb/pebble/sstable"
21 : )
22 :
23 : const (
24 : // In skip-shared iteration mode, keys in levels greater than
25 : // sharedLevelsStart (i.e. lower in the LSM) are skipped. Keys
26 : // in sharedLevelsStart are returned iff they are not in a
27 : // shared file.
28 : sharedLevelsStart = remote.SharedLevelsStart
29 :
30 : // In skip-external iteration mode, keys in levels greater
31 : // than externalSkipStart are skipped. Keys in
32 : // externalSkipStart are returned iff they are not in an
33 : // external file.
34 : externalSkipStart = 6
35 : )
36 :
37 : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
38 : // with a shared file visitor function, and a file in a shareable level (i.e.
39 : // level >= sharedLevelsStart) was found to not be in shared storage according
40 : // to objstorage.Provider, or not shareable for another reason such as for
41 : // containing keys newer than the snapshot sequence number.
42 : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
43 :
44 : // SharedSSTMeta represents an sstable on shared storage that can be ingested
45 : // by another pebble instance. This struct must contain all fields that are
46 : // required for a Pebble instance to ingest a foreign sstable on shared storage,
47 : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
48 : // data structures, as well as creating virtual FileMetadatas.
49 : //
50 : // Note that the Pebble instance creating and returning a SharedSSTMeta might
51 : // not be the one that created the underlying sstable on shared storage to begin
52 : // with; it's possible for a Pebble instance to reshare an sstable that was
53 : // shared to it.
54 : type SharedSSTMeta struct {
55 : // Backing is the shared object underlying this SST. Can be attached to an
56 : // objstorage.Provider.
57 : Backing objstorage.RemoteObjectBackingHandle
58 :
59 : // Smallest and Largest internal keys for the overall bounds. The kind and
60 : // SeqNum of these will reflect what is physically present on the source Pebble
61 : // instance's view of the sstable; it's up to the ingesting instance to set the
62 : // sequence number in the trailer to match the read-time sequence numbers
63 : // reserved for the level this SST is being ingested into. The Kind is expected
64 : // to remain unchanged by the ingesting instance.
65 : //
66 : // Note that these bounds could be narrower than the bounds of the underlying
67 : // sstable; ScanInternal is expected to truncate sstable bounds to the user key
68 : // bounds passed into that method.
69 : Smallest, Largest InternalKey
70 :
71 : // SmallestRangeKey and LargestRangeKey are internal keys that denote the
72 : // range key bounds of this sstable. Must lie within [Smallest, Largest].
73 : SmallestRangeKey, LargestRangeKey InternalKey
74 :
75 : // SmallestPointKey and LargestPointKey are internal keys that denote the
76 : // point key bounds of this sstable. Must lie within [Smallest, Largest].
77 : SmallestPointKey, LargestPointKey InternalKey
78 :
79 : // Level denotes the level at which this file was present at read time.
80 : // For files visited by ScanInternal, this value will only be 5 or 6.
81 : Level uint8
82 :
83 : // Size contains an estimate of the size of this sstable.
84 : Size uint64
85 :
86 : // fileNum at time of creation in the creator instance. Only used for
87 : // debugging/tests.
88 : fileNum base.FileNum
89 : }
90 :
91 2 : func (s *SharedSSTMeta) cloneFromFileMeta(f *fileMetadata) {
92 2 : *s = SharedSSTMeta{
93 2 : Smallest: f.Smallest.Clone(),
94 2 : Largest: f.Largest.Clone(),
95 2 : SmallestRangeKey: f.SmallestRangeKey.Clone(),
96 2 : LargestRangeKey: f.LargestRangeKey.Clone(),
97 2 : SmallestPointKey: f.SmallestPointKey.Clone(),
98 2 : LargestPointKey: f.LargestPointKey.Clone(),
99 2 : Size: f.Size,
100 2 : fileNum: f.FileNum,
101 2 : }
102 2 : }
103 :
104 : type sharedByLevel []SharedSSTMeta
105 :
106 2 : func (s sharedByLevel) Len() int { return len(s) }
107 0 : func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
108 2 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
109 :
110 : type pcIterPos int
111 :
112 : const (
113 : pcIterPosCur pcIterPos = iota
114 : pcIterPosNext
115 : )
116 :
117 : // pointCollapsingIterator is an internalIterator that collapses point keys and
118 : // returns at most one point internal key for each user key. Merges and
119 : // SingleDels are not supported and result in a panic if encountered. Point keys
120 : // deleted by rangedels are considered shadowed and not exposed.
121 : //
122 : // Only used in ScanInternal to return at most one internal key per user key.
123 : type pointCollapsingIterator struct {
124 : iter keyspan.InterleavingIter
125 : pos pcIterPos
126 : comparer *base.Comparer
127 : merge base.Merge
128 : err error
129 : seqNum uint64
130 : // The current position of `iter`. Always owned by the underlying iter.
131 : iterKey *InternalKey
132 : // The last saved key. findNextEntry and similar methods are expected to save
133 : // the current value of iterKey to savedKey if they're iterating away from the
134 : // current key but still need to retain it. See comments in findNextEntry on
135 : // how this field is used.
136 : //
137 : // At the end of a positioning call:
138 : // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
139 : // by `iter` while savedKey is holding a copy to our current key.
140 : // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
141 : // key, and savedKey is either undefined or pointing to a version of the
142 : // current key owned by this iterator (i.e. backed by savedKeyBuf).
143 : savedKey InternalKey
144 : savedKeyBuf []byte
145 : // Value at the current iterator position, at iterKey.
146 : iterValue base.LazyValue
147 : // If fixedSeqNum is non-zero, all emitted points are verified to have this
148 : // fixed sequence number.
149 : fixedSeqNum uint64
150 : }
151 :
152 2 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
153 2 : return p.iter.Span()
154 2 : }
155 :
156 : // SeekPrefixGE implements the InternalIterator interface.
157 : func (p *pointCollapsingIterator) SeekPrefixGE(
158 : prefix, key []byte, flags base.SeekGEFlags,
159 0 : ) (*base.InternalKey, base.LazyValue) {
160 0 : p.resetKey()
161 0 : p.iterKey, p.iterValue = p.iter.SeekPrefixGE(prefix, key, flags)
162 0 : p.pos = pcIterPosCur
163 0 : if p.iterKey == nil {
164 0 : return nil, base.LazyValue{}
165 0 : }
166 0 : return p.findNextEntry()
167 : }
168 :
169 : // SeekGE implements the InternalIterator interface.
170 : func (p *pointCollapsingIterator) SeekGE(
171 : key []byte, flags base.SeekGEFlags,
172 2 : ) (*base.InternalKey, base.LazyValue) {
173 2 : p.resetKey()
174 2 : p.iterKey, p.iterValue = p.iter.SeekGE(key, flags)
175 2 : p.pos = pcIterPosCur
176 2 : if p.iterKey == nil {
177 2 : return nil, base.LazyValue{}
178 2 : }
179 2 : return p.findNextEntry()
180 : }
181 :
182 : // SeekLT implements the InternalIterator interface.
183 : func (p *pointCollapsingIterator) SeekLT(
184 : key []byte, flags base.SeekLTFlags,
185 0 : ) (*base.InternalKey, base.LazyValue) {
186 0 : panic("unimplemented")
187 : }
188 :
189 2 : func (p *pointCollapsingIterator) resetKey() {
190 2 : p.savedKey.UserKey = p.savedKeyBuf[:0]
191 2 : p.savedKey.Trailer = 0
192 2 : p.iterKey = nil
193 2 : p.pos = pcIterPosCur
194 2 : }
195 :
196 2 : func (p *pointCollapsingIterator) verifySeqNum(key *base.InternalKey) *base.InternalKey {
197 2 : if !invariants.Enabled {
198 0 : return key
199 0 : }
200 2 : if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete {
201 2 : return key
202 2 : }
203 0 : if key.SeqNum() != p.fixedSeqNum {
204 0 : panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, key.SeqNum()))
205 : }
206 0 : return key
207 : }
208 :
209 : // findNextEntry is called to return the next key. p.iter must be positioned at the
210 : // start of the first user key we are interested in.
211 2 : func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyValue) {
212 2 : p.saveKey()
213 2 : // Saves a comparison in the fast path
214 2 : firstIteration := true
215 2 : for p.iterKey != nil {
216 2 : // NB: p.savedKey is either the current key (iff p.iterKey == firstKey),
217 2 : // or the previous key.
218 2 : if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
219 2 : p.saveKey()
220 2 : continue
221 : }
222 2 : firstIteration = false
223 2 : if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) {
224 2 : // All future keys for this user key must be deleted.
225 2 : if p.savedKey.Kind() == InternalKeyKindSingleDelete {
226 0 : panic("cannot process singledel key in point collapsing iterator")
227 : }
228 : // Fast forward to the next user key.
229 2 : p.saveKey()
230 2 : p.iterKey, p.iterValue = p.iter.Next()
231 2 : for p.iterKey != nil && p.savedKey.SeqNum() >= p.iterKey.SeqNum() && p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) {
232 2 : p.iterKey, p.iterValue = p.iter.Next()
233 2 : }
234 2 : continue
235 : }
236 2 : switch p.savedKey.Kind() {
237 2 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
238 2 : // Note that we return SETs directly, even if they would otherwise get
239 2 : // compacted into a Del to turn into a SetWithDelete. This is a fast
240 2 : // path optimization that can break SINGLEDEL determinism. To lead to
241 2 : // consistent SINGLEDEL behaviour, this iterator should *not* be used for
242 2 : // a keyspace where SINGLEDELs could be in use. If this iterator observes
243 2 : // a SINGLEDEL as the first internal key for a user key, it will panic.
244 2 : //
245 2 : // As p.value is a lazy value owned by the child iterator, we can thread
246 2 : // it through without loading it into p.valueBuf.
247 2 : //
248 2 : // TODO(bilal): We can even avoid saving the key in this fast path if
249 2 : // we are in a block where setHasSamePrefix = false in a v3 sstable,
250 2 : // guaranteeing that there's only one internal key for each user key.
251 2 : // Thread this logic through the sstable iterators and/or consider
252 2 : // collapsing (ha) this logic into the sstable iterators that are aware
253 2 : // of blocks and can determine user key changes without doing key saves
254 2 : // or comparisons.
255 2 : p.pos = pcIterPosCur
256 2 : return p.verifySeqNum(p.iterKey), p.iterValue
257 0 : case InternalKeyKindSingleDelete:
258 0 : // Panic, as this iterator is not expected to observe single deletes.
259 0 : panic("cannot process singledel key in point collapsing iterator")
260 0 : case InternalKeyKindMerge:
261 0 : // Panic, as this iterator is not expected to observe merges.
262 0 : panic("cannot process merge key in point collapsing iterator")
263 2 : case InternalKeyKindRangeDelete:
264 2 : // These are interleaved by the interleaving iterator ahead of all points.
265 2 : // We should pass them as-is, but also account for any points ahead of
266 2 : // them.
267 2 : p.pos = pcIterPosCur
268 2 : return p.verifySeqNum(p.iterKey), p.iterValue
269 0 : default:
270 0 : panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind()))
271 : }
272 : }
273 1 : p.resetKey()
274 1 : return nil, base.LazyValue{}
275 : }
276 :
277 : // First implements the InternalIterator interface.
278 1 : func (p *pointCollapsingIterator) First() (*base.InternalKey, base.LazyValue) {
279 1 : p.resetKey()
280 1 : p.iterKey, p.iterValue = p.iter.First()
281 1 : p.pos = pcIterPosCur
282 1 : if p.iterKey == nil {
283 0 : return nil, base.LazyValue{}
284 0 : }
285 1 : return p.findNextEntry()
286 : }
287 :
288 : // Last implements the InternalIterator interface.
289 0 : func (p *pointCollapsingIterator) Last() (*base.InternalKey, base.LazyValue) {
290 0 : panic("unimplemented")
291 : }
292 :
293 2 : func (p *pointCollapsingIterator) saveKey() {
294 2 : if p.iterKey == nil {
295 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
296 1 : return
297 1 : }
298 2 : p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKey.UserKey...)
299 2 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKey.Trailer}
300 : }
301 :
302 : // Next implements the InternalIterator interface.
303 2 : func (p *pointCollapsingIterator) Next() (*base.InternalKey, base.LazyValue) {
304 2 : switch p.pos {
305 2 : case pcIterPosCur:
306 2 : p.saveKey()
307 2 : if p.iterKey != nil && p.iterKey.Kind() == InternalKeyKindRangeDelete {
308 2 : // Step over the interleaved range delete and process the very next
309 2 : // internal key, even if it's at the same user key. This is because a
310 2 : // point for that user key has not been returned yet.
311 2 : p.iterKey, p.iterValue = p.iter.Next()
312 2 : break
313 : }
314 : // Fast forward to the next user key.
315 2 : key, val := p.iter.Next()
316 2 : // p.iterKey.SeqNum() >= key.SeqNum() is an optimization that allows us to
317 2 : // use p.iterKey.SeqNum() < key.SeqNum() as a sign that the user key has
318 2 : // changed, without needing to do the full key comparison.
319 2 : for key != nil && p.savedKey.SeqNum() >= key.SeqNum() &&
320 2 : p.comparer.Equal(p.savedKey.UserKey, key.UserKey) {
321 2 : key, val = p.iter.Next()
322 2 : }
323 2 : if key == nil {
324 2 : // There are no keys to return.
325 2 : p.resetKey()
326 2 : return nil, base.LazyValue{}
327 2 : }
328 2 : p.iterKey, p.iterValue = key, val
329 0 : case pcIterPosNext:
330 0 : p.pos = pcIterPosCur
331 : }
332 2 : if p.iterKey == nil {
333 2 : p.resetKey()
334 2 : return nil, base.LazyValue{}
335 2 : }
336 2 : return p.findNextEntry()
337 : }
338 :
339 : // NextPrefix implements the InternalIterator interface.
340 0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) {
341 0 : panic("unimplemented")
342 : }
343 :
344 : // Prev implements the InternalIterator interface.
345 0 : func (p *pointCollapsingIterator) Prev() (*base.InternalKey, base.LazyValue) {
346 0 : panic("unimplemented")
347 : }
348 :
349 : // Error implements the InternalIterator interface.
350 2 : func (p *pointCollapsingIterator) Error() error {
351 2 : if p.err != nil {
352 0 : return p.err
353 0 : }
354 2 : return p.iter.Error()
355 : }
356 :
357 : // Close implements the InternalIterator interface.
358 2 : func (p *pointCollapsingIterator) Close() error {
359 2 : return p.iter.Close()
360 2 : }
361 :
362 : // SetBounds implements the InternalIterator interface.
363 0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
364 0 : p.resetKey()
365 0 : p.iter.SetBounds(lower, upper)
366 0 : }
367 :
368 0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
369 0 : p.iter.SetContext(ctx)
370 0 : }
371 :
372 : // String implements the InternalIterator interface.
373 0 : func (p *pointCollapsingIterator) String() string {
374 0 : return p.iter.String()
375 0 : }
376 :
377 : var _ internalIterator = &pointCollapsingIterator{}
378 :
379 : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
380 : // is unknown, belongs to a flushable, or belongs to an LSM level type.
381 : type IteratorLevelKind int8
382 :
383 : const (
384 : // IteratorLevelUnknown indicates an unknown LSM level.
385 : IteratorLevelUnknown IteratorLevelKind = iota
386 : // IteratorLevelLSM indicates an LSM level.
387 : IteratorLevelLSM
388 : // IteratorLevelFlushable indicates a flushable (i.e. memtable).
389 : IteratorLevelFlushable
390 : )
391 :
392 : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
393 : // Note: this is struct is only provided for point keys.
394 : type IteratorLevel struct {
395 : Kind IteratorLevelKind
396 : // FlushableIndex indicates the position within the flushable queue of this level.
397 : // Only valid if kind == IteratorLevelFlushable.
398 : FlushableIndex int
399 : // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
400 : Level int
401 : // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
402 : Sublevel int
403 : }
404 :
405 : // scanInternalIterator is an iterator that returns all internal keys, including
406 : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
407 : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
408 : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
409 : // one with the higher sequence is returned. Useful if an external user of Pebble
410 : // needs to observe and rebuild Pebble's history of internal keys, such as in
411 : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
412 : //
413 : // scanInternalIterator is expected to ignore point keys deleted by range
414 : // deletions, and range keys shadowed by a range key unset or delete, however it
415 : // *must* return the range delete as well as the range key unset/delete that did
416 : // the shadowing.
417 : type scanInternalIterator struct {
418 : ctx context.Context
419 : db *DB
420 : opts scanInternalOptions
421 : comparer *base.Comparer
422 : merge Merge
423 : iter internalIterator
424 : readState *readState
425 : version *version
426 : rangeKey *iteratorRangeKeyState
427 : pointKeyIter internalIterator
428 : iterKey *InternalKey
429 : iterValue LazyValue
430 : alloc *iterAlloc
431 : newIters tableNewIters
432 : newIterRangeKey keyspanimpl.TableNewSpanIter
433 : seqNum uint64
434 : iterLevels []IteratorLevel
435 : mergingIter *mergingIter
436 :
437 : // boundsBuf holds two buffers used to store the lower and upper bounds.
438 : // Whenever the InternalIterator's bounds change, the new bounds are copied
439 : // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
440 : // allocations. opts.LowerBound and opts.UpperBound point into this slice.
441 : boundsBuf [2][]byte
442 : boundsBufIdx int
443 : }
444 :
445 : // truncateExternalFile truncates an External file's [SmallestUserKey,
446 : // LargestUserKey] fields to [lower, upper). A ExternalFile is
447 : // produced that is suitable for external consumption by other Pebble
448 : // instances.
449 : //
450 : // truncateSharedFile reads the file to try to create the the smallest
451 : // possible bounds. Here, we blindly truncate them. This may mean we
452 : // include this SST in iterations it isn't really needed in. Since we
453 : // don't expect External files to be long-lived in the pebble
454 : // instance, We think this is OK.
455 : //
456 : // TODO(ssd) 2024-01-26: Potentially de-duplicate with
457 : // truncateSharedFile.
458 : func (d *DB) truncateExternalFile(
459 : ctx context.Context,
460 : lower, upper []byte,
461 : level int,
462 : file *fileMetadata,
463 : objMeta objstorage.ObjectMetadata,
464 1 : ) (*ExternalFile, error) {
465 1 : cmp := d.cmp
466 1 : sst := &ExternalFile{
467 1 : Level: uint8(level),
468 1 : ObjName: objMeta.Remote.CustomObjectName,
469 1 : Locator: objMeta.Remote.Locator,
470 1 : Bounds: KeyRange{},
471 1 : HasPointKey: file.HasPointKeys,
472 1 : HasRangeKey: file.HasRangeKeys,
473 1 : Size: file.Size,
474 1 : SyntheticSuffix: slices.Clone(file.SyntheticSuffix),
475 1 : }
476 1 : if pr := file.PrefixReplacement; pr != nil {
477 0 : sst.ContentPrefix = slices.Clone(pr.ContentPrefix)
478 0 : sst.SyntheticPrefix = slices.Clone(pr.SyntheticPrefix)
479 0 : }
480 :
481 1 : needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
482 1 : if needsLowerTruncate {
483 0 : sst.Bounds.Start = slices.Clone(lower)
484 1 : } else {
485 1 : sst.Bounds.Start = slices.Clone(file.Smallest.UserKey)
486 1 : }
487 :
488 1 : cmpUpper := cmp(upper, file.Largest.UserKey)
489 1 : needsUpperTruncate := cmpUpper < 0 || (cmpUpper == 0 && !file.Largest.IsExclusiveSentinel())
490 1 : if needsUpperTruncate {
491 0 : sst.Bounds.End = slices.Clone(upper)
492 1 : } else {
493 1 : sst.Bounds.End = slices.Clone(file.Largest.UserKey)
494 1 : }
495 :
496 1 : if cmp(sst.Bounds.Start, sst.Bounds.End) >= 0 {
497 0 : return nil, errors.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.Bounds.Start, sst.Bounds.End)
498 0 : }
499 :
500 1 : return sst, nil
501 : }
502 :
503 : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
504 : // [lower, upper), potentially opening iterators on the file to find keys within
505 : // the requested bounds. A SharedSSTMeta is produced that is suitable for
506 : // external consumption by other Pebble instances. If shouldSkip is true, this
507 : // file does not contain any keys in [lower, upper) and can be skipped.
508 : //
509 : // TODO(bilal): If opening iterators and doing reads in this method is too
510 : // inefficient, consider producing non-tight file bounds instead.
511 : func (d *DB) truncateSharedFile(
512 : ctx context.Context,
513 : lower, upper []byte,
514 : level int,
515 : file *fileMetadata,
516 : objMeta objstorage.ObjectMetadata,
517 2 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
518 2 : cmp := d.cmp
519 2 : sst = &SharedSSTMeta{}
520 2 : sst.cloneFromFileMeta(file)
521 2 : sst.Level = uint8(level)
522 2 : sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
523 2 : if err != nil {
524 0 : return nil, false, err
525 0 : }
526 2 : needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
527 2 : needsUpperTruncate := cmp(upper, file.Largest.UserKey) < 0 || (cmp(upper, file.Largest.UserKey) == 0 && !file.Largest.IsExclusiveSentinel())
528 2 : // Fast path: file is entirely within [lower, upper).
529 2 : if !needsLowerTruncate && !needsUpperTruncate {
530 2 : return sst, false, nil
531 2 : }
532 :
533 : // We will need to truncate file bounds in at least one direction. Open all
534 : // relevant iterators.
535 2 : iter, rangeDelIter, err := d.newIters.TODO(ctx, file, &IterOptions{
536 2 : LowerBound: lower,
537 2 : UpperBound: upper,
538 2 : level: manifest.Level(level),
539 2 : }, internalIterOpts{})
540 2 : if err != nil {
541 0 : return nil, false, err
542 0 : }
543 2 : defer iter.Close()
544 2 : if rangeDelIter != nil {
545 2 : rangeDelIter = keyspan.Truncate(
546 2 : cmp, rangeDelIter, lower, upper, nil, nil,
547 2 : false, /* panicOnUpperTruncate */
548 2 : )
549 2 : defer rangeDelIter.Close()
550 2 : }
551 2 : rangeKeyIter, err := d.tableNewRangeKeyIter(file, keyspan.SpanIterOptions{})
552 2 : if err != nil {
553 0 : return nil, false, err
554 0 : }
555 2 : if rangeKeyIter != nil {
556 2 : rangeKeyIter = keyspan.Truncate(
557 2 : cmp, rangeKeyIter, lower, upper, nil, nil,
558 2 : false, /* panicOnUpperTruncate */
559 2 : )
560 2 : defer rangeKeyIter.Close()
561 2 : }
562 : // Check if we need to truncate on the left side. This means finding a new
563 : // LargestPointKey and LargestRangeKey that is >= lower.
564 2 : if needsLowerTruncate {
565 2 : sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
566 2 : sst.SmallestPointKey.Trailer = 0
567 2 : key, _ := iter.SeekGE(lower, base.SeekGEFlagsNone)
568 2 : foundPointKey := key != nil
569 2 : if key != nil {
570 2 : sst.SmallestPointKey.CopyFrom(*key)
571 2 : }
572 2 : if rangeDelIter != nil {
573 2 : if span, err := rangeDelIter.SeekGE(lower); err != nil {
574 0 : return nil, false, err
575 2 : } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
576 2 : sst.SmallestPointKey.CopyFrom(span.SmallestKey())
577 2 : foundPointKey = true
578 2 : }
579 : }
580 2 : if !foundPointKey {
581 2 : // There are no point keys in the span we're interested in.
582 2 : sst.SmallestPointKey = InternalKey{}
583 2 : sst.LargestPointKey = InternalKey{}
584 2 : }
585 2 : sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
586 2 : sst.SmallestRangeKey.Trailer = 0
587 2 : if rangeKeyIter != nil {
588 2 : span, err := rangeKeyIter.SeekGE(lower)
589 2 : switch {
590 0 : case err != nil:
591 0 : return nil, false, err
592 2 : case span != nil:
593 2 : sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
594 2 : default:
595 2 : // There are no range keys in the span we're interested in.
596 2 : sst.SmallestRangeKey = InternalKey{}
597 2 : sst.LargestRangeKey = InternalKey{}
598 : }
599 : }
600 : }
601 : // Check if we need to truncate on the right side. This means finding a new
602 : // LargestPointKey and LargestRangeKey that is < upper.
603 2 : if needsUpperTruncate {
604 2 : sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
605 2 : sst.LargestPointKey.Trailer = 0
606 2 : key, _ := iter.SeekLT(upper, base.SeekLTFlagsNone)
607 2 : foundPointKey := key != nil
608 2 : if key != nil {
609 2 : sst.LargestPointKey.CopyFrom(*key)
610 2 : }
611 2 : if rangeDelIter != nil {
612 2 : if span, err := rangeDelIter.SeekLT(upper); err != nil {
613 0 : return nil, false, err
614 2 : } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
615 2 : sst.LargestPointKey.CopyFrom(span.LargestKey())
616 2 : foundPointKey = true
617 2 : }
618 : }
619 2 : if !foundPointKey {
620 2 : // There are no point keys in the span we're interested in.
621 2 : sst.SmallestPointKey = InternalKey{}
622 2 : sst.LargestPointKey = InternalKey{}
623 2 : }
624 2 : sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
625 2 : sst.LargestRangeKey.Trailer = 0
626 2 : if rangeKeyIter != nil {
627 2 : span, err := rangeKeyIter.SeekLT(upper)
628 2 : switch {
629 0 : case err != nil:
630 0 : return nil, false, err
631 2 : case span != nil:
632 2 : sst.LargestRangeKey.CopyFrom(span.LargestKey())
633 2 : default:
634 2 : // There are no range keys in the span we're interested in.
635 2 : sst.SmallestRangeKey = InternalKey{}
636 2 : sst.LargestRangeKey = InternalKey{}
637 : }
638 : }
639 : }
640 : // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
641 2 : switch {
642 2 : case len(sst.SmallestRangeKey.UserKey) == 0:
643 2 : sst.Smallest = sst.SmallestPointKey
644 2 : case len(sst.SmallestPointKey.UserKey) == 0:
645 2 : sst.Smallest = sst.SmallestRangeKey
646 2 : default:
647 2 : sst.Smallest = sst.SmallestPointKey
648 2 : if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
649 2 : sst.Smallest = sst.SmallestRangeKey
650 2 : }
651 : }
652 2 : switch {
653 2 : case len(sst.LargestRangeKey.UserKey) == 0:
654 2 : sst.Largest = sst.LargestPointKey
655 2 : case len(sst.LargestPointKey.UserKey) == 0:
656 2 : sst.Largest = sst.LargestRangeKey
657 2 : default:
658 2 : sst.Largest = sst.LargestPointKey
659 2 : if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
660 2 : sst.Largest = sst.LargestRangeKey
661 2 : }
662 : }
663 : // On rare occasion, a file might overlap with [lower, upper) but not actually
664 : // have any keys within those bounds. Skip such files.
665 2 : if len(sst.Smallest.UserKey) == 0 {
666 2 : return nil, true, nil
667 2 : }
668 2 : sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
669 2 : if err != nil {
670 0 : return nil, false, err
671 0 : }
672 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
673 : // can cause panics in places where we divide by file sizes. Correct for it
674 : // here.
675 2 : if sst.Size == 0 {
676 2 : sst.Size = 1
677 2 : }
678 2 : return sst, false, nil
679 : }
680 :
681 : func scanInternalImpl(
682 : ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions,
683 2 : ) error {
684 2 : if opts.visitSharedFile != nil && (lower == nil || upper == nil) {
685 0 : panic("lower and upper bounds must be specified in skip-shared iteration mode")
686 : }
687 2 : if opts.visitSharedFile != nil && opts.visitExternalFile != nil {
688 0 : return errors.AssertionFailedf("cannot provide both a shared-file and external-file visitor")
689 0 : }
690 :
691 : // Before starting iteration, check if any files in levels sharedLevelsStart
692 : // and below are *not* shared. Error out if that is the case, as skip-shared
693 : // iteration will not produce a consistent point-in-time view of this range
694 : // of keys. For files that are shared, call visitSharedFile with a truncated
695 : // version of that file.
696 2 : cmp := iter.comparer.Compare
697 2 : provider := iter.db.ObjProvider()
698 2 : seqNum := iter.seqNum
699 2 : current := iter.version
700 2 : if current == nil {
701 2 : current = iter.readState.current
702 2 : }
703 :
704 2 : if opts.visitSharedFile != nil || opts.visitExternalFile != nil {
705 2 : if provider == nil {
706 0 : panic("expected non-nil Provider in skip-shared iteration mode")
707 : }
708 :
709 2 : firstLevelWithRemote := opts.skipLevelForOpts()
710 2 : for level := firstLevelWithRemote; level < numLevels; level++ {
711 2 : files := current.Levels[level].Iter()
712 2 : for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest.UserKey, upper) < 0; f = files.Next() {
713 2 : if cmp(lower, f.Largest.UserKey) == 0 && f.Largest.IsExclusiveSentinel() {
714 2 : continue
715 : }
716 :
717 2 : var objMeta objstorage.ObjectMetadata
718 2 : var err error
719 2 : objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
720 2 : if err != nil {
721 0 : return err
722 0 : }
723 :
724 : // We allow a mix of files at the first level.
725 2 : if level != firstLevelWithRemote {
726 2 : if !objMeta.IsShared() && !objMeta.IsExternal() {
727 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
728 0 : }
729 : }
730 :
731 2 : if objMeta.IsShared() && opts.visitSharedFile == nil {
732 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "shared file is present but no shared file visitor is defined")
733 0 : }
734 :
735 2 : if objMeta.IsExternal() && opts.visitExternalFile == nil {
736 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "external file is present but no external file visitor is defined")
737 1 : }
738 :
739 2 : if !base.Visible(f.LargestSeqNum, seqNum, base.InternalKeySeqNumMax) {
740 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
741 1 : }
742 :
743 2 : if level != firstLevelWithRemote && (!objMeta.IsShared() && !objMeta.IsExternal()) {
744 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
745 0 : }
746 :
747 2 : if objMeta.IsShared() {
748 2 : var sst *SharedSSTMeta
749 2 : var skip bool
750 2 : sst, skip, err = iter.db.truncateSharedFile(ctx, lower, upper, level, f, objMeta)
751 2 : if err != nil {
752 0 : return err
753 0 : }
754 2 : if skip {
755 2 : continue
756 : }
757 2 : if err = opts.visitSharedFile(sst); err != nil {
758 0 : return err
759 0 : }
760 1 : } else if objMeta.IsExternal() {
761 1 : sst, err := iter.db.truncateExternalFile(ctx, lower, upper, level, f, objMeta)
762 1 : if err != nil {
763 0 : return err
764 0 : }
765 1 : if err := opts.visitExternalFile(sst); err != nil {
766 0 : return err
767 0 : }
768 : }
769 :
770 : }
771 : }
772 : }
773 :
774 2 : for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
775 2 : key := iter.unsafeKey()
776 2 :
777 2 : if opts.rateLimitFunc != nil {
778 0 : if err := opts.rateLimitFunc(key, iter.lazyValue()); err != nil {
779 0 : return err
780 0 : }
781 : }
782 :
783 2 : switch key.Kind() {
784 2 : case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
785 2 : if opts.visitRangeKey != nil {
786 2 : span := iter.unsafeSpan()
787 2 : // NB: The caller isn't interested in the sequence numbers of these
788 2 : // range keys. Rather, the caller wants them to be in trailer order
789 2 : // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
790 2 : // call visitRangeKey.
791 2 : keysCopy := make([]keyspan.Key, len(span.Keys))
792 2 : for i := range span.Keys {
793 2 : keysCopy[i] = span.Keys[i]
794 2 : keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
795 2 : }
796 2 : keyspan.SortKeysByTrailer(&keysCopy)
797 2 : if err := opts.visitRangeKey(span.Start, span.End, keysCopy); err != nil {
798 0 : return err
799 0 : }
800 : }
801 2 : case InternalKeyKindRangeDelete:
802 2 : if opts.visitRangeDel != nil {
803 2 : rangeDel := iter.unsafeRangeDel()
804 2 : if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
805 0 : return err
806 0 : }
807 : }
808 2 : default:
809 2 : if opts.visitPointKey != nil {
810 2 : var info IteratorLevel
811 2 : if len(iter.mergingIter.heap.items) > 0 {
812 2 : mergingIterIdx := iter.mergingIter.heap.items[0].index
813 2 : info = iter.iterLevels[mergingIterIdx]
814 2 : } else {
815 0 : info = IteratorLevel{Kind: IteratorLevelUnknown}
816 0 : }
817 2 : val := iter.lazyValue()
818 2 : if err := opts.visitPointKey(key, val, info); err != nil {
819 0 : return err
820 0 : }
821 : }
822 : }
823 : }
824 :
825 2 : return nil
826 : }
827 :
828 2 : func (opts *scanInternalOptions) skipLevelForOpts() int {
829 2 : if opts.visitSharedFile != nil {
830 2 : return sharedLevelsStart
831 2 : }
832 1 : if opts.visitExternalFile != nil {
833 1 : return externalSkipStart
834 1 : }
835 1 : return numLevels
836 : }
837 :
838 : // constructPointIter constructs a merging iterator and sets i.iter to it.
839 : func (i *scanInternalIterator) constructPointIter(
840 : categoryAndQoS sstable.CategoryAndQoS, memtables flushableList, buf *iterAlloc,
841 2 : ) error {
842 2 : // Merging levels and levels from iterAlloc.
843 2 : mlevels := buf.mlevels[:0]
844 2 : levels := buf.levels[:0]
845 2 :
846 2 : // We compute the number of levels needed ahead of time and reallocate a slice if
847 2 : // the array from the iterAlloc isn't large enough. Doing this allocation once
848 2 : // should improve the performance.
849 2 : numMergingLevels := len(memtables)
850 2 : numLevelIters := 0
851 2 :
852 2 : current := i.version
853 2 : if current == nil {
854 2 : current = i.readState.current
855 2 : }
856 2 : numMergingLevels += len(current.L0SublevelFiles)
857 2 : numLevelIters += len(current.L0SublevelFiles)
858 2 :
859 2 : skipStart := i.opts.skipLevelForOpts()
860 2 : for level := 1; level < len(current.Levels); level++ {
861 2 : if current.Levels[level].Empty() {
862 2 : continue
863 : }
864 2 : if level > skipStart {
865 2 : continue
866 : }
867 2 : numMergingLevels++
868 2 : numLevelIters++
869 : }
870 :
871 2 : if numMergingLevels > cap(mlevels) {
872 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
873 1 : }
874 2 : if numLevelIters > cap(levels) {
875 1 : levels = make([]levelIter, 0, numLevelIters)
876 1 : }
877 : // TODO(bilal): Push these into the iterAlloc buf.
878 2 : var rangeDelMiter keyspanimpl.MergingIter
879 2 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
880 2 : rangeDelLevels := make([]keyspanimpl.LevelIter, 0, numLevelIters)
881 2 :
882 2 : i.iterLevels = make([]IteratorLevel, numMergingLevels)
883 2 : mlevelsIndex := 0
884 2 :
885 2 : // Next are the memtables.
886 2 : for j := len(memtables) - 1; j >= 0; j-- {
887 2 : mem := memtables[j]
888 2 : mlevels = append(mlevels, mergingIterLevel{
889 2 : iter: mem.newIter(&i.opts.IterOptions),
890 2 : })
891 2 : i.iterLevels[mlevelsIndex] = IteratorLevel{
892 2 : Kind: IteratorLevelFlushable,
893 2 : FlushableIndex: j,
894 2 : }
895 2 : mlevelsIndex++
896 2 : if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
897 2 : rangeDelIters = append(rangeDelIters, rdi)
898 2 : }
899 : }
900 :
901 : // Next are the file levels: L0 sub-levels followed by lower levels.
902 2 : levelsIndex := len(levels)
903 2 : mlevels = mlevels[:numMergingLevels]
904 2 : levels = levels[:numLevelIters]
905 2 : rangeDelLevels = rangeDelLevels[:numLevelIters]
906 2 : i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
907 2 : i.opts.IterOptions.CategoryAndQoS = categoryAndQoS
908 2 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) {
909 2 : li := &levels[levelsIndex]
910 2 : rli := &rangeDelLevels[levelsIndex]
911 2 :
912 2 : li.init(
913 2 : i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level,
914 2 : internalIterOpts{})
915 2 : li.initBoundaryContext(&mlevels[mlevelsIndex].levelIterBoundaryContext)
916 2 : mlevels[mlevelsIndex].iter = li
917 2 : rli.Init(keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
918 2 : i.comparer.Compare, tableNewRangeDelIter(i.ctx, i.newIters), files, level,
919 2 : manifest.KeyTypePoint)
920 2 : rangeDelIters = append(rangeDelIters, rli)
921 2 :
922 2 : levelsIndex++
923 2 : mlevelsIndex++
924 2 : }
925 :
926 2 : for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
927 2 : i.iterLevels[mlevelsIndex] = IteratorLevel{
928 2 : Kind: IteratorLevelLSM,
929 2 : Level: 0,
930 2 : Sublevel: j,
931 2 : }
932 2 : addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
933 2 : }
934 : // Add level iterators for the non-empty non-L0 levels.
935 2 : for level := 1; level < numLevels; level++ {
936 2 : if current.Levels[level].Empty() {
937 2 : continue
938 : }
939 :
940 2 : if level > skipStart {
941 2 : continue
942 : }
943 2 : i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
944 2 : levIter := current.Levels[level].Iter()
945 2 : if level == skipStart {
946 2 : nonRemoteFiles := make([]*manifest.FileMetadata, 0)
947 2 : for f := levIter.First(); f != nil; f = levIter.Next() {
948 2 : meta, err := i.db.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
949 2 : if err != nil {
950 0 : return err
951 0 : }
952 2 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
953 2 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
954 2 : // Skip this file.
955 2 : continue
956 : }
957 1 : nonRemoteFiles = append(nonRemoteFiles, f)
958 : }
959 2 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
960 2 : levIter = levSlice.Iter()
961 : }
962 :
963 2 : addLevelIterForFiles(levIter, manifest.Level(level))
964 : }
965 :
966 2 : buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
967 2 : buf.merging.snapshot = i.seqNum
968 2 : rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
969 2 :
970 2 : if i.opts.includeObsoleteKeys {
971 1 : iiter := &keyspan.InterleavingIter{}
972 1 : iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
973 1 : keyspan.InterleavingIterOpts{
974 1 : LowerBound: i.opts.LowerBound,
975 1 : UpperBound: i.opts.UpperBound,
976 1 : })
977 1 : i.pointKeyIter = iiter
978 2 : } else {
979 2 : pcIter := &pointCollapsingIterator{
980 2 : comparer: i.comparer,
981 2 : merge: i.merge,
982 2 : seqNum: i.seqNum,
983 2 : }
984 2 : pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
985 2 : LowerBound: i.opts.LowerBound,
986 2 : UpperBound: i.opts.UpperBound,
987 2 : })
988 2 : i.pointKeyIter = pcIter
989 2 : }
990 2 : i.iter = i.pointKeyIter
991 2 : return nil
992 : }
993 :
994 : // constructRangeKeyIter constructs the range-key iterator stack, populating
995 : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
996 : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
997 : // iterConfig does *not* elide unsets/deletes.
998 2 : func (i *scanInternalIterator) constructRangeKeyIter() error {
999 2 : // We want the bounded iter from iterConfig, but not the collapsing of
1000 2 : // RangeKeyUnsets and RangeKeyDels.
1001 2 : i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
1002 2 : i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
1003 2 : nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
1004 2 : &i.rangeKey.rangeKeyBuffers.internal)
1005 2 :
1006 2 : // Next are the flushables: memtables and large batches.
1007 2 : if i.readState != nil {
1008 2 : for j := len(i.readState.memtables) - 1; j >= 0; j-- {
1009 2 : mem := i.readState.memtables[j]
1010 2 : // We only need to read from memtables which contain sequence numbers older
1011 2 : // than seqNum.
1012 2 : if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
1013 2 : continue
1014 : }
1015 2 : if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
1016 2 : i.rangeKey.iterConfig.AddLevel(rki)
1017 2 : }
1018 : }
1019 : }
1020 :
1021 2 : current := i.version
1022 2 : if current == nil {
1023 2 : current = i.readState.current
1024 2 : }
1025 : // Next are the file levels: L0 sub-levels followed by lower levels.
1026 : //
1027 : // Add file-specific iterators for L0 files containing range keys. This is less
1028 : // efficient than using levelIters for sublevels of L0 files containing
1029 : // range keys, but range keys are expected to be sparse anyway, reducing the
1030 : // cost benefit of maintaining a separate L0Sublevels instance for range key
1031 : // files and then using it here.
1032 : //
1033 : // NB: We iterate L0's files in reverse order. They're sorted by
1034 : // LargestSeqNum ascending, and we need to add them to the merging iterator
1035 : // in LargestSeqNum descending to preserve the merging iterator's invariants
1036 : // around Key Trailer order.
1037 2 : iter := current.RangeKeyLevels[0].Iter()
1038 2 : for f := iter.Last(); f != nil; f = iter.Prev() {
1039 2 : spanIter, err := i.newIterRangeKey(f, i.opts.SpanIterOptions())
1040 2 : if err != nil {
1041 0 : return err
1042 0 : }
1043 2 : i.rangeKey.iterConfig.AddLevel(spanIter)
1044 : }
1045 : // Add level iterators for the non-empty non-L0 levels.
1046 2 : skipStart := i.opts.skipLevelForOpts()
1047 2 : for level := 1; level < len(current.RangeKeyLevels); level++ {
1048 2 : if current.RangeKeyLevels[level].Empty() {
1049 2 : continue
1050 : }
1051 2 : if level > skipStart {
1052 2 : continue
1053 : }
1054 2 : li := i.rangeKey.iterConfig.NewLevelIter()
1055 2 : spanIterOpts := i.opts.SpanIterOptions()
1056 2 : levIter := current.RangeKeyLevels[level].Iter()
1057 2 : if level == skipStart {
1058 2 : nonRemoteFiles := make([]*manifest.FileMetadata, 0)
1059 2 : for f := levIter.First(); f != nil; f = levIter.Next() {
1060 2 : meta, err := i.db.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
1061 2 : if err != nil {
1062 0 : return err
1063 0 : }
1064 2 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
1065 2 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
1066 2 : // Skip this file.
1067 2 : continue
1068 : }
1069 0 : nonRemoteFiles = append(nonRemoteFiles, f)
1070 : }
1071 2 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
1072 2 : levIter = levSlice.Iter()
1073 : }
1074 2 : li.Init(spanIterOpts, i.comparer.Compare, i.newIterRangeKey, levIter,
1075 2 : manifest.Level(level), manifest.KeyTypeRange)
1076 2 : i.rangeKey.iterConfig.AddLevel(li)
1077 : }
1078 2 : return nil
1079 : }
1080 :
1081 : // seekGE seeks this iterator to the first key that's greater than or equal
1082 : // to the specified user key.
1083 2 : func (i *scanInternalIterator) seekGE(key []byte) bool {
1084 2 : i.iterKey, i.iterValue = i.iter.SeekGE(key, base.SeekGEFlagsNone)
1085 2 : return i.iterKey != nil
1086 2 : }
1087 :
1088 : // unsafeKey returns the unsafe InternalKey at the current position. The value
1089 : // is nil if the iterator is invalid or exhausted.
1090 2 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
1091 2 : return i.iterKey
1092 2 : }
1093 :
1094 : // lazyValue returns a value pointer to the value at the current iterator
1095 : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
1096 : // kind key.
1097 2 : func (i *scanInternalIterator) lazyValue() LazyValue {
1098 2 : return i.iterValue
1099 2 : }
1100 :
1101 : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
1102 : // a non-rangedel kind.
1103 2 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
1104 2 : type spanInternalIterator interface {
1105 2 : Span() *keyspan.Span
1106 2 : }
1107 2 : return i.pointKeyIter.(spanInternalIterator).Span()
1108 2 : }
1109 :
1110 : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
1111 : // a non-rangekey type.
1112 2 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
1113 2 : return i.rangeKey.iiter.Span()
1114 2 : }
1115 :
1116 : // next advances the iterator in the forward direction, and returns the
1117 : // iterator's new validity state.
1118 2 : func (i *scanInternalIterator) next() bool {
1119 2 : i.iterKey, i.iterValue = i.iter.Next()
1120 2 : return i.iterKey != nil
1121 2 : }
1122 :
1123 : // error returns an error from the internal iterator, if there's any.
1124 2 : func (i *scanInternalIterator) error() error {
1125 2 : return i.iter.Error()
1126 2 : }
1127 :
1128 : // close closes this iterator, and releases any pooled objects.
1129 2 : func (i *scanInternalIterator) close() error {
1130 2 : if err := i.iter.Close(); err != nil {
1131 0 : return err
1132 0 : }
1133 2 : if i.readState != nil {
1134 2 : i.readState.unref()
1135 2 : }
1136 2 : if i.version != nil {
1137 1 : i.version.Unref()
1138 1 : }
1139 2 : if i.rangeKey != nil {
1140 2 : i.rangeKey.PrepareForReuse()
1141 2 : *i.rangeKey = iteratorRangeKeyState{
1142 2 : rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
1143 2 : }
1144 2 : iterRangeKeyStateAllocPool.Put(i.rangeKey)
1145 2 : i.rangeKey = nil
1146 2 : }
1147 2 : if alloc := i.alloc; alloc != nil {
1148 2 : for j := range i.boundsBuf {
1149 2 : if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
1150 0 : alloc.boundsBuf[j] = nil
1151 2 : } else {
1152 2 : alloc.boundsBuf[j] = i.boundsBuf[j]
1153 2 : }
1154 : }
1155 2 : *alloc = iterAlloc{
1156 2 : keyBuf: alloc.keyBuf[:0],
1157 2 : boundsBuf: alloc.boundsBuf,
1158 2 : prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
1159 2 : }
1160 2 : iterAllocPool.Put(alloc)
1161 2 : i.alloc = nil
1162 : }
1163 2 : return nil
1164 : }
1165 :
1166 2 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
1167 2 : buf := i.boundsBuf[i.boundsBufIdx][:0]
1168 2 : if lower != nil {
1169 2 : buf = append(buf, lower...)
1170 2 : i.opts.LowerBound = buf
1171 2 : } else {
1172 1 : i.opts.LowerBound = nil
1173 1 : }
1174 2 : if upper != nil {
1175 2 : buf = append(buf, upper...)
1176 2 : i.opts.UpperBound = buf[len(buf)-len(upper):]
1177 2 : } else {
1178 1 : i.opts.UpperBound = nil
1179 1 : }
1180 2 : i.boundsBuf[i.boundsBufIdx] = buf
1181 2 : i.boundsBufIdx = 1 - i.boundsBufIdx
1182 : }
|