Line data Source code
1 : // Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package pebble
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/keyspan"
13 : "github.com/cockroachdb/pebble/internal/manifest"
14 : "github.com/cockroachdb/pebble/sstable"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : )
17 :
18 : // NewExternalIter takes an input 2d array of sstable files which may overlap
19 : // across subarrays but not within a subarray (at least as far as points are
20 : // concerned; range keys are allowed to overlap arbitrarily even within a
21 : // subarray), and returns an Iterator over the merged contents of the sstables.
22 : // Input sstables may contain point keys, range keys, range deletions, etc. The
23 : // input files slice must be sorted in reverse chronological ordering. A key in a
24 : // file at a lower index subarray will shadow a key with an identical user key
25 : // contained within a file at a higher index subarray. Each subarray must be
26 : // sorted in internal key order, where lower index files contain keys that sort
27 : // left of files with higher indexes.
28 : //
29 : // Input sstables must only contain keys with the zero sequence number.
30 : //
31 : // Iterators constructed through NewExternalIter do not support all iterator
32 : // options, including block-property and table filters. NewExternalIter errors
33 : // if an incompatible option is set.
34 : func NewExternalIter(
35 : o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
36 0 : ) (it *Iterator, err error) {
37 0 : return NewExternalIterWithContext(context.Background(), o, iterOpts, files)
38 0 : }
39 :
40 : // NewExternalIterWithContext is like NewExternalIter, and additionally
41 : // accepts a context for tracing.
42 : func NewExternalIterWithContext(
43 : ctx context.Context, o *Options, iterOpts *IterOptions, files [][]sstable.ReadableFile,
44 0 : ) (it *Iterator, err error) {
45 0 : if iterOpts != nil {
46 0 : if err := validateExternalIterOpts(iterOpts); err != nil {
47 0 : return nil, err
48 0 : }
49 : }
50 :
51 0 : ro := o.MakeReaderOptions()
52 0 : var readers [][]*sstable.Reader
53 0 : for _, levelFiles := range files {
54 0 : subReaders, err := openExternalTables(ctx, levelFiles, ro)
55 0 : readers = append(readers, subReaders)
56 0 : if err != nil {
57 0 : // Close all the opened readers.
58 0 : for i := range readers {
59 0 : for j := range readers[i] {
60 0 : _ = readers[i][j].Close()
61 0 : }
62 : }
63 0 : return nil, err
64 : }
65 : }
66 :
67 0 : buf := iterAllocPool.Get().(*iterAlloc)
68 0 : dbi := &buf.dbi
69 0 : *dbi = Iterator{
70 0 : ctx: ctx,
71 0 : alloc: buf,
72 0 : merge: o.Merger.Merge,
73 0 : comparer: *o.Comparer,
74 0 : readState: nil,
75 0 : keyBuf: buf.keyBuf,
76 0 : prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
77 0 : boundsBuf: buf.boundsBuf,
78 0 : batch: nil,
79 0 : // Add the external iter state to the Iterator so that Close closes it,
80 0 : // and SetOptions can re-construct iterators using its state.
81 0 : externalIter: &externalIterState{readers: readers},
82 0 : newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
83 0 : internalIterOpts, iterKinds) (iterSet, error) {
84 0 : // NB: External iterators are currently constructed without any
85 0 : // `levelIters`. newIters should never be called. When we support
86 0 : // organizing multiple non-overlapping files into a single level
87 0 : // (see TODO below), we'll need to adjust this tableNewIters
88 0 : // implementation to open iterators by looking up f in a map
89 0 : // of readers indexed by *fileMetadata.
90 0 : panic("unreachable")
91 : },
92 : seqNum: base.SeqNumMax,
93 : }
94 0 : dbi.externalIter.bufferPool.Init(2)
95 0 :
96 0 : if iterOpts != nil {
97 0 : dbi.opts = *iterOpts
98 0 : dbi.processBounds(iterOpts.LowerBound, iterOpts.UpperBound)
99 0 : }
100 0 : if err := finishInitializingExternal(ctx, dbi); err != nil {
101 0 : dbi.Close()
102 0 : return nil, err
103 0 : }
104 0 : return dbi, nil
105 : }
106 :
107 : // externalIterState encapsulates state that is specific to external iterators.
108 : // An external *pebble.Iterator maintains a pointer to the externalIterState and
109 : // calls Close when the Iterator is Closed, providing an opportuntity for the
110 : // external iterator to release resources particular to external iterators.
111 : type externalIterState struct {
112 : bufferPool block.BufferPool
113 : readers [][]*sstable.Reader
114 : }
115 :
116 0 : func (e *externalIterState) Close() (err error) {
117 0 : for _, readers := range e.readers {
118 0 : for _, r := range readers {
119 0 : err = firstError(err, r.Close())
120 0 : }
121 : }
122 0 : e.bufferPool.Release()
123 0 : return err
124 : }
125 :
126 0 : func validateExternalIterOpts(iterOpts *IterOptions) error {
127 0 : switch {
128 0 : case iterOpts.PointKeyFilters != nil:
129 0 : return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
130 0 : case iterOpts.RangeKeyFilters != nil:
131 0 : return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
132 0 : case iterOpts.OnlyReadGuaranteedDurable:
133 0 : return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
134 0 : case iterOpts.UseL6Filters:
135 0 : return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
136 : }
137 0 : return nil
138 : }
139 :
140 : func createExternalPointIter(
141 : ctx context.Context, it *Iterator, readEnv block.ReadEnv,
142 0 : ) (topLevelIterator, error) {
143 0 : // TODO(jackson): In some instances we could generate fewer levels by using
144 0 : // L0Sublevels code to organize nonoverlapping files into the same level.
145 0 : // This would allow us to use levelIters and keep a smaller set of data and
146 0 : // files in-memory. However, it would also require us to identify the bounds
147 0 : // of all the files upfront.
148 0 :
149 0 : if !it.opts.pointKeys() {
150 0 : return emptyIter, nil
151 0 : } else if it.pointIter != nil {
152 0 : return it.pointIter, nil
153 0 : }
154 0 : mlevels := it.alloc.mlevels[:0]
155 0 :
156 0 : if len(it.externalIter.readers) > cap(mlevels) {
157 0 : mlevels = make([]mergingIterLevel, 0, len(it.externalIter.readers))
158 0 : }
159 : // We set a synthetic sequence number, with lower levels having higer numbers.
160 0 : seqNum := 0
161 0 : for _, readers := range it.externalIter.readers {
162 0 : seqNum += len(readers)
163 0 : }
164 0 : for _, readers := range it.externalIter.readers {
165 0 : for _, r := range readers {
166 0 : var (
167 0 : rangeDelIter keyspan.FragmentIterator
168 0 : pointIter internalIterator
169 0 : err error
170 0 : )
171 0 : // We could set hideObsoletePoints=true, since we are reading at
172 0 : // InternalKeySeqNumMax, but we don't bother since these sstables should
173 0 : // not have obsolete points (so the performance optimization is
174 0 : // unnecessary), and we don't want to bother constructing a
175 0 : // BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
176 0 : transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
177 0 : seqNum--
178 0 : pointIter, err = r.NewPointIter(
179 0 : ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
180 0 : sstable.NeverUseFilterBlock, readEnv, sstable.MakeTrivialReaderProvider(r))
181 0 : if err != nil {
182 0 : return nil, err
183 0 : }
184 0 : rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
185 0 : SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
186 0 : }, readEnv)
187 0 : if err != nil {
188 0 : return nil, err
189 0 : }
190 0 : mlevels = append(mlevels, mergingIterLevel{
191 0 : iter: pointIter,
192 0 : rangeDelIter: rangeDelIter,
193 0 : })
194 : }
195 : }
196 :
197 0 : it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
198 0 : it.alloc.merging.snapshot = base.SeqNumMax
199 0 : if len(mlevels) <= cap(it.alloc.levelsPositioned) {
200 0 : it.alloc.merging.levelsPositioned = it.alloc.levelsPositioned[:len(mlevels)]
201 0 : }
202 0 : return &it.alloc.merging, nil
203 : }
204 :
205 0 : func finishInitializingExternal(ctx context.Context, it *Iterator) error {
206 0 : readEnv := block.ReadEnv{
207 0 : Stats: &it.stats.InternalStats,
208 0 : // TODO(jackson): External iterators never provide categorized iterator
209 0 : // stats today because they exist outside the context of a *DB. If the
210 0 : // sstables being read are on the physical filesystem, we may still want to
211 0 : // thread a CategoryStatsCollector through so that we collect their stats.
212 0 : IterStats: nil,
213 0 : BufferPool: &it.externalIter.bufferPool,
214 0 : }
215 0 : pointIter, err := createExternalPointIter(ctx, it, readEnv)
216 0 : if err != nil {
217 0 : return err
218 0 : }
219 0 : it.pointIter = pointIter
220 0 : it.iter = it.pointIter
221 0 :
222 0 : if it.opts.rangeKeys() {
223 0 : it.rangeKeyMasking.init(it, &it.comparer)
224 0 : var rangeKeyIters []keyspan.FragmentIterator
225 0 : if it.rangeKey == nil {
226 0 : // We could take advantage of the lack of overlaps in range keys within
227 0 : // each slice in it.externalReaders, and generate keyspanimpl.LevelIters
228 0 : // out of those. However, since range keys are expected to be sparse to
229 0 : // begin with, the performance gain might not be significant enough to
230 0 : // warrant it.
231 0 : //
232 0 : // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
233 0 : // operate on FileMetadatas (similar to simpleLevelIter), and implements
234 0 : // this optimization.
235 0 : // We set a synthetic sequence number, with lower levels having higer numbers.
236 0 : seqNum := 0
237 0 : for _, readers := range it.externalIter.readers {
238 0 : seqNum += len(readers)
239 0 : }
240 0 : for _, readers := range it.externalIter.readers {
241 0 : for _, r := range readers {
242 0 : transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
243 0 : seqNum--
244 0 : if rki, err := r.NewRawRangeKeyIter(ctx, transforms, readEnv); err != nil {
245 0 : return err
246 0 : } else if rki != nil {
247 0 : rangeKeyIters = append(rangeKeyIters, rki)
248 0 : }
249 : }
250 : }
251 0 : if len(rangeKeyIters) > 0 {
252 0 : it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
253 0 : it.rangeKey.init(it.comparer.Compare, it.comparer.Split, &it.opts)
254 0 : it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
255 0 : &it.comparer,
256 0 : base.SeqNumMax,
257 0 : it.opts.LowerBound, it.opts.UpperBound,
258 0 : &it.hasPrefix, &it.prefixOrFullSeekKey,
259 0 : false /* internalKeys */, &it.rangeKey.internal,
260 0 : )
261 0 : for i := range rangeKeyIters {
262 0 : it.rangeKey.iterConfig.AddLevel(rangeKeyIters[i])
263 0 : }
264 : }
265 : }
266 0 : if it.rangeKey != nil {
267 0 : it.rangeKey.iiter.Init(&it.comparer, it.iter, it.rangeKey.rangeKeyIter,
268 0 : keyspan.InterleavingIterOpts{
269 0 : Mask: &it.rangeKeyMasking,
270 0 : LowerBound: it.opts.LowerBound,
271 0 : UpperBound: it.opts.UpperBound,
272 0 : })
273 0 : it.iter = &it.rangeKey.iiter
274 0 : }
275 : }
276 0 : return nil
277 : }
278 :
279 : func openExternalTables(
280 : ctx context.Context, files []sstable.ReadableFile, readerOpts sstable.ReaderOptions,
281 0 : ) (readers []*sstable.Reader, err error) {
282 0 : readers = make([]*sstable.Reader, 0, len(files))
283 0 : for i := range files {
284 0 : readable, err := sstable.NewSimpleReadable(files[i])
285 0 : if err != nil {
286 0 : return readers, err
287 0 : }
288 0 : r, err := sstable.NewReader(ctx, readable, readerOpts)
289 0 : if err != nil {
290 0 : return readers, err
291 0 : }
292 0 : readers = append(readers, r)
293 : }
294 0 : return readers, err
295 : }
|