Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/sstable"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : )
17 :
18 : // NewExternalIter takes an input 2d array of sstable files which may overlap
19 : // across subarrays but not within a subarray (at least as far as points are
20 : // concerned; range keys are allowed to overlap arbitrarily even within a
21 : // subarray), and returns an Iterator over the merged contents of the sstables.
22 : // Input sstables may contain point keys, range keys, range deletions, etc. The
23 : // input files slice must be sorted in reverse chronological ordering. A key in a
24 : // file at a lower index subarray will shadow a key with an identical user key
25 : // contained within a file at a higher index subarray. Each subarray must be
26 : // sorted in internal key order, where lower index files contain keys that sort
27 : // left of files with higher indexes.
28 : //
29 : // Input sstables must only contain keys with the zero sequence number and must
30 : // not contain references to values in external blob files.
31 : //
32 : // Iterators constructed through NewExternalIter do not support all iterator
33 : // options, including block-property and table filters. NewExternalIter errors
34 : // if an incompatible option is set.
35 : func NewExternalIter(
36 : o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
37 1 : ) (it *Iterator, err error) {
38 1 : return NewExternalIterWithContext(context.Background(), o, iterOpts, files)
39 1 : }
40 :
41 : // NewExternalIterWithContext is like NewExternalIter, and additionally
42 : // accepts a context for tracing.
43 : func NewExternalIterWithContext(
44 : ctx context.Context, o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
45 1 : ) (it *Iterator, err error) {
46 1 : if iterOpts != nil {
47 1 : if err := validateExternalIterOpts(iterOpts); err != nil {
48 0 : return nil, err
49 0 : }
50 : }
51 :
52 1 : ro := o.MakeReaderOptions()
53 1 : var readers [][]*sstable.Reader
54 1 : for _, levelFiles := range files {
55 1 : subReaders, err := openExternalTables(ctx, levelFiles, ro)
56 1 : readers = append(readers, subReaders)
57 1 : if err != nil {
58 1 : // Close all the opened readers.
59 1 : for i := range readers {
60 1 : for j := range readers[i] {
61 1 : _ = readers[i][j].Close()
62 1 : }
63 : }
64 1 : return nil, err
65 : }
66 : }
67 :
68 1 : buf := iterAllocPool.Get().(*iterAlloc)
69 1 : dbi := &buf.dbi
70 1 : *dbi = Iterator{
71 1 : ctx: ctx,
72 1 : alloc: buf,
73 1 : merge: o.Merger.Merge,
74 1 : comparer: *o.Comparer,
75 1 : readState: nil,
76 1 : keyBuf: buf.keyBuf,
77 1 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
78 1 : boundsBuf: buf.boundsBuf,
79 1 : batch: nil,
80 1 : // Add the external iter state to the Iterator so that Close closes it,
81 1 : // and SetOptions can re-construct iterators using its state.
82 1 : externalIter: &externalIterState{readers: readers},
83 1 : newIters: func(context.Context, *manifest.TableMetadata, *IterOptions,
84 1 : internalIterOpts, iterKinds) (iterSet, error) {
85 0 : // NB: External iterators are currently constructed without any
86 0 : // `levelIters`. newIters should never be called. When we support
87 0 : // organizing multiple non-overlapping files into a single level
88 0 : // (see TODO below), we'll need to adjust this tableNewIters
89 0 : // implementation to open iterators by looking up f in a map
90 0 : // of readers indexed by *fileMetadata.
91 0 : panic("unreachable")
92 : },
93 : seqNum: base.SeqNumMax,
94 : }
95 1 : dbi.externalIter.bufferPool.Init(2)
96 1 :
97 1 : if iterOpts != nil {
98 1 : dbi.opts = *iterOpts
99 1 : dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
100 1 : }
101 1 : if err := finishInitializingExternal(ctx, dbi); err != nil {
102 1 : _ = dbi.Close()
103 1 : return nil, err
104 1 : }
105 1 : return dbi, nil
106 : }
107 :
108 : // externalIterState encapsulates state that is specific to external iterators.
109 : // An external *pebble.Iterator maintains a pointer to the externalIterState and
110 : // calls Close when the Iterator is Closed, providing an opportuntity for the
111 : // external iterator to release resources particular to external iterators.
112 : type externalIterState struct {
113 : bufferPool block.BufferPool
114 : readers [][]*sstable.Reader
115 : }
116 :
117 1 : func (e *externalIterState) Close() (err error) {
118 1 : for _, readers := range e.readers {
119 1 : for _, r := range readers {
120 1 : err = firstError(err, r.Close())
121 1 : }
122 : }
123 1 : e.bufferPool.Release()
124 1 : return err
125 : }
126 :
127 1 : func validateExternalIterOpts(iterOpts *IterOptions) error {
128 1 : switch {
129 0 : case iterOpts.PointKeyFilters != nil:
130 0 : return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
131 0 : case iterOpts.RangeKeyFilters != nil:
132 0 : return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
133 0 : case iterOpts.OnlyReadGuaranteedDurable:
134 0 : return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
135 0 : case iterOpts.UseL6Filters:
136 0 : return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
137 : }
138 1 : return nil
139 : }
140 :
141 : func createExternalPointIter(
142 : ctx context.Context, it *Iterator, readEnv sstable.ReadEnv,
143 1 : ) (topLevelIterator, error) {
144 1 : // TODO(jackson): In some instances we could generate fewer levels by using
145 1 : // L0Sublevels code to organize nonoverlapping files into the same level.
146 1 : // This would allow us to use levelIters and keep a smaller set of data and
147 1 : // files in-memory. However, it would also require us to identify the bounds
148 1 : // of all the files upfront.
149 1 :
150 1 : if !it.opts.pointKeys() {
151 0 : return emptyIter, nil
152 1 : } else if it.pointIter != nil {
153 1 : return it.pointIter, nil
154 1 : }
155 1 : mlevels := it.alloc.mlevels[:0]
156 1 :
157 1 : if len(it.externalIter.readers) > cap(mlevels) {
158 0 : mlevels = make([]mergingIterLevel, 0, len(it.externalIter.readers))
159 0 : }
160 : // We set a synthetic sequence number, with lower levels having higer numbers.
161 1 : seqNum := 0
162 1 : for _, readers := range it.externalIter.readers {
163 1 : seqNum += len(readers)
164 1 : }
165 1 : for _, readers := range it.externalIter.readers {
166 1 : for _, r := range readers {
167 1 : var (
168 1 : rangeDelIter keyspan.FragmentIterator
169 1 : pointIter internalIterator
170 1 : err error
171 1 : )
172 1 : // We could set hideObsoletePoints=true, since we are reading at
173 1 : // InternalKeySeqNumMax, but we don't bother since these sstables should
174 1 : // not have obsolete points (so the performance optimization is
175 1 : // unnecessary), and we don't want to bother constructing a
176 1 : // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
177 1 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
178 1 : seqNum--
179 1 : pointIter, err = r.NewPointIter(ctx, sstable.IterOptions{
180 1 : Lower: it.opts.LowerBound,
181 1 : Upper: it.opts.UpperBound,
182 1 : Transforms: transforms,
183 1 : FilterBlockSizeLimit: sstable.NeverUseFilterBlock,
184 1 : Env: readEnv,
185 1 : ReaderProvider: sstable.MakeTrivialReaderProvider(r),
186 1 : })
187 1 : if err == nil {
188 1 : rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
189 1 : SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
190 1 : }, readEnv)
191 1 : }
192 1 : if err != nil {
193 1 : if pointIter != nil {
194 1 : _ = pointIter.Close()
195 1 : }
196 1 : for i := range mlevels {
197 1 : _ = mlevels[i].iter.Close()
198 1 : if mlevels[i].rangeDelIter != nil {
199 1 : mlevels[i].rangeDelIter.Close()
200 1 : }
201 : }
202 1 : return nil, err
203 : }
204 1 : mlevels = append(mlevels, mergingIterLevel{
205 1 : iter: pointIter,
206 1 : rangeDelIter: rangeDelIter,
207 1 : })
208 : }
209 : }
210 :
211 1 : it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
212 1 : it.alloc.merging.snapshot = base.SeqNumMax
213 1 : if len(mlevels) <= cap(it.alloc.levelsPositioned) {
214 1 : it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
215 1 : }
216 1 : return &it.alloc.merging, nil
217 : }
218 :
219 1 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
220 1 : readEnv := sstable.ReadEnv{
221 1 : Block: block.ReadEnv{
222 1 : Stats: &it.stats.InternalStats,
223 1 : // TODO(jackson): External iterators never provide categorized iterator
224 1 : // stats today because they exist outside the context of a *DB. If the
225 1 : // sstables being read are on the physical filesystem, we may still want to
226 1 : // thread a CategoryStatsCollector through so that we collect their stats.
227 1 : IterStats: nil,
228 1 : BufferPool: &it.externalIter.bufferPool,
229 1 : },
230 1 : }
231 1 : pointIter, err := createExternalPointIter(ctx, it, readEnv)
232 1 : if err != nil {
233 1 : return err
234 1 : }
235 1 : it.pointIter = pointIter
236 1 : it.iter = it.pointIter
237 1 :
238 1 : if it.opts.rangeKeys() {
239 1 : it.rangeKeyMasking.init(it, &it.comparer)
240 1 : var rangeKeyIters []keyspan.FragmentIterator
241 1 : if it.rangeKey == nil {
242 1 : // We could take advantage of the lack of overlaps in range keys within
243 1 : // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
244 1 : // out of those. However, since range keys are expected to be sparse to
245 1 : // begin with, the performance gain might not be significant enough to
246 1 : // warrant it.
247 1 : //
248 1 : // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
249 1 : // operate on TableMetadatas (similar to simpleLevelIter), and implements
250 1 : // this optimization.
251 1 : // We set a synthetic sequence number, with lower levels having higer numbers.
252 1 : seqNum := 0
253 1 : for _, readers := range it.externalIter.readers {
254 1 : seqNum += len(readers)
255 1 : }
256 1 : for _, readers := range it.externalIter.readers {
257 1 : for _, r := range readers {
258 1 : transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
259 1 : seqNum--
260 1 : rki, err := r.NewRawRangeKeyIter(ctx, transforms, readEnv)
261 1 : if err != nil {
262 1 : for _, iter := range rangeKeyIters {
263 1 : iter.Close()
264 1 : }
265 1 : return err
266 : }
267 1 : if rki != nil {
268 1 : rangeKeyIters = append(rangeKeyIters, rki)
269 1 : }
270 : }
271 : }
272 1 : if len(rangeKeyIters) > 0 {
273 1 : it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
274 1 : it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
275 1 : it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
276 1 : &it.comparer,
277 1 : base.SeqNumMax,
278 1 : it.opts.LowerBound, it.opts.UpperBound,
279 1 : &it.hasPrefix, &it.prefixOrFullSeekKey,
280 1 : false /* internalKeys */, &it.rangeKey.internal,
281 1 : )
282 1 : for i := range rangeKeyIters {
283 1 : it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
284 1 : }
285 : }
286 : }
287 1 : if it.rangeKey != nil {
288 1 : it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
289 1 : keyspan.InterleavingIterOpts{
290 1 : Mask: &it.rangeKeyMasking,
291 1 : LowerBound: it.opts.LowerBound,
292 1 : UpperBound: it.opts.UpperBound,
293 1 : })
294 1 : it.iter = &it.rangeKey.iiter
295 1 : }
296 : }
297 1 : return nil
298 : }
299 :
300 : func openExternalTables(
301 : ctx context.Context, files []sstable.ReadableFile, readerOpts sstable.ReaderOptions,
302 1 : ) (readers []*sstable.Reader, err error) {
303 1 : readers = make([]*sstable.Reader, 0, len(files))
304 1 : for i := range files {
305 1 : readable, err := sstable.NewSimpleReadable(files[i])
306 1 : if err != nil {
307 0 : return readers, err
308 0 : }
309 1 : r, err := sstable.NewReader(ctx, readable, readerOpts)
310 1 : if err != nil {
311 1 : return readers, err
312 1 : }
313 1 : if r.Attributes.Has(sstable.AttributeBlobValues) {
314 1 : return readers, errors.Newf("pebble: NewExternalIter does not support blob references")
315 1 : }
316 1 : readers = append(readers, r)
317 : }
318 1 : return readers, err
319 : }
|