Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : "github.com/cockroachdb/pebble/sstable/rowblk"
17 : )
18 :
19 : // CopySpan produces a copy of a approximate subset of an input sstable.
20 : //
21 : // The produced sstable contains all keys from the input sstable in the span
22 : // [start, end), as well as potentially some additional keys from the original
23 : // file that were adjacent to but outside that span.
24 : //
25 : // CopySpan differs from simply seeking a reader to start and iterating until
26 : // the end passing the results to a writer in that it does not write the new
27 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
28 : // computing new filters and properties. Instead, it finds data _blocks_ that
29 : // intersect the requested span and copies those, whole, to the new file,
30 : // avoiding all decompression and recompression work. It then copies the
31 : // original bloom filter - this filter is valid for the subset of data as well,
32 : // just with potentially a higher false positive rate compared to one that would
33 : // be computed just from the keys in it.
34 : //
35 : // The resulting sstable will have no block properties.
36 : //
37 : // The function might return ErrEmptySpan if there are no blocks that could
38 : // include keys in the given range. See ErrEmptySpan for more details.
39 : //
40 : // Closes input and finishes or aborts output in all cases, including on errors.
41 : //
42 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
43 : // caller must account for those when specifying the bounds.
44 : func CopySpan(
45 : ctx context.Context,
46 : input objstorage.Readable,
47 : rOpts ReaderOptions,
48 : output objstorage.Writable,
49 : o WriterOptions,
50 : start, end InternalKey,
51 1 : ) (size uint64, _ error) {
52 1 : r, err := NewReader(ctx, input, rOpts)
53 1 : if err != nil {
54 0 : input.Close()
55 0 : output.Abort()
56 0 : return 0, err
57 0 : }
58 1 : defer r.Close() // r.Close now owns calling input.Close().
59 1 :
60 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
61 1 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
62 1 : }
63 :
64 : // If our input has not filters, our output cannot have filters either.
65 1 : if r.tableFilter == nil {
66 1 : o.FilterPolicy = nil
67 1 : }
68 1 : o.TableFormat = r.tableFormat
69 1 : w := NewRawWriter(output, o)
70 1 :
71 1 : // We don't want the writer to attempt to write out block property data in
72 1 : // index blocks. This data won't be valid since we're not passing the actual
73 1 : // key data through the writer. We also remove the table-level properties
74 1 : // below.
75 1 : //
76 1 : // TODO(dt,radu): Figure out how to populate the prop collector state with
77 1 : // block props from the original sst.
78 1 : w.blockPropCollectors = nil
79 1 :
80 1 : defer func() {
81 1 : if w != nil {
82 0 : // set w.err to any non-nil error just so it aborts instead of finishing.
83 0 : w.err = base.ErrNotFound
84 0 : // w.Close now owns calling output.Abort().
85 0 : w.Close()
86 0 : }
87 : }()
88 :
89 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
90 0 : // We just checked for these conditions above.
91 0 : return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
92 0 : }
93 :
94 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
95 1 : // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
96 1 : // and lower-level index blocks with one read.
97 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
98 1 : r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
99 1 : defer rh.Close()
100 1 : indexH, err := r.readIndex(ctx, rh, nil, nil)
101 1 : if err != nil {
102 0 : return 0, err
103 0 : }
104 1 : defer indexH.Release()
105 1 :
106 1 : // Set the filter block to be copied over if it exists. It will return false
107 1 : // positives for keys in blocks of the original file that we don't copy, but
108 1 : // filters can always have false positives, so this is fine.
109 1 : if r.tableFilter != nil {
110 1 : filterBlock, err := r.readFilter(ctx, rh, nil, nil)
111 1 : if err != nil {
112 0 : return 0, errors.Wrap(err, "reading filter")
113 0 : }
114 1 : filterBytes := append([]byte{}, filterBlock.Get()...)
115 1 : filterBlock.Release()
116 1 : w.filter = copyFilterWriter{
117 1 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
118 1 : }
119 : }
120 :
121 : // Copy all the props from the source file; we can't compute our own for many
122 : // that depend on seeing every key, such as total count or size so we copy the
123 : // original props instead. This will result in over-counts but that is safer
124 : // than under-counts.
125 1 : w.props = r.Properties
126 1 : // Remove all user properties to disable block properties, which we do not
127 1 : // calculate.
128 1 : w.props.UserProperties = nil
129 1 : // Reset props that we'll re-derive as we build our own index.
130 1 : w.props.IndexPartitions = 0
131 1 : w.props.TopLevelIndexSize = 0
132 1 : w.props.IndexSize = 0
133 1 : w.props.IndexType = 0
134 1 : if w.filter != nil {
135 1 : if err := checkWriterFilterMatchesReader(r, w); err != nil {
136 0 : return 0, err
137 0 : }
138 : }
139 :
140 : // Find the blocks that intersect our span.
141 1 : blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
142 1 : if err != nil {
143 0 : return 0, err
144 0 : }
145 :
146 : // In theory an empty SST is fine, but #3409 means they are not. We could make
147 : // a non-empty sst by copying something outside the span, but #3907 means that
148 : // the empty virtual span would still be a problem, so don't bother.
149 1 : if len(blocks) < 1 {
150 0 : return 0, ErrEmptySpan
151 0 : }
152 :
153 : // Find the span of the input file that contains all our blocks, and then copy
154 : // it byte-for-byte without doing any per-key processing.
155 1 : offset := blocks[0].bh.Offset
156 1 :
157 1 : // The block lengths don't include their trailers, which just sit after the
158 1 : // block length, before the next offset; We get the ones between the blocks
159 1 : // we copy implicitly but need to explicitly add the last trailer to length.
160 1 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - offset
161 1 :
162 1 : if spanEnd := length + offset; spanEnd < offset {
163 0 : return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
164 0 : }
165 :
166 1 : if err := objstorage.Copy(ctx, r.readable, w.layout.writable, offset, length); err != nil {
167 0 : return 0, err
168 0 : }
169 1 : w.layout.offset += length
170 1 :
171 1 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
172 1 : w.meta.Size += length
173 1 :
174 1 : // Now we can setup index entries for all the blocks we just copied, pointing
175 1 : // into the copied span.
176 1 : for i := range blocks {
177 1 : blocks[i].bh.Offset -= offset
178 1 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
179 0 : return 0, err
180 0 : }
181 : }
182 :
183 : // TODO(dt): Copy range keys (the fact there are none is checked above).
184 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
185 :
186 1 : if err := w.Close(); err != nil {
187 0 : w = nil
188 0 : return 0, err
189 0 : }
190 1 : wrote := w.meta.Size
191 1 : w = nil
192 1 : return wrote, nil
193 : }
194 :
195 : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
196 : // requested span.
197 : //
198 : // Note that CopySpan's determination of block overlap is best effort - we may
199 : // copy a block that doesn't actually contain any keys in the span, in which
200 : // case we won't generate this error. We currently only generate this error when
201 : // the span start is beyond all keys in the physical sstable.
202 : var ErrEmptySpan = errors.New("cannot copy empty span")
203 :
204 : // indexEntry captures the two components of an sst index entry: the key and the
205 : // decoded block handle value.
206 : type indexEntry struct {
207 : sep InternalKey
208 : bh BlockHandleWithProperties
209 : }
210 :
211 : // intersectingIndexEntries returns the entries from the index with separator
212 : // keys contained by [start, end), i.e. the subset of the sst's index that
213 : // intersects the provided span.
214 : func intersectingIndexEntries(
215 : ctx context.Context,
216 : r *Reader,
217 : rh objstorage.ReadHandle,
218 : indexH block.BufferHandle,
219 : start, end InternalKey,
220 1 : ) ([]indexEntry, error) {
221 1 : top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
222 1 : if err != nil {
223 0 : return nil, err
224 0 : }
225 1 : defer top.Close()
226 1 :
227 1 : var alloc bytealloc.A
228 1 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
229 1 : for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
230 1 : bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
231 1 : if err != nil {
232 0 : return nil, err
233 0 : }
234 1 : if r.Properties.IndexType != twoLevelIndex {
235 1 : entry := indexEntry{bh: bh, sep: kv.K}
236 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
237 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
238 1 : res = append(res, entry)
239 1 : } else {
240 1 : subBlk, err := r.readBlock(ctx, bh.Handle, nil, rh, nil, nil, nil)
241 1 : if err != nil {
242 0 : return nil, err
243 0 : }
244 1 : defer subBlk.Release() // in-loop, but it is a short loop.
245 1 :
246 1 : sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
247 1 : if err != nil {
248 0 : return nil, err
249 0 : }
250 1 : defer sub.Close() // in-loop, but it is a short loop.
251 1 :
252 1 : for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
253 1 : bh, err := decodeBlockHandleWithProperties(kv.InPlaceValue())
254 1 : if err != nil {
255 0 : return nil, err
256 0 : }
257 1 : entry := indexEntry{bh: bh, sep: kv.K}
258 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
259 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
260 1 : res = append(res, entry)
261 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
262 1 : break
263 : }
264 : }
265 1 : if err := sub.Error(); err != nil {
266 0 : return nil, err
267 0 : }
268 : }
269 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
270 1 : break
271 : }
272 : }
273 1 : return res, top.Error()
274 : }
275 :
276 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
277 : // exists to ensure it is visible in profiles/stack traces if we are looking at
278 : // cluster copying more than expected.
279 : //
280 : // Finishes or Aborts output; does *not* Close input.
281 : func copyWholeFileBecauseOfUnsupportedFeature(
282 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
283 1 : ) (size uint64, _ error) {
284 1 : length := uint64(input.Size())
285 1 : if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
286 0 : output.Abort()
287 0 : return 0, err
288 0 : }
289 1 : return length, output.Finish()
290 : }
|