Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : "github.com/cockroachdb/pebble/sstable/rowblk"
17 : )
18 :
19 : // CopySpan produces a copy of a approximate subset of an input sstable.
20 : //
21 : // The produced sstable contains all keys from the input sstable in the span
22 : // [start, end), as well as potentially some additional keys from the original
23 : // file that were adjacent to but outside that span.
24 : //
25 : // CopySpan differs from simply seeking a reader to start and iterating until
26 : // the end passing the results to a writer in that it does not write the new
27 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
28 : // computing new filters and properties. Instead, it finds data _blocks_ that
29 : // intersect the requested span and copies those, whole, to the new file,
30 : // avoiding all decompression and recompression work. It then copies the
31 : // original bloom filter - this filter is valid for the subset of data as well,
32 : // just with potentially a higher false positive rate compared to one that would
33 : // be computed just from the keys in it.
34 : //
35 : // The resulting sstable will have no block properties.
36 : //
37 : // The function might return ErrEmptySpan if there are no blocks that could
38 : // include keys in the given range. See ErrEmptySpan for more details.
39 : //
40 : // Closes input and finishes or aborts output in all cases, including on errors.
41 : //
42 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
43 : // caller must account for those when specifying the bounds.
44 : func CopySpan(
45 : ctx context.Context,
46 : input objstorage.Readable,
47 : r *Reader,
48 : rOpts ReaderOptions,
49 : output objstorage.Writable,
50 : o WriterOptions,
51 : start, end InternalKey,
52 1 : ) (size uint64, _ error) {
53 1 : defer input.Close()
54 1 :
55 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
56 0 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
57 0 : }
58 :
59 : // If our input has not filters, our output cannot have filters either.
60 1 : if r.tableFilter == nil {
61 1 : o.FilterPolicy = nil
62 1 : }
63 1 : o.TableFormat = r.tableFormat
64 1 : w := NewRawWriter(output, o)
65 1 :
66 1 : // We don't want the writer to attempt to write out block property data in
67 1 : // index blocks. This data won't be valid since we're not passing the actual
68 1 : // key data through the writer. We also remove the table-level properties
69 1 : // below.
70 1 : //
71 1 : // TODO(dt,radu): Figure out how to populate the prop collector state with
72 1 : // block props from the original sst.
73 1 : w.blockPropCollectors = nil
74 1 :
75 1 : defer func() {
76 1 : if w != nil {
77 1 : // set w.err to any non-nil error just so it aborts instead of finishing.
78 1 : w.err = base.ErrNotFound
79 1 : // w.Close now owns calling output.Abort().
80 1 : w.Close()
81 1 : }
82 : }()
83 :
84 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
85 0 : // We just checked for these conditions above.
86 0 : return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
87 0 : }
88 :
89 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
90 1 : // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
91 1 : // and lower-level index blocks with one read.
92 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
93 1 : r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
94 1 : defer rh.Close()
95 1 : rh.SetupForCompaction()
96 1 : indexH, err := r.readIndex(ctx, rh, nil, nil)
97 1 : if err != nil {
98 0 : return 0, err
99 0 : }
100 1 : defer indexH.Release()
101 1 :
102 1 : // Set the filter block to be copied over if it exists. It will return false
103 1 : // positives for keys in blocks of the original file that we don't copy, but
104 1 : // filters can always have false positives, so this is fine.
105 1 : if r.tableFilter != nil {
106 0 : filterBlock, err := r.readFilter(ctx, rh, nil, nil)
107 0 : if err != nil {
108 0 : return 0, errors.Wrap(err, "reading filter")
109 0 : }
110 0 : filterBytes := append([]byte{}, filterBlock.Get()...)
111 0 : filterBlock.Release()
112 0 : w.filter = copyFilterWriter{
113 0 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBytes,
114 0 : }
115 : }
116 :
117 : // Copy all the props from the source file; we can't compute our own for many
118 : // that depend on seeing every key, such as total count or size so we copy the
119 : // original props instead. This will result in over-counts but that is safer
120 : // than under-counts.
121 1 : w.props = r.Properties
122 1 : // Remove all user properties to disable block properties, which we do not
123 1 : // calculate.
124 1 : w.props.UserProperties = nil
125 1 : // Reset props that we'll re-derive as we build our own index.
126 1 : w.props.IndexPartitions = 0
127 1 : w.props.TopLevelIndexSize = 0
128 1 : w.props.IndexSize = 0
129 1 : w.props.IndexType = 0
130 1 : if w.filter != nil {
131 0 : if err := checkWriterFilterMatchesReader(r, w); err != nil {
132 0 : return 0, err
133 0 : }
134 : }
135 :
136 : // Find the blocks that intersect our span.
137 1 : blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
138 1 : if err != nil {
139 0 : return 0, err
140 0 : }
141 :
142 : // In theory an empty SST is fine, but #3409 means they are not. We could make
143 : // a non-empty sst by copying something outside the span, but #3907 means that
144 : // the empty virtual span would still be a problem, so don't bother.
145 1 : if len(blocks) < 1 {
146 1 : return 0, ErrEmptySpan
147 1 : }
148 :
149 : // Copy all blocks byte-for-byte without doing any per-key processing.
150 1 : var blocksNotInCache []indexEntry
151 1 :
152 1 : copyBlocksToFile := func(blocks []indexEntry) error {
153 1 : blockOffset := blocks[0].bh.Offset
154 1 : // The block lengths don't include their trailers, which just sit after the
155 1 : // block length, before the next offset; We get the ones between the blocks
156 1 : // we copy implicitly but need to explicitly add the last trailer to length.
157 1 : length := blocks[len(blocks)-1].bh.Offset + blocks[len(blocks)-1].bh.Length + block.TrailerLen - blockOffset
158 1 : if spanEnd := length + blockOffset; spanEnd < blockOffset {
159 0 : return base.AssertionFailedf("invalid intersecting span for CopySpan [%d, %d)", blockOffset, spanEnd)
160 0 : }
161 1 : if err := objstorage.Copy(ctx, rh, w.layout.writable, blockOffset, length); err != nil {
162 0 : return err
163 0 : }
164 : // Update w.meta.Size so subsequently flushed metadata has correct offsets.
165 1 : w.meta.Size += length
166 1 : for i := range blocks {
167 1 : blocks[i].bh.Offset = w.layout.offset
168 1 : // blocks[i].bh.Length remains unmodified.
169 1 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
170 0 : return err
171 0 : }
172 1 : w.layout.offset += uint64(blocks[i].bh.Length) + block.TrailerLen
173 : }
174 1 : return nil
175 : }
176 1 : for i := range blocks {
177 1 : h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
178 1 : if h.Get() == nil {
179 1 : // Cache miss. Add this block to the list of blocks that are not in cache.
180 1 : blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
181 1 : continue
182 : }
183 :
184 : // Cache hit.
185 1 : rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
186 1 : if len(blocksNotInCache) > 0 {
187 0 : // We have some blocks that were not in cache preceding this block.
188 0 : // Copy them using objstorage.Copy.
189 0 : if err := copyBlocksToFile(blocksNotInCache); err != nil {
190 0 : h.Release()
191 0 : return 0, err
192 0 : }
193 0 : blocksNotInCache = nil
194 : }
195 :
196 : // layout.WriteDataBlock keeps layout.offset up-to-date for us.
197 1 : bh, err := w.layout.WriteDataBlock(h.Get(), &w.dataBlockBuf.blockBuf)
198 1 : h.Release()
199 1 : if err != nil {
200 0 : return 0, err
201 0 : }
202 1 : blocks[i].bh.Handle = bh
203 1 : if err := w.addIndexEntrySep(blocks[i].sep, blocks[i].bh, w.dataBlockBuf.tmp[:]); err != nil {
204 0 : return 0, err
205 0 : }
206 1 : w.meta.Size += uint64(bh.Length) + block.TrailerLen
207 : }
208 :
209 1 : if len(blocksNotInCache) > 0 {
210 1 : // We have some remaining blocks that were not in cache. Copy them
211 1 : // using objstorage.Copy.
212 1 : if err := copyBlocksToFile(blocksNotInCache); err != nil {
213 0 : return 0, err
214 0 : }
215 1 : blocksNotInCache = nil
216 : }
217 :
218 : // TODO(dt): Copy range keys (the fact there are none is checked above).
219 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
220 :
221 1 : if err := w.Close(); err != nil {
222 0 : w = nil
223 0 : return 0, err
224 0 : }
225 1 : wrote := w.meta.Size
226 1 : w = nil
227 1 : return wrote, nil
228 : }
229 :
230 : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
231 : // requested span.
232 : //
233 : // Note that CopySpan's determination of block overlap is best effort - we may
234 : // copy a block that doesn't actually contain any keys in the span, in which
235 : // case we won't generate this error. We currently only generate this error when
236 : // the span start is beyond all keys in the physical sstable.
237 : var ErrEmptySpan = errors.New("cannot copy empty span")
238 :
239 : // indexEntry captures the two components of an sst index entry: the key and the
240 : // decoded block handle value.
241 : type indexEntry struct {
242 : sep InternalKey
243 : bh block.HandleWithProperties
244 : }
245 :
246 : // intersectingIndexEntries returns the entries from the index with separator
247 : // keys contained by [start, end), i.e. the subset of the sst's index that
248 : // intersects the provided span.
249 : func intersectingIndexEntries(
250 : ctx context.Context,
251 : r *Reader,
252 : rh objstorage.ReadHandle,
253 : indexH block.BufferHandle,
254 : start, end InternalKey,
255 1 : ) ([]indexEntry, error) {
256 1 : top, err := rowblk.NewIter(r.Compare, r.Split, indexH.Get(), NoTransforms)
257 1 : if err != nil {
258 0 : return nil, err
259 0 : }
260 1 : defer top.Close()
261 1 :
262 1 : var alloc bytealloc.A
263 1 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
264 1 : for kv := top.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = top.Next() {
265 1 : bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
266 1 : if err != nil {
267 0 : return nil, err
268 0 : }
269 1 : if r.Properties.IndexType != twoLevelIndex {
270 1 : entry := indexEntry{bh: bh, sep: kv.K}
271 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
272 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
273 1 : res = append(res, entry)
274 1 : } else {
275 1 : subBlk, err := r.readBlock(ctx, bh.Handle, nil, rh, nil, nil, nil)
276 1 : if err != nil {
277 0 : return nil, err
278 0 : }
279 1 : defer subBlk.Release() // in-loop, but it is a short loop.
280 1 :
281 1 : sub, err := rowblk.NewIter(r.Compare, r.Split, subBlk.Get(), NoTransforms)
282 1 : if err != nil {
283 0 : return nil, err
284 0 : }
285 1 : defer sub.Close() // in-loop, but it is a short loop.
286 1 :
287 1 : for kv := sub.SeekGE(start.UserKey, base.SeekGEFlagsNone); kv != nil; kv = sub.Next() {
288 1 : bh, err := block.DecodeHandleWithProperties(kv.InPlaceValue())
289 1 : if err != nil {
290 0 : return nil, err
291 0 : }
292 1 : entry := indexEntry{bh: bh, sep: kv.K}
293 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
294 1 : alloc, entry.sep.UserKey = alloc.Copy(entry.sep.UserKey)
295 1 : res = append(res, entry)
296 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
297 1 : break
298 : }
299 : }
300 1 : if err := sub.Error(); err != nil {
301 0 : return nil, err
302 0 : }
303 : }
304 1 : if base.InternalCompare(r.Compare, end, kv.K) <= 0 {
305 1 : break
306 : }
307 : }
308 1 : return res, top.Error()
309 : }
310 :
311 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
312 : // exists to ensure it is visible in profiles/stack traces if we are looking at
313 : // cluster copying more than expected.
314 : //
315 : // Finishes or Aborts output; does *not* Close input.
316 : func copyWholeFileBecauseOfUnsupportedFeature(
317 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
318 0 : ) (size uint64, _ error) {
319 0 : length := uint64(input.Size())
320 0 : rh := input.NewReadHandle(objstorage.NoReadBefore)
321 0 : rh.SetupForCompaction()
322 0 : if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
323 0 : output.Abort()
324 0 : return 0, err
325 0 : }
326 0 : return length, output.Finish()
327 : }
|