Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/internal/treeprinter"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/sstable"
22 : )
23 :
24 : const (
25 : // In skip-shared iteration mode, keys in levels greater than
26 : // sharedLevelsStart (i.e. lower in the LSM) are skipped. Keys
27 : // in sharedLevelsStart are returned iff they are not in a
28 : // shared file.
29 : sharedLevelsStart = remote.SharedLevelsStart
30 :
31 : // In skip-external iteration mode, keys in levels greater
32 : // than externalSkipStart are skipped. Keys in
33 : // externalSkipStart are returned iff they are not in an
34 : // external file.
35 : externalSkipStart = 6
36 : )
37 :
38 : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
39 : // with a shared file visitor function, and a file in a shareable level (i.e.
40 : // level >= sharedLevelsStart) was found to not be in shared storage according
41 : // to objstorage.Provider, or not shareable for another reason such as for
42 : // containing keys newer than the snapshot sequence number.
43 : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
44 :
45 : // SharedSSTMeta represents an sstable on shared storage that can be ingested
46 : // by another pebble instance. This struct must contain all fields that are
47 : // required for a Pebble instance to ingest a foreign sstable on shared storage,
48 : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
49 : // data structures, as well as creating virtual FileMetadatas.
50 : //
51 : // Note that the Pebble instance creating and returning a SharedSSTMeta might
52 : // not be the one that created the underlying sstable on shared storage to begin
53 : // with; it's possible for a Pebble instance to reshare an sstable that was
54 : // shared to it.
55 : type SharedSSTMeta struct {
56 : // Backing is the shared object underlying this SST. Can be attached to an
57 : // objstorage.Provider.
58 : Backing objstorage.RemoteObjectBackingHandle
59 :
60 : // Smallest and Largest internal keys for the overall bounds. The kind and
61 : // SeqNum of these will reflect what is physically present on the source Pebble
62 : // instance's view of the sstable; it's up to the ingesting instance to set the
63 : // sequence number in the trailer to match the read-time sequence numbers
64 : // reserved for the level this SST is being ingested into. The Kind is expected
65 : // to remain unchanged by the ingesting instance.
66 : //
67 : // Note that these bounds could be narrower than the bounds of the underlying
68 : // sstable; ScanInternal is expected to truncate sstable bounds to the user key
69 : // bounds passed into that method.
70 : Smallest, Largest InternalKey
71 :
72 : // SmallestRangeKey and LargestRangeKey are internal keys that denote the
73 : // range key bounds of this sstable. Must lie within [Smallest, Largest].
74 : SmallestRangeKey, LargestRangeKey InternalKey
75 :
76 : // SmallestPointKey and LargestPointKey are internal keys that denote the
77 : // point key bounds of this sstable. Must lie within [Smallest, Largest].
78 : SmallestPointKey, LargestPointKey InternalKey
79 :
80 : // Level denotes the level at which this file was present at read time.
81 : // For files visited by ScanInternal, this value will only be 5 or 6.
82 : Level uint8
83 :
84 : // Size contains an estimate of the size of this sstable.
85 : Size uint64
86 :
87 : // fileNum at time of creation in the creator instance. Only used for
88 : // debugging/tests.
89 : fileNum base.FileNum
90 : }
91 :
92 1 : func (s *SharedSSTMeta) cloneFromFileMeta(f *fileMetadata) {
93 1 : *s = SharedSSTMeta{
94 1 : Smallest: f.Smallest.Clone(),
95 1 : Largest: f.Largest.Clone(),
96 1 : SmallestRangeKey: f.SmallestRangeKey.Clone(),
97 1 : LargestRangeKey: f.LargestRangeKey.Clone(),
98 1 : SmallestPointKey: f.SmallestPointKey.Clone(),
99 1 : LargestPointKey: f.LargestPointKey.Clone(),
100 1 : Size: f.Size,
101 1 : fileNum: f.FileNum,
102 1 : }
103 1 : }
104 :
105 : type sharedByLevel []SharedSSTMeta
106 :
107 1 : func (s sharedByLevel) Len() int { return len(s) }
108 0 : func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
109 1 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
110 :
111 : type pcIterPos int
112 :
113 : const (
114 : pcIterPosCur pcIterPos = iota
115 : pcIterPosNext
116 : )
117 :
118 : // pointCollapsingIterator is an internalIterator that collapses point keys and
119 : // returns at most one point internal key for each user key. Merges and
120 : // SingleDels are not supported and result in a panic if encountered. Point keys
121 : // deleted by rangedels are considered shadowed and not exposed.
122 : //
123 : // Only used in ScanInternal to return at most one internal key per user key.
124 : type pointCollapsingIterator struct {
125 : iter keyspan.InterleavingIter
126 : pos pcIterPos
127 : comparer *base.Comparer
128 : merge base.Merge
129 : err error
130 : seqNum base.SeqNum
131 : // The current position of `iter`. Always owned by the underlying iter.
132 : iterKV *base.InternalKV
133 : // The last saved key. findNextEntry and similar methods are expected to save
134 : // the current value of iterKey to savedKey if they're iterating away from the
135 : // current key but still need to retain it. See comments in findNextEntry on
136 : // how this field is used.
137 : //
138 : // At the end of a positioning call:
139 : // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
140 : // by `iter` while savedKey is holding a copy to our current key.
141 : // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
142 : // key, and savedKey is either undefined or pointing to a version of the
143 : // current key owned by this iterator (i.e. backed by savedKeyBuf).
144 : savedKey InternalKey
145 : savedKeyBuf []byte
146 : // If fixedSeqNum is non-zero, all emitted points are verified to have this
147 : // fixed sequence number.
148 : fixedSeqNum base.SeqNum
149 : }
150 :
151 1 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
152 1 : return p.iter.Span()
153 1 : }
154 :
155 : // SeekPrefixGE implements the InternalIterator interface.
156 : func (p *pointCollapsingIterator) SeekPrefixGE(
157 : prefix, key []byte, flags base.SeekGEFlags,
158 0 : ) *base.InternalKV {
159 0 : p.resetKey()
160 0 : p.iterKV = p.iter.SeekPrefixGE(prefix, key, flags)
161 0 : p.pos = pcIterPosCur
162 0 : if p.iterKV == nil {
163 0 : return nil
164 0 : }
165 0 : return p.findNextEntry()
166 : }
167 :
168 : // SeekGE implements the InternalIterator interface.
169 1 : func (p *pointCollapsingIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
170 1 : p.resetKey()
171 1 : p.iterKV = p.iter.SeekGE(key, flags)
172 1 : p.pos = pcIterPosCur
173 1 : if p.iterKV == nil {
174 1 : return nil
175 1 : }
176 1 : return p.findNextEntry()
177 : }
178 :
179 : // SeekLT implements the InternalIterator interface.
180 0 : func (p *pointCollapsingIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
181 0 : panic("unimplemented")
182 : }
183 :
184 1 : func (p *pointCollapsingIterator) resetKey() {
185 1 : p.savedKey.UserKey = p.savedKeyBuf[:0]
186 1 : p.savedKey.Trailer = 0
187 1 : p.iterKV = nil
188 1 : p.pos = pcIterPosCur
189 1 : }
190 :
191 1 : func (p *pointCollapsingIterator) verifySeqNum(kv *base.InternalKV) *base.InternalKV {
192 1 : if !invariants.Enabled {
193 0 : return kv
194 0 : }
195 1 : if p.fixedSeqNum == 0 || kv == nil || kv.Kind() == InternalKeyKindRangeDelete {
196 1 : return kv
197 1 : }
198 0 : if kv.SeqNum() != p.fixedSeqNum {
199 0 : panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, kv.SeqNum()))
200 : }
201 0 : return kv
202 : }
203 :
204 : // findNextEntry is called to return the next key. p.iter must be positioned at the
205 : // start of the first user key we are interested in.
206 1 : func (p *pointCollapsingIterator) findNextEntry() *base.InternalKV {
207 1 : p.saveKey()
208 1 : // Saves a comparison in the fast path
209 1 : firstIteration := true
210 1 : for p.iterKV != nil {
211 1 : // NB: p.savedKey is either the current key (iff p.iterKV == firstKey),
212 1 : // or the previous key.
213 1 : if !firstIteration && !p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
214 1 : p.saveKey()
215 1 : continue
216 : }
217 1 : firstIteration = false
218 1 : if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKV.SeqNum()) {
219 1 : // All future keys for this user key must be deleted.
220 1 : if p.savedKey.Kind() == InternalKeyKindSingleDelete {
221 0 : panic("cannot process singledel key in point collapsing iterator")
222 : }
223 : // Fast forward to the next user key.
224 1 : p.saveKey()
225 1 : p.iterKV = p.iter.Next()
226 1 : for p.iterKV != nil && p.savedKey.SeqNum() >= p.iterKV.SeqNum() && p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
227 1 : p.iterKV = p.iter.Next()
228 1 : }
229 1 : continue
230 : }
231 1 : switch p.savedKey.Kind() {
232 1 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
233 1 : // Note that we return SETs directly, even if they would otherwise get
234 1 : // compacted into a Del to turn into a SetWithDelete. This is a fast
235 1 : // path optimization that can break SINGLEDEL determinism. To lead to
236 1 : // consistent SINGLEDEL behaviour, this iterator should *not* be used for
237 1 : // a keyspace where SINGLEDELs could be in use. If this iterator observes
238 1 : // a SINGLEDEL as the first internal key for a user key, it will panic.
239 1 : //
240 1 : // As p.value is a lazy value owned by the child iterator, we can thread
241 1 : // it through without loading it into p.valueBuf.
242 1 : //
243 1 : // TODO(bilal): We can even avoid saving the key in this fast path if
244 1 : // we are in a block where setHasSamePrefix = false in a v3 sstable,
245 1 : // guaranteeing that there's only one internal key for each user key.
246 1 : // Thread this logic through the sstable iterators and/or consider
247 1 : // collapsing (ha) this logic into the sstable iterators that are aware
248 1 : // of blocks and can determine user key changes without doing key saves
249 1 : // or comparisons.
250 1 : p.pos = pcIterPosCur
251 1 : return p.verifySeqNum(p.iterKV)
252 0 : case InternalKeyKindSingleDelete:
253 0 : // Panic, as this iterator is not expected to observe single deletes.
254 0 : panic("cannot process singledel key in point collapsing iterator")
255 0 : case InternalKeyKindMerge:
256 0 : // Panic, as this iterator is not expected to observe merges.
257 0 : panic("cannot process merge key in point collapsing iterator")
258 1 : case InternalKeyKindRangeDelete:
259 1 : // These are interleaved by the interleaving iterator ahead of all points.
260 1 : // We should pass them as-is, but also account for any points ahead of
261 1 : // them.
262 1 : p.pos = pcIterPosCur
263 1 : return p.verifySeqNum(p.iterKV)
264 0 : default:
265 0 : panic(fmt.Sprintf("unexpected kind: %d", p.iterKV.Kind()))
266 : }
267 : }
268 1 : p.resetKey()
269 1 : return nil
270 : }
271 :
272 : // First implements the InternalIterator interface.
273 0 : func (p *pointCollapsingIterator) First() *base.InternalKV {
274 0 : p.resetKey()
275 0 : p.iterKV = p.iter.First()
276 0 : p.pos = pcIterPosCur
277 0 : if p.iterKV == nil {
278 0 : return nil
279 0 : }
280 0 : return p.findNextEntry()
281 : }
282 :
283 : // Last implements the InternalIterator interface.
284 0 : func (p *pointCollapsingIterator) Last() *base.InternalKV {
285 0 : panic("unimplemented")
286 : }
287 :
288 1 : func (p *pointCollapsingIterator) saveKey() {
289 1 : if p.iterKV == nil {
290 0 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
291 0 : return
292 0 : }
293 1 : p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKV.K.UserKey...)
294 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKV.K.Trailer}
295 : }
296 :
297 : // Next implements the InternalIterator interface.
298 1 : func (p *pointCollapsingIterator) Next() *base.InternalKV {
299 1 : switch p.pos {
300 1 : case pcIterPosCur:
301 1 : p.saveKey()
302 1 : if p.iterKV != nil && p.iterKV.Kind() == InternalKeyKindRangeDelete {
303 1 : // Step over the interleaved range delete and process the very next
304 1 : // internal key, even if it's at the same user key. This is because a
305 1 : // point for that user key has not been returned yet.
306 1 : p.iterKV = p.iter.Next()
307 1 : break
308 : }
309 : // Fast forward to the next user key.
310 1 : kv := p.iter.Next()
311 1 : // p.iterKV.SeqNum() >= key.SeqNum() is an optimization that allows us to
312 1 : // use p.iterKV.SeqNum() < key.SeqNum() as a sign that the user key has
313 1 : // changed, without needing to do the full key comparison.
314 1 : for kv != nil && p.savedKey.SeqNum() >= kv.SeqNum() &&
315 1 : p.comparer.Equal(p.savedKey.UserKey, kv.K.UserKey) {
316 1 : kv = p.iter.Next()
317 1 : }
318 1 : if kv == nil {
319 1 : // There are no keys to return.
320 1 : p.resetKey()
321 1 : return nil
322 1 : }
323 1 : p.iterKV = kv
324 0 : case pcIterPosNext:
325 0 : p.pos = pcIterPosCur
326 : }
327 1 : if p.iterKV == nil {
328 1 : p.resetKey()
329 1 : return nil
330 1 : }
331 1 : return p.findNextEntry()
332 : }
333 :
334 : // NextPrefix implements the InternalIterator interface.
335 0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) *base.InternalKV {
336 0 : panic("unimplemented")
337 : }
338 :
339 : // Prev implements the InternalIterator interface.
340 0 : func (p *pointCollapsingIterator) Prev() *base.InternalKV {
341 0 : panic("unimplemented")
342 : }
343 :
344 : // Error implements the InternalIterator interface.
345 1 : func (p *pointCollapsingIterator) Error() error {
346 1 : if p.err != nil {
347 0 : return p.err
348 0 : }
349 1 : return p.iter.Error()
350 : }
351 :
352 : // Close implements the InternalIterator interface.
353 1 : func (p *pointCollapsingIterator) Close() error {
354 1 : return p.iter.Close()
355 1 : }
356 :
357 : // SetBounds implements the InternalIterator interface.
358 0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
359 0 : p.resetKey()
360 0 : p.iter.SetBounds(lower, upper)
361 0 : }
362 :
363 0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
364 0 : p.iter.SetContext(ctx)
365 0 : }
366 :
367 : // DebugTree is part of the InternalIterator interface.
368 0 : func (p *pointCollapsingIterator) DebugTree(tp treeprinter.Node) {
369 0 : n := tp.Childf("%T(%p)", p, p)
370 0 : p.iter.DebugTree(n)
371 0 : }
372 :
373 : // String implements the InternalIterator interface.
374 0 : func (p *pointCollapsingIterator) String() string {
375 0 : return p.iter.String()
376 0 : }
377 :
378 : var _ internalIterator = &pointCollapsingIterator{}
379 :
380 : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
381 : // is unknown, belongs to a flushable, or belongs to an LSM level type.
382 : type IteratorLevelKind int8
383 :
384 : const (
385 : // IteratorLevelUnknown indicates an unknown LSM level.
386 : IteratorLevelUnknown IteratorLevelKind = iota
387 : // IteratorLevelLSM indicates an LSM level.
388 : IteratorLevelLSM
389 : // IteratorLevelFlushable indicates a flushable (i.e. memtable).
390 : IteratorLevelFlushable
391 : )
392 :
393 : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
394 : // Note: this is struct is only provided for point keys.
395 : type IteratorLevel struct {
396 : Kind IteratorLevelKind
397 : // FlushableIndex indicates the position within the flushable queue of this level.
398 : // Only valid if kind == IteratorLevelFlushable.
399 : FlushableIndex int
400 : // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
401 : Level int
402 : // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
403 : Sublevel int
404 : }
405 :
406 : // scanInternalIterator is an iterator that returns all internal keys, including
407 : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
408 : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
409 : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
410 : // one with the higher sequence is returned. Useful if an external user of Pebble
411 : // needs to observe and rebuild Pebble's history of internal keys, such as in
412 : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
413 : //
414 : // scanInternalIterator is expected to ignore point keys deleted by range
415 : // deletions, and range keys shadowed by a range key unset or delete, however it
416 : // *must* return the range delete as well as the range key unset/delete that did
417 : // the shadowing.
418 : type scanInternalIterator struct {
419 : ctx context.Context
420 : db *DB
421 : opts scanInternalOptions
422 : comparer *base.Comparer
423 : merge Merge
424 : iter internalIterator
425 : readState *readState
426 : version *version
427 : rangeKey *iteratorRangeKeyState
428 : pointKeyIter internalIterator
429 : iterKV *base.InternalKV
430 : alloc *iterAlloc
431 : newIters tableNewIters
432 : newIterRangeKey keyspanimpl.TableNewSpanIter
433 : seqNum base.SeqNum
434 : iterLevels []IteratorLevel
435 : mergingIter *mergingIter
436 :
437 : // boundsBuf holds two buffers used to store the lower and upper bounds.
438 : // Whenever the InternalIterator's bounds change, the new bounds are copied
439 : // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
440 : // allocations. opts.LowerBound and opts.UpperBound point into this slice.
441 : boundsBuf [2][]byte
442 : boundsBufIdx int
443 : }
444 :
445 : // truncateExternalFile truncates an External file's [SmallestUserKey,
446 : // LargestUserKey] fields to [lower, upper). A ExternalFile is
447 : // produced that is suitable for external consumption by other Pebble
448 : // instances.
449 : //
450 : // truncateSharedFile reads the file to try to create the smallest
451 : // possible bounds. Here, we blindly truncate them. This may mean we
452 : // include this SST in iterations it isn't really needed in. Since we
453 : // don't expect External files to be long-lived in the pebble
454 : // instance, We think this is OK.
455 : //
456 : // TODO(ssd) 2024-01-26: Potentially de-duplicate with
457 : // truncateSharedFile.
458 : func (d *DB) truncateExternalFile(
459 : ctx context.Context,
460 : lower, upper []byte,
461 : level int,
462 : file *fileMetadata,
463 : objMeta objstorage.ObjectMetadata,
464 0 : ) (*ExternalFile, error) {
465 0 : cmp := d.cmp
466 0 : sst := &ExternalFile{
467 0 : Level: uint8(level),
468 0 : ObjName: objMeta.Remote.CustomObjectName,
469 0 : Locator: objMeta.Remote.Locator,
470 0 : HasPointKey: file.HasPointKeys,
471 0 : HasRangeKey: file.HasRangeKeys,
472 0 : Size: file.Size,
473 0 : SyntheticSuffix: slices.Clone(file.SyntheticSuffix),
474 0 : }
475 0 : if file.SyntheticPrefix.IsSet() {
476 0 : sst.SyntheticPrefix = slices.Clone(sst.SyntheticPrefix)
477 0 : }
478 :
479 0 : needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
480 0 : if needsLowerTruncate {
481 0 : sst.StartKey = slices.Clone(lower)
482 0 : } else {
483 0 : sst.StartKey = slices.Clone(file.Smallest.UserKey)
484 0 : }
485 :
486 0 : cmpUpper := cmp(upper, file.Largest.UserKey)
487 0 : needsUpperTruncate := cmpUpper < 0
488 0 : if needsUpperTruncate {
489 0 : sst.EndKey = slices.Clone(upper)
490 0 : sst.EndKeyIsInclusive = false
491 0 : } else {
492 0 : sst.EndKey = slices.Clone(file.Largest.UserKey)
493 0 : sst.EndKeyIsInclusive = !file.Largest.IsExclusiveSentinel()
494 0 : }
495 :
496 0 : if cmp(sst.StartKey, sst.EndKey) > 0 {
497 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
498 0 : }
499 :
500 0 : if cmp(sst.StartKey, sst.EndKey) == 0 && !sst.EndKeyIsInclusive {
501 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
502 0 : }
503 :
504 0 : return sst, nil
505 : }
506 :
507 : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
508 : // [lower, upper), potentially opening iterators on the file to find keys within
509 : // the requested bounds. A SharedSSTMeta is produced that is suitable for
510 : // external consumption by other Pebble instances. If shouldSkip is true, this
511 : // file does not contain any keys in [lower, upper) and can be skipped.
512 : //
513 : // TODO(bilal): If opening iterators and doing reads in this method is too
514 : // inefficient, consider producing non-tight file bounds instead.
515 : func (d *DB) truncateSharedFile(
516 : ctx context.Context,
517 : lower, upper []byte,
518 : level int,
519 : file *fileMetadata,
520 : objMeta objstorage.ObjectMetadata,
521 1 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
522 1 : cmp := d.cmp
523 1 : sst = &SharedSSTMeta{}
524 1 : sst.cloneFromFileMeta(file)
525 1 : sst.Level = uint8(level)
526 1 : sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
527 1 : if err != nil {
528 0 : return nil, false, err
529 0 : }
530 1 : needsLowerTruncate := cmp(lower, file.Smallest.UserKey) > 0
531 1 : needsUpperTruncate := cmp(upper, file.Largest.UserKey) < 0 || (cmp(upper, file.Largest.UserKey) == 0 && !file.Largest.IsExclusiveSentinel())
532 1 : // Fast path: file is entirely within [lower, upper).
533 1 : if !needsLowerTruncate && !needsUpperTruncate {
534 1 : return sst, false, nil
535 1 : }
536 :
537 : // We will need to truncate file bounds in at least one direction. Open all
538 : // relevant iterators.
539 1 : iters, err := d.newIters(ctx, file, &IterOptions{
540 1 : LowerBound: lower,
541 1 : UpperBound: upper,
542 1 : layer: manifest.Level(level),
543 1 : }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
544 1 : if err != nil {
545 0 : return nil, false, err
546 0 : }
547 1 : defer iters.CloseAll()
548 1 : iter := iters.point
549 1 : rangeDelIter := iters.rangeDeletion
550 1 : rangeKeyIter := iters.rangeKey
551 1 : if rangeDelIter != nil {
552 1 : rangeDelIter = keyspan.Truncate(cmp, rangeDelIter, base.UserKeyBoundsEndExclusive(lower, upper))
553 1 : }
554 1 : if rangeKeyIter != nil {
555 1 : rangeKeyIter = keyspan.Truncate(cmp, rangeKeyIter, base.UserKeyBoundsEndExclusive(lower, upper))
556 1 : }
557 : // Check if we need to truncate on the left side. This means finding a new
558 : // LargestPointKey and LargestRangeKey that is >= lower.
559 1 : if needsLowerTruncate {
560 1 : sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
561 1 : sst.SmallestPointKey.Trailer = 0
562 1 : kv := iter.SeekGE(lower, base.SeekGEFlagsNone)
563 1 : foundPointKey := kv != nil
564 1 : if kv != nil {
565 1 : sst.SmallestPointKey.CopyFrom(kv.K)
566 1 : }
567 1 : if rangeDelIter != nil {
568 1 : if span, err := rangeDelIter.SeekGE(lower); err != nil {
569 0 : return nil, false, err
570 1 : } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
571 1 : sst.SmallestPointKey.CopyFrom(span.SmallestKey())
572 1 : foundPointKey = true
573 1 : }
574 : }
575 1 : if !foundPointKey {
576 1 : // There are no point keys in the span we're interested in.
577 1 : sst.SmallestPointKey = InternalKey{}
578 1 : sst.LargestPointKey = InternalKey{}
579 1 : }
580 1 : sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
581 1 : sst.SmallestRangeKey.Trailer = 0
582 1 : if rangeKeyIter != nil {
583 1 : span, err := rangeKeyIter.SeekGE(lower)
584 1 : switch {
585 0 : case err != nil:
586 0 : return nil, false, err
587 1 : case span != nil:
588 1 : sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
589 1 : default:
590 1 : // There are no range keys in the span we're interested in.
591 1 : sst.SmallestRangeKey = InternalKey{}
592 1 : sst.LargestRangeKey = InternalKey{}
593 : }
594 : }
595 : }
596 : // Check if we need to truncate on the right side. This means finding a new
597 : // LargestPointKey and LargestRangeKey that is < upper.
598 1 : if needsUpperTruncate {
599 1 : sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
600 1 : sst.LargestPointKey.Trailer = 0
601 1 : kv := iter.SeekLT(upper, base.SeekLTFlagsNone)
602 1 : foundPointKey := kv != nil
603 1 : if kv != nil {
604 1 : sst.LargestPointKey.CopyFrom(kv.K)
605 1 : }
606 1 : if rangeDelIter != nil {
607 1 : if span, err := rangeDelIter.SeekLT(upper); err != nil {
608 0 : return nil, false, err
609 1 : } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
610 1 : sst.LargestPointKey.CopyFrom(span.LargestKey())
611 1 : foundPointKey = true
612 1 : }
613 : }
614 1 : if !foundPointKey {
615 1 : // There are no point keys in the span we're interested in.
616 1 : sst.SmallestPointKey = InternalKey{}
617 1 : sst.LargestPointKey = InternalKey{}
618 1 : }
619 1 : sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
620 1 : sst.LargestRangeKey.Trailer = 0
621 1 : if rangeKeyIter != nil {
622 1 : span, err := rangeKeyIter.SeekLT(upper)
623 1 : switch {
624 0 : case err != nil:
625 0 : return nil, false, err
626 1 : case span != nil:
627 1 : sst.LargestRangeKey.CopyFrom(span.LargestKey())
628 1 : default:
629 1 : // There are no range keys in the span we're interested in.
630 1 : sst.SmallestRangeKey = InternalKey{}
631 1 : sst.LargestRangeKey = InternalKey{}
632 : }
633 : }
634 : }
635 : // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
636 1 : switch {
637 1 : case len(sst.SmallestRangeKey.UserKey) == 0:
638 1 : sst.Smallest = sst.SmallestPointKey
639 1 : case len(sst.SmallestPointKey.UserKey) == 0:
640 1 : sst.Smallest = sst.SmallestRangeKey
641 1 : default:
642 1 : sst.Smallest = sst.SmallestPointKey
643 1 : if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
644 1 : sst.Smallest = sst.SmallestRangeKey
645 1 : }
646 : }
647 1 : switch {
648 1 : case len(sst.LargestRangeKey.UserKey) == 0:
649 1 : sst.Largest = sst.LargestPointKey
650 1 : case len(sst.LargestPointKey.UserKey) == 0:
651 1 : sst.Largest = sst.LargestRangeKey
652 1 : default:
653 1 : sst.Largest = sst.LargestPointKey
654 1 : if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
655 1 : sst.Largest = sst.LargestRangeKey
656 1 : }
657 : }
658 : // On rare occasion, a file might overlap with [lower, upper) but not actually
659 : // have any keys within those bounds. Skip such files.
660 1 : if len(sst.Smallest.UserKey) == 0 {
661 1 : return nil, true, nil
662 1 : }
663 1 : sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
664 1 : if err != nil {
665 0 : return nil, false, err
666 0 : }
667 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
668 : // can cause panics in places where we divide by file sizes. Correct for it
669 : // here.
670 1 : if sst.Size == 0 {
671 1 : sst.Size = 1
672 1 : }
673 1 : return sst, false, nil
674 : }
675 :
676 : func scanInternalImpl(
677 : ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions,
678 1 : ) error {
679 1 : if opts.visitSharedFile != nil && (lower == nil || upper == nil) {
680 0 : panic("lower and upper bounds must be specified in skip-shared iteration mode")
681 : }
682 1 : if opts.visitSharedFile != nil && opts.visitExternalFile != nil {
683 0 : return base.AssertionFailedf("cannot provide both a shared-file and external-file visitor")
684 0 : }
685 :
686 : // Before starting iteration, check if any files in levels sharedLevelsStart
687 : // and below are *not* shared. Error out if that is the case, as skip-shared
688 : // iteration will not produce a consistent point-in-time view of this range
689 : // of keys. For files that are shared, call visitSharedFile with a truncated
690 : // version of that file.
691 1 : cmp := iter.comparer.Compare
692 1 : provider := iter.db.ObjProvider()
693 1 : seqNum := iter.seqNum
694 1 : current := iter.version
695 1 : if current == nil {
696 1 : current = iter.readState.current
697 1 : }
698 :
699 1 : if opts.visitSharedFile != nil || opts.visitExternalFile != nil {
700 1 : if provider == nil {
701 0 : panic("expected non-nil Provider in skip-shared iteration mode")
702 : }
703 :
704 1 : firstLevelWithRemote := opts.skipLevelForOpts()
705 1 : for level := firstLevelWithRemote; level < numLevels; level++ {
706 1 : files := current.Levels[level].Iter()
707 1 : for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest.UserKey, upper) < 0; f = files.Next() {
708 1 : if cmp(lower, f.Largest.UserKey) == 0 && f.Largest.IsExclusiveSentinel() {
709 0 : continue
710 : }
711 :
712 1 : var objMeta objstorage.ObjectMetadata
713 1 : var err error
714 1 : objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
715 1 : if err != nil {
716 0 : return err
717 0 : }
718 :
719 : // We allow a mix of files at the first level.
720 1 : if level != firstLevelWithRemote {
721 1 : if !objMeta.IsShared() && !objMeta.IsExternal() {
722 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
723 0 : }
724 : }
725 :
726 1 : if objMeta.IsShared() && opts.visitSharedFile == nil {
727 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "shared file is present but no shared file visitor is defined")
728 0 : }
729 :
730 1 : if objMeta.IsExternal() && opts.visitExternalFile == nil {
731 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "external file is present but no external file visitor is defined")
732 0 : }
733 :
734 1 : if !base.Visible(f.LargestSeqNum, seqNum, base.SeqNumMax) {
735 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
736 0 : }
737 :
738 1 : if level != firstLevelWithRemote && (!objMeta.IsShared() && !objMeta.IsExternal()) {
739 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
740 0 : }
741 :
742 1 : if objMeta.IsShared() {
743 1 : var sst *SharedSSTMeta
744 1 : var skip bool
745 1 : sst, skip, err = iter.db.truncateSharedFile(ctx, lower, upper, level, f, objMeta)
746 1 : if err != nil {
747 0 : return err
748 0 : }
749 1 : if skip {
750 1 : continue
751 : }
752 1 : if err = opts.visitSharedFile(sst); err != nil {
753 0 : return err
754 0 : }
755 0 : } else if objMeta.IsExternal() {
756 0 : sst, err := iter.db.truncateExternalFile(ctx, lower, upper, level, f, objMeta)
757 0 : if err != nil {
758 0 : return err
759 0 : }
760 0 : if err := opts.visitExternalFile(sst); err != nil {
761 0 : return err
762 0 : }
763 : }
764 :
765 : }
766 : }
767 : }
768 :
769 1 : for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
770 1 : key := iter.unsafeKey()
771 1 :
772 1 : if opts.rateLimitFunc != nil {
773 0 : if err := opts.rateLimitFunc(key, iter.lazyValue()); err != nil {
774 0 : return err
775 0 : }
776 : }
777 :
778 1 : switch key.Kind() {
779 1 : case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
780 1 : if opts.visitRangeKey != nil {
781 1 : span := iter.unsafeSpan()
782 1 : // NB: The caller isn't interested in the sequence numbers of these
783 1 : // range keys. Rather, the caller wants them to be in trailer order
784 1 : // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
785 1 : // call visitRangeKey.
786 1 : keysCopy := make([]keyspan.Key, len(span.Keys))
787 1 : for i := range span.Keys {
788 1 : keysCopy[i].CopyFrom(span.Keys[i])
789 1 : keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
790 1 : }
791 1 : keyspan.SortKeysByTrailer(keysCopy)
792 1 : if err := opts.visitRangeKey(span.Start, span.End, keysCopy); err != nil {
793 0 : return err
794 0 : }
795 : }
796 1 : case InternalKeyKindRangeDelete:
797 1 : if opts.visitRangeDel != nil {
798 1 : rangeDel := iter.unsafeRangeDel()
799 1 : if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
800 0 : return err
801 0 : }
802 : }
803 1 : default:
804 1 : if opts.visitPointKey != nil {
805 1 : var info IteratorLevel
806 1 : if len(iter.mergingIter.heap.items) > 0 {
807 1 : mergingIterIdx := iter.mergingIter.heap.items[0].index
808 1 : info = iter.iterLevels[mergingIterIdx]
809 1 : } else {
810 0 : info = IteratorLevel{Kind: IteratorLevelUnknown}
811 0 : }
812 1 : val := iter.lazyValue()
813 1 : if err := opts.visitPointKey(key, val, info); err != nil {
814 0 : return err
815 0 : }
816 : }
817 : }
818 : }
819 :
820 1 : return nil
821 : }
822 :
823 1 : func (opts *scanInternalOptions) skipLevelForOpts() int {
824 1 : if opts.visitSharedFile != nil {
825 1 : return sharedLevelsStart
826 1 : }
827 0 : if opts.visitExternalFile != nil {
828 0 : return externalSkipStart
829 0 : }
830 0 : return numLevels
831 : }
832 :
833 : // constructPointIter constructs a merging iterator and sets i.iter to it.
834 : func (i *scanInternalIterator) constructPointIter(
835 : categoryAndQoS sstable.CategoryAndQoS, memtables flushableList, buf *iterAlloc,
836 1 : ) error {
837 1 : // Merging levels and levels from iterAlloc.
838 1 : mlevels := buf.mlevels[:0]
839 1 : levels := buf.levels[:0]
840 1 :
841 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
842 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
843 1 : // should improve the performance.
844 1 : numMergingLevels := len(memtables)
845 1 : numLevelIters := 0
846 1 :
847 1 : current := i.version
848 1 : if current == nil {
849 1 : current = i.readState.current
850 1 : }
851 1 : numMergingLevels += len(current.L0SublevelFiles)
852 1 : numLevelIters += len(current.L0SublevelFiles)
853 1 :
854 1 : skipStart := i.opts.skipLevelForOpts()
855 1 : for level := 1; level < len(current.Levels); level++ {
856 1 : if current.Levels[level].Empty() {
857 1 : continue
858 : }
859 1 : if level > skipStart {
860 1 : continue
861 : }
862 1 : numMergingLevels++
863 1 : numLevelIters++
864 : }
865 :
866 1 : if numMergingLevels > cap(mlevels) {
867 1 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
868 1 : }
869 1 : if numLevelIters > cap(levels) {
870 1 : levels = make([]levelIter, 0, numLevelIters)
871 1 : }
872 : // TODO(bilal): Push these into the iterAlloc buf.
873 1 : var rangeDelMiter keyspanimpl.MergingIter
874 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
875 1 : rangeDelLevels := make([]keyspanimpl.LevelIter, 0, numLevelIters)
876 1 :
877 1 : i.iterLevels = make([]IteratorLevel, numMergingLevels)
878 1 : mlevelsIndex := 0
879 1 :
880 1 : // Next are the memtables.
881 1 : for j := len(memtables) - 1; j >= 0; j-- {
882 1 : mem := memtables[j]
883 1 : mlevels = append(mlevels, mergingIterLevel{
884 1 : iter: mem.newIter(&i.opts.IterOptions),
885 1 : })
886 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
887 1 : Kind: IteratorLevelFlushable,
888 1 : FlushableIndex: j,
889 1 : }
890 1 : mlevelsIndex++
891 1 : if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
892 1 : rangeDelIters = append(rangeDelIters, rdi)
893 1 : }
894 : }
895 :
896 : // Next are the file levels: L0 sub-levels followed by lower levels.
897 1 : levelsIndex := len(levels)
898 1 : mlevels = mlevels[:numMergingLevels]
899 1 : levels = levels[:numLevelIters]
900 1 : rangeDelLevels = rangeDelLevels[:numLevelIters]
901 1 : i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
902 1 : i.opts.IterOptions.CategoryAndQoS = categoryAndQoS
903 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
904 1 : li := &levels[levelsIndex]
905 1 : rli := &rangeDelLevels[levelsIndex]
906 1 :
907 1 : li.init(
908 1 : i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level,
909 1 : internalIterOpts{})
910 1 : mlevels[mlevelsIndex].iter = li
911 1 : rli.Init(i.ctx, keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
912 1 : i.comparer.Compare, tableNewRangeDelIter(i.newIters), files, level,
913 1 : manifest.KeyTypePoint)
914 1 : rangeDelIters = append(rangeDelIters, rli)
915 1 :
916 1 : levelsIndex++
917 1 : mlevelsIndex++
918 1 : }
919 :
920 1 : for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
921 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
922 1 : Kind: IteratorLevelLSM,
923 1 : Level: 0,
924 1 : Sublevel: j,
925 1 : }
926 1 : addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
927 1 : }
928 : // Add level iterators for the non-empty non-L0 levels.
929 1 : for level := 1; level < numLevels; level++ {
930 1 : if current.Levels[level].Empty() {
931 1 : continue
932 : }
933 :
934 1 : if level > skipStart {
935 1 : continue
936 : }
937 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
938 1 : levIter := current.Levels[level].Iter()
939 1 : if level == skipStart {
940 1 : nonRemoteFiles := make([]*manifest.FileMetadata, 0)
941 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
942 1 : meta, err := i.db.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
943 1 : if err != nil {
944 0 : return err
945 0 : }
946 1 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
947 1 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
948 1 : // Skip this file.
949 1 : continue
950 : }
951 0 : nonRemoteFiles = append(nonRemoteFiles, f)
952 : }
953 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
954 1 : levIter = levSlice.Iter()
955 : }
956 :
957 1 : addLevelIterForFiles(levIter, manifest.Level(level))
958 : }
959 :
960 1 : buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
961 1 : buf.merging.snapshot = i.seqNum
962 1 : rangeDelMiter.Init(i.comparer, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
963 1 :
964 1 : if i.opts.includeObsoleteKeys {
965 0 : iiter := &keyspan.InterleavingIter{}
966 0 : iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
967 0 : keyspan.InterleavingIterOpts{
968 0 : LowerBound: i.opts.LowerBound,
969 0 : UpperBound: i.opts.UpperBound,
970 0 : })
971 0 : i.pointKeyIter = iiter
972 1 : } else {
973 1 : pcIter := &pointCollapsingIterator{
974 1 : comparer: i.comparer,
975 1 : merge: i.merge,
976 1 : seqNum: i.seqNum,
977 1 : }
978 1 : pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
979 1 : LowerBound: i.opts.LowerBound,
980 1 : UpperBound: i.opts.UpperBound,
981 1 : })
982 1 : i.pointKeyIter = pcIter
983 1 : }
984 1 : i.iter = i.pointKeyIter
985 1 : return nil
986 : }
987 :
988 : // constructRangeKeyIter constructs the range-key iterator stack, populating
989 : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
990 : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
991 : // iterConfig does *not* elide unsets/deletes.
992 1 : func (i *scanInternalIterator) constructRangeKeyIter() error {
993 1 : // We want the bounded iter from iterConfig, but not the collapsing of
994 1 : // RangeKeyUnsets and RangeKeyDels.
995 1 : i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
996 1 : i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
997 1 : nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
998 1 : &i.rangeKey.rangeKeyBuffers.internal)
999 1 :
1000 1 : // Next are the flushables: memtables and large batches.
1001 1 : if i.readState != nil {
1002 1 : for j := len(i.readState.memtables) - 1; j >= 0; j-- {
1003 1 : mem := i.readState.memtables[j]
1004 1 : // We only need to read from memtables which contain sequence numbers older
1005 1 : // than seqNum.
1006 1 : if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
1007 1 : continue
1008 : }
1009 1 : if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
1010 1 : i.rangeKey.iterConfig.AddLevel(rki)
1011 1 : }
1012 : }
1013 : }
1014 :
1015 1 : current := i.version
1016 1 : if current == nil {
1017 1 : current = i.readState.current
1018 1 : }
1019 : // Next are the file levels: L0 sub-levels followed by lower levels.
1020 : //
1021 : // Add file-specific iterators for L0 files containing range keys. This is less
1022 : // efficient than using levelIters for sublevels of L0 files containing
1023 : // range keys, but range keys are expected to be sparse anyway, reducing the
1024 : // cost benefit of maintaining a separate L0Sublevels instance for range key
1025 : // files and then using it here.
1026 : //
1027 : // NB: We iterate L0's files in reverse order. They're sorted by
1028 : // LargestSeqNum ascending, and we need to add them to the merging iterator
1029 : // in LargestSeqNum descending to preserve the merging iterator's invariants
1030 : // around Key InternalKeyTrailer order.
1031 1 : iter := current.RangeKeyLevels[0].Iter()
1032 1 : for f := iter.Last(); f != nil; f = iter.Prev() {
1033 1 : spanIter, err := i.newIterRangeKey(i.ctx, f, i.opts.SpanIterOptions())
1034 1 : if err != nil {
1035 0 : return err
1036 0 : }
1037 1 : i.rangeKey.iterConfig.AddLevel(spanIter)
1038 : }
1039 : // Add level iterators for the non-empty non-L0 levels.
1040 1 : skipStart := i.opts.skipLevelForOpts()
1041 1 : for level := 1; level < len(current.RangeKeyLevels); level++ {
1042 1 : if current.RangeKeyLevels[level].Empty() {
1043 1 : continue
1044 : }
1045 1 : if level > skipStart {
1046 1 : continue
1047 : }
1048 1 : li := i.rangeKey.iterConfig.NewLevelIter()
1049 1 : spanIterOpts := i.opts.SpanIterOptions()
1050 1 : levIter := current.RangeKeyLevels[level].Iter()
1051 1 : if level == skipStart {
1052 1 : nonRemoteFiles := make([]*manifest.FileMetadata, 0)
1053 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
1054 1 : meta, err := i.db.objProvider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum)
1055 1 : if err != nil {
1056 0 : return err
1057 0 : }
1058 1 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
1059 1 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
1060 1 : // Skip this file.
1061 1 : continue
1062 : }
1063 0 : nonRemoteFiles = append(nonRemoteFiles, f)
1064 : }
1065 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
1066 1 : levIter = levSlice.Iter()
1067 : }
1068 1 : li.Init(i.ctx, spanIterOpts, i.comparer.Compare, i.newIterRangeKey, levIter,
1069 1 : manifest.Level(level), manifest.KeyTypeRange)
1070 1 : i.rangeKey.iterConfig.AddLevel(li)
1071 : }
1072 1 : return nil
1073 : }
1074 :
1075 : // seekGE seeks this iterator to the first key that's greater than or equal
1076 : // to the specified user key.
1077 1 : func (i *scanInternalIterator) seekGE(key []byte) bool {
1078 1 : i.iterKV = i.iter.SeekGE(key, base.SeekGEFlagsNone)
1079 1 : return i.iterKV != nil
1080 1 : }
1081 :
1082 : // unsafeKey returns the unsafe InternalKey at the current position. The value
1083 : // is nil if the iterator is invalid or exhausted.
1084 1 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
1085 1 : return &i.iterKV.K
1086 1 : }
1087 :
1088 : // lazyValue returns a value pointer to the value at the current iterator
1089 : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
1090 : // kind key.
1091 1 : func (i *scanInternalIterator) lazyValue() LazyValue {
1092 1 : return i.iterKV.V
1093 1 : }
1094 :
1095 : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
1096 : // a non-rangedel kind.
1097 1 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
1098 1 : type spanInternalIterator interface {
1099 1 : Span() *keyspan.Span
1100 1 : }
1101 1 : return i.pointKeyIter.(spanInternalIterator).Span()
1102 1 : }
1103 :
1104 : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
1105 : // a non-rangekey type.
1106 1 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
1107 1 : return i.rangeKey.iiter.Span()
1108 1 : }
1109 :
1110 : // next advances the iterator in the forward direction, and returns the
1111 : // iterator's new validity state.
1112 1 : func (i *scanInternalIterator) next() bool {
1113 1 : i.iterKV = i.iter.Next()
1114 1 : return i.iterKV != nil
1115 1 : }
1116 :
1117 : // error returns an error from the internal iterator, if there's any.
1118 1 : func (i *scanInternalIterator) error() error {
1119 1 : return i.iter.Error()
1120 1 : }
1121 :
1122 : // close closes this iterator, and releases any pooled objects.
1123 1 : func (i *scanInternalIterator) close() error {
1124 1 : if err := i.iter.Close(); err != nil {
1125 0 : return err
1126 0 : }
1127 1 : if i.readState != nil {
1128 1 : i.readState.unref()
1129 1 : }
1130 1 : if i.version != nil {
1131 0 : i.version.Unref()
1132 0 : }
1133 1 : if i.rangeKey != nil {
1134 1 : i.rangeKey.PrepareForReuse()
1135 1 : *i.rangeKey = iteratorRangeKeyState{
1136 1 : rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
1137 1 : }
1138 1 : iterRangeKeyStateAllocPool.Put(i.rangeKey)
1139 1 : i.rangeKey = nil
1140 1 : }
1141 1 : if alloc := i.alloc; alloc != nil {
1142 1 : for j := range i.boundsBuf {
1143 1 : if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
1144 0 : alloc.boundsBuf[j] = nil
1145 1 : } else {
1146 1 : alloc.boundsBuf[j] = i.boundsBuf[j]
1147 1 : }
1148 : }
1149 1 : *alloc = iterAlloc{
1150 1 : keyBuf: alloc.keyBuf[:0],
1151 1 : boundsBuf: alloc.boundsBuf,
1152 1 : prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
1153 1 : }
1154 1 : iterAllocPool.Put(alloc)
1155 1 : i.alloc = nil
1156 : }
1157 1 : return nil
1158 : }
1159 :
1160 1 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
1161 1 : buf := i.boundsBuf[i.boundsBufIdx][:0]
1162 1 : if lower != nil {
1163 1 : buf = append(buf, lower...)
1164 1 : i.opts.LowerBound = buf
1165 1 : } else {
1166 0 : i.opts.LowerBound = nil
1167 0 : }
1168 1 : if upper != nil {
1169 1 : buf = append(buf, upper...)
1170 1 : i.opts.UpperBound = buf[len(buf)-len(upper):]
1171 1 : } else {
1172 0 : i.opts.UpperBound = nil
1173 0 : }
1174 1 : i.boundsBuf[i.boundsBufIdx] = buf
1175 1 : i.boundsBufIdx = 1 - i.boundsBufIdx
1176 : }
|