Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
15 : )
16 :
17 : // CopySpan produces a copy of a approximate subset of an input sstable.
18 : //
19 : // The produced sstable contains all keys from the input sstable in the span
20 : // [start, end), as well as potentially some additional keys from the original
21 : // file that were adjacent to but outside that span.
22 : //
23 : // CopySpan differs from simply seeking a reader to start and iterating until
24 : // the end passing the results to a writer in that it does not write the new
25 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
26 : // computing new filters and properties. Instead, it finds data _blocks_ that
27 : // intersect the requested span and copies those, whole, to the new file,
28 : // avoiding all decompression and recompression work. It then copies the
29 : // original bloom filter - this filter is valid for the subset of data as well,
30 : // just with potentially a higher false positive rate compared to one that would
31 : // be computed just from the keys in it.
32 : //
33 : // The resulting sstable will have no block properties.
34 : //
35 : // The function might return ErrEmptySpan if there are no blocks that could
36 : // include keys in the given range. See ErrEmptySpan for more details.
37 : //
38 : // Closes input and finishes or aborts output in all cases, including on errors.
39 : //
40 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
41 : // caller must account for those when specifying the bounds.
42 : func CopySpan(
43 : ctx context.Context,
44 : input objstorage.Readable,
45 : rOpts ReaderOptions,
46 : output objstorage.Writable,
47 : o WriterOptions,
48 : start, end InternalKey,
49 1 : ) (size uint64, _ error) {
50 1 : r, err := NewReader(input, rOpts)
51 1 : if err != nil {
52 0 : input.Close()
53 0 : output.Abort()
54 0 : return 0, err
55 0 : }
56 1 : defer r.Close() // r.Close now owns calling input.Close().
57 1 :
58 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
59 0 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
60 0 : }
61 :
62 : // If our input has not filters, our output cannot have filters either.
63 1 : if r.tableFilter == nil {
64 1 : o.FilterPolicy = nil
65 1 : }
66 1 : o.TableFormat = r.tableFormat
67 1 : w := NewWriter(output, o)
68 1 :
69 1 : // We don't want the writer to attempt to write out block property data in
70 1 : // index blocks. This data won't be valid since we're not passing the actual
71 1 : // key data through the writer. We also remove the table-level properties
72 1 : // below.
73 1 : //
74 1 : // TODO(dt,radu): Figure out how to populate the prop collector state with
75 1 : // block props from the original sst.
76 1 : w.blockPropCollectors = nil
77 1 :
78 1 : defer func() {
79 1 : if w != nil {
80 1 : // set w.err to any non-nil error just so it aborts instead of finishing.
81 1 : w.err = base.ErrNotFound
82 1 : // w.Close now owns calling output.Abort().
83 1 : w.Close()
84 1 : }
85 : }()
86 :
87 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
88 0 : // We just checked for these conditions above.
89 0 : return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
90 0 : }
91 :
92 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
93 1 : // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
94 1 : // and lower-level index blocks with one read.
95 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
96 1 : ctx, r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
97 1 : defer rh.Close()
98 1 : indexH, err := r.readIndex(ctx, rh, nil, nil)
99 1 : if err != nil {
100 0 : return 0, err
101 0 : }
102 1 : defer indexH.Release()
103 1 :
104 1 : // Set the filter block to be copied over if it exists. It will return false
105 1 : // positives for keys in blocks of the original file that we don't copy, but
106 1 : // filters can always have false positives, so this is fine.
107 1 : if r.tableFilter != nil {
108 0 : filterBlock, err := r.readFilter(ctx, rh, nil, nil)
109 0 : if err != nil {
110 0 : return 0, errors.Wrap(err, "reading filter")
111 0 : }
112 0 : filterBytes := append([]byte{}, filterBlock.Get()...)
113 0 : filterBlock.Release()
114 0 : w.filter = copyFilterWriter{
115 0 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
116 0 : }
117 : }
118 :
119 : // Copy all the props from the source file; we can't compute our own for many
120 : // that depend on seeing every key, such as total count or size so we copy the
121 : // original props instead. This will result in over-counts but that is safer
122 : // than under-counts.
123 1 : w.props = r.Properties
124 1 : // Remove all user properties to disable block properties, which we do not
125 1 : // calculate.
126 1 : w.props.UserProperties = nil
127 1 : // Reset props that we'll re-derive as we build our own index.
128 1 : w.props.IndexPartitions = 0
129 1 : w.props.TopLevelIndexSize = 0
130 1 : w.props.IndexSize = 0
131 1 : w.props.IndexType = 0
132 1 : if w.filter != nil {
133 0 : if err := checkWriterFilterMatchesReader(r, w); err != nil {
134 0 : return 0, err
135 0 : }
136 : }
137 :
138 : // Find the blocks that intersect our span.
139 1 : blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
140 1 : if err != nil {
141 0 : return 0, err
142 0 : }
143 :
144 : // In theory an empty SST is fine, but #3409 means they are not. We could make
145 : // a non-empty sst by copying something outside the span, but #3907 means that
146 : // the empty virtual span would still be a problem, so don't bother.
147 1 : if len(blocks) < 1 {
148 1 : return 0, ErrEmptySpan
149 1 : }
150 :
151 : // Find the span of the input file that contains all our blocks, and then copy
152 : // it byte-for-byte without doing any per-key processing.
153 1 : offset := blocks[0].bh.Offset
154 1 :
155 1 : // The block lengths don't include their trailers, which just sit after the
156 1 : // block length, before the next offset; We get the ones between the blocks
157 1 : // we copy implicitly but need to explicitly add the last trailer to length.
158 1 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + blockTrailerLen - offset
159 1 :
160 1 : if spanEnd := length + offset; spanEnd < offset {
161 0 : return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
162 0 : }
163 :
164 1 : if err := objstorage.Copy(ctx, r.readable, w.writable, offset, length); err != nil {
165 0 : return 0, err
166 0 : }
167 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
168 1 : w.meta.Size += length
169 1 :
170 1 : // Now we can setup index entries for all the blocks we just copied, pointing
171 1 : // into the copied span.
172 1 : for i := range blocks {
173 1 : blocks[i].bh.Offset -= offset
174 1 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
175 0 : return 0, err
176 0 : }
177 : }
178 :
179 : // TODO(dt): Copy range keys (the fact there are none is checked above).
180 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
181 :
182 1 : if err := w.Close(); err != nil {
183 0 : w = nil
184 0 : return 0, err
185 0 : }
186 1 : wrote := w.meta.Size
187 1 : w = nil
188 1 : return wrote, nil
189 : }
190 :
191 : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
192 : // requested span.
193 : //
194 : // Note that CopySpan's determination of block overlap is best effort - we may
195 : // copy a block that doesn't actually contain any keys in the span, in which
196 : // case we won't generate this error. We currently only generate this error when
197 : // the span start is beyond all keys in the physical sstable.
198 : var ErrEmptySpan = errors.New("cannot copy empty span")
199 :
200 : // indexEntry captures the two components of an sst index entry: the key and the
201 : // decoded block handle value.
202 : type indexEntry struct {
203 : sep InternalKey
204 : bh BlockHandleWithProperties
205 : }
206 :
207 : // intersectingIndexEntries returns the entries from the index with separator
208 : // keys contained by [start, end), i.e. the subset of the sst's index that
209 : // intersects the provided span.
210 : func intersectingIndexEntries(
211 : ctx context.Context,
212 : r *Reader,
213 : rh objstorage.ReadHandle,
214 : indexH bufferHandle,
215 : start, end InternalKey,
216 1 : ) ([]indexEntry, error) {
217 1 : top, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
218 1 : if err != nil {
219 0 : return nil, err
220 0 : }
221 1 : defer top.Close()
222 1 :
223 1 : var alloc bytealloc.A
224 1 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
225 1 : for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
226 1 : bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
227 1 : if err != nil {
228 0 : return nil, err
229 0 : }
230 1 : if r.Properties.IndexType != twoLevelIndex {
231 1 : entry := indexEntry{bh: bh, sep: kv.K}
232 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
233 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
234 1 : res = append(res, entry)
235 1 : } else {
236 1 : subBlk, err := r.readBlock(ctx, bh.BlockHandle, nil, rh, nil, nil, nil)
237 1 : if err != nil {
238 0 : return nil, err
239 0 : }
240 1 : defer subBlk.Release() // in-loop, but it is a short loop.
241 1 :
242 1 : sub, err := newBlockIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
243 1 : if err != nil {
244 0 : return nil, err
245 0 : }
246 1 : defer sub.Close() // in-loop, but it is a short loop.
247 1 :
248 1 : for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
249 1 : bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
250 1 : if err != nil {
251 0 : return nil, err
252 0 : }
253 1 : entry := indexEntry{bh: bh, sep: kv.K}
254 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
255 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
256 1 : res = append(res, entry)
257 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
258 1 : break
259 : }
260 : }
261 1 : if err := sub.Error(); err != nil {
262 0 : return nil, err
263 0 : }
264 : }
265 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
266 1 : break
267 : }
268 : }
269 1 : return res, top.Error()
270 : }
271 :
272 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
273 : // exists to ensure it is visible in profiles/stack traces if we are looking at
274 : // cluster copying more than expected.
275 : //
276 : // Finishes or Aborts output; does *not* Close input.
277 : func copyWholeFileBecauseOfUnsupportedFeature(
278 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
279 0 : ) (size uint64, _ error) {
280 0 : length := uint64(input.Size())
281 0 : if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
282 0 : output.Abort()
283 0 : return 0, err
284 0 : }
285 0 : return length, output.Finish()
286 : }
|