Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/sstable"
15 : )
16 :
17 : // ExternalIterOption provide an interface to specify open-time options to
18 : // NewExternalIter.
19 : type ExternalIterOption interface {
20 : // iterApply is called on the iterator during opening in order to set internal
21 : // parameters.
22 : iterApply(*Iterator)
23 : // readerOptions returns any reader options added by this iter option.
24 : readerOptions() []sstable.ReaderOption
25 : }
26 :
27 : type externalIterReaderOptions struct {
28 : opts []sstable.ReaderOption
29 : }
30 :
31 0 : func (e *externalIterReaderOptions) iterApply(iterator *Iterator) {
32 0 : // Do nothing.
33 0 : }
34 :
35 0 : func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption {
36 0 : return e.opts
37 0 : }
38 :
39 : // ExternalIterReaderOptions returns an ExternalIterOption that specifies
40 : // sstable.ReaderOptions to be applied on sstable readers in NewExternalIter.
41 0 : func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption {
42 0 : return &externalIterReaderOptions{opts: opts}
43 0 : }
44 :
45 : // NewExternalIter takes an input 2d array of sstable files which may overlap
46 : // across subarrays but not within a subarray (at least as far as points are
47 : // concerned; range keys are allowed to overlap arbitrarily even within a
48 : // subarray), and returns an Iterator over the merged contents of the sstables.
49 : // Input sstables may contain point keys, range keys, range deletions, etc. The
50 : // input files slice must be sorted in reverse chronological ordering. A key in a
51 : // file at a lower index subarray will shadow a key with an identical user key
52 : // contained within a file at a higher index subarray. Each subarray must be
53 : // sorted in internal key order, where lower index files contain keys that sort
54 : // left of files with higher indexes.
55 : //
56 : // Input sstables must only contain keys with the zero sequence number.
57 : //
58 : // Iterators constructed through NewExternalIter do not support all iterator
59 : // options, including block-property and table filters. NewExternalIter errors
60 : // if an incompatible option is set.
61 : func NewExternalIter(
62 : o *Options,
63 : iterOpts *IterOptions,
64 : files [][]sstable.ReadableFile,
65 : extraOpts ...ExternalIterOption,
66 1 : ) (it *Iterator, err error) {
67 1 : return NewExternalIterWithContext(context.Background(), o, iterOpts, files, extraOpts...)
68 1 : }
69 :
70 : // NewExternalIterWithContext is like NewExternalIter, and additionally
71 : // accepts a context for tracing.
72 : func NewExternalIterWithContext(
73 : ctx context.Context,
74 : o *Options,
75 : iterOpts *IterOptions,
76 : files [][]sstable.ReadableFile,
77 : extraOpts ...ExternalIterOption,
78 1 : ) (it *Iterator, err error) {
79 1 : if iterOpts != nil {
80 1 : if err := validateExternalIterOpts(iterOpts); err != nil {
81 0 : return nil, err
82 0 : }
83 : }
84 :
85 1 : var readers [][]*sstable.Reader
86 1 :
87 1 : seqNumOffset := 0
88 1 : var extraReaderOpts []sstable.ReaderOption
89 1 : for i := range extraOpts {
90 0 : extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...)
91 0 : }
92 1 : for _, levelFiles := range files {
93 1 : seqNumOffset += len(levelFiles)
94 1 : }
95 1 : for _, levelFiles := range files {
96 1 : seqNumOffset -= len(levelFiles)
97 1 : subReaders, err := openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...)
98 1 : readers = append(readers, subReaders)
99 1 : if err != nil {
100 0 : // Close all the opened readers.
101 0 : for i := range readers {
102 0 : for j := range readers[i] {
103 0 : _ = readers[i][j].Close()
104 0 : }
105 : }
106 0 : return nil, err
107 : }
108 : }
109 :
110 1 : buf := iterAllocPool.Get().(*iterAlloc)
111 1 : dbi := &buf.dbi
112 1 : *dbi = Iterator{
113 1 : ctx: ctx,
114 1 : alloc: buf,
115 1 : merge: o.Merger.Merge,
116 1 : comparer: *o.Comparer,
117 1 : readState: nil,
118 1 : keyBuf: buf.keyBuf,
119 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
120 1 : boundsBuf: buf.boundsBuf,
121 1 : batch: nil,
122 1 : // Add the readers to the Iterator so that Close closes them, and
123 1 : // SetOptions can re-construct iterators from them.
124 1 : externalReaders: readers,
125 1 : newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
126 1 : internalIterOpts, iterKinds) (iterSet, error) {
127 0 : // NB: External iterators are currently constructed without any
128 0 : // `levelIters`. newIters should never be called. When we support
129 0 : // organizing multiple non-overlapping files into a single level
130 0 : // (see TODO below), we'll need to adjust this tableNewIters
131 0 : // implementation to open iterators by looking up f in a map
132 0 : // of readers indexed by *fileMetadata.
133 0 : panic("unreachable")
134 : },
135 : seqNum: base.InternalKeySeqNumMax,
136 : }
137 1 : if iterOpts != nil {
138 1 : dbi.opts = *iterOpts
139 1 : dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
140 1 : }
141 1 : for i := range extraOpts {
142 0 : extraOpts[i].iterApply(dbi)
143 0 : }
144 1 : if err := finishInitializingExternal(ctx, dbi); err != nil {
145 0 : dbi.Close()
146 0 : return nil, err
147 0 : }
148 1 : return dbi, nil
149 : }
150 :
151 1 : func validateExternalIterOpts(iterOpts *IterOptions) error {
152 1 : switch {
153 0 : case iterOpts.TableFilter != nil:
154 0 : return errors.Errorf("pebble: external iterator: TableFilter unsupported")
155 0 : case iterOpts.PointKeyFilters != nil:
156 0 : return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
157 0 : case iterOpts.RangeKeyFilters != nil:
158 0 : return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
159 0 : case iterOpts.OnlyReadGuaranteedDurable:
160 0 : return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
161 0 : case iterOpts.UseL6Filters:
162 0 : return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
163 : }
164 1 : return nil
165 : }
166 :
167 1 : func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
168 1 : // TODO(jackson): In some instances we could generate fewer levels by using
169 1 : // L0Sublevels code to organize nonoverlapping files into the same level.
170 1 : // This would allow us to use levelIters and keep a smaller set of data and
171 1 : // files in-memory. However, it would also require us to identify the bounds
172 1 : // of all the files upfront.
173 1 :
174 1 : if !it.opts.pointKeys() {
175 0 : return emptyIter, nil
176 1 : } else if it.pointIter != nil {
177 1 : return it.pointIter, nil
178 1 : }
179 1 : mlevels := it.alloc.mlevels[:0]
180 1 :
181 1 : if len(it.externalReaders) > cap(mlevels) {
182 0 : mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
183 0 : }
184 : // We set a synthetic sequence number, with lower levels having higer numbers.
185 1 : seqNum := 0
186 1 : for _, readers := range it.externalReaders {
187 1 : seqNum += len(readers)
188 1 : }
189 1 : for _, readers := range it.externalReaders {
190 1 : for _, r := range readers {
191 1 : var (
192 1 : rangeDelIter keyspan.FragmentIterator
193 1 : pointIter internalIterator
194 1 : err error
195 1 : )
196 1 : // We could set hideObsoletePoints=true, since we are reading at
197 1 : // InternalKeySeqNumMax, but we don't bother since these sstables should
198 1 : // not have obsolete points (so the performance optimization is
199 1 : // unnecessary), and we don't want to bother constructing a
200 1 : // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
201 1 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
202 1 : seqNum--
203 1 : pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
204 1 : ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
205 1 : false, /* useFilterBlock */
206 1 : &it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
207 1 : sstable.TrivialReaderProvider{Reader: r})
208 1 : if err != nil {
209 0 : return nil, err
210 0 : }
211 1 : rangeDelIter, err = r.NewRawRangeDelIter(transforms)
212 1 : if err != nil {
213 0 : return nil, err
214 0 : }
215 1 : mlevels = append(mlevels, mergingIterLevel{
216 1 : iter: pointIter,
217 1 : rangeDelIter: rangeDelIter,
218 1 : })
219 : }
220 : }
221 :
222 1 : it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
223 1 : it.alloc.merging.snapshot = base.InternalKeySeqNumMax
224 1 : if len(mlevels) <= cap(it.alloc.levelsPositioned) {
225 1 : it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
226 1 : }
227 1 : return &it.alloc.merging, nil
228 : }
229 :
230 1 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
231 1 : pointIter, err := createExternalPointIter(ctx, it)
232 1 : if err != nil {
233 0 : return err
234 0 : }
235 1 : it.pointIter = pointIter
236 1 : it.iter = it.pointIter
237 1 :
238 1 : if it.opts.rangeKeys() {
239 1 : it.rangeKeyMasking.init(it, it.comparer.Compare, it.comparer.Split)
240 1 : var rangeKeyIters []keyspan.FragmentIterator
241 1 : if it.rangeKey == nil {
242 1 : // We could take advantage of the lack of overlaps in range keys within
243 1 : // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
244 1 : // out of those. However, since range keys are expected to be sparse to
245 1 : // begin with, the performance gain might not be significant enough to
246 1 : // warrant it.
247 1 : //
248 1 : // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
249 1 : // operate on FileMetadatas (similar to simpleLevelIter), and implements
250 1 : // this optimization.
251 1 : // We set a synthetic sequence number, with lower levels having higer numbers.
252 1 : seqNum := 0
253 1 : for _, readers := range it.externalReaders {
254 1 : seqNum += len(readers)
255 1 : }
256 1 : for _, readers := range it.externalReaders {
257 1 : for _, r := range readers {
258 1 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
259 1 : seqNum--
260 1 : if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
261 0 : return err
262 1 : } else if rki != nil {
263 1 : rangeKeyIters = append(rangeKeyIters, rki)
264 1 : }
265 : }
266 : }
267 1 : if len(rangeKeyIters) > 0 {
268 1 : it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
269 1 : it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
270 1 : it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
271 1 : &it.comparer,
272 1 : base.InternalKeySeqNumMax,
273 1 : it.opts.LowerBound, it.opts.UpperBound,
274 1 : &it.hasPrefix, &it.prefixOrFullSeekKey,
275 1 : false /* internalKeys */, &it.rangeKey.internal,
276 1 : )
277 1 : for i := range rangeKeyIters {
278 1 : it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
279 1 : }
280 : }
281 : }
282 1 : if it.rangeKey != nil {
283 1 : it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
284 1 : keyspan.InterleavingIterOpts{
285 1 : Mask: &it.rangeKeyMasking,
286 1 : LowerBound: it.opts.LowerBound,
287 1 : UpperBound: it.opts.UpperBound,
288 1 : })
289 1 : it.iter = &it.rangeKey.iiter
290 1 : }
291 : }
292 1 : return nil
293 : }
294 :
295 : func openExternalTables(
296 : o *Options,
297 : files []sstable.ReadableFile,
298 : seqNumOffset int,
299 : readerOpts sstable.ReaderOptions,
300 : extraReaderOpts ...sstable.ReaderOption,
301 1 : ) (readers []*sstable.Reader, err error) {
302 1 : readers = make([]*sstable.Reader, 0, len(files))
303 1 : for i := range files {
304 1 : readable, err := sstable.NewSimpleReadable(files[i])
305 1 : if err != nil {
306 0 : return readers, err
307 0 : }
308 1 : r, err := sstable.NewReader(readable, readerOpts, extraReaderOpts...)
309 1 : if err != nil {
310 0 : return readers, err
311 0 : }
312 1 : readers = append(readers, r)
313 : }
314 1 : return readers, err
315 : }
|