Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/invariants"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
17 : "github.com/cockroachdb/pebble/internal/manifest"
18 : "github.com/cockroachdb/pebble/internal/treeprinter"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/sstable/blob"
22 : "github.com/cockroachdb/pebble/sstable/block"
23 : )
24 :
25 : const (
26 : // In skip-shared iteration mode, keys in levels greater than
27 : // sharedLevelsStart (i.e. lower in the LSM) are skipped. Keys
28 : // in sharedLevelsStart are returned iff they are not in a
29 : // shared file.
30 : sharedLevelsStart = remote.SharedLevelsStart
31 :
32 : // In skip-external iteration mode, keys in levels greater
33 : // than externalSkipStart are skipped. Keys in
34 : // externalSkipStart are returned iff they are not in an
35 : // external file.
36 : externalSkipStart = 6
37 : )
38 :
39 : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
40 : // with a shared file visitor function, and a file in a shareable level (i.e.
41 : // level >= sharedLevelsStart) was found to not be in shared storage according
42 : // to objstorage.Provider, or not shareable for another reason such as for
43 : // containing keys newer than the snapshot sequence number.
44 : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
45 :
46 : // SharedSSTMeta represents an sstable on shared storage that can be ingested
47 : // by another pebble instance. This struct must contain all fields that are
48 : // required for a Pebble instance to ingest a foreign sstable on shared storage,
49 : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
50 : // data structures, as well as creating virtual TableMetadatas.
51 : //
52 : // Note that the Pebble instance creating and returning a SharedSSTMeta might
53 : // not be the one that created the underlying sstable on shared storage to begin
54 : // with; it's possible for a Pebble instance to reshare an sstable that was
55 : // shared to it.
56 : type SharedSSTMeta struct {
57 : // Backing is the shared object underlying this SST. Can be attached to an
58 : // objstorage.Provider.
59 : Backing objstorage.RemoteObjectBackingHandle
60 :
61 : // Smallest and Largest internal keys for the overall bounds. The kind and
62 : // SeqNum of these will reflect what is physically present on the source Pebble
63 : // instance's view of the sstable; it's up to the ingesting instance to set the
64 : // sequence number in the trailer to match the read-time sequence numbers
65 : // reserved for the level this SST is being ingested into. The Kind is expected
66 : // to remain unchanged by the ingesting instance.
67 : //
68 : // Note that these bounds could be narrower than the bounds of the underlying
69 : // sstable; ScanInternal is expected to truncate sstable bounds to the user key
70 : // bounds passed into that method.
71 : Smallest, Largest InternalKey
72 :
73 : // SmallestRangeKey and LargestRangeKey are internal keys that denote the
74 : // range key bounds of this sstable. Must lie within [Smallest, Largest].
75 : SmallestRangeKey, LargestRangeKey InternalKey
76 :
77 : // SmallestPointKey and LargestPointKey are internal keys that denote the
78 : // point key bounds of this sstable. Must lie within [Smallest, Largest].
79 : SmallestPointKey, LargestPointKey InternalKey
80 :
81 : // Level denotes the level at which this file was present at read time.
82 : // For files visited by ScanInternal, this value will only be 5 or 6.
83 : Level uint8
84 :
85 : // Size contains an estimate of the size of this sstable.
86 : Size uint64
87 :
88 : // tableNum at time of creation in the creator instance. Only used for
89 : // debugging/tests.
90 : tableNum base.TableNum
91 : }
92 :
93 1 : func (s *SharedSSTMeta) cloneFromFileMeta(f *tableMetadata) {
94 1 : *s = SharedSSTMeta{
95 1 : Smallest: f.Smallest().Clone(),
96 1 : Largest: f.Largest().Clone(),
97 1 : SmallestPointKey: f.PointKeyBounds.Smallest().Clone(),
98 1 : LargestPointKey: f.PointKeyBounds.Largest().Clone(),
99 1 : Size: f.Size,
100 1 : tableNum: f.TableNum,
101 1 : }
102 1 : if f.HasRangeKeys {
103 1 : s.SmallestRangeKey = f.RangeKeyBounds.Smallest().Clone()
104 1 : s.LargestRangeKey = f.RangeKeyBounds.Largest().Clone()
105 1 : }
106 : }
107 :
108 : type sharedByLevel []SharedSSTMeta
109 :
110 1 : func (s sharedByLevel) Len() int { return len(s) }
111 0 : func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
112 1 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
113 :
114 : type pcIterPos int
115 :
116 : const (
117 : pcIterPosCur pcIterPos = iota
118 : pcIterPosNext
119 : )
120 :
121 : // pointCollapsingIterator is an internalIterator that collapses point keys and
122 : // returns at most one point internal key for each user key. Merges and
123 : // SingleDels are not supported and result in a panic if encountered. Point keys
124 : // deleted by rangedels are considered shadowed and not exposed.
125 : //
126 : // Only used in ScanInternal to return at most one internal key per user key.
127 : type pointCollapsingIterator struct {
128 : iter keyspan.InterleavingIter
129 : pos pcIterPos
130 : comparer *base.Comparer
131 : merge base.Merge
132 : err error
133 : seqNum base.SeqNum
134 : // The current position of `iter`. Always owned by the underlying iter.
135 : iterKV *base.InternalKV
136 : // The last saved key. findNextEntry and similar methods are expected to save
137 : // the current value of iterKey to savedKey if they're iterating away from the
138 : // current key but still need to retain it. See comments in findNextEntry on
139 : // how this field is used.
140 : //
141 : // At the end of a positioning call:
142 : // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
143 : // by `iter` while savedKey is holding a copy to our current key.
144 : // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
145 : // key, and savedKey is either undefined or pointing to a version of the
146 : // current key owned by this iterator (i.e. backed by savedKeyBuf).
147 : savedKey InternalKey
148 : savedKeyBuf []byte
149 : // If fixedSeqNum is non-zero, all emitted points are verified to have this
150 : // fixed sequence number.
151 : fixedSeqNum base.SeqNum
152 : }
153 :
154 1 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
155 1 : return p.iter.Span()
156 1 : }
157 :
158 : // SeekPrefixGE implements the InternalIterator interface.
159 : func (p *pointCollapsingIterator) SeekPrefixGE(
160 : prefix, key []byte, flags base.SeekGEFlags,
161 0 : ) *base.InternalKV {
162 0 : p.resetKey()
163 0 : p.iterKV = p.iter.SeekPrefixGE(prefix, key, flags)
164 0 : p.pos = pcIterPosCur
165 0 : if p.iterKV == nil {
166 0 : return nil
167 0 : }
168 0 : return p.findNextEntry()
169 : }
170 :
171 : // SeekGE implements the InternalIterator interface.
172 1 : func (p *pointCollapsingIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
173 1 : p.resetKey()
174 1 : p.iterKV = p.iter.SeekGE(key, flags)
175 1 : p.pos = pcIterPosCur
176 1 : if p.iterKV == nil {
177 1 : return nil
178 1 : }
179 1 : return p.findNextEntry()
180 : }
181 :
182 : // SeekLT implements the InternalIterator interface.
183 0 : func (p *pointCollapsingIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
184 0 : panic("unimplemented")
185 : }
186 :
187 1 : func (p *pointCollapsingIterator) resetKey() {
188 1 : p.savedKey.UserKey = p.savedKeyBuf[:0]
189 1 : p.savedKey.Trailer = 0
190 1 : p.iterKV = nil
191 1 : p.pos = pcIterPosCur
192 1 : }
193 :
194 1 : func (p *pointCollapsingIterator) verifySeqNum(kv *base.InternalKV) *base.InternalKV {
195 1 : if !invariants.Enabled {
196 0 : return kv
197 0 : }
198 1 : if p.fixedSeqNum == 0 || kv == nil || kv.Kind() == InternalKeyKindRangeDelete {
199 1 : return kv
200 1 : }
201 0 : if kv.SeqNum() != p.fixedSeqNum {
202 0 : panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, kv.SeqNum()))
203 : }
204 0 : return kv
205 : }
206 :
207 : // findNextEntry is called to return the next key. p.iter must be positioned at the
208 : // start of the first user key we are interested in.
209 1 : func (p *pointCollapsingIterator) findNextEntry() *base.InternalKV {
210 1 : p.saveKey()
211 1 : // Saves a comparison in the fast path
212 1 : firstIteration := true
213 1 : for p.iterKV != nil {
214 1 : // NB: p.savedKey is either the current key (iff p.iterKV == firstKey),
215 1 : // or the previous key.
216 1 : if !firstIteration && !p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
217 1 : p.saveKey()
218 1 : continue
219 : }
220 1 : firstIteration = false
221 1 : if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKV.SeqNum()) {
222 1 : // All future keys for this user key must be deleted.
223 1 : if p.savedKey.Kind() == InternalKeyKindSingleDelete {
224 0 : panic("cannot process singledel key in point collapsing iterator")
225 : }
226 : // Fast forward to the next user key.
227 1 : p.saveKey()
228 1 : p.iterKV = p.iter.Next()
229 1 : for p.iterKV != nil && p.savedKey.SeqNum() >= p.iterKV.SeqNum() && p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
230 1 : p.iterKV = p.iter.Next()
231 1 : }
232 1 : continue
233 : }
234 1 : switch p.savedKey.Kind() {
235 1 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
236 1 : // Note that we return SETs directly, even if they would otherwise get
237 1 : // compacted into a Del to turn into a SetWithDelete. This is a fast
238 1 : // path optimization that can break SINGLEDEL determinism. To lead to
239 1 : // consistent SINGLEDEL behaviour, this iterator should *not* be used for
240 1 : // a keyspace where SINGLEDELs could be in use. If this iterator observes
241 1 : // a SINGLEDEL as the first internal key for a user key, it will panic.
242 1 : //
243 1 : // As p.value is a lazy value owned by the child iterator, we can thread
244 1 : // it through without loading it into p.valueBuf.
245 1 : //
246 1 : // TODO(bilal): We can even avoid saving the key in this fast path if
247 1 : // we are in a block where setHasSamePrefix = false in a v3 sstable,
248 1 : // guaranteeing that there's only one internal key for each user key.
249 1 : // Thread this logic through the sstable iterators and/or consider
250 1 : // collapsing (ha) this logic into the sstable iterators that are aware
251 1 : // of blocks and can determine user key changes without doing key saves
252 1 : // or comparisons.
253 1 : p.pos = pcIterPosCur
254 1 : return p.verifySeqNum(p.iterKV)
255 0 : case InternalKeyKindSingleDelete:
256 0 : // Panic, as this iterator is not expected to observe single deletes.
257 0 : panic("cannot process singledel key in point collapsing iterator")
258 0 : case InternalKeyKindMerge:
259 0 : // Panic, as this iterator is not expected to observe merges.
260 0 : panic("cannot process merge key in point collapsing iterator")
261 1 : case InternalKeyKindRangeDelete:
262 1 : // These are interleaved by the interleaving iterator ahead of all points.
263 1 : // We should pass them as-is, but also account for any points ahead of
264 1 : // them.
265 1 : p.pos = pcIterPosCur
266 1 : return p.verifySeqNum(p.iterKV)
267 0 : default:
268 0 : panic(fmt.Sprintf("unexpected kind: %d", p.iterKV.Kind()))
269 : }
270 : }
271 0 : p.resetKey()
272 0 : return nil
273 : }
274 :
275 : // First implements the InternalIterator interface.
276 1 : func (p *pointCollapsingIterator) First() *base.InternalKV {
277 1 : p.resetKey()
278 1 : p.iterKV = p.iter.First()
279 1 : p.pos = pcIterPosCur
280 1 : if p.iterKV == nil {
281 0 : return nil
282 0 : }
283 1 : return p.findNextEntry()
284 : }
285 :
286 : // Last implements the InternalIterator interface.
287 0 : func (p *pointCollapsingIterator) Last() *base.InternalKV {
288 0 : panic("unimplemented")
289 : }
290 :
291 1 : func (p *pointCollapsingIterator) saveKey() {
292 1 : if p.iterKV == nil {
293 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
294 1 : return
295 1 : }
296 1 : p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKV.K.UserKey...)
297 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKV.K.Trailer}
298 : }
299 :
300 : // Next implements the InternalIterator interface.
301 1 : func (p *pointCollapsingIterator) Next() *base.InternalKV {
302 1 : switch p.pos {
303 1 : case pcIterPosCur:
304 1 : p.saveKey()
305 1 : if p.iterKV != nil && p.iterKV.Kind() == InternalKeyKindRangeDelete {
306 1 : // Step over the interleaved range delete and process the very next
307 1 : // internal key, even if it's at the same user key. This is because a
308 1 : // point for that user key has not been returned yet.
309 1 : p.iterKV = p.iter.Next()
310 1 : break
311 : }
312 : // Fast forward to the next user key.
313 1 : kv := p.iter.Next()
314 1 : // p.iterKV.SeqNum() >= key.SeqNum() is an optimization that allows us to
315 1 : // use p.iterKV.SeqNum() < key.SeqNum() as a sign that the user key has
316 1 : // changed, without needing to do the full key comparison.
317 1 : for kv != nil && p.savedKey.SeqNum() >= kv.SeqNum() &&
318 1 : p.comparer.Equal(p.savedKey.UserKey, kv.K.UserKey) {
319 1 : kv = p.iter.Next()
320 1 : }
321 1 : if kv == nil {
322 1 : // There are no keys to return.
323 1 : p.resetKey()
324 1 : return nil
325 1 : }
326 1 : p.iterKV = kv
327 0 : case pcIterPosNext:
328 0 : p.pos = pcIterPosCur
329 : }
330 1 : if p.iterKV == nil {
331 1 : p.resetKey()
332 1 : return nil
333 1 : }
334 1 : return p.findNextEntry()
335 : }
336 :
337 : // NextPrefix implements the InternalIterator interface.
338 0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) *base.InternalKV {
339 0 : panic("unimplemented")
340 : }
341 :
342 : // Prev implements the InternalIterator interface.
343 0 : func (p *pointCollapsingIterator) Prev() *base.InternalKV {
344 0 : panic("unimplemented")
345 : }
346 :
347 : // Error implements the InternalIterator interface.
348 1 : func (p *pointCollapsingIterator) Error() error {
349 1 : if p.err != nil {
350 0 : return p.err
351 0 : }
352 1 : return p.iter.Error()
353 : }
354 :
355 : // Close implements the InternalIterator interface.
356 1 : func (p *pointCollapsingIterator) Close() error {
357 1 : return p.iter.Close()
358 1 : }
359 :
360 : // SetBounds implements the InternalIterator interface.
361 0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
362 0 : p.resetKey()
363 0 : p.iter.SetBounds(lower, upper)
364 0 : }
365 :
366 0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
367 0 : p.iter.SetContext(ctx)
368 0 : }
369 :
370 : // DebugTree is part of the InternalIterator interface.
371 0 : func (p *pointCollapsingIterator) DebugTree(tp treeprinter.Node) {
372 0 : n := tp.Childf("%T(%p)", p, p)
373 0 : p.iter.DebugTree(n)
374 0 : }
375 :
376 : // String implements the InternalIterator interface.
377 0 : func (p *pointCollapsingIterator) String() string {
378 0 : return p.iter.String()
379 0 : }
380 :
381 : var _ internalIterator = &pointCollapsingIterator{}
382 :
383 : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
384 : // is unknown, belongs to a flushable, or belongs to an LSM level type.
385 : type IteratorLevelKind int8
386 :
387 : const (
388 : // IteratorLevelUnknown indicates an unknown LSM level.
389 : IteratorLevelUnknown IteratorLevelKind = iota
390 : // IteratorLevelLSM indicates an LSM level.
391 : IteratorLevelLSM
392 : // IteratorLevelFlushable indicates a flushable (i.e. memtable).
393 : IteratorLevelFlushable
394 : )
395 :
396 : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
397 : // Note: this is struct is only provided for point keys.
398 : type IteratorLevel struct {
399 : Kind IteratorLevelKind
400 : // FlushableIndex indicates the position within the flushable queue of this level.
401 : // Only valid if kind == IteratorLevelFlushable.
402 : FlushableIndex int
403 : // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
404 : Level int
405 : // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
406 : Sublevel int
407 : }
408 :
409 : // scanInternalIterator is an iterator that returns all internal keys, including
410 : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
411 : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
412 : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
413 : // one with the higher sequence is returned. Useful if an external user of Pebble
414 : // needs to observe and rebuild Pebble's history of internal keys, such as in
415 : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
416 : //
417 : // scanInternalIterator is expected to ignore point keys deleted by range
418 : // deletions, and range keys shadowed by a range key unset or delete, however it
419 : // *must* return the range delete as well as the range key unset/delete that did
420 : // the shadowing.
421 : type scanInternalIterator struct {
422 : ctx context.Context
423 : db *DB
424 : opts scanInternalOptions
425 : comparer *base.Comparer
426 : merge Merge
427 : iter internalIterator
428 : readState *readState
429 : version *version
430 : rangeKey *iteratorRangeKeyState
431 : pointKeyIter internalIterator
432 : iterKV *base.InternalKV
433 : alloc *iterAlloc
434 : newIters tableNewIters
435 : newIterRangeKey keyspanimpl.TableNewSpanIter
436 : seqNum base.SeqNum
437 : iterLevels []IteratorLevel
438 : mergingIter *mergingIter
439 : blobValueFetcher blob.ValueFetcher
440 :
441 : // boundsBuf holds two buffers used to store the lower and upper bounds.
442 : // Whenever the InternalIterator's bounds change, the new bounds are copied
443 : // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
444 : // allocations. opts.LowerBound and opts.UpperBound point into this slice.
445 : boundsBuf [2][]byte
446 : boundsBufIdx int
447 : }
448 :
449 : // truncateExternalFile truncates an External file's [SmallestUserKey,
450 : // LargestUserKey] fields to [lower, upper). A ExternalFile is
451 : // produced that is suitable for external consumption by other Pebble
452 : // instances.
453 : //
454 : // truncateSharedFile reads the file to try to create the smallest
455 : // possible bounds. Here, we blindly truncate them. This may mean we
456 : // include this SST in iterations it isn't really needed in. Since we
457 : // don't expect External files to be long-lived in the pebble
458 : // instance, We think this is OK.
459 : //
460 : // TODO(ssd) 2024-01-26: Potentially de-duplicate with
461 : // truncateSharedFile.
462 : func (d *DB) truncateExternalFile(
463 : ctx context.Context,
464 : lower, upper []byte,
465 : level int,
466 : file *tableMetadata,
467 : objMeta objstorage.ObjectMetadata,
468 1 : ) (*ExternalFile, error) {
469 1 : cmp := d.cmp
470 1 : sst := &ExternalFile{
471 1 : Level: uint8(level),
472 1 : ObjName: objMeta.Remote.CustomObjectName,
473 1 : Locator: objMeta.Remote.Locator,
474 1 : HasPointKey: file.HasPointKeys,
475 1 : HasRangeKey: file.HasRangeKeys,
476 1 : Size: file.Size,
477 1 : SyntheticPrefix: slices.Clone(file.SyntheticPrefixAndSuffix.Prefix()),
478 1 : SyntheticSuffix: slices.Clone(file.SyntheticPrefixAndSuffix.Suffix()),
479 1 : }
480 1 :
481 1 : needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
482 1 : if needsLowerTruncate {
483 1 : sst.StartKey = slices.Clone(lower)
484 1 : } else {
485 1 : sst.StartKey = slices.Clone(file.Smallest().UserKey)
486 1 : }
487 :
488 1 : cmpUpper := cmp(upper, file.Largest().UserKey)
489 1 : needsUpperTruncate := cmpUpper < 0
490 1 : if needsUpperTruncate {
491 0 : sst.EndKey = slices.Clone(upper)
492 0 : sst.EndKeyIsInclusive = false
493 1 : } else {
494 1 : sst.EndKey = slices.Clone(file.Largest().UserKey)
495 1 : sst.EndKeyIsInclusive = !file.Largest().IsExclusiveSentinel()
496 1 : }
497 :
498 1 : if cmp(sst.StartKey, sst.EndKey) > 0 {
499 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
500 0 : }
501 :
502 1 : if cmp(sst.StartKey, sst.EndKey) == 0 && !sst.EndKeyIsInclusive {
503 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
504 0 : }
505 :
506 1 : return sst, nil
507 : }
508 :
509 : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
510 : // [lower, upper), potentially opening iterators on the file to find keys within
511 : // the requested bounds. A SharedSSTMeta is produced that is suitable for
512 : // external consumption by other Pebble instances. If shouldSkip is true, this
513 : // file does not contain any keys in [lower, upper) and can be skipped.
514 : //
515 : // TODO(bilal): If opening iterators and doing reads in this method is too
516 : // inefficient, consider producing non-tight file bounds instead.
517 : func (d *DB) truncateSharedFile(
518 : ctx context.Context,
519 : lower, upper []byte,
520 : level int,
521 : file *tableMetadata,
522 : objMeta objstorage.ObjectMetadata,
523 1 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
524 1 : cmp := d.cmp
525 1 : sst = &SharedSSTMeta{}
526 1 : sst.cloneFromFileMeta(file)
527 1 : sst.Level = uint8(level)
528 1 : sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
529 1 : if err != nil {
530 0 : return nil, false, err
531 0 : }
532 1 : needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
533 1 : needsUpperTruncate := cmp(upper, file.Largest().UserKey) < 0 || (cmp(upper, file.Largest().UserKey) == 0 && !file.Largest().IsExclusiveSentinel())
534 1 : // Fast path: file is entirely within [lower, upper).
535 1 : if !needsLowerTruncate && !needsUpperTruncate {
536 1 : return sst, false, nil
537 1 : }
538 :
539 : // We will need to truncate file bounds in at least one direction. Open all
540 : // relevant iterators.
541 1 : iters, err := d.newIters(ctx, file, &IterOptions{
542 1 : LowerBound: lower,
543 1 : UpperBound: upper,
544 1 : layer: manifest.Level(level),
545 1 : }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
546 1 : if err != nil {
547 0 : return nil, false, err
548 0 : }
549 1 : defer func() { _ = iters.CloseAll() }()
550 1 : iter := iters.point
551 1 : rangeDelIter := iters.rangeDeletion
552 1 : rangeKeyIter := iters.rangeKey
553 1 : if rangeDelIter != nil {
554 1 : rangeDelIter = keyspan.Truncate(cmp, rangeDelIter, base.UserKeyBoundsEndExclusive(lower, upper))
555 1 : }
556 1 : if rangeKeyIter != nil {
557 1 : rangeKeyIter = keyspan.Truncate(cmp, rangeKeyIter, base.UserKeyBoundsEndExclusive(lower, upper))
558 1 : }
559 : // Check if we need to truncate on the left side. This means finding a new
560 : // LargestPointKey and LargestRangeKey that is >= lower.
561 1 : if needsLowerTruncate {
562 1 : sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
563 1 : sst.SmallestPointKey.Trailer = 0
564 1 : kv := iter.SeekGE(lower, base.SeekGEFlagsNone)
565 1 : foundPointKey := kv != nil
566 1 : if kv != nil {
567 1 : sst.SmallestPointKey.CopyFrom(kv.K)
568 1 : }
569 1 : if rangeDelIter != nil {
570 1 : if span, err := rangeDelIter.SeekGE(lower); err != nil {
571 0 : return nil, false, err
572 1 : } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
573 1 : sst.SmallestPointKey.CopyFrom(span.SmallestKey())
574 1 : foundPointKey = true
575 1 : }
576 : }
577 1 : if !foundPointKey {
578 1 : // There are no point keys in the span we're interested in.
579 1 : sst.SmallestPointKey = InternalKey{}
580 1 : sst.LargestPointKey = InternalKey{}
581 1 : }
582 1 : sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
583 1 : sst.SmallestRangeKey.Trailer = 0
584 1 : if rangeKeyIter != nil {
585 1 : span, err := rangeKeyIter.SeekGE(lower)
586 1 : switch {
587 0 : case err != nil:
588 0 : return nil, false, err
589 1 : case span != nil:
590 1 : sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
591 1 : default:
592 1 : // There are no range keys in the span we're interested in.
593 1 : sst.SmallestRangeKey = InternalKey{}
594 1 : sst.LargestRangeKey = InternalKey{}
595 : }
596 : }
597 : }
598 : // Check if we need to truncate on the right side. This means finding a new
599 : // LargestPointKey and LargestRangeKey that is < upper.
600 1 : if needsUpperTruncate {
601 1 : sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
602 1 : sst.LargestPointKey.Trailer = 0
603 1 : kv := iter.SeekLT(upper, base.SeekLTFlagsNone)
604 1 : foundPointKey := kv != nil
605 1 : if kv != nil {
606 1 : sst.LargestPointKey.CopyFrom(kv.K)
607 1 : }
608 1 : if rangeDelIter != nil {
609 1 : if span, err := rangeDelIter.SeekLT(upper); err != nil {
610 0 : return nil, false, err
611 1 : } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
612 1 : sst.LargestPointKey.CopyFrom(span.LargestKey())
613 1 : foundPointKey = true
614 1 : }
615 : }
616 1 : if !foundPointKey {
617 1 : // There are no point keys in the span we're interested in.
618 1 : sst.SmallestPointKey = InternalKey{}
619 1 : sst.LargestPointKey = InternalKey{}
620 1 : }
621 1 : sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
622 1 : sst.LargestRangeKey.Trailer = 0
623 1 : if rangeKeyIter != nil {
624 1 : span, err := rangeKeyIter.SeekLT(upper)
625 1 : switch {
626 0 : case err != nil:
627 0 : return nil, false, err
628 1 : case span != nil:
629 1 : sst.LargestRangeKey.CopyFrom(span.LargestKey())
630 1 : default:
631 1 : // There are no range keys in the span we're interested in.
632 1 : sst.SmallestRangeKey = InternalKey{}
633 1 : sst.LargestRangeKey = InternalKey{}
634 : }
635 : }
636 : }
637 : // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
638 1 : switch {
639 1 : case len(sst.SmallestRangeKey.UserKey) == 0:
640 1 : sst.Smallest = sst.SmallestPointKey
641 1 : case len(sst.SmallestPointKey.UserKey) == 0:
642 1 : sst.Smallest = sst.SmallestRangeKey
643 1 : default:
644 1 : sst.Smallest = sst.SmallestPointKey
645 1 : if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
646 1 : sst.Smallest = sst.SmallestRangeKey
647 1 : }
648 : }
649 1 : switch {
650 1 : case len(sst.LargestRangeKey.UserKey) == 0:
651 1 : sst.Largest = sst.LargestPointKey
652 1 : case len(sst.LargestPointKey.UserKey) == 0:
653 1 : sst.Largest = sst.LargestRangeKey
654 1 : default:
655 1 : sst.Largest = sst.LargestPointKey
656 1 : if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
657 1 : sst.Largest = sst.LargestRangeKey
658 1 : }
659 : }
660 : // On rare occasion, a file might overlap with [lower, upper) but not actually
661 : // have any keys within those bounds. Skip such files.
662 1 : if len(sst.Smallest.UserKey) == 0 {
663 1 : return nil, true, nil
664 1 : }
665 1 : sst.Size, err = d.fileCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
666 1 : if err != nil {
667 0 : return nil, false, err
668 0 : }
669 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
670 : // can cause panics in places where we divide by file sizes. Correct for it
671 : // here.
672 1 : if sst.Size == 0 {
673 1 : sst.Size = 1
674 1 : }
675 1 : return sst, false, nil
676 : }
677 :
678 : func scanInternalImpl(
679 : ctx context.Context, lower, upper []byte, iter *scanInternalIterator, opts *scanInternalOptions,
680 1 : ) error {
681 1 : if opts.visitSharedFile != nil && (lower == nil || upper == nil) {
682 0 : panic("lower and upper bounds must be specified in skip-shared iteration mode")
683 : }
684 1 : if opts.visitSharedFile != nil && opts.visitExternalFile != nil {
685 0 : return base.AssertionFailedf("cannot provide both a shared-file and external-file visitor")
686 0 : }
687 :
688 : // Before starting iteration, check if any files in levels sharedLevelsStart
689 : // and below are *not* shared. Error out if that is the case, as skip-shared
690 : // iteration will not produce a consistent point-in-time view of this range
691 : // of keys. For files that are shared, call visitSharedFile with a truncated
692 : // version of that file.
693 1 : cmp := iter.comparer.Compare
694 1 : provider := iter.db.ObjProvider()
695 1 : seqNum := iter.seqNum
696 1 : current := iter.version
697 1 : if current == nil {
698 1 : current = iter.readState.current
699 1 : }
700 :
701 1 : if opts.visitSharedFile != nil || opts.visitExternalFile != nil {
702 1 : if provider == nil {
703 0 : panic("expected non-nil Provider in skip-shared iteration mode")
704 : }
705 :
706 1 : firstLevelWithRemote := opts.skipLevelForOpts()
707 1 : for level := firstLevelWithRemote; level < numLevels; level++ {
708 1 : files := current.Levels[level].Iter()
709 1 : for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest().UserKey, upper) < 0; f = files.Next() {
710 1 : if cmp(lower, f.Largest().UserKey) == 0 && f.Largest().IsExclusiveSentinel() {
711 0 : continue
712 : }
713 :
714 1 : var objMeta objstorage.ObjectMetadata
715 1 : var err error
716 1 : objMeta, err = provider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
717 1 : if err != nil {
718 0 : return err
719 0 : }
720 :
721 : // We allow a mix of files at the first level.
722 1 : if level != firstLevelWithRemote {
723 1 : if !objMeta.IsShared() && !objMeta.IsExternal() {
724 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
725 0 : }
726 : }
727 :
728 1 : if objMeta.IsShared() && opts.visitSharedFile == nil {
729 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "shared file is present but no shared file visitor is defined")
730 0 : }
731 :
732 1 : if objMeta.IsExternal() && opts.visitExternalFile == nil {
733 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "external file is present but no external file visitor is defined")
734 1 : }
735 :
736 1 : if !base.Visible(f.LargestSeqNum, seqNum, base.SeqNumMax) {
737 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
738 1 : }
739 :
740 1 : if level != firstLevelWithRemote && (!objMeta.IsShared() && !objMeta.IsExternal()) {
741 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
742 0 : }
743 :
744 1 : if objMeta.IsShared() {
745 1 : var sst *SharedSSTMeta
746 1 : var skip bool
747 1 : sst, skip, err = iter.db.truncateSharedFile(ctx, lower, upper, level, f, objMeta)
748 1 : if err != nil {
749 0 : return err
750 0 : }
751 1 : if skip {
752 1 : continue
753 : }
754 1 : if err = opts.visitSharedFile(sst); err != nil {
755 0 : return err
756 0 : }
757 1 : } else if objMeta.IsExternal() {
758 1 : sst, err := iter.db.truncateExternalFile(ctx, lower, upper, level, f, objMeta)
759 1 : if err != nil {
760 0 : return err
761 0 : }
762 1 : if err := opts.visitExternalFile(sst); err != nil {
763 0 : return err
764 0 : }
765 : }
766 :
767 : }
768 : }
769 : }
770 :
771 1 : for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
772 1 : key := iter.unsafeKey()
773 1 :
774 1 : if opts.rateLimitFunc != nil {
775 0 : if err := opts.rateLimitFunc(key, iter.lazyValue()); err != nil {
776 0 : return err
777 0 : }
778 : }
779 :
780 1 : switch key.Kind() {
781 1 : case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
782 1 : if opts.visitRangeKey != nil {
783 1 : span := iter.unsafeSpan()
784 1 : // NB: The caller isn't interested in the sequence numbers of these
785 1 : // range keys. Rather, the caller wants them to be in trailer order
786 1 : // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
787 1 : // call visitRangeKey.
788 1 : keysCopy := make([]keyspan.Key, len(span.Keys))
789 1 : for i := range span.Keys {
790 1 : keysCopy[i].CopyFrom(span.Keys[i])
791 1 : keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
792 1 : }
793 1 : keyspan.SortKeysByTrailer(keysCopy)
794 1 : if err := opts.visitRangeKey(span.Start, span.End, keysCopy); err != nil {
795 0 : return err
796 0 : }
797 : }
798 1 : case InternalKeyKindRangeDelete:
799 1 : if opts.visitRangeDel != nil {
800 1 : rangeDel := iter.unsafeRangeDel()
801 1 : if err := opts.visitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
802 0 : return err
803 0 : }
804 : }
805 1 : default:
806 1 : if opts.visitPointKey != nil {
807 1 : var info IteratorLevel
808 1 : if len(iter.mergingIter.heap.items) > 0 {
809 1 : mergingIterIdx := iter.mergingIter.heap.items[0].index
810 1 : info = iter.iterLevels[mergingIterIdx]
811 1 : } else {
812 0 : info = IteratorLevel{Kind: IteratorLevelUnknown}
813 0 : }
814 1 : val := iter.lazyValue()
815 1 : if err := opts.visitPointKey(key, val, info); err != nil {
816 0 : return err
817 0 : }
818 : }
819 : }
820 : }
821 :
822 1 : return nil
823 : }
824 :
825 1 : func (opts *scanInternalOptions) skipLevelForOpts() int {
826 1 : if opts.visitSharedFile != nil {
827 1 : return sharedLevelsStart
828 1 : }
829 1 : if opts.visitExternalFile != nil {
830 1 : return externalSkipStart
831 1 : }
832 1 : return numLevels
833 : }
834 :
835 : // constructPointIter constructs a merging iterator and sets i.iter to it.
836 : func (i *scanInternalIterator) constructPointIter(
837 : category block.Category, memtables flushableList, buf *iterAlloc,
838 1 : ) error {
839 1 : // Merging levels and levels from iterAlloc.
840 1 : mlevels := buf.mlevels[:0]
841 1 : levels := buf.levels[:0]
842 1 :
843 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
844 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
845 1 : // should improve the performance.
846 1 : numMergingLevels := len(memtables)
847 1 : numLevelIters := 0
848 1 :
849 1 : current := i.version
850 1 : if current == nil {
851 1 : current = i.readState.current
852 1 : }
853 1 : numMergingLevels += len(current.L0SublevelFiles)
854 1 : numLevelIters += len(current.L0SublevelFiles)
855 1 :
856 1 : skipStart := i.opts.skipLevelForOpts()
857 1 : for level := 1; level < len(current.Levels); level++ {
858 1 : if current.Levels[level].Empty() {
859 1 : continue
860 : }
861 1 : if level > skipStart {
862 1 : continue
863 : }
864 1 : numMergingLevels++
865 1 : numLevelIters++
866 : }
867 :
868 1 : if numMergingLevels > cap(mlevels) {
869 0 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
870 0 : }
871 1 : if numLevelIters > cap(levels) {
872 0 : levels = make([]levelIter, 0, numLevelIters)
873 0 : }
874 : // TODO(bilal): Push these into the iterAlloc buf.
875 1 : var rangeDelMiter keyspanimpl.MergingIter
876 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
877 1 : rangeDelLevels := make([]keyspanimpl.LevelIter, 0, numLevelIters)
878 1 :
879 1 : i.iterLevels = make([]IteratorLevel, numMergingLevels)
880 1 : mlevelsIndex := 0
881 1 :
882 1 : // Next are the memtables.
883 1 : for j := len(memtables) - 1; j >= 0; j-- {
884 1 : mem := memtables[j]
885 1 : mlevels = append(mlevels, mergingIterLevel{
886 1 : iter: mem.newIter(&i.opts.IterOptions),
887 1 : })
888 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
889 1 : Kind: IteratorLevelFlushable,
890 1 : FlushableIndex: j,
891 1 : }
892 1 : mlevelsIndex++
893 1 : if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
894 1 : rangeDelIters = append(rangeDelIters, rdi)
895 1 : }
896 : }
897 :
898 : // Next are the file levels: L0 sub-levels followed by lower levels.
899 1 : levelsIndex := len(levels)
900 1 : mlevels = mlevels[:numMergingLevels]
901 1 : levels = levels[:numLevelIters]
902 1 : rangeDelLevels = rangeDelLevels[:numLevelIters]
903 1 : i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
904 1 : i.opts.IterOptions.Category = category
905 1 :
906 1 : internalOpts := internalIterOpts{
907 1 : blobValueFetcher: &i.blobValueFetcher,
908 1 : }
909 1 :
910 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
911 1 : li := &levels[levelsIndex]
912 1 : rli := &rangeDelLevels[levelsIndex]
913 1 :
914 1 : li.init(i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level, internalOpts)
915 1 : mlevels[mlevelsIndex].iter = li
916 1 : rli.Init(i.ctx, keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
917 1 : i.comparer.Compare, tableNewRangeDelIter(i.newIters), files, level,
918 1 : manifest.KeyTypePoint)
919 1 : rangeDelIters = append(rangeDelIters, rli)
920 1 :
921 1 : levelsIndex++
922 1 : mlevelsIndex++
923 1 : }
924 :
925 1 : for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
926 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
927 1 : Kind: IteratorLevelLSM,
928 1 : Level: 0,
929 1 : Sublevel: j,
930 1 : }
931 1 : addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
932 1 : }
933 : // Add level iterators for the non-empty non-L0 levels.
934 1 : for level := 1; level < numLevels; level++ {
935 1 : if current.Levels[level].Empty() {
936 1 : continue
937 : }
938 :
939 1 : if level > skipStart {
940 1 : continue
941 : }
942 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
943 1 : levIter := current.Levels[level].Iter()
944 1 : if level == skipStart {
945 1 : nonRemoteFiles := make([]*manifest.TableMetadata, 0)
946 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
947 1 : meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
948 1 : if err != nil {
949 0 : return err
950 0 : }
951 1 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
952 1 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
953 1 : // Skip this file.
954 1 : continue
955 : }
956 1 : nonRemoteFiles = append(nonRemoteFiles, f)
957 : }
958 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
959 1 : levIter = levSlice.Iter()
960 : }
961 :
962 1 : addLevelIterForFiles(levIter, manifest.Level(level))
963 : }
964 :
965 1 : buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
966 1 : buf.merging.snapshot = i.seqNum
967 1 : rangeDelMiter.Init(i.comparer, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
968 1 :
969 1 : if i.opts.includeObsoleteKeys {
970 1 : iiter := &keyspan.InterleavingIter{}
971 1 : iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
972 1 : keyspan.InterleavingIterOpts{
973 1 : LowerBound: i.opts.LowerBound,
974 1 : UpperBound: i.opts.UpperBound,
975 1 : })
976 1 : i.pointKeyIter = iiter
977 1 : } else {
978 1 : pcIter := &pointCollapsingIterator{
979 1 : comparer: i.comparer,
980 1 : merge: i.merge,
981 1 : seqNum: i.seqNum,
982 1 : }
983 1 : pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
984 1 : LowerBound: i.opts.LowerBound,
985 1 : UpperBound: i.opts.UpperBound,
986 1 : })
987 1 : i.pointKeyIter = pcIter
988 1 : }
989 1 : i.iter = i.pointKeyIter
990 1 : return nil
991 : }
992 :
993 : // constructRangeKeyIter constructs the range-key iterator stack, populating
994 : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
995 : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
996 : // iterConfig does *not* elide unsets/deletes.
997 1 : func (i *scanInternalIterator) constructRangeKeyIter() error {
998 1 : // We want the bounded iter from iterConfig, but not the collapsing of
999 1 : // RangeKeyUnsets and RangeKeyDels.
1000 1 : i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
1001 1 : i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
1002 1 : nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
1003 1 : &i.rangeKey.rangeKeyBuffers.internal)
1004 1 :
1005 1 : // Next are the flushables: memtables and large batches.
1006 1 : if i.readState != nil {
1007 1 : for j := len(i.readState.memtables) - 1; j >= 0; j-- {
1008 1 : mem := i.readState.memtables[j]
1009 1 : // We only need to read from memtables which contain sequence numbers older
1010 1 : // than seqNum.
1011 1 : if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
1012 1 : continue
1013 : }
1014 1 : if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
1015 1 : i.rangeKey.iterConfig.AddLevel(rki)
1016 1 : }
1017 : }
1018 : }
1019 :
1020 1 : current := i.version
1021 1 : if current == nil {
1022 1 : current = i.readState.current
1023 1 : }
1024 : // Next are the file levels: L0 sub-levels followed by lower levels.
1025 : //
1026 : // Add file-specific iterators for L0 files containing range keys. This is less
1027 : // efficient than using levelIters for sublevels of L0 files containing
1028 : // range keys, but range keys are expected to be sparse anyway, reducing the
1029 : // cost benefit of maintaining a separate L0Sublevels instance for range key
1030 : // files and then using it here.
1031 : //
1032 : // NB: We iterate L0's files in reverse order. They're sorted by
1033 : // LargestSeqNum ascending, and we need to add them to the merging iterator
1034 : // in LargestSeqNum descending to preserve the merging iterator's invariants
1035 : // around Key InternalKeyTrailer order.
1036 1 : iter := current.RangeKeyLevels[0].Iter()
1037 1 : for f := iter.Last(); f != nil; f = iter.Prev() {
1038 1 : spanIter, err := i.newIterRangeKey(i.ctx, f, i.opts.SpanIterOptions())
1039 1 : if err != nil {
1040 0 : return err
1041 0 : }
1042 1 : i.rangeKey.iterConfig.AddLevel(spanIter)
1043 : }
1044 : // Add level iterators for the non-empty non-L0 levels.
1045 1 : skipStart := i.opts.skipLevelForOpts()
1046 1 : for level := 1; level < len(current.RangeKeyLevels); level++ {
1047 1 : if current.RangeKeyLevels[level].Empty() {
1048 1 : continue
1049 : }
1050 1 : if level > skipStart {
1051 1 : continue
1052 : }
1053 1 : li := i.rangeKey.iterConfig.NewLevelIter()
1054 1 : spanIterOpts := i.opts.SpanIterOptions()
1055 1 : levIter := current.RangeKeyLevels[level].Iter()
1056 1 : if level == skipStart {
1057 1 : nonRemoteFiles := make([]*manifest.TableMetadata, 0)
1058 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
1059 1 : meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
1060 1 : if err != nil {
1061 0 : return err
1062 0 : }
1063 1 : if (meta.IsShared() && i.opts.visitSharedFile != nil) ||
1064 1 : (meta.IsExternal() && i.opts.visitExternalFile != nil) {
1065 1 : // Skip this file.
1066 1 : continue
1067 : }
1068 0 : nonRemoteFiles = append(nonRemoteFiles, f)
1069 : }
1070 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
1071 1 : levIter = levSlice.Iter()
1072 : }
1073 1 : li.Init(i.ctx, spanIterOpts, i.comparer.Compare, i.newIterRangeKey, levIter,
1074 1 : manifest.Level(level), manifest.KeyTypeRange)
1075 1 : i.rangeKey.iterConfig.AddLevel(li)
1076 : }
1077 1 : return nil
1078 : }
1079 :
1080 : // seekGE seeks this iterator to the first key that's greater than or equal
1081 : // to the specified user key.
1082 1 : func (i *scanInternalIterator) seekGE(key []byte) bool {
1083 1 : i.iterKV = i.iter.SeekGE(key, base.SeekGEFlagsNone)
1084 1 : return i.iterKV != nil
1085 1 : }
1086 :
1087 : // unsafeKey returns the unsafe InternalKey at the current position. The value
1088 : // is nil if the iterator is invalid or exhausted.
1089 1 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
1090 1 : return &i.iterKV.K
1091 1 : }
1092 :
1093 : // lazyValue returns a value pointer to the value at the current iterator
1094 : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
1095 : // kind key.
1096 1 : func (i *scanInternalIterator) lazyValue() LazyValue {
1097 1 : return i.iterKV.LazyValue()
1098 1 : }
1099 :
1100 : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
1101 : // a non-rangedel kind.
1102 1 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
1103 1 : type spanInternalIterator interface {
1104 1 : Span() *keyspan.Span
1105 1 : }
1106 1 : return i.pointKeyIter.(spanInternalIterator).Span()
1107 1 : }
1108 :
1109 : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
1110 : // a non-rangekey type.
1111 1 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
1112 1 : return i.rangeKey.iiter.Span()
1113 1 : }
1114 :
1115 : // next advances the iterator in the forward direction, and returns the
1116 : // iterator's new validity state.
1117 1 : func (i *scanInternalIterator) next() bool {
1118 1 : i.iterKV = i.iter.Next()
1119 1 : return i.iterKV != nil
1120 1 : }
1121 :
1122 : // error returns an error from the internal iterator, if there's any.
1123 1 : func (i *scanInternalIterator) error() error {
1124 1 : return i.iter.Error()
1125 1 : }
1126 :
1127 : // close closes this iterator, and releases any pooled objects.
1128 1 : func (i *scanInternalIterator) close() {
1129 1 : _ = i.iter.Close()
1130 1 : if i.readState != nil {
1131 1 : i.readState.unref()
1132 1 : }
1133 1 : if i.version != nil {
1134 1 : i.version.Unref()
1135 1 : }
1136 1 : if i.rangeKey != nil {
1137 1 : i.rangeKey.PrepareForReuse()
1138 1 : *i.rangeKey = iteratorRangeKeyState{
1139 1 : rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
1140 1 : }
1141 1 : iterRangeKeyStateAllocPool.Put(i.rangeKey)
1142 1 : i.rangeKey = nil
1143 1 : }
1144 1 : _ = i.blobValueFetcher.Close()
1145 1 : if alloc := i.alloc; alloc != nil {
1146 1 : for j := range i.boundsBuf {
1147 1 : if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
1148 0 : alloc.boundsBuf[j] = nil
1149 1 : } else {
1150 1 : alloc.boundsBuf[j] = i.boundsBuf[j]
1151 1 : }
1152 : }
1153 1 : *alloc = iterAlloc{
1154 1 : keyBuf: alloc.keyBuf[:0],
1155 1 : boundsBuf: alloc.boundsBuf,
1156 1 : prefixOrFullSeekKey: alloc.prefixOrFullSeekKey[:0],
1157 1 : }
1158 1 : iterAllocPool.Put(alloc)
1159 1 : i.alloc = nil
1160 : }
1161 : }
1162 :
1163 1 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
1164 1 : buf := i.boundsBuf[i.boundsBufIdx][:0]
1165 1 : if lower != nil {
1166 1 : buf = append(buf, lower...)
1167 1 : i.opts.LowerBound = buf
1168 1 : } else {
1169 1 : i.opts.LowerBound = nil
1170 1 : }
1171 1 : if upper != nil {
1172 1 : buf = append(buf, upper...)
1173 1 : i.opts.UpperBound = buf[len(buf)-len(upper):]
1174 1 : } else {
1175 1 : i.opts.UpperBound = nil
1176 1 : }
1177 1 : i.boundsBuf[i.boundsBufIdx] = buf
1178 1 : i.boundsBufIdx = 1 - i.boundsBufIdx
1179 : }
|