Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : )
15 :
16 : // CopySpan produces a copy of a approximate subset of an input sstable.
17 : //
18 : // The produced sstable contains all keys from the input sstable in the span
19 : // [start, end), as well as potentially some additional keys from the original
20 : // file that were adjacent to but outside that span.
21 : //
22 : // CopySpan differs from simply seeking a reader to start and iterating until
23 : // the end passing the results to a writer in that it does not write the new
24 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
25 : // computing new filters and properties. Instead, it finds data _blocks_ that
26 : // intersect the requested span and copies those, whole, to the new file,
27 : // avoiding all decompression and recompression work. It then copies the
28 : // original bloom filter - this filter is valid for the subset of data as well,
29 : // just with potentially a higher false positive rate compared to one that would
30 : // be computed just from the keys in it.
31 : //
32 : // The resulting sstable will have no block properties.
33 : //
34 : // Closes input and finishes or aborts output, including on non-nil errors.
35 : //
36 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
37 : // caller must account for those when specifying the bounds.
38 : func CopySpan(
39 : ctx context.Context,
40 : input objstorage.Readable,
41 : rOpts ReaderOptions,
42 : output objstorage.Writable,
43 : o WriterOptions,
44 : start, end InternalKey,
45 2 : ) (size uint64, _ error) {
46 2 : r, err := NewReader(input, rOpts)
47 2 : if err != nil {
48 0 : input.Close()
49 0 : output.Abort()
50 0 : return 0, err
51 0 : }
52 2 : defer r.Close() // r.Close now owns calling input.Close().
53 2 :
54 2 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
55 0 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
56 0 : }
57 :
58 : // If our input has not filters, our output cannot have filters either.
59 2 : if r.tableFilter == nil {
60 2 : o.FilterPolicy = nil
61 2 : }
62 2 : o.TableFormat = r.tableFormat
63 2 : w := NewWriter(output, o)
64 2 :
65 2 : // We don't want the writer to attempt to write out block property data in
66 2 : // index blocks. This data won't be valid since we're not passing the actual
67 2 : // key data through the writer. We also remove the table-level properties
68 2 : // below.
69 2 : //
70 2 : // TODO(dt,radu): Figure out how to populate the prop collector state with
71 2 : // block props from the original sst.
72 2 : w.blockPropCollectors = nil
73 2 :
74 2 : defer func() {
75 2 : if w != nil {
76 0 : // set w.err to any non-nil error just so it aborts instead of finishing.
77 0 : w.err = base.ErrNotFound
78 0 : // w.Close now owns calling output.Abort().
79 0 : w.Close()
80 0 : }
81 : }()
82 :
83 2 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
84 0 : return 0, errors.New("cannot CopySpan sstables with value blocks or range keys")
85 0 : }
86 :
87 : // Set the filter block to be copied over if it exists. It will return false
88 : // positives for keys in blocks of the original file that we don't copy, but
89 : // filters can always have false positives, so this is fine.
90 2 : if r.tableFilter != nil {
91 1 : filterBlock, err := r.readFilter(ctx, nil, nil)
92 1 : if err != nil {
93 0 : return 0, errors.Wrap(err, "reading filter")
94 0 : }
95 1 : filterBytes := append([]byte{}, filterBlock.Get()...)
96 1 : filterBlock.Release()
97 1 : w.filter = copyFilterWriter{
98 1 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
99 1 : }
100 : }
101 :
102 : // Copy all the props from the source file; we can't compute our own for many
103 : // that depend on seeing every key, such as total count or size so we copy the
104 : // original props instead. This will result in over-counts but that is safer
105 : // than under-counts.
106 2 : w.props = r.Properties
107 2 : // Remove all user properties to disable block properties, which we do not
108 2 : // calculate.
109 2 : w.props.UserProperties = nil
110 2 : // Reset props that we'll re-derive as we build our own index.
111 2 : w.props.IndexPartitions = 0
112 2 : w.props.TopLevelIndexSize = 0
113 2 : w.props.IndexSize = 0
114 2 : w.props.IndexType = 0
115 2 : if w.filter != nil {
116 1 : if err := checkWriterFilterMatchesReader(r, w); err != nil {
117 0 : return 0, err
118 0 : }
119 : }
120 :
121 : // Find the blocks that intersect our span.
122 2 : blocks, err := intersectingIndexEntries(ctx, r, start, end)
123 2 : if err != nil {
124 0 : return 0, err
125 0 : }
126 :
127 : // In theory an empty SST is fine, but #3409 means they are not. We could make
128 : // a non-empty sst by copying something outside the span, but #3907 means that
129 : // the empty virtual span would still be a problem, so don't bother.
130 2 : if len(blocks) < 1 {
131 0 : return 0, errors.Newf("CopySpan cannot copy empty span %s %s", start, end)
132 0 : }
133 :
134 : // Find the span of the input file that contains all our blocks, and then copy
135 : // it byte-for-byte without doing any per-key processing.
136 2 : offset := blocks[0].bh.Offset
137 2 :
138 2 : // The block lengths don't include their trailers, which just sit after the
139 2 : // block length, before the next offset; We get the ones between the blocks
140 2 : // we copy implicitly but need to explicitly add the last trailer to length.
141 2 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + blockTrailerLen - offset
142 2 :
143 2 : if spanEnd := length + offset; spanEnd < offset {
144 0 : return 0, base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", offset, spanEnd)
145 0 : }
146 :
147 2 : if err := objstorage.Copy(ctx, r.readable, w.writable, offset, length); err != nil {
148 0 : return 0, err
149 0 : }
150 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
151 2 : w.meta.Size += length
152 2 :
153 2 : // Now we can setup index entries for all the blocks we just copied, pointing
154 2 : // into the copied span.
155 2 : for i := range blocks {
156 2 : blocks[i].bh.Offset -= offset
157 2 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
158 0 : return 0, err
159 0 : }
160 : }
161 :
162 : // TODO(dt): Copy range keys (the fact there are none is checked above).
163 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
164 :
165 2 : if err := w.Close(); err != nil {
166 0 : w = nil
167 0 : return 0, err
168 0 : }
169 2 : wrote := w.meta.Size
170 2 : w = nil
171 2 : return wrote, nil
172 : }
173 :
174 : // indexEntry captures the two components of an sst index entry: the key and the
175 : // decoded block handle value.
176 : type indexEntry struct {
177 : sep InternalKey
178 : bh BlockHandleWithProperties
179 : }
180 :
181 : // intersectingIndexEntries returns the entries from the index with separator
182 : // keys contained by [start, end), i.e. the subset of the sst's index that
183 : // intersects the provided span.
184 : func intersectingIndexEntries(
185 : ctx context.Context, r *Reader, start, end InternalKey,
186 2 : ) ([]indexEntry, error) {
187 2 : indexH, err := r.readIndex(ctx, nil, nil)
188 2 : if err != nil {
189 0 : return nil, err
190 0 : }
191 2 : defer indexH.Release()
192 2 : top, err := newBlockIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
193 2 : if err != nil {
194 0 : return nil, err
195 0 : }
196 2 : defer top.Close()
197 2 :
198 2 : var rh objstorage.ReadHandle
199 2 : if r.Properties.IndexType == twoLevelIndex {
200 2 : rh = r.readable.NewReadHandle(ctx)
201 2 : defer rh.Close()
202 2 : }
203 :
204 2 : var alloc bytealloc.A
205 2 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
206 2 : for key, value := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); key != nil; key, value = top.Next() {
207 2 : bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
208 2 : if err != nil {
209 0 : return nil, err
210 0 : }
211 2 : if r.Properties.IndexType != twoLevelIndex {
212 2 : entry := indexEntry{bh: bh, sep: *key}
213 2 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
214 2 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
215 2 : res = append(res, entry)
216 2 : } else {
217 2 : subBlk, err := r.readBlock(ctx, bh.BlockHandle, nil, rh, nil, nil, nil)
218 2 : if err != nil {
219 0 : return nil, err
220 0 : }
221 2 : defer subBlk.Release() // in-loop, but it is a short loop.
222 2 :
223 2 : sub, err := newBlockIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
224 2 : if err != nil {
225 0 : return nil, err
226 0 : }
227 2 : defer sub.Close() // in-loop, but it is a short loop.
228 2 :
229 2 : for key, value := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); key != nil; key, value = sub.Next() {
230 2 : bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
231 2 : if err != nil {
232 0 : return nil, err
233 0 : }
234 2 : entry := indexEntry{bh: bh, sep: *key}
235 2 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
236 2 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
237 2 : res = append(res, entry)
238 2 : if base.InternalCompare(r.Compare, end, *key) <= 0 {
239 2 : break
240 : }
241 : }
242 2 : if err := sub.Error(); err != nil {
243 0 : return nil, err
244 0 : }
245 : }
246 2 : if base.InternalCompare(r.Compare, end, *key) <= 0 {
247 2 : break
248 : }
249 : }
250 2 : return res, top.Error()
251 : }
252 :
253 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
254 : // exists to ensure it is visible in profiles/stack traces if we are looking at
255 : // cluster copying more than expected.
256 : //
257 : // Finishes or Aborts output; does *not* Close input.
258 : func copyWholeFileBecauseOfUnsupportedFeature(
259 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
260 0 : ) (size uint64, _ error) {
261 0 : length := uint64(input.Size())
262 0 : if err := objstorage.Copy(ctx, input, output, 0, length); err != nil {
263 0 : output.Abort()
264 0 : return 0, err
265 0 : }
266 0 : return length, output.Finish()
267 : }
|