Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "sort"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/manifest"
16 : "github.com/cockroachdb/pebble/sstable"
17 : )
18 :
19 : // ExternalIterOption provide an interface to specify open-time options to
20 : // NewExternalIter.
21 : type ExternalIterOption interface {
22 : // iterApply is called on the iterator during opening in order to set internal
23 : // parameters.
24 : iterApply(*Iterator)
25 : // readerOptions returns any reader options added by this iter option.
26 : readerOptions() []sstable.ReaderOption
27 : }
28 :
29 : type externalIterReaderOptions struct {
30 : opts []sstable.ReaderOption
31 : }
32 :
33 0 : func (e *externalIterReaderOptions) iterApply(iterator *Iterator) {
34 0 : // Do nothing.
35 0 : }
36 :
37 0 : func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption {
38 0 : return e.opts
39 0 : }
40 :
41 : // ExternalIterReaderOptions returns an ExternalIterOption that specifies
42 : // sstable.ReaderOptions to be applied on sstable readers in NewExternalIter.
43 0 : func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption {
44 0 : return &externalIterReaderOptions{opts: opts}
45 0 : }
46 :
47 : // NewExternalIter takes an input 2d array of sstable files which may overlap
48 : // across subarrays but not within a subarray (at least as far as points are
49 : // concerned; range keys are allowed to overlap arbitrarily even within a
50 : // subarray), and returns an Iterator over the merged contents of the sstables.
51 : // Input sstables may contain point keys, range keys, range deletions, etc. The
52 : // input files slice must be sorted in reverse chronological ordering. A key in a
53 : // file at a lower index subarray will shadow a key with an identical user key
54 : // contained within a file at a higher index subarray. Each subarray must be
55 : // sorted in internal key order, where lower index files contain keys that sort
56 : // left of files with higher indexes.
57 : //
58 : // Input sstables must only contain keys with the zero sequence number.
59 : //
60 : // Iterators constructed through NewExternalIter do not support all iterator
61 : // options, including block-property and table filters. NewExternalIter errors
62 : // if an incompatible option is set.
63 : func NewExternalIter(
64 : o *Options,
65 : iterOpts *IterOptions,
66 : files [][]sstable.ReadableFile,
67 : extraOpts ...ExternalIterOption,
68 1 : ) (it *Iterator, err error) {
69 1 : return NewExternalIterWithContext(context.Background(), o, iterOpts, files, extraOpts...)
70 1 : }
71 :
72 : // NewExternalIterWithContext is like NewExternalIter, and additionally
73 : // accepts a context for tracing.
74 : func NewExternalIterWithContext(
75 : ctx context.Context,
76 : o *Options,
77 : iterOpts *IterOptions,
78 : files [][]sstable.ReadableFile,
79 : extraOpts ...ExternalIterOption,
80 1 : ) (it *Iterator, err error) {
81 1 : if iterOpts != nil {
82 1 : if err := validateExternalIterOpts(iterOpts); err != nil {
83 0 : return nil, err
84 0 : }
85 : }
86 :
87 1 : var readers [][]*sstable.Reader
88 1 :
89 1 : seqNumOffset := 0
90 1 : var extraReaderOpts []sstable.ReaderOption
91 1 : for i := range extraOpts {
92 0 : extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...)
93 0 : }
94 1 : for _, levelFiles := range files {
95 1 : seqNumOffset += len(levelFiles)
96 1 : }
97 1 : for _, levelFiles := range files {
98 1 : seqNumOffset -= len(levelFiles)
99 1 : subReaders, err := openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...)
100 1 : readers = append(readers, subReaders)
101 1 : if err != nil {
102 0 : // Close all the opened readers.
103 0 : for i := range readers {
104 0 : for j := range readers[i] {
105 0 : _ = readers[i][j].Close()
106 0 : }
107 : }
108 0 : return nil, err
109 : }
110 : }
111 :
112 1 : buf := iterAllocPool.Get().(*iterAlloc)
113 1 : dbi := &buf.dbi
114 1 : *dbi = Iterator{
115 1 : ctx: ctx,
116 1 : alloc: buf,
117 1 : merge: o.Merger.Merge,
118 1 : comparer: *o.Comparer,
119 1 : readState: nil,
120 1 : keyBuf: buf.keyBuf,
121 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
122 1 : boundsBuf: buf.boundsBuf,
123 1 : batch: nil,
124 1 : // Add the readers to the Iterator so that Close closes them, and
125 1 : // SetOptions can re-construct iterators from them.
126 1 : externalReaders: readers,
127 1 : newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
128 1 : internalIterOpts, iterKinds) (iterSet, error) {
129 0 : // NB: External iterators are currently constructed without any
130 0 : // `levelIters`. newIters should never be called. When we support
131 0 : // organizing multiple non-overlapping files into a single level
132 0 : // (see TODO below), we'll need to adjust this tableNewIters
133 0 : // implementation to open iterators by looking up f in a map
134 0 : // of readers indexed by *fileMetadata.
135 0 : panic("unreachable")
136 : },
137 : seqNum: base.InternalKeySeqNumMax,
138 : }
139 1 : if iterOpts != nil {
140 1 : dbi.opts = *iterOpts
141 1 : dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
142 1 : }
143 1 : for i := range extraOpts {
144 0 : extraOpts[i].iterApply(dbi)
145 0 : }
146 1 : if err := finishInitializingExternal(ctx, dbi); err != nil {
147 0 : dbi.Close()
148 0 : return nil, err
149 0 : }
150 1 : return dbi, nil
151 : }
152 :
153 1 : func validateExternalIterOpts(iterOpts *IterOptions) error {
154 1 : switch {
155 0 : case iterOpts.TableFilter != nil:
156 0 : return errors.Errorf("pebble: external iterator: TableFilter unsupported")
157 0 : case iterOpts.PointKeyFilters != nil:
158 0 : return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
159 0 : case iterOpts.RangeKeyFilters != nil:
160 0 : return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
161 0 : case iterOpts.OnlyReadGuaranteedDurable:
162 0 : return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
163 0 : case iterOpts.UseL6Filters:
164 0 : return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
165 : }
166 1 : return nil
167 : }
168 :
169 1 : func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
170 1 : // TODO(jackson): In some instances we could generate fewer levels by using
171 1 : // L0Sublevels code to organize nonoverlapping files into the same level.
172 1 : // This would allow us to use levelIters and keep a smaller set of data and
173 1 : // files in-memory. However, it would also require us to identify the bounds
174 1 : // of all the files upfront.
175 1 :
176 1 : if !it.opts.pointKeys() {
177 0 : return emptyIter, nil
178 1 : } else if it.pointIter != nil {
179 1 : return it.pointIter, nil
180 1 : }
181 1 : mlevels := it.alloc.mlevels[:0]
182 1 :
183 1 : if len(it.externalReaders) > cap(mlevels) {
184 0 : mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
185 0 : }
186 : // We set a synthetic sequence number, with lower levels having higer numbers.
187 1 : seqNum := 0
188 1 : for _, readers := range it.externalReaders {
189 1 : seqNum += len(readers)
190 1 : }
191 1 : for _, readers := range it.externalReaders {
192 1 : var combinedIters []internalIterator
193 1 : for _, r := range readers {
194 1 : var (
195 1 : rangeDelIter keyspan.FragmentIterator
196 1 : pointIter internalIterator
197 1 : err error
198 1 : )
199 1 : // We could set hideObsoletePoints=true, since we are reading at
200 1 : // InternalKeySeqNumMax, but we don't bother since these sstables should
201 1 : // not have obsolete points (so the performance optimization is
202 1 : // unnecessary), and we don't want to bother constructing a
203 1 : // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
204 1 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
205 1 : seqNum--
206 1 : pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
207 1 : ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
208 1 : false, /* useFilterBlock */
209 1 : &it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
210 1 : sstable.TrivialReaderProvider{Reader: r})
211 1 : if err != nil {
212 0 : return nil, err
213 0 : }
214 1 : rangeDelIter, err = r.NewRawRangeDelIter(transforms)
215 1 : if err != nil {
216 0 : return nil, err
217 0 : }
218 1 : mlevels = append(mlevels, mergingIterLevel{
219 1 : iter: pointIter,
220 1 : rangeDelIter: rangeDelIter,
221 1 : })
222 : }
223 1 : if len(combinedIters) == 1 {
224 0 : mlevels = append(mlevels, mergingIterLevel{
225 0 : iter: combinedIters[0],
226 0 : })
227 1 : } else if len(combinedIters) > 1 {
228 0 : sli := &simpleLevelIter{
229 0 : cmp: it.cmp,
230 0 : iters: combinedIters,
231 0 : }
232 0 : sli.init(it.opts)
233 0 : mlevels = append(mlevels, mergingIterLevel{
234 0 : iter: sli,
235 0 : rangeDelIter: nil,
236 0 : })
237 0 : }
238 : }
239 :
240 1 : it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
241 1 : it.alloc.merging.snapshot = base.InternalKeySeqNumMax
242 1 : if len(mlevels) <= cap(it.alloc.levelsPositioned) {
243 1 : it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
244 1 : }
245 1 : return &it.alloc.merging, nil
246 : }
247 :
248 1 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
249 1 : pointIter, err := createExternalPointIter(ctx, it)
250 1 : if err != nil {
251 0 : return err
252 0 : }
253 1 : it.pointIter = pointIter
254 1 : it.iter = it.pointIter
255 1 :
256 1 : if it.opts.rangeKeys() {
257 1 : it.rangeKeyMasking.init(it, it.comparer.Compare, it.comparer.Split)
258 1 : var rangeKeyIters []keyspan.FragmentIterator
259 1 : if it.rangeKey == nil {
260 1 : // We could take advantage of the lack of overlaps in range keys within
261 1 : // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
262 1 : // out of those. However, since range keys are expected to be sparse to
263 1 : // begin with, the performance gain might not be significant enough to
264 1 : // warrant it.
265 1 : //
266 1 : // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
267 1 : // operate on FileMetadatas (similar to simpleLevelIter), and implements
268 1 : // this optimization.
269 1 : // We set a synthetic sequence number, with lower levels having higer numbers.
270 1 : seqNum := 0
271 1 : for _, readers := range it.externalReaders {
272 1 : seqNum += len(readers)
273 1 : }
274 1 : for _, readers := range it.externalReaders {
275 1 : for _, r := range readers {
276 1 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
277 1 : seqNum--
278 1 : if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
279 0 : return err
280 1 : } else if rki != nil {
281 1 : rangeKeyIters = append(rangeKeyIters, rki)
282 1 : }
283 : }
284 : }
285 1 : if len(rangeKeyIters) > 0 {
286 1 : it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
287 1 : it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
288 1 : it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
289 1 : &it.comparer,
290 1 : base.InternalKeySeqNumMax,
291 1 : it.opts.LowerBound, it.opts.UpperBound,
292 1 : &it.hasPrefix, &it.prefixOrFullSeekKey,
293 1 : false /* internalKeys */, &it.rangeKey.internal,
294 1 : )
295 1 : for i := range rangeKeyIters {
296 1 : it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
297 1 : }
298 : }
299 : }
300 1 : if it.rangeKey != nil {
301 1 : it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
302 1 : keyspan.InterleavingIterOpts{
303 1 : Mask: &it.rangeKeyMasking,
304 1 : LowerBound: it.opts.LowerBound,
305 1 : UpperBound: it.opts.UpperBound,
306 1 : })
307 1 : it.iter = &it.rangeKey.iiter
308 1 : }
309 : }
310 1 : return nil
311 : }
312 :
313 : func openExternalTables(
314 : o *Options,
315 : files []sstable.ReadableFile,
316 : seqNumOffset int,
317 : readerOpts sstable.ReaderOptions,
318 : extraReaderOpts ...sstable.ReaderOption,
319 1 : ) (readers []*sstable.Reader, err error) {
320 1 : readers = make([]*sstable.Reader, 0, len(files))
321 1 : for i := range files {
322 1 : readable, err := sstable.NewSimpleReadable(files[i])
323 1 : if err != nil {
324 0 : return readers, err
325 0 : }
326 1 : r, err := sstable.NewReader(readable, readerOpts, extraReaderOpts...)
327 1 : if err != nil {
328 0 : return readers, err
329 0 : }
330 1 : readers = append(readers, r)
331 : }
332 1 : return readers, err
333 : }
334 :
335 : // simpleLevelIter is similar to a levelIter in that it merges the points
336 : // from multiple point iterators that are non-overlapping in the key ranges
337 : // they return. It is only expected to support forward iteration and forward
338 : // regular seeking; reverse iteration and prefix seeking is not supported.
339 : // Intended to be a low-overhead, non-FileMetadata dependent option for
340 : // NewExternalIter. To optimize seeking and forward iteration, it maintains
341 : // two slices of child iterators; one of all iterators, and a subset of it that
342 : // contains just the iterators that contain point keys within the current
343 : // bounds.
344 : //
345 : // Note that this levelIter does not support pausing at file boundaries
346 : // in case of range tombstones in this file that could apply to points outside
347 : // of this file (and outside of this level). This is sufficient for optimizing
348 : // the main use cases of NewExternalIter, however for completeness it would make
349 : // sense to build this pausing functionality in.
350 : type simpleLevelIter struct {
351 : cmp Compare
352 : err error
353 : lowerBound []byte
354 : iters []internalIterator
355 : filtered []internalIterator
356 : firstKeys [][]byte
357 : firstKeysBuf []byte
358 : currentIdx int
359 : }
360 :
361 : var _ internalIterator = &simpleLevelIter{}
362 :
363 : // init initializes this simpleLevelIter.
364 1 : func (s *simpleLevelIter) init(opts IterOptions) {
365 1 : s.currentIdx = 0
366 1 : s.lowerBound = opts.LowerBound
367 1 : s.resetFilteredIters()
368 1 : }
369 :
370 1 : func (s *simpleLevelIter) resetFilteredIters() {
371 1 : s.filtered = s.filtered[:0]
372 1 : s.firstKeys = s.firstKeys[:0]
373 1 : s.firstKeysBuf = s.firstKeysBuf[:0]
374 1 : s.err = nil
375 1 : for i := range s.iters {
376 1 : var iterKey *base.InternalKey
377 1 : if s.lowerBound != nil {
378 0 : iterKey, _ = s.iters[i].SeekGE(s.lowerBound, base.SeekGEFlagsNone)
379 1 : } else {
380 1 : iterKey, _ = s.iters[i].First()
381 1 : }
382 1 : if iterKey != nil {
383 1 : s.filtered = append(s.filtered, s.iters[i])
384 1 : bufStart := len(s.firstKeysBuf)
385 1 : s.firstKeysBuf = append(s.firstKeysBuf, iterKey.UserKey...)
386 1 : s.firstKeys = append(s.firstKeys, s.firstKeysBuf[bufStart:bufStart+len(iterKey.UserKey)])
387 1 : } else if err := s.iters[i].Error(); err != nil {
388 1 : s.err = err
389 1 : }
390 : }
391 : }
392 :
393 : func (s *simpleLevelIter) SeekGE(
394 : key []byte, flags base.SeekGEFlags,
395 1 : ) (*base.InternalKey, base.LazyValue) {
396 1 : if s.err != nil {
397 0 : return nil, base.LazyValue{}
398 0 : }
399 : // Find the first file that is entirely >= key. The file before that could
400 : // contain the key we're looking for.
401 1 : n := sort.Search(len(s.firstKeys), func(i int) bool {
402 1 : return s.cmp(key, s.firstKeys[i]) <= 0
403 1 : })
404 1 : if n > 0 {
405 1 : s.currentIdx = n - 1
406 1 : } else {
407 1 : s.currentIdx = n
408 1 : }
409 1 : if s.currentIdx < len(s.filtered) {
410 1 : if iterKey, val := s.filtered[s.currentIdx].SeekGE(key, flags); iterKey != nil {
411 1 : return iterKey, val
412 1 : }
413 1 : if err := s.filtered[s.currentIdx].Error(); err != nil {
414 0 : s.err = err
415 0 : }
416 1 : s.currentIdx++
417 : }
418 1 : return s.skipEmptyFileForward(key, flags)
419 : }
420 :
421 : func (s *simpleLevelIter) skipEmptyFileForward(
422 : seekKey []byte, flags base.SeekGEFlags,
423 1 : ) (*base.InternalKey, base.LazyValue) {
424 1 : var iterKey *base.InternalKey
425 1 : var val base.LazyValue
426 1 : for s.currentIdx >= 0 && s.currentIdx < len(s.filtered) && s.err == nil {
427 1 : if seekKey != nil {
428 1 : iterKey, val = s.filtered[s.currentIdx].SeekGE(seekKey, flags)
429 1 : } else if s.lowerBound != nil {
430 0 : iterKey, val = s.filtered[s.currentIdx].SeekGE(s.lowerBound, flags)
431 1 : } else {
432 1 : iterKey, val = s.filtered[s.currentIdx].First()
433 1 : }
434 1 : if iterKey != nil {
435 1 : return iterKey, val
436 1 : }
437 0 : if err := s.filtered[s.currentIdx].Error(); err != nil {
438 0 : s.err = err
439 0 : }
440 0 : s.currentIdx++
441 : }
442 1 : return nil, base.LazyValue{}
443 : }
444 :
445 : func (s *simpleLevelIter) SeekPrefixGE(
446 : prefix, key []byte, flags base.SeekGEFlags,
447 0 : ) (*base.InternalKey, base.LazyValue) {
448 0 : panic("unimplemented")
449 : }
450 :
451 : func (s *simpleLevelIter) SeekLT(
452 : key []byte, flags base.SeekLTFlags,
453 0 : ) (*base.InternalKey, base.LazyValue) {
454 0 : panic("unimplemented")
455 : }
456 :
457 1 : func (s *simpleLevelIter) First() (*base.InternalKey, base.LazyValue) {
458 1 : if s.err != nil {
459 1 : return nil, base.LazyValue{}
460 1 : }
461 1 : s.currentIdx = 0
462 1 : return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
463 : }
464 :
465 0 : func (s *simpleLevelIter) Last() (*base.InternalKey, base.LazyValue) {
466 0 : panic("unimplemented")
467 : }
468 :
469 1 : func (s *simpleLevelIter) Next() (*base.InternalKey, base.LazyValue) {
470 1 : if s.err != nil {
471 0 : return nil, base.LazyValue{}
472 0 : }
473 1 : if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
474 1 : return nil, base.LazyValue{}
475 1 : }
476 1 : if iterKey, val := s.filtered[s.currentIdx].Next(); iterKey != nil {
477 1 : return iterKey, val
478 1 : }
479 1 : s.currentIdx++
480 1 : return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
481 : }
482 :
483 0 : func (s *simpleLevelIter) NextPrefix(succKey []byte) (*base.InternalKey, base.LazyValue) {
484 0 : if s.err != nil {
485 0 : return nil, base.LazyValue{}
486 0 : }
487 0 : if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
488 0 : return nil, base.LazyValue{}
489 0 : }
490 0 : if iterKey, val := s.filtered[s.currentIdx].NextPrefix(succKey); iterKey != nil {
491 0 : return iterKey, val
492 0 : }
493 0 : s.currentIdx++
494 0 : return s.skipEmptyFileForward(succKey /* seekKey */, base.SeekGEFlagsNone)
495 : }
496 :
497 0 : func (s *simpleLevelIter) Prev() (*base.InternalKey, base.LazyValue) {
498 0 : panic("unimplemented")
499 : }
500 :
501 1 : func (s *simpleLevelIter) Error() error {
502 1 : if s.currentIdx >= 0 && s.currentIdx < len(s.filtered) {
503 0 : s.err = firstError(s.err, s.filtered[s.currentIdx].Error())
504 0 : }
505 1 : return s.err
506 : }
507 :
508 1 : func (s *simpleLevelIter) Close() error {
509 1 : var err error
510 1 : for i := range s.iters {
511 1 : err = firstError(err, s.iters[i].Close())
512 1 : }
513 1 : return err
514 : }
515 :
516 0 : func (s *simpleLevelIter) SetBounds(lower, upper []byte) {
517 0 : s.currentIdx = -1
518 0 : s.lowerBound = lower
519 0 : for i := range s.iters {
520 0 : s.iters[i].SetBounds(lower, upper)
521 0 : }
522 0 : s.resetFilteredIters()
523 : }
524 :
525 0 : func (s *simpleLevelIter) SetContext(_ context.Context) {}
526 :
527 0 : func (s *simpleLevelIter) String() string {
528 0 : if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
529 0 : return "simpleLevelIter: current=<nil>"
530 0 : }
531 0 : return fmt.Sprintf("simpleLevelIter: current=%s", s.filtered[s.currentIdx])
532 : }
533 :
534 : var _ internalIterator = &simpleLevelIter{}
|