Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 : "sync"
12 :
13 : "github.com/cockroachdb/errors"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/invariants"
16 : "github.com/cockroachdb/pebble/internal/keyspan"
17 : "github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
18 : "github.com/cockroachdb/pebble/internal/manifest"
19 : "github.com/cockroachdb/pebble/internal/treeprinter"
20 : "github.com/cockroachdb/pebble/objstorage"
21 : "github.com/cockroachdb/pebble/objstorage/remote"
22 : "github.com/cockroachdb/pebble/sstable"
23 : "github.com/cockroachdb/pebble/sstable/blob"
24 : "github.com/cockroachdb/pebble/sstable/block"
25 : )
26 :
27 : const (
28 : // In skip-shared iteration mode, keys in levels greater than
29 : // sharedLevelsStart (i.e. lower in the LSM) are skipped. Keys
30 : // in sharedLevelsStart are returned iff they are not in a
31 : // shared file.
32 : sharedLevelsStart = remote.SharedLevelsStart
33 :
34 : // In skip-external iteration mode, keys in levels greater
35 : // than externalSkipStart are skipped. Keys in
36 : // externalSkipStart are returned iff they are not in an
37 : // external file.
38 : externalSkipStart = 6
39 : )
40 :
41 : // ErrInvalidSkipSharedIteration is returned by ScanInternal if it was called
42 : // with a shared file visitor function, and a file in a shareable level (i.e.
43 : // level >= sharedLevelsStart) was found to not be in shared storage according
44 : // to objstorage.Provider, or not shareable for another reason such as for
45 : // containing keys newer than the snapshot sequence number.
46 : var ErrInvalidSkipSharedIteration = errors.New("pebble: cannot use skip-shared iteration due to non-shareable files in lower levels")
47 :
48 : // SharedSSTMeta represents an sstable on shared storage that can be ingested
49 : // by another pebble instance. This struct must contain all fields that are
50 : // required for a Pebble instance to ingest a foreign sstable on shared storage,
51 : // including constructing any relevant objstorage.Provider / remoteobjcat.Catalog
52 : // data structures, as well as creating virtual TableMetadatas.
53 : //
54 : // Note that the Pebble instance creating and returning a SharedSSTMeta might
55 : // not be the one that created the underlying sstable on shared storage to begin
56 : // with; it's possible for a Pebble instance to reshare an sstable that was
57 : // shared to it.
58 : type SharedSSTMeta struct {
59 : // Backing is the shared object underlying this SST. Can be attached to an
60 : // objstorage.Provider.
61 : Backing objstorage.RemoteObjectBackingHandle
62 :
63 : // Smallest and Largest internal keys for the overall bounds. The kind and
64 : // SeqNum of these will reflect what is physically present on the source Pebble
65 : // instance's view of the sstable; it's up to the ingesting instance to set the
66 : // sequence number in the trailer to match the read-time sequence numbers
67 : // reserved for the level this SST is being ingested into. The Kind is expected
68 : // to remain unchanged by the ingesting instance.
69 : //
70 : // Note that these bounds could be narrower than the bounds of the underlying
71 : // sstable; ScanInternal is expected to truncate sstable bounds to the user key
72 : // bounds passed into that method.
73 : Smallest, Largest InternalKey
74 :
75 : // SmallestRangeKey and LargestRangeKey are internal keys that denote the
76 : // range key bounds of this sstable. Must lie within [Smallest, Largest].
77 : SmallestRangeKey, LargestRangeKey InternalKey
78 :
79 : // SmallestPointKey and LargestPointKey are internal keys that denote the
80 : // point key bounds of this sstable. Must lie within [Smallest, Largest].
81 : SmallestPointKey, LargestPointKey InternalKey
82 :
83 : // Level denotes the level at which this file was present at read time.
84 : // For files visited by ScanInternal, this value will only be 5 or 6.
85 : Level uint8
86 :
87 : // Size contains an estimate of the size of this sstable.
88 : Size uint64
89 :
90 : // tableNum at time of creation in the creator instance. Only used for
91 : // debugging/tests.
92 : tableNum base.TableNum
93 : }
94 :
95 1 : func (s *SharedSSTMeta) cloneFromFileMeta(f *manifest.TableMetadata) {
96 1 : *s = SharedSSTMeta{
97 1 : Smallest: f.Smallest().Clone(),
98 1 : Largest: f.Largest().Clone(),
99 1 : SmallestPointKey: f.PointKeyBounds.Smallest().Clone(),
100 1 : LargestPointKey: f.PointKeyBounds.Largest().Clone(),
101 1 : Size: f.Size,
102 1 : tableNum: f.TableNum,
103 1 : }
104 1 : if f.HasRangeKeys {
105 1 : s.SmallestRangeKey = f.RangeKeyBounds.Smallest().Clone()
106 1 : s.LargestRangeKey = f.RangeKeyBounds.Largest().Clone()
107 1 : }
108 : }
109 :
110 : // ScanInternal scans all internal keys within the specified bounds, truncating
111 : // any rangedels and rangekeys to those bounds if they span past them. For use
112 : // when an external user needs to be aware of all internal keys that make up a
113 : // key range.
114 : //
115 : // Keys deleted by range deletions must not be returned or exposed by this
116 : // method, while the range deletion deleting that key must be exposed using
117 : // visitRangeDel. Keys that would be masked by range key masking (if an
118 : // appropriate prefix were set) should be exposed, alongside the range key
119 : // that would have masked it. This method also collapses all point keys into
120 : // one InternalKey; so only one internal key at most per user key is returned
121 : // to visitPointKey.
122 : //
123 : // If visitSharedFile is not nil, ScanInternal iterates in skip-shared iteration
124 : // mode. In this iteration mode, sstables in levels L5 and L6 are skipped, and
125 : // their metadatas truncated to [lower, upper) and passed into visitSharedFile.
126 : // ErrInvalidSkipSharedIteration is returned if visitSharedFile is not nil and an
127 : // sstable in L5 or L6 is found that is not in shared storage according to
128 : // provider.IsShared, or an sstable in those levels contains a newer key than the
129 : // snapshot sequence number (only applicable for snapshot.ScanInternal). Examples
130 : // of when this could happen could be if Pebble started writing sstables before a
131 : // creator ID was set (as creator IDs are necessary to enable shared storage)
132 : // resulting in some lower level SSTs being on non-shared storage. Skip-shared
133 : // iteration is invalid in those cases.
134 1 : func (d *DB) ScanInternal(ctx context.Context, opts ScanInternalOptions) (err error) {
135 1 : var iter *scanInternalIterator
136 1 : iter, err = d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, &opts)
137 1 : if err != nil {
138 0 : return err
139 0 : }
140 1 : defer func() { err = errors.CombineErrors(err, iter.Close()) }()
141 1 : return scanInternalImpl(ctx, iter, &opts)
142 : }
143 :
144 : // newInternalIter constructs and returns a new scanInternalIterator on this db.
145 : // If o.skipSharedLevels is true, levels below sharedLevelsStart are *not* added
146 : // to the internal iterator.
147 : //
148 : // TODO(bilal): This method has a lot of similarities with db.newIter as well as
149 : // finishInitializingIter. Both pairs of methods should be refactored to reduce
150 : // this duplication.
151 : func (d *DB) newInternalIter(
152 : ctx context.Context, sOpts snapshotIterOpts, o *ScanInternalOptions,
153 1 : ) (*scanInternalIterator, error) {
154 1 : if err := d.closed.Load(); err != nil {
155 0 : panic(err)
156 : }
157 : // Grab and reference the current readState. This prevents the underlying
158 : // files in the associated version from being deleted if there is a current
159 : // compaction. The readState is unref'd by Iterator.Close().
160 1 : var readState *readState
161 1 : var vers *manifest.Version
162 1 : if sOpts.vers == nil {
163 1 : if sOpts.readState != nil {
164 0 : readState = sOpts.readState
165 0 : readState.ref()
166 0 : vers = readState.current
167 1 : } else {
168 1 : readState = d.loadReadState()
169 1 : vers = readState.current
170 1 : }
171 1 : } else {
172 1 : vers = sOpts.vers
173 1 : sOpts.vers.Ref()
174 1 : }
175 :
176 : // Determine the seqnum to read at after grabbing the read state (current and
177 : // memtables) above.
178 1 : seqNum := sOpts.seqNum
179 1 : if seqNum == 0 {
180 1 : seqNum = d.mu.versions.visibleSeqNum.Load()
181 1 : }
182 :
183 : // Bundle various structures under a single umbrella in order to allocate
184 : // them together.
185 1 : buf := scanInternalIteratorIterAllocPool.Get().(*scanInternalIterAlloc)
186 1 : dbi := &buf.scanIter
187 1 : *dbi = scanInternalIterator{
188 1 : ctx: ctx,
189 1 : db: d,
190 1 : comparer: d.opts.Comparer,
191 1 : merge: d.opts.Merger.Merge,
192 1 : readState: readState,
193 1 : version: sOpts.vers,
194 1 : alloc: buf,
195 1 : newIters: d.newIters,
196 1 : newIterRangeKey: d.tableNewRangeKeyIter,
197 1 : seqNum: seqNum,
198 1 : mergingIter: &buf.iterAlloc.merging,
199 1 : }
200 1 : dbi.blobValueFetcher.Init(&vers.BlobFiles, d.fileCache, block.ReadEnv{},
201 1 : blob.SuggestedCachedReaders(vers.MaxReadAmp()))
202 1 :
203 1 : dbi.opts = *o
204 1 : dbi.opts.logger = d.opts.Logger
205 1 : if d.opts.private.disableLazyCombinedIteration {
206 0 : dbi.opts.disableLazyCombinedIteration = true
207 0 : }
208 1 : return finishInitializingInternalIter(buf, dbi)
209 : }
210 :
211 : type internalIterOpts struct {
212 : // if compaction is set, sstable-level iterators will be created using
213 : // NewCompactionIter; these iterators have a more constrained interface
214 : // and are optimized for the sequential scan of a compaction.
215 : compaction bool
216 : readEnv sstable.ReadEnv
217 : boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
218 : // blobValueFetcher is the base.ValueFetcher to use when constructing
219 : // internal values to represent values stored externally in blob files.
220 : blobValueFetcher base.ValueFetcher
221 : }
222 :
223 : func finishInitializingInternalIter(
224 : buf *scanInternalIterAlloc, i *scanInternalIterator,
225 1 : ) (*scanInternalIterator, error) {
226 1 : // Short-hand.
227 1 : var memtables flushableList
228 1 : if i.readState != nil {
229 1 : memtables = i.readState.memtables
230 1 : }
231 : // We only need to read from memtables which contain sequence numbers older
232 : // than seqNum. Trim off newer memtables.
233 1 : for j := len(memtables) - 1; j >= 0; j-- {
234 1 : if logSeqNum := memtables[j].logSeqNum; logSeqNum < i.seqNum {
235 1 : break
236 : }
237 1 : memtables = memtables[:j]
238 : }
239 1 : i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
240 1 :
241 1 : if err := i.constructPointIter(i.opts.Category, memtables, &buf.iterAlloc); err != nil {
242 0 : return nil, err
243 0 : }
244 :
245 : // For internal iterators, we skip the lazy combined iteration optimization
246 : // entirely, and create the range key iterator stack directly.
247 1 : i.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
248 1 : if err := i.constructRangeKeyIter(); err != nil {
249 0 : return nil, err
250 0 : }
251 :
252 : // Wrap the point iterator (currently i.iter) with an interleaving
253 : // iterator that interleaves range keys pulled from
254 : // i.rangeKey.rangeKeyIter.
255 1 : i.rangeKey.iiter.Init(i.comparer, i.iter, i.rangeKey.rangeKeyIter,
256 1 : keyspan.InterleavingIterOpts{
257 1 : LowerBound: i.opts.LowerBound,
258 1 : UpperBound: i.opts.UpperBound,
259 1 : })
260 1 : i.iter = &i.rangeKey.iiter
261 1 :
262 1 : return i, nil
263 : }
264 :
265 : type sharedByLevel []SharedSSTMeta
266 :
267 1 : func (s sharedByLevel) Len() int { return len(s) }
268 0 : func (s sharedByLevel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
269 1 : func (s sharedByLevel) Less(i, j int) bool { return s[i].Level < s[j].Level }
270 :
271 : type pcIterPos int
272 :
273 : const (
274 : pcIterPosCur pcIterPos = iota
275 : pcIterPosNext
276 : )
277 :
278 : // pointCollapsingIterator is an internalIterator that collapses point keys and
279 : // returns at most one point internal key for each user key. Merges and
280 : // SingleDels are not supported and result in a panic if encountered. Point keys
281 : // deleted by rangedels are considered shadowed and not exposed.
282 : //
283 : // Only used in ScanInternal to return at most one internal key per user key.
284 : type pointCollapsingIterator struct {
285 : iter keyspan.InterleavingIter
286 : pos pcIterPos
287 : comparer *base.Comparer
288 : merge base.Merge
289 : err error
290 : seqNum base.SeqNum
291 : // The current position of `iter`. Always owned by the underlying iter.
292 : iterKV *base.InternalKV
293 : // The last saved key. findNextEntry and similar methods are expected to save
294 : // the current value of iterKey to savedKey if they're iterating away from the
295 : // current key but still need to retain it. See comments in findNextEntry on
296 : // how this field is used.
297 : //
298 : // At the end of a positioning call:
299 : // - if pos == pcIterPosNext, iterKey is pointing to the next user key owned
300 : // by `iter` while savedKey is holding a copy to our current key.
301 : // - If pos == pcIterPosCur, iterKey is pointing to an `iter`-owned current
302 : // key, and savedKey is either undefined or pointing to a version of the
303 : // current key owned by this iterator (i.e. backed by savedKeyBuf).
304 : savedKey InternalKey
305 : savedKeyBuf []byte
306 : // If fixedSeqNum is non-zero, all emitted points are verified to have this
307 : // fixed sequence number.
308 : fixedSeqNum base.SeqNum
309 : }
310 :
311 1 : func (p *pointCollapsingIterator) Span() *keyspan.Span {
312 1 : return p.iter.Span()
313 1 : }
314 :
315 : // SeekPrefixGE implements the InternalIterator interface.
316 : func (p *pointCollapsingIterator) SeekPrefixGE(
317 : prefix, key []byte, flags base.SeekGEFlags,
318 0 : ) *base.InternalKV {
319 0 : p.resetKey()
320 0 : p.iterKV = p.iter.SeekPrefixGE(prefix, key, flags)
321 0 : p.pos = pcIterPosCur
322 0 : if p.iterKV == nil {
323 0 : return nil
324 0 : }
325 0 : return p.findNextEntry()
326 : }
327 :
328 : // SeekGE implements the InternalIterator interface.
329 1 : func (p *pointCollapsingIterator) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV {
330 1 : p.resetKey()
331 1 : p.iterKV = p.iter.SeekGE(key, flags)
332 1 : p.pos = pcIterPosCur
333 1 : if p.iterKV == nil {
334 1 : return nil
335 1 : }
336 1 : return p.findNextEntry()
337 : }
338 :
339 : // SeekLT implements the InternalIterator interface.
340 0 : func (p *pointCollapsingIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV {
341 0 : panic("unimplemented")
342 : }
343 :
344 1 : func (p *pointCollapsingIterator) resetKey() {
345 1 : p.savedKey.UserKey = p.savedKeyBuf[:0]
346 1 : p.savedKey.Trailer = 0
347 1 : p.iterKV = nil
348 1 : p.pos = pcIterPosCur
349 1 : }
350 :
351 1 : func (p *pointCollapsingIterator) verifySeqNum(kv *base.InternalKV) *base.InternalKV {
352 1 : if !invariants.Enabled {
353 0 : return kv
354 0 : }
355 1 : if p.fixedSeqNum == 0 || kv == nil || kv.Kind() == InternalKeyKindRangeDelete {
356 1 : return kv
357 1 : }
358 0 : if kv.SeqNum() != p.fixedSeqNum {
359 0 : panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, kv.SeqNum()))
360 : }
361 0 : return kv
362 : }
363 :
364 : // findNextEntry is called to return the next key. p.iter must be positioned at the
365 : // start of the first user key we are interested in.
366 1 : func (p *pointCollapsingIterator) findNextEntry() *base.InternalKV {
367 1 : p.saveKey()
368 1 : // Saves a comparison in the fast path
369 1 : firstIteration := true
370 1 : for p.iterKV != nil {
371 1 : // NB: p.savedKey is either the current key (iff p.iterKV == firstKey),
372 1 : // or the previous key.
373 1 : if !firstIteration && !p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
374 1 : p.saveKey()
375 1 : continue
376 : }
377 1 : firstIteration = false
378 1 : if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKV.SeqNum()) {
379 1 : // All future keys for this user key must be deleted.
380 1 : if p.savedKey.Kind() == InternalKeyKindSingleDelete {
381 0 : panic("cannot process singledel key in point collapsing iterator")
382 : }
383 : // Fast forward to the next user key.
384 1 : p.saveKey()
385 1 : p.iterKV = p.iter.Next()
386 1 : for p.iterKV != nil && p.savedKey.SeqNum() >= p.iterKV.SeqNum() && p.comparer.Equal(p.iterKV.K.UserKey, p.savedKey.UserKey) {
387 1 : p.iterKV = p.iter.Next()
388 1 : }
389 1 : continue
390 : }
391 1 : switch p.savedKey.Kind() {
392 1 : case InternalKeyKindSet, InternalKeyKindDelete, InternalKeyKindSetWithDelete, InternalKeyKindDeleteSized:
393 1 : // Note that we return SETs directly, even if they would otherwise get
394 1 : // compacted into a Del to turn into a SetWithDelete. This is a fast
395 1 : // path optimization that can break SINGLEDEL determinism. To lead to
396 1 : // consistent SINGLEDEL behaviour, this iterator should *not* be used for
397 1 : // a keyspace where SINGLEDELs could be in use. If this iterator observes
398 1 : // a SINGLEDEL as the first internal key for a user key, it will panic.
399 1 : //
400 1 : // As p.value is a lazy value owned by the child iterator, we can thread
401 1 : // it through without loading it into p.valueBuf.
402 1 : //
403 1 : // TODO(bilal): We can even avoid saving the key in this fast path if
404 1 : // we are in a block where setHasSamePrefix = false in a v3 sstable,
405 1 : // guaranteeing that there's only one internal key for each user key.
406 1 : // Thread this logic through the sstable iterators and/or consider
407 1 : // collapsing (ha) this logic into the sstable iterators that are aware
408 1 : // of blocks and can determine user key changes without doing key saves
409 1 : // or comparisons.
410 1 : p.pos = pcIterPosCur
411 1 : return p.verifySeqNum(p.iterKV)
412 0 : case InternalKeyKindSingleDelete:
413 0 : // Panic, as this iterator is not expected to observe single deletes.
414 0 : panic("cannot process singledel key in point collapsing iterator")
415 0 : case InternalKeyKindMerge:
416 0 : // Panic, as this iterator is not expected to observe merges.
417 0 : panic("cannot process merge key in point collapsing iterator")
418 1 : case InternalKeyKindRangeDelete:
419 1 : // These are interleaved by the interleaving iterator ahead of all points.
420 1 : // We should pass them as-is, but also account for any points ahead of
421 1 : // them.
422 1 : p.pos = pcIterPosCur
423 1 : return p.verifySeqNum(p.iterKV)
424 0 : default:
425 0 : panic(fmt.Sprintf("unexpected kind: %d", p.iterKV.Kind()))
426 : }
427 : }
428 0 : p.resetKey()
429 0 : return nil
430 : }
431 :
432 : // First implements the InternalIterator interface.
433 1 : func (p *pointCollapsingIterator) First() *base.InternalKV {
434 1 : p.resetKey()
435 1 : p.iterKV = p.iter.First()
436 1 : p.pos = pcIterPosCur
437 1 : if p.iterKV == nil {
438 0 : return nil
439 0 : }
440 1 : return p.findNextEntry()
441 : }
442 :
443 : // Last implements the InternalIterator interface.
444 0 : func (p *pointCollapsingIterator) Last() *base.InternalKV {
445 0 : panic("unimplemented")
446 : }
447 :
448 1 : func (p *pointCollapsingIterator) saveKey() {
449 1 : if p.iterKV == nil {
450 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf[:0]}
451 1 : return
452 1 : }
453 1 : p.savedKeyBuf = append(p.savedKeyBuf[:0], p.iterKV.K.UserKey...)
454 1 : p.savedKey = InternalKey{UserKey: p.savedKeyBuf, Trailer: p.iterKV.K.Trailer}
455 : }
456 :
457 : // Next implements the InternalIterator interface.
458 1 : func (p *pointCollapsingIterator) Next() *base.InternalKV {
459 1 : switch p.pos {
460 1 : case pcIterPosCur:
461 1 : p.saveKey()
462 1 : if p.iterKV != nil && p.iterKV.Kind() == InternalKeyKindRangeDelete {
463 1 : // Step over the interleaved range delete and process the very next
464 1 : // internal key, even if it's at the same user key. This is because a
465 1 : // point for that user key has not been returned yet.
466 1 : p.iterKV = p.iter.Next()
467 1 : break
468 : }
469 : // Fast forward to the next user key.
470 1 : kv := p.iter.Next()
471 1 : // p.iterKV.SeqNum() >= key.SeqNum() is an optimization that allows us to
472 1 : // use p.iterKV.SeqNum() < key.SeqNum() as a sign that the user key has
473 1 : // changed, without needing to do the full key comparison.
474 1 : for kv != nil && p.savedKey.SeqNum() >= kv.SeqNum() &&
475 1 : p.comparer.Equal(p.savedKey.UserKey, kv.K.UserKey) {
476 1 : kv = p.iter.Next()
477 1 : }
478 1 : if kv == nil {
479 1 : // There are no keys to return.
480 1 : p.resetKey()
481 1 : return nil
482 1 : }
483 1 : p.iterKV = kv
484 0 : case pcIterPosNext:
485 0 : p.pos = pcIterPosCur
486 : }
487 1 : if p.iterKV == nil {
488 1 : p.resetKey()
489 1 : return nil
490 1 : }
491 1 : return p.findNextEntry()
492 : }
493 :
494 : // NextPrefix implements the InternalIterator interface.
495 0 : func (p *pointCollapsingIterator) NextPrefix(succKey []byte) *base.InternalKV {
496 0 : panic("unimplemented")
497 : }
498 :
499 : // Prev implements the InternalIterator interface.
500 0 : func (p *pointCollapsingIterator) Prev() *base.InternalKV {
501 0 : panic("unimplemented")
502 : }
503 :
504 : // Error implements the InternalIterator interface.
505 1 : func (p *pointCollapsingIterator) Error() error {
506 1 : if p.err != nil {
507 0 : return p.err
508 0 : }
509 1 : return p.iter.Error()
510 : }
511 :
512 : // Close implements the InternalIterator interface.
513 1 : func (p *pointCollapsingIterator) Close() error {
514 1 : return p.iter.Close()
515 1 : }
516 :
517 : // SetBounds implements the InternalIterator interface.
518 0 : func (p *pointCollapsingIterator) SetBounds(lower, upper []byte) {
519 0 : p.resetKey()
520 0 : p.iter.SetBounds(lower, upper)
521 0 : }
522 :
523 0 : func (p *pointCollapsingIterator) SetContext(ctx context.Context) {
524 0 : p.iter.SetContext(ctx)
525 0 : }
526 :
527 : // DebugTree is part of the InternalIterator interface.
528 0 : func (p *pointCollapsingIterator) DebugTree(tp treeprinter.Node) {
529 0 : n := tp.Childf("%T(%p)", p, p)
530 0 : p.iter.DebugTree(n)
531 0 : }
532 :
533 : // String implements the InternalIterator interface.
534 0 : func (p *pointCollapsingIterator) String() string {
535 0 : return p.iter.String()
536 0 : }
537 :
538 : var _ internalIterator = &pointCollapsingIterator{}
539 :
540 : // IteratorLevelKind is used to denote whether the current ScanInternal iterator
541 : // is unknown, belongs to a flushable, or belongs to an LSM level type.
542 : type IteratorLevelKind int8
543 :
544 : const (
545 : // IteratorLevelUnknown indicates an unknown LSM level.
546 : IteratorLevelUnknown IteratorLevelKind = iota
547 : // IteratorLevelLSM indicates an LSM level.
548 : IteratorLevelLSM
549 : // IteratorLevelFlushable indicates a flushable (i.e. memtable).
550 : IteratorLevelFlushable
551 : )
552 :
553 : // IteratorLevel is used with scanInternalIterator to surface additional iterator-specific info where possible.
554 : // Note: this is struct is only provided for point keys.
555 : type IteratorLevel struct {
556 : Kind IteratorLevelKind
557 : // FlushableIndex indicates the position within the flushable queue of this level.
558 : // Only valid if kind == IteratorLevelFlushable.
559 : FlushableIndex int
560 : // The level within the LSM. Only valid if Kind == IteratorLevelLSM.
561 : Level int
562 : // Sublevel is only valid if Kind == IteratorLevelLSM and Level == 0.
563 : Sublevel int
564 : }
565 :
566 : // scanInternalIterator is an iterator that returns all internal keys, including
567 : // tombstones. For instance, an InternalKeyKindDelete would be returned as an
568 : // InternalKeyKindDelete instead of the iterator skipping over to the next key.
569 : // Internal keys within a user key are collapsed, eg. if there are two SETs, the
570 : // one with the higher sequence is returned. Useful if an external user of Pebble
571 : // needs to observe and rebuild Pebble's history of internal keys, such as in
572 : // node-to-node replication. For use with {db,snapshot}.ScanInternal().
573 : //
574 : // scanInternalIterator is expected to ignore point keys deleted by range
575 : // deletions, and range keys shadowed by a range key unset or delete, however it
576 : // *must* return the range delete as well as the range key unset/delete that did
577 : // the shadowing.
578 : type scanInternalIterator struct {
579 : ctx context.Context
580 : db *DB
581 : opts ScanInternalOptions
582 : comparer *base.Comparer
583 : merge Merge
584 : iter internalIterator
585 : readState *readState
586 : version *manifest.Version
587 : rangeKey *iteratorRangeKeyState
588 : pointKeyIter internalIterator
589 : iterKV *base.InternalKV
590 : alloc *scanInternalIterAlloc
591 : newIters tableNewIters
592 : newIterRangeKey keyspanimpl.TableNewSpanIter
593 : seqNum base.SeqNum
594 : iterLevels []IteratorLevel
595 : mergingIter *mergingIter
596 : blobValueFetcher blob.ValueFetcher
597 :
598 : // boundsBuf holds two buffers used to store the lower and upper bounds.
599 : // Whenever the InternalIterator's bounds change, the new bounds are copied
600 : // into boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
601 : // allocations. opts.LowerBound and opts.UpperBound point into this slice.
602 : boundsBuf [2][]byte
603 : boundsBufIdx int
604 : }
605 :
606 : // truncateExternalFile truncates an External file's [SmallestUserKey,
607 : // LargestUserKey] fields to [lower, upper). A ExternalFile is
608 : // produced that is suitable for external consumption by other Pebble
609 : // instances.
610 : //
611 : // truncateSharedFile reads the file to try to create the smallest
612 : // possible bounds. Here, we blindly truncate them. This may mean we
613 : // include this SST in iterations it isn't really needed in. Since we
614 : // don't expect External files to be long-lived in the pebble
615 : // instance, We think this is OK.
616 : //
617 : // TODO(ssd) 2024-01-26: Potentially de-duplicate with
618 : // truncateSharedFile.
619 : func (d *DB) truncateExternalFile(
620 : ctx context.Context,
621 : lower, upper []byte,
622 : level int,
623 : file *manifest.TableMetadata,
624 : objMeta objstorage.ObjectMetadata,
625 1 : ) (*ExternalFile, error) {
626 1 : cmp := d.cmp
627 1 : sst := &ExternalFile{
628 1 : Level: uint8(level),
629 1 : ObjName: objMeta.Remote.CustomObjectName,
630 1 : Locator: objMeta.Remote.Locator,
631 1 : HasPointKey: file.HasPointKeys,
632 1 : HasRangeKey: file.HasRangeKeys,
633 1 : Size: file.Size,
634 1 : SyntheticPrefix: slices.Clone(file.SyntheticPrefixAndSuffix.Prefix()),
635 1 : SyntheticSuffix: slices.Clone(file.SyntheticPrefixAndSuffix.Suffix()),
636 1 : }
637 1 :
638 1 : needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
639 1 : if needsLowerTruncate {
640 1 : sst.StartKey = slices.Clone(lower)
641 1 : } else {
642 1 : sst.StartKey = slices.Clone(file.Smallest().UserKey)
643 1 : }
644 :
645 1 : cmpUpper := cmp(upper, file.Largest().UserKey)
646 1 : needsUpperTruncate := cmpUpper < 0
647 1 : if needsUpperTruncate {
648 0 : sst.EndKey = slices.Clone(upper)
649 0 : sst.EndKeyIsInclusive = false
650 1 : } else {
651 1 : sst.EndKey = slices.Clone(file.Largest().UserKey)
652 1 : sst.EndKeyIsInclusive = !file.Largest().IsExclusiveSentinel()
653 1 : }
654 :
655 1 : if cmp(sst.StartKey, sst.EndKey) > 0 {
656 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
657 0 : }
658 :
659 1 : if cmp(sst.StartKey, sst.EndKey) == 0 && !sst.EndKeyIsInclusive {
660 0 : return nil, base.AssertionFailedf("pebble: invalid external file bounds after truncation [%q, %q)", sst.StartKey, sst.EndKey)
661 0 : }
662 :
663 1 : return sst, nil
664 : }
665 :
666 : // truncateSharedFile truncates a shared file's [Smallest, Largest] fields to
667 : // [lower, upper), potentially opening iterators on the file to find keys within
668 : // the requested bounds. A SharedSSTMeta is produced that is suitable for
669 : // external consumption by other Pebble instances. If shouldSkip is true, this
670 : // file does not contain any keys in [lower, upper) and can be skipped.
671 : //
672 : // TODO(bilal): If opening iterators and doing reads in this method is too
673 : // inefficient, consider producing non-tight file bounds instead.
674 : func (d *DB) truncateSharedFile(
675 : ctx context.Context,
676 : lower, upper []byte,
677 : level int,
678 : file *manifest.TableMetadata,
679 : objMeta objstorage.ObjectMetadata,
680 1 : ) (sst *SharedSSTMeta, shouldSkip bool, err error) {
681 1 : cmp := d.cmp
682 1 : sst = &SharedSSTMeta{}
683 1 : sst.cloneFromFileMeta(file)
684 1 : sst.Level = uint8(level)
685 1 : sst.Backing, err = d.objProvider.RemoteObjectBacking(&objMeta)
686 1 : if err != nil {
687 0 : return nil, false, err
688 0 : }
689 1 : needsLowerTruncate := cmp(lower, file.Smallest().UserKey) > 0
690 1 : needsUpperTruncate := cmp(upper, file.Largest().UserKey) < 0 || (cmp(upper, file.Largest().UserKey) == 0 && !file.Largest().IsExclusiveSentinel())
691 1 : // Fast path: file is entirely within [lower, upper).
692 1 : if !needsLowerTruncate && !needsUpperTruncate {
693 1 : return sst, false, nil
694 1 : }
695 :
696 : // We will need to truncate file bounds in at least one direction. Open all
697 : // relevant iterators.
698 1 : iters, err := d.newIters(ctx, file, &IterOptions{
699 1 : LowerBound: lower,
700 1 : UpperBound: upper,
701 1 : layer: manifest.Level(level),
702 1 : }, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
703 1 : if err != nil {
704 0 : return nil, false, err
705 0 : }
706 1 : defer func() { _ = iters.CloseAll() }()
707 1 : iter := iters.point
708 1 : rangeDelIter := iters.rangeDeletion
709 1 : rangeKeyIter := iters.rangeKey
710 1 : if rangeDelIter != nil {
711 1 : rangeDelIter = keyspan.Truncate(cmp, rangeDelIter, base.UserKeyBoundsEndExclusive(lower, upper))
712 1 : }
713 1 : if rangeKeyIter != nil {
714 1 : rangeKeyIter = keyspan.Truncate(cmp, rangeKeyIter, base.UserKeyBoundsEndExclusive(lower, upper))
715 1 : }
716 : // Check if we need to truncate on the left side. This means finding a new
717 : // LargestPointKey and LargestRangeKey that is >= lower.
718 1 : if needsLowerTruncate {
719 1 : sst.SmallestPointKey.UserKey = sst.SmallestPointKey.UserKey[:0]
720 1 : sst.SmallestPointKey.Trailer = 0
721 1 : kv := iter.SeekGE(lower, base.SeekGEFlagsNone)
722 1 : foundPointKey := kv != nil
723 1 : if kv != nil {
724 1 : sst.SmallestPointKey.CopyFrom(kv.K)
725 1 : }
726 1 : if rangeDelIter != nil {
727 1 : if span, err := rangeDelIter.SeekGE(lower); err != nil {
728 0 : return nil, false, err
729 1 : } else if span != nil && (len(sst.SmallestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.SmallestKey(), sst.SmallestPointKey) < 0) {
730 1 : sst.SmallestPointKey.CopyFrom(span.SmallestKey())
731 1 : foundPointKey = true
732 1 : }
733 : }
734 1 : if !foundPointKey {
735 1 : // There are no point keys in the span we're interested in.
736 1 : sst.SmallestPointKey = InternalKey{}
737 1 : sst.LargestPointKey = InternalKey{}
738 1 : }
739 1 : sst.SmallestRangeKey.UserKey = sst.SmallestRangeKey.UserKey[:0]
740 1 : sst.SmallestRangeKey.Trailer = 0
741 1 : if rangeKeyIter != nil {
742 1 : span, err := rangeKeyIter.SeekGE(lower)
743 1 : switch {
744 0 : case err != nil:
745 0 : return nil, false, err
746 1 : case span != nil:
747 1 : sst.SmallestRangeKey.CopyFrom(span.SmallestKey())
748 1 : default:
749 1 : // There are no range keys in the span we're interested in.
750 1 : sst.SmallestRangeKey = InternalKey{}
751 1 : sst.LargestRangeKey = InternalKey{}
752 : }
753 : }
754 : }
755 : // Check if we need to truncate on the right side. This means finding a new
756 : // LargestPointKey and LargestRangeKey that is < upper.
757 1 : if needsUpperTruncate {
758 1 : sst.LargestPointKey.UserKey = sst.LargestPointKey.UserKey[:0]
759 1 : sst.LargestPointKey.Trailer = 0
760 1 : kv := iter.SeekLT(upper, base.SeekLTFlagsNone)
761 1 : foundPointKey := kv != nil
762 1 : if kv != nil {
763 1 : sst.LargestPointKey.CopyFrom(kv.K)
764 1 : }
765 1 : if rangeDelIter != nil {
766 1 : if span, err := rangeDelIter.SeekLT(upper); err != nil {
767 0 : return nil, false, err
768 1 : } else if span != nil && (len(sst.LargestPointKey.UserKey) == 0 || base.InternalCompare(cmp, span.LargestKey(), sst.LargestPointKey) > 0) {
769 1 : sst.LargestPointKey.CopyFrom(span.LargestKey())
770 1 : foundPointKey = true
771 1 : }
772 : }
773 1 : if !foundPointKey {
774 1 : // There are no point keys in the span we're interested in.
775 1 : sst.SmallestPointKey = InternalKey{}
776 1 : sst.LargestPointKey = InternalKey{}
777 1 : }
778 1 : sst.LargestRangeKey.UserKey = sst.LargestRangeKey.UserKey[:0]
779 1 : sst.LargestRangeKey.Trailer = 0
780 1 : if rangeKeyIter != nil {
781 1 : span, err := rangeKeyIter.SeekLT(upper)
782 1 : switch {
783 0 : case err != nil:
784 0 : return nil, false, err
785 1 : case span != nil:
786 1 : sst.LargestRangeKey.CopyFrom(span.LargestKey())
787 1 : default:
788 1 : // There are no range keys in the span we're interested in.
789 1 : sst.SmallestRangeKey = InternalKey{}
790 1 : sst.LargestRangeKey = InternalKey{}
791 : }
792 : }
793 : }
794 : // Set overall bounds based on {Smallest,Largest}{Point,Range}Key.
795 1 : switch {
796 1 : case len(sst.SmallestRangeKey.UserKey) == 0:
797 1 : sst.Smallest = sst.SmallestPointKey
798 1 : case len(sst.SmallestPointKey.UserKey) == 0:
799 1 : sst.Smallest = sst.SmallestRangeKey
800 1 : default:
801 1 : sst.Smallest = sst.SmallestPointKey
802 1 : if base.InternalCompare(cmp, sst.SmallestRangeKey, sst.SmallestPointKey) < 0 {
803 1 : sst.Smallest = sst.SmallestRangeKey
804 1 : }
805 : }
806 1 : switch {
807 1 : case len(sst.LargestRangeKey.UserKey) == 0:
808 1 : sst.Largest = sst.LargestPointKey
809 1 : case len(sst.LargestPointKey.UserKey) == 0:
810 1 : sst.Largest = sst.LargestRangeKey
811 1 : default:
812 1 : sst.Largest = sst.LargestPointKey
813 1 : if base.InternalCompare(cmp, sst.LargestRangeKey, sst.LargestPointKey) > 0 {
814 1 : sst.Largest = sst.LargestRangeKey
815 1 : }
816 : }
817 : // On rare occasion, a file might overlap with [lower, upper) but not actually
818 : // have any keys within those bounds. Skip such files.
819 1 : if len(sst.Smallest.UserKey) == 0 {
820 1 : return nil, true, nil
821 1 : }
822 1 : sst.Size, err = d.fileCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey)
823 1 : if err != nil {
824 0 : return nil, false, err
825 0 : }
826 : // On occasion, estimateSize gives us a low estimate, i.e. a 0 file size. This
827 : // can cause panics in places where we divide by file sizes. Correct for it
828 : // here.
829 1 : if sst.Size == 0 {
830 1 : sst.Size = 1
831 1 : }
832 1 : return sst, false, nil
833 : }
834 :
835 : func scanInternalImpl(
836 : ctx context.Context, iter *scanInternalIterator, opts *ScanInternalOptions,
837 1 : ) error {
838 1 : if opts.VisitSharedFile != nil && (opts.LowerBound == nil || opts.UpperBound == nil) {
839 0 : panic("lower and upper bounds must be specified in skip-shared iteration mode")
840 : }
841 1 : if opts.VisitSharedFile != nil && opts.VisitExternalFile != nil {
842 0 : return base.AssertionFailedf("cannot provide both a shared-file and external-file visitor")
843 0 : }
844 :
845 : // Before starting iteration, check if any files in levels sharedLevelsStart
846 : // and below are *not* shared. Error out if that is the case, as skip-shared
847 : // iteration will not produce a consistent point-in-time view of this range
848 : // of keys. For files that are shared, call visitSharedFile with a truncated
849 : // version of that file.
850 1 : cmp := iter.comparer.Compare
851 1 : provider := iter.db.ObjProvider()
852 1 : seqNum := iter.seqNum
853 1 : current := iter.version
854 1 : if current == nil {
855 1 : current = iter.readState.current
856 1 : }
857 :
858 1 : if opts.VisitSharedFile != nil || opts.VisitExternalFile != nil {
859 1 : if provider == nil {
860 0 : panic("expected non-nil Provider in skip-shared iteration mode")
861 : }
862 :
863 1 : firstLevelWithRemote := opts.skipLevelForOpts()
864 1 : for level := firstLevelWithRemote; level < numLevels; level++ {
865 1 : files := current.Levels[level].Iter()
866 1 : for f := files.SeekGE(cmp, opts.LowerBound); f != nil && cmp(f.Smallest().UserKey, opts.UpperBound) < 0; f = files.Next() {
867 1 : if cmp(opts.LowerBound, f.Largest().UserKey) == 0 && f.Largest().IsExclusiveSentinel() {
868 0 : continue
869 : }
870 :
871 1 : var objMeta objstorage.ObjectMetadata
872 1 : var err error
873 1 : objMeta, err = provider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
874 1 : if err != nil {
875 0 : return err
876 0 : }
877 :
878 : // We allow a mix of files at the first level.
879 1 : if level != firstLevelWithRemote {
880 1 : if !objMeta.IsShared() && !objMeta.IsExternal() {
881 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
882 0 : }
883 : }
884 :
885 1 : if objMeta.IsShared() && opts.VisitSharedFile == nil {
886 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "shared file is present but no shared file visitor is defined")
887 0 : }
888 :
889 1 : if objMeta.IsExternal() && opts.VisitExternalFile == nil {
890 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "external file is present but no external file visitor is defined")
891 1 : }
892 :
893 1 : if !base.Visible(f.LargestSeqNum, seqNum, base.SeqNumMax) {
894 1 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s contains keys newer than snapshot", objMeta.DiskFileNum)
895 1 : }
896 :
897 1 : if level != firstLevelWithRemote && (!objMeta.IsShared() && !objMeta.IsExternal()) {
898 0 : return errors.Wrapf(ErrInvalidSkipSharedIteration, "file %s is not shared or external", objMeta.DiskFileNum)
899 0 : }
900 :
901 1 : if objMeta.IsShared() {
902 1 : var sst *SharedSSTMeta
903 1 : var skip bool
904 1 : sst, skip, err = iter.db.truncateSharedFile(ctx, opts.LowerBound, opts.UpperBound, level, f, objMeta)
905 1 : if err != nil {
906 0 : return err
907 0 : }
908 1 : if skip {
909 1 : continue
910 : }
911 1 : if err = opts.VisitSharedFile(sst); err != nil {
912 0 : return err
913 0 : }
914 1 : } else if objMeta.IsExternal() {
915 1 : sst, err := iter.db.truncateExternalFile(ctx, opts.LowerBound, opts.UpperBound, level, f, objMeta)
916 1 : if err != nil {
917 0 : return err
918 0 : }
919 1 : if err := opts.VisitExternalFile(sst); err != nil {
920 0 : return err
921 0 : }
922 : }
923 :
924 : }
925 : }
926 : }
927 :
928 1 : for valid := iter.seekGE(opts.LowerBound); valid && iter.error() == nil; valid = iter.next() {
929 1 : key := iter.unsafeKey()
930 1 :
931 1 : if opts.RateLimitFunc != nil {
932 0 : if err := opts.RateLimitFunc(key, iter.lazyValue()); err != nil {
933 0 : return err
934 0 : }
935 : }
936 :
937 1 : switch key.Kind() {
938 1 : case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
939 1 : if opts.VisitRangeKey != nil {
940 1 : span := iter.unsafeSpan()
941 1 : // NB: The caller isn't interested in the sequence numbers of these
942 1 : // range keys. Rather, the caller wants them to be in trailer order
943 1 : // _after_ zeroing of sequence numbers. Copy span.Keys, sort it, and then
944 1 : // call visitRangeKey.
945 1 : keysCopy := make([]keyspan.Key, len(span.Keys))
946 1 : for i := range span.Keys {
947 1 : keysCopy[i].CopyFrom(span.Keys[i])
948 1 : keysCopy[i].Trailer = base.MakeTrailer(0, span.Keys[i].Kind())
949 1 : }
950 1 : keyspan.SortKeysByTrailer(keysCopy)
951 1 : if err := opts.VisitRangeKey(span.Start, span.End, keysCopy); err != nil {
952 0 : return err
953 0 : }
954 : }
955 1 : case InternalKeyKindRangeDelete:
956 1 : if opts.VisitRangeDel != nil {
957 1 : rangeDel := iter.unsafeRangeDel()
958 1 : if err := opts.VisitRangeDel(rangeDel.Start, rangeDel.End, rangeDel.LargestSeqNum()); err != nil {
959 0 : return err
960 0 : }
961 : }
962 1 : default:
963 1 : if opts.VisitPointKey != nil {
964 1 : var info IteratorLevel
965 1 : if len(iter.mergingIter.heap.items) > 0 {
966 1 : mergingIterIdx := iter.mergingIter.heap.items[0].index
967 1 : info = iter.iterLevels[mergingIterIdx]
968 1 : } else {
969 0 : info = IteratorLevel{Kind: IteratorLevelUnknown}
970 0 : }
971 1 : val := iter.lazyValue()
972 1 : if err := opts.VisitPointKey(key, val, info); err != nil {
973 0 : return err
974 0 : }
975 : }
976 : }
977 : }
978 :
979 1 : return nil
980 : }
981 :
982 1 : func (opts *ScanInternalOptions) skipLevelForOpts() int {
983 1 : if opts.VisitSharedFile != nil {
984 1 : return sharedLevelsStart
985 1 : }
986 1 : if opts.VisitExternalFile != nil {
987 1 : return externalSkipStart
988 1 : }
989 1 : return numLevels
990 : }
991 :
992 : // constructPointIter constructs a merging iterator and sets i.iter to it.
993 : func (i *scanInternalIterator) constructPointIter(
994 : category block.Category, memtables flushableList, buf *iterAlloc,
995 1 : ) error {
996 1 : // Merging levels and levels from iterAlloc.
997 1 : mlevels := buf.mlevels[:0]
998 1 : levels := buf.levels[:0]
999 1 :
1000 1 : // We compute the number of levels needed ahead of time and reallocate a slice if
1001 1 : // the array from the iterAlloc isn't large enough. Doing this allocation once
1002 1 : // should improve the performance.
1003 1 : numMergingLevels := len(memtables)
1004 1 : numLevelIters := 0
1005 1 :
1006 1 : current := i.version
1007 1 : if current == nil {
1008 1 : current = i.readState.current
1009 1 : }
1010 1 : numMergingLevels += len(current.L0SublevelFiles)
1011 1 : numLevelIters += len(current.L0SublevelFiles)
1012 1 :
1013 1 : skipStart := i.opts.skipLevelForOpts()
1014 1 : for level := 1; level < len(current.Levels); level++ {
1015 1 : if current.Levels[level].Empty() {
1016 1 : continue
1017 : }
1018 1 : if level > skipStart {
1019 1 : continue
1020 : }
1021 1 : numMergingLevels++
1022 1 : numLevelIters++
1023 : }
1024 :
1025 1 : if numMergingLevels > cap(mlevels) {
1026 0 : mlevels = make([]mergingIterLevel, 0, numMergingLevels)
1027 0 : }
1028 1 : if numLevelIters > cap(levels) {
1029 0 : levels = make([]levelIter, 0, numLevelIters)
1030 0 : }
1031 : // TODO(bilal): Push these into the iterAlloc buf.
1032 1 : var rangeDelMiter keyspanimpl.MergingIter
1033 1 : rangeDelIters := make([]keyspan.FragmentIterator, 0, numMergingLevels)
1034 1 : rangeDelLevels := make([]keyspanimpl.LevelIter, 0, numLevelIters)
1035 1 :
1036 1 : i.iterLevels = make([]IteratorLevel, numMergingLevels)
1037 1 : mlevelsIndex := 0
1038 1 :
1039 1 : // Next are the memtables.
1040 1 : for j := len(memtables) - 1; j >= 0; j-- {
1041 1 : mem := memtables[j]
1042 1 : mlevels = append(mlevels, mergingIterLevel{
1043 1 : iter: mem.newIter(&i.opts.IterOptions),
1044 1 : })
1045 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
1046 1 : Kind: IteratorLevelFlushable,
1047 1 : FlushableIndex: j,
1048 1 : }
1049 1 : mlevelsIndex++
1050 1 : if rdi := mem.newRangeDelIter(&i.opts.IterOptions); rdi != nil {
1051 1 : rangeDelIters = append(rangeDelIters, rdi)
1052 1 : }
1053 : }
1054 :
1055 : // Next are the file levels: L0 sub-levels followed by lower levels.
1056 1 : levelsIndex := len(levels)
1057 1 : mlevels = mlevels[:numMergingLevels]
1058 1 : levels = levels[:numLevelIters]
1059 1 : rangeDelLevels = rangeDelLevels[:numLevelIters]
1060 1 : i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
1061 1 : i.opts.IterOptions.Category = category
1062 1 :
1063 1 : internalOpts := internalIterOpts{
1064 1 : blobValueFetcher: &i.blobValueFetcher,
1065 1 : }
1066 1 :
1067 1 : addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
1068 1 : li := &levels[levelsIndex]
1069 1 : rli := &rangeDelLevels[levelsIndex]
1070 1 :
1071 1 : li.init(i.ctx, i.opts.IterOptions, i.comparer, i.newIters, files, level, internalOpts)
1072 1 : mlevels[mlevelsIndex].iter = li
1073 1 : rli.Init(i.ctx, keyspan.SpanIterOptions{RangeKeyFilters: i.opts.RangeKeyFilters},
1074 1 : i.comparer.Compare, tableNewRangeDelIter(i.newIters), files, level,
1075 1 : manifest.KeyTypePoint)
1076 1 : rangeDelIters = append(rangeDelIters, rli)
1077 1 :
1078 1 : levelsIndex++
1079 1 : mlevelsIndex++
1080 1 : }
1081 :
1082 1 : for j := len(current.L0SublevelFiles) - 1; j >= 0; j-- {
1083 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{
1084 1 : Kind: IteratorLevelLSM,
1085 1 : Level: 0,
1086 1 : Sublevel: j,
1087 1 : }
1088 1 : addLevelIterForFiles(current.L0SublevelFiles[j].Iter(), manifest.L0Sublevel(j))
1089 1 : }
1090 : // Add level iterators for the non-empty non-L0 levels.
1091 1 : for level := 1; level < numLevels; level++ {
1092 1 : if current.Levels[level].Empty() {
1093 1 : continue
1094 : }
1095 :
1096 1 : if level > skipStart {
1097 1 : continue
1098 : }
1099 1 : i.iterLevels[mlevelsIndex] = IteratorLevel{Kind: IteratorLevelLSM, Level: level}
1100 1 : levIter := current.Levels[level].Iter()
1101 1 : if level == skipStart {
1102 1 : nonRemoteFiles := make([]*manifest.TableMetadata, 0)
1103 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
1104 1 : meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
1105 1 : if err != nil {
1106 0 : return err
1107 0 : }
1108 1 : if (meta.IsShared() && i.opts.VisitSharedFile != nil) ||
1109 1 : (meta.IsExternal() && i.opts.VisitExternalFile != nil) {
1110 1 : // Skip this file.
1111 1 : continue
1112 : }
1113 1 : nonRemoteFiles = append(nonRemoteFiles, f)
1114 : }
1115 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
1116 1 : levIter = levSlice.Iter()
1117 : }
1118 :
1119 1 : addLevelIterForFiles(levIter, manifest.Level(level))
1120 : }
1121 :
1122 1 : buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
1123 1 : buf.merging.snapshot = i.seqNum
1124 1 : rangeDelMiter.Init(i.comparer, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
1125 1 :
1126 1 : if i.opts.IncludeObsoleteKeys {
1127 1 : iiter := &keyspan.InterleavingIter{}
1128 1 : iiter.Init(i.comparer, &buf.merging, &rangeDelMiter,
1129 1 : keyspan.InterleavingIterOpts{
1130 1 : LowerBound: i.opts.LowerBound,
1131 1 : UpperBound: i.opts.UpperBound,
1132 1 : })
1133 1 : i.pointKeyIter = iiter
1134 1 : } else {
1135 1 : pcIter := &pointCollapsingIterator{
1136 1 : comparer: i.comparer,
1137 1 : merge: i.merge,
1138 1 : seqNum: i.seqNum,
1139 1 : }
1140 1 : pcIter.iter.Init(i.comparer, &buf.merging, &rangeDelMiter, keyspan.InterleavingIterOpts{
1141 1 : LowerBound: i.opts.LowerBound,
1142 1 : UpperBound: i.opts.UpperBound,
1143 1 : })
1144 1 : i.pointKeyIter = pcIter
1145 1 : }
1146 1 : i.iter = i.pointKeyIter
1147 1 : return nil
1148 : }
1149 :
1150 : // constructRangeKeyIter constructs the range-key iterator stack, populating
1151 : // i.rangeKey.rangeKeyIter with the resulting iterator. This is similar to
1152 : // Iterator.constructRangeKeyIter, except it doesn't handle batches and ensures
1153 : // iterConfig does *not* elide unsets/deletes.
1154 1 : func (i *scanInternalIterator) constructRangeKeyIter() error {
1155 1 : // We want the bounded iter from iterConfig, but not the collapsing of
1156 1 : // RangeKeyUnsets and RangeKeyDels.
1157 1 : i.rangeKey.rangeKeyIter = i.rangeKey.iterConfig.Init(
1158 1 : i.comparer, i.seqNum, i.opts.LowerBound, i.opts.UpperBound,
1159 1 : nil /* hasPrefix */, nil /* prefix */, true, /* internalKeys */
1160 1 : &i.rangeKey.rangeKeyBuffers.internal)
1161 1 :
1162 1 : // Next are the flushables: memtables and large batches.
1163 1 : if i.readState != nil {
1164 1 : for j := len(i.readState.memtables) - 1; j >= 0; j-- {
1165 1 : mem := i.readState.memtables[j]
1166 1 : // We only need to read from memtables which contain sequence numbers older
1167 1 : // than seqNum.
1168 1 : if logSeqNum := mem.logSeqNum; logSeqNum >= i.seqNum {
1169 1 : continue
1170 : }
1171 1 : if rki := mem.newRangeKeyIter(&i.opts.IterOptions); rki != nil {
1172 1 : i.rangeKey.iterConfig.AddLevel(rki)
1173 1 : }
1174 : }
1175 : }
1176 :
1177 1 : current := i.version
1178 1 : if current == nil {
1179 1 : current = i.readState.current
1180 1 : }
1181 : // Next are the file levels: L0 sub-levels followed by lower levels.
1182 : //
1183 : // Add file-specific iterators for L0 files containing range keys. This is less
1184 : // efficient than using levelIters for sublevels of L0 files containing
1185 : // range keys, but range keys are expected to be sparse anyway, reducing the
1186 : // cost benefit of maintaining a separate L0Sublevels instance for range key
1187 : // files and then using it here.
1188 : //
1189 : // NB: We iterate L0's files in reverse order. They're sorted by
1190 : // LargestSeqNum ascending, and we need to add them to the merging iterator
1191 : // in LargestSeqNum descending to preserve the merging iterator's invariants
1192 : // around Key InternalKeyTrailer order.
1193 1 : iter := current.RangeKeyLevels[0].Iter()
1194 1 : for f := iter.Last(); f != nil; f = iter.Prev() {
1195 1 : spanIter, err := i.newIterRangeKey(i.ctx, f, i.opts.SpanIterOptions())
1196 1 : if err != nil {
1197 0 : return err
1198 0 : }
1199 1 : i.rangeKey.iterConfig.AddLevel(spanIter)
1200 : }
1201 : // Add level iterators for the non-empty non-L0 levels.
1202 1 : skipStart := i.opts.skipLevelForOpts()
1203 1 : for level := 1; level < len(current.RangeKeyLevels); level++ {
1204 1 : if current.RangeKeyLevels[level].Empty() {
1205 1 : continue
1206 : }
1207 1 : if level > skipStart {
1208 1 : continue
1209 : }
1210 1 : li := i.rangeKey.iterConfig.NewLevelIter()
1211 1 : spanIterOpts := i.opts.SpanIterOptions()
1212 1 : levIter := current.RangeKeyLevels[level].Iter()
1213 1 : if level == skipStart {
1214 1 : nonRemoteFiles := make([]*manifest.TableMetadata, 0)
1215 1 : for f := levIter.First(); f != nil; f = levIter.Next() {
1216 1 : meta, err := i.db.objProvider.Lookup(base.FileTypeTable, f.TableBacking.DiskFileNum)
1217 1 : if err != nil {
1218 0 : return err
1219 0 : }
1220 1 : if (meta.IsShared() && i.opts.VisitSharedFile != nil) ||
1221 1 : (meta.IsExternal() && i.opts.VisitExternalFile != nil) {
1222 1 : // Skip this file.
1223 1 : continue
1224 : }
1225 0 : nonRemoteFiles = append(nonRemoteFiles, f)
1226 : }
1227 1 : levSlice := manifest.NewLevelSliceKeySorted(i.db.cmp, nonRemoteFiles)
1228 1 : levIter = levSlice.Iter()
1229 : }
1230 1 : li.Init(i.ctx, spanIterOpts, i.comparer.Compare, i.newIterRangeKey, levIter,
1231 1 : manifest.Level(level), manifest.KeyTypeRange)
1232 1 : i.rangeKey.iterConfig.AddLevel(li)
1233 : }
1234 1 : return nil
1235 : }
1236 :
1237 : // seekGE seeks this iterator to the first key that's greater than or equal
1238 : // to the specified user key.
1239 1 : func (i *scanInternalIterator) seekGE(key []byte) bool {
1240 1 : i.iterKV = i.iter.SeekGE(key, base.SeekGEFlagsNone)
1241 1 : return i.iterKV != nil
1242 1 : }
1243 :
1244 : // unsafeKey returns the unsafe InternalKey at the current position. The value
1245 : // is nil if the iterator is invalid or exhausted.
1246 1 : func (i *scanInternalIterator) unsafeKey() *InternalKey {
1247 1 : return &i.iterKV.K
1248 1 : }
1249 :
1250 : // lazyValue returns a value pointer to the value at the current iterator
1251 : // position. Behaviour undefined if unsafeKey() returns a Range key or Rangedel
1252 : // kind key.
1253 1 : func (i *scanInternalIterator) lazyValue() LazyValue {
1254 1 : return i.iterKV.LazyValue()
1255 1 : }
1256 :
1257 : // unsafeRangeDel returns a range key span. Behaviour undefined if UnsafeKey returns
1258 : // a non-rangedel kind.
1259 1 : func (i *scanInternalIterator) unsafeRangeDel() *keyspan.Span {
1260 1 : type spanInternalIterator interface {
1261 1 : Span() *keyspan.Span
1262 1 : }
1263 1 : return i.pointKeyIter.(spanInternalIterator).Span()
1264 1 : }
1265 :
1266 : // unsafeSpan returns a range key span. Behaviour undefined if UnsafeKey returns
1267 : // a non-rangekey type.
1268 1 : func (i *scanInternalIterator) unsafeSpan() *keyspan.Span {
1269 1 : return i.rangeKey.iiter.Span()
1270 1 : }
1271 :
1272 : // next advances the iterator in the forward direction, and returns the
1273 : // iterator's new validity state.
1274 1 : func (i *scanInternalIterator) next() bool {
1275 1 : i.iterKV = i.iter.Next()
1276 1 : return i.iterKV != nil
1277 1 : }
1278 :
1279 : // error returns an error from the internal iterator, if there's any.
1280 1 : func (i *scanInternalIterator) error() error {
1281 1 : return i.iter.Error()
1282 1 : }
1283 :
1284 : // Close closes this iterator, and releases any pooled objects.
1285 1 : func (i *scanInternalIterator) Close() error {
1286 1 : err := i.iter.Close()
1287 1 : err = errors.CombineErrors(err, i.blobValueFetcher.Close())
1288 1 : if i.readState != nil {
1289 1 : i.readState.unref()
1290 1 : }
1291 1 : if i.version != nil {
1292 1 : i.version.Unref()
1293 1 : }
1294 1 : if i.rangeKey != nil {
1295 1 : i.rangeKey.PrepareForReuse()
1296 1 : *i.rangeKey = iteratorRangeKeyState{
1297 1 : rangeKeyBuffers: i.rangeKey.rangeKeyBuffers,
1298 1 : }
1299 1 : iterRangeKeyStateAllocPool.Put(i.rangeKey)
1300 1 : i.rangeKey = nil
1301 1 : }
1302 1 : if alloc := i.alloc; alloc != nil {
1303 1 : for j := range i.boundsBuf {
1304 1 : if cap(i.boundsBuf[j]) >= maxKeyBufCacheSize {
1305 0 : alloc.iterAlloc.boundsBuf[j] = nil
1306 1 : } else {
1307 1 : alloc.iterAlloc.boundsBuf[j] = i.boundsBuf[j]
1308 1 : }
1309 : }
1310 1 : keyBuf := alloc.iterAlloc.keyBuf[:0]
1311 1 : boundsBuf := alloc.iterAlloc.boundsBuf
1312 1 : prefixOrFullSeekKey := alloc.iterAlloc.prefixOrFullSeekKey[:0]
1313 1 : *alloc = scanInternalIterAlloc{}
1314 1 : alloc.iterAlloc.keyBuf = keyBuf
1315 1 : alloc.iterAlloc.boundsBuf = boundsBuf
1316 1 : alloc.iterAlloc.prefixOrFullSeekKey = prefixOrFullSeekKey
1317 1 : scanInternalIteratorIterAllocPool.Put(alloc)
1318 : }
1319 1 : return err
1320 : }
1321 :
1322 1 : func (i *scanInternalIterator) initializeBoundBufs(lower, upper []byte) {
1323 1 : buf := i.boundsBuf[i.boundsBufIdx][:0]
1324 1 : if lower != nil {
1325 1 : buf = append(buf, lower...)
1326 1 : i.opts.LowerBound = buf
1327 1 : } else {
1328 1 : i.opts.LowerBound = nil
1329 1 : }
1330 1 : if upper != nil {
1331 1 : buf = append(buf, upper...)
1332 1 : i.opts.UpperBound = buf[len(buf)-len(upper):]
1333 1 : } else {
1334 1 : i.opts.UpperBound = nil
1335 1 : }
1336 1 : i.boundsBuf[i.boundsBufIdx] = buf
1337 1 : i.boundsBufIdx = 1 - i.boundsBufIdx
1338 : }
1339 :
1340 : // scanInternalIterAlloc is a wrapper around iterAlloc that includes a
1341 : // scanInternalIterator.
1342 : type scanInternalIterAlloc struct {
1343 : iterAlloc iterAlloc
1344 : scanIter scanInternalIterator
1345 : }
1346 :
1347 : var scanInternalIteratorIterAllocPool = sync.Pool{
1348 1 : New: func() interface{} {
1349 1 : return &scanInternalIterAlloc{}
1350 1 : },
1351 : }
|