Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : "github.com/cockroachdb/pebble/sstable/rowblk"
17 : )
18 :
19 : // CopySpan produces a copy of a approximate subset of an input sstable.
20 : //
21 : // The produced sstable contains all keys from the input sstable in the span
22 : // [start, end), as well as potentially some additional keys from the original
23 : // file that were adjacent to but outside that span.
24 : //
25 : // CopySpan differs from simply seeking a reader to start and iterating until
26 : // the end passing the results to a writer in that it does not write the new
27 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
28 : // computing new filters and properties. Instead, it finds data _blocks_ that
29 : // intersect the requested span and copies those, whole, to the new file,
30 : // avoiding all decompression and recompression work. It then copies the
31 : // original bloom filter - this filter is valid for the subset of data as well,
32 : // just with potentially a higher false positive rate compared to one that would
33 : // be computed just from the keys in it.
34 : //
35 : // The resulting sstable will have no block properties.
36 : //
37 : // The function might return ErrEmptySpan if there are no blocks that could
38 : // include keys in the given range. See ErrEmptySpan for more details.
39 : //
40 : // Closes input and finishes or aborts output in all cases, including on errors.
41 : //
42 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
43 : // caller must account for those when specifying the bounds.
44 : func CopySpan(
45 : ctx context.Context,
46 : input objstorage.Readable,
47 : r *Reader,
48 : rOpts ReaderOptions,
49 : output objstorage.Writable,
50 : o WriterOptions,
51 : start, end InternalKey,
52 2 : ) (size uint64, _ error) {
53 2 : defer input.Close()
54 2 :
55 2 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
56 1 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
57 1 : }
58 :
59 : // If our input has not filters, our output cannot have filters either.
60 2 : if r.tableFilter == nil {
61 2 : o.FilterPolicy = nil
62 2 : }
63 2 : o.TableFormat = r.tableFormat
64 2 : w := newRowWriter(output, o)
65 2 :
66 2 : // We don't want the writer to attempt to write out block property data in
67 2 : // index blocks. This data won't be valid since we're not passing the actual
68 2 : // key data through the writer. We also remove the table-level properties
69 2 : // below.
70 2 : //
71 2 : // TODO(dt,radu): Figure out how to populate the prop collector state with
72 2 : // block props from the original sst.
73 2 : w.blockPropCollectors = nil
74 2 :
75 2 : defer func() {
76 2 : if w != nil {
77 1 : // set w.err to any non-nil error just so it aborts instead of finishing.
78 1 : w.err = base.ErrNotFound
79 1 : // w.Close now owns calling output.Abort().
80 1 : w.Close()
81 1 : }
82 : }()
83 :
84 2 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
85 0 : // We just checked for these conditions above.
86 0 : return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
87 0 : }
88 :
89 2 : var preallocRH objstorageprovider.PreallocatedReadHandle
90 2 : // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
91 2 : // and lower-level index blocks with one read.
92 2 : rh := objstorageprovider.UsePreallocatedReadHandle(
93 2 : r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
94 2 : defer rh.Close()
95 2 : rh.SetupForCompaction()
96 2 : indexH, err := r.readIndex(ctx, rh, nil, nil)
97 2 : if err != nil {
98 0 : return 0, err
99 0 : }
100 2 : defer indexH.Release()
101 2 :
102 2 : // Set the filter block to be copied over if it exists. It will return false
103 2 : // positives for keys in blocks of the original file that we don't copy, but
104 2 : // filters can always have false positives, so this is fine.
105 2 : if r.tableFilter != nil {
106 1 : filterBlock, err := r.readFilter(ctx, rh, nil, nil)
107 1 : if err != nil {
108 0 : return 0, errors.Wrap(err, "reading filter")
109 0 : }
110 1 : filterBytes := append([]byte{}, filterBlock.Get()...)
111 1 : filterBlock.Release()
112 1 : w.filter = copyFilterWriter{
113 1 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
114 1 : }
115 : }
116 :
117 : // Copy all the props from the source file; we can't compute our own for many
118 : // that depend on seeing every key, such as total count or size so we copy the
119 : // original props instead. This will result in over-counts but that is safer
120 : // than under-counts.
121 2 : w.props = r.Properties
122 2 : // Remove all user properties to disable block properties, which we do not
123 2 : // calculate.
124 2 : w.props.UserProperties = nil
125 2 : // Reset props that we'll re-derive as we build our own index.
126 2 : w.props.IndexPartitions = 0
127 2 : w.props.TopLevelIndexSize = 0
128 2 : w.props.IndexSize = 0
129 2 : w.props.IndexType = 0
130 2 : if w.filter != nil {
131 1 : if err := checkWriterFilterMatchesReader(r, w); err != nil {
132 0 : return 0, err
133 0 : }
134 : }
135 :
136 : // Find the blocks that intersect our span.
137 2 : blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
138 2 : if err != nil {
139 0 : return 0, err
140 0 : }
141 :
142 : // In theory an empty SST is fine, but #3409 means they are not. We could make
143 : // a non-empty sst by copying something outside the span, but #3907 means that
144 : // the empty virtual span would still be a problem, so don't bother.
145 2 : if len(blocks) < 1 {
146 1 : return 0, ErrEmptySpan
147 1 : }
148 :
149 : // Copy all blocks byte-for-byte without doing any per-key processing.
150 2 : var blocksNotInCache []indexEntry
151 2 :
152 2 : copyBlocksToFile := func(blocks []indexEntry) error {
153 2 : blockOffset := blocks[0].bh.Offset
154 2 : // The block lengths don't include their trailers, which just sit after the
155 2 : // block length, before the next offset; We get the ones between the blocks
156 2 : // we copy implicitly but need to explicitly add the last trailer to length.
157 2 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
158 2 : if spanEnd := length + blockOffset; spanEnd < blockOffset {
159 0 : return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
160 0 : }
161 2 : if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
162 0 : return err
163 0 : }
164 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
165 2 : w.meta.Size += length
166 2 : for i := range blocks {
167 2 : blocks[i].bh.Offset = w.layout.offset
168 2 : // blocks[i].bh.Length remains unmodified.
169 2 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
170 0 : return err
171 0 : }
172 2 : w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
173 : }
174 2 : return nil
175 : }
176 2 : for i := range blocks {
177 2 : h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
178 2 : if h.Get() == nil {
179 2 : // Cache miss. Add this block to the list of blocks that are not in cache.
180 2 : blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
181 2 : continue
182 : }
183 :
184 : // Cache hit.
185 2 : rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
186 2 : if len(blocksNotInCache) > 0 {
187 0 : // We have some blocks that were not in cache preceding this block.
188 0 : // Copy them using objstorage.Copy.
189 0 : if err := copyBlocksToFile(blocksNotInCache); err != nil {
190 0 : h.Release()
191 0 : return 0, err
192 0 : }
193 0 : blocksNotInCache = nil
194 : }
195 :
196 : // layout.WriteDataBlock keeps layout.offset up-to-date for us.
197 2 : bh, err := w.layout.WriteDataBlock(h.Get(), &w.dataBlockBuf.blockBuf)
198 2 : h.Release()
199 2 : if err != nil {
200 0 : return 0, err
201 0 : }
202 2 : blocks[i].bh.Handle = bh
203 2 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
204 0 : return 0, err
205 0 : }
206 2 : w.meta.Size += uint64(bh.Length) + block.TrailerLen
207 : }
208 :
209 2 : if len(blocksNotInCache) > 0 {
210 2 : // We have some remaining blocks that were not in cache. Copy them
211 2 : // using objstorage.Copy.
212 2 : if err := copyBlocksToFile(blocksNotInCache); err != nil {
213 0 : return 0, err
214 0 : }
215 2 : blocksNotInCache = nil
216 : }
217 :
218 : // TODO(dt): Copy range keys (the fact there are none is checked above).
219 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
220 :
221 2 : if err := w.Close(); err != nil {
222 0 : w = nil
223 0 : return 0, err
224 0 : }
225 2 : wrote := w.meta.Size
226 2 : w = nil
227 2 : return wrote, nil
228 : }
229 :
230 : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
231 : // requested span.
232 : //
233 : // Note that CopySpan's determination of block overlap is best effort - we may
234 : // copy a block that doesn't actually contain any keys in the span, in which
235 : // case we won't generate this error. We currently only generate this error when
236 : // the span start is beyond all keys in the physical sstable.
237 : var ErrEmptySpan = errors.New("cannot copy empty span")
238 :
239 : // indexEntry captures the two components of an sst index entry: the key and the
240 : // decoded block handle value.
241 : type indexEntry struct {
242 : sep InternalKey
243 : bh block.HandleWithProperties
244 : }
245 :
246 : // intersectingIndexEntries returns the entries from the index with separator
247 : // keys contained by [start, end), i.e. the subset of the sst's index that
248 : // intersects the provided span.
249 : func intersectingIndexEntries(
250 : ctx context.Context,
251 : r *Reader,
252 : rh objstorage.ReadHandle,
253 : indexH block.BufferHandle,
254 : start, end InternalKey,
255 2 : ) ([]indexEntry, error) {
256 2 : top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
257 2 : if err != nil {
258 0 : return nil, err
259 0 : }
260 2 : defer top.Close()
261 2 :
262 2 : var alloc bytealloc.A
263 2 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
264 2 : for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
265 2 : bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
266 2 : if err != nil {
267 0 : return nil, err
268 0 : }
269 2 : if r.Properties.IndexType != twoLevelIndex {
270 2 : entry := indexEntry{bh: bh, sep: kv.K}
271 2 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
272 2 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
273 2 : res = append(res, entry)
274 2 : } else {
275 2 : subBlk, err := r.readBlock(ctx, bh.Handle, nil, rh, nil, nil, nil)
276 2 : if err != nil {
277 0 : return nil, err
278 0 : }
279 2 : defer subBlk.Release() // in-loop, but it is a short loop.
280 2 :
281 2 : sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
282 2 : if err != nil {
283 0 : return nil, err
284 0 : }
285 2 : defer sub.Close() // in-loop, but it is a short loop.
286 2 :
287 2 : for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
288 2 : bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
289 2 : if err != nil {
290 0 : return nil, err
291 0 : }
292 2 : entry := indexEntry{bh: bh, sep: kv.K}
293 2 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
294 2 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
295 2 : res = append(res, entry)
296 2 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
297 2 : break
298 : }
299 : }
300 2 : if err := sub.Error(); err != nil {
301 0 : return nil, err
302 0 : }
303 : }
304 2 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
305 2 : break
306 : }
307 : }
308 2 : return res, top.Error()
309 : }
310 :
311 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
312 : // exists to ensure it is visible in profiles/stack traces if we are looking at
313 : // cluster copying more than expected.
314 : //
315 : // Finishes or Aborts output; does *not* Close input.
316 : func copyWholeFileBecauseOfUnsupportedFeature(
317 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
318 1 : ) (size uint64, _ error) {
319 1 : length := uint64(input.Size())
320 1 : rh := input.NewReadHandle(objstorage.NoReadBefore)
321 1 : rh.SetupForCompaction()
322 1 : if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
323 0 : output.Abort()
324 0 : return 0, err
325 0 : }
326 1 : return length, output.Finish()
327 : }
|