Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "context"
9 :
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/objstorage"
14 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
15 : "github.com/cockroachdb/pebble/sstable/block"
16 : )
17 :
18 : // CopySpan produces a copy of a approximate subset of an input sstable.
19 : //
20 : // The produced sstable contains all keys from the input sstable in the span
21 : // [start, end), as well as potentially some additional keys from the original
22 : // file that were adjacent to but outside that span.
23 : //
24 : // CopySpan differs from simply seeking a reader to start and iterating until
25 : // the end passing the results to a writer in that it does not write the new
26 : // sstable from scratch, key-by-key, recompressing each key into new blocks and
27 : // computing new filters and properties. Instead, it finds data _blocks_ that
28 : // intersect the requested span and copies those, whole, to the new file,
29 : // avoiding all decompression and recompression work. It then copies the
30 : // original bloom filter - this filter is valid for the subset of data as well,
31 : // just with potentially a higher false positive rate compared to one that would
32 : // be computed just from the keys in it.
33 : //
34 : // The resulting sstable will have no block properties.
35 : //
36 : // The function might return ErrEmptySpan if there are no blocks that could
37 : // include keys in the given range. See ErrEmptySpan for more details.
38 : //
39 : // Closes input and finishes or aborts output in all cases, including on errors.
40 : //
41 : // Note that CopySpan is not aware of any suffix or prefix replacement; the
42 : // caller must account for those when specifying the bounds.
43 : func CopySpan(
44 : ctx context.Context,
45 : input objstorage.Readable,
46 : r *Reader,
47 : rOpts ReaderOptions,
48 : output objstorage.Writable,
49 : o WriterOptions,
50 : start, end InternalKey,
51 1 : ) (size uint64, _ error) {
52 1 : defer input.Close()
53 1 :
54 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
55 0 : return copyWholeFileBecauseOfUnsupportedFeature(ctx, input, output) // Finishes/Aborts output.
56 0 : }
57 :
58 : // If our input has not filters, our output cannot have filters either.
59 1 : if r.tableFilter == nil {
60 1 : o.FilterPolicy = nil
61 1 : }
62 1 : o.TableFormat = r.tableFormat
63 1 : // We don't want the writer to attempt to write out block property data in
64 1 : // index blocks. This data won't be valid since we're not passing the actual
65 1 : // key data through the writer. We also remove the table-level properties
66 1 : // below.
67 1 : //
68 1 : // TODO(dt,radu): Figure out how to populate the prop collector state with
69 1 : // block props from the original sst.
70 1 : o.BlockPropertyCollectors = nil
71 1 : o.disableObsoleteCollector = true
72 1 : w := NewRawWriter(output, o)
73 1 :
74 1 : defer func() {
75 1 : if w != nil {
76 1 : w.Close()
77 1 : }
78 : }()
79 :
80 1 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumRangeKeys() > 0 || r.Properties.NumRangeDeletions > 0 {
81 0 : // We just checked for these conditions above.
82 0 : return 0, base.AssertionFailedf("cannot CopySpan sstables with value blocks or range keys")
83 0 : }
84 :
85 1 : var preallocRH objstorageprovider.PreallocatedReadHandle
86 1 : // ReadBeforeForIndexAndFilter attempts to read the top-level index, filter
87 1 : // and lower-level index blocks with one read.
88 1 : rh := objstorageprovider.UsePreallocatedReadHandle(
89 1 : r.readable, objstorage.ReadBeforeForIndexAndFilter, &preallocRH)
90 1 : defer rh.Close()
91 1 : rh.SetupForCompaction()
92 1 : indexH, err := r.readTopLevelIndexBlock(ctx, noEnv, rh)
93 1 : if err != nil {
94 0 : return 0, err
95 0 : }
96 1 : defer indexH.Release()
97 1 :
98 1 : // Set the filter block to be copied over if it exists. It will return false
99 1 : // positives for keys in blocks of the original file that we don't copy, but
100 1 : // filters can always have false positives, so this is fine.
101 1 : if r.tableFilter != nil {
102 0 : filterBlock, err := r.readFilterBlock(ctx, noEnv, rh, r.filterBH)
103 0 : if err != nil {
104 0 : return 0, errors.Wrap(err, "reading filter")
105 0 : }
106 0 : filterBytes := append([]byte{}, filterBlock.BlockData()...)
107 0 : filterBlock.Release()
108 0 : w.copyFilter(filterBytes, r.Properties.FilterPolicyName)
109 : }
110 :
111 : // Copy all the props from the source file; we can't compute our own for many
112 : // that depend on seeing every key, such as total count or size so we copy the
113 : // original props instead. This will result in over-counts but that is safer
114 : // than under-counts.
115 1 : w.copyProperties(r.Properties)
116 1 :
117 1 : // Find the blocks that intersect our span.
118 1 : blocks, err := intersectingIndexEntries(ctx, r, rh, indexH, start, end)
119 1 : if err != nil {
120 0 : return 0, err
121 0 : }
122 :
123 : // In theory an empty SST is fine, but #3409 means they are not. We could make
124 : // a non-empty sst by copying something outside the span, but #3907 means that
125 : // the empty virtual span would still be a problem, so don't bother.
126 1 : if len(blocks) < 1 {
127 1 : return 0, ErrEmptySpan
128 1 : }
129 :
130 : // Copy all blocks byte-for-byte without doing any per-key processing.
131 1 : var blocksNotInCache []indexEntry
132 1 :
133 1 : for i := range blocks {
134 1 : h := r.cacheOpts.Cache.Get(r.cacheOpts.CacheID, r.cacheOpts.FileNum, blocks[i].bh.Offset)
135 1 : if !h.Valid() {
136 1 : // Cache miss. Add this block to the list of blocks that are not in cache.
137 1 : blocksNotInCache = blocks[i-len(blocksNotInCache) : i+1]
138 1 : continue
139 : }
140 :
141 : // Cache hit.
142 1 : rh.RecordCacheHit(ctx, int64(blocks[i].bh.Offset), int64(blocks[i].bh.Length+block.TrailerLen))
143 1 : if len(blocksNotInCache) > 0 {
144 1 : // We have some blocks that were not in cache preceding this block.
145 1 : // Copy them using objstorage.Copy.
146 1 : if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
147 0 : h.Release()
148 0 : return 0, err
149 0 : }
150 1 : blocksNotInCache = nil
151 : }
152 :
153 1 : err := w.addDataBlock(block.CacheBufferHandle(h).BlockData(), blocks[i].sep, blocks[i].bh)
154 1 : h.Release()
155 1 : if err != nil {
156 0 : return 0, err
157 0 : }
158 : }
159 :
160 1 : if len(blocksNotInCache) > 0 {
161 0 : // We have some remaining blocks that were not in cache. Copy them
162 0 : // using objstorage.Copy.
163 0 : if err := w.copyDataBlocks(ctx, blocksNotInCache, rh); err != nil {
164 0 : return 0, err
165 0 : }
166 0 : blocksNotInCache = nil
167 : }
168 :
169 : // TODO(dt): Copy range keys (the fact there are none is checked above).
170 : // TODO(dt): Copy valblocks keys (the fact there are none is checked above).
171 :
172 1 : if err := w.Close(); err != nil {
173 0 : w = nil
174 0 : return 0, err
175 0 : }
176 1 : meta, err := w.Metadata()
177 1 : if err != nil {
178 0 : return 0, err
179 0 : }
180 1 : wrote := meta.Size
181 1 : w = nil
182 1 : return wrote, nil
183 : }
184 :
185 : // ErrEmptySpan is returned by CopySpan if the input sstable has no keys in the
186 : // requested span.
187 : //
188 : // Note that CopySpan's determination of block overlap is best effort - we may
189 : // copy a block that doesn't actually contain any keys in the span, in which
190 : // case we won't generate this error. We currently only generate this error when
191 : // the span start is beyond all keys in the physical sstable.
192 : var ErrEmptySpan = errors.New("cannot copy empty span")
193 :
194 : // indexEntry captures the two components of an sst index entry: the key and the
195 : // decoded block handle value.
196 : type indexEntry struct {
197 : sep []byte
198 : bh block.HandleWithProperties
199 : }
200 :
201 : // intersectingIndexEntries returns the entries from the index with separator
202 : // keys contained by [start, end), i.e. the subset of the sst's index that
203 : // intersects the provided span.
204 : func intersectingIndexEntries(
205 : ctx context.Context,
206 : r *Reader,
207 : rh objstorage.ReadHandle,
208 : indexH block.BufferHandle,
209 : start, end InternalKey,
210 1 : ) ([]indexEntry, error) {
211 1 : top := r.tableFormat.newIndexIter()
212 1 : err := top.Init(r.Comparer, indexH.BlockData(), NoTransforms)
213 1 : if err != nil {
214 0 : return nil, err
215 0 : }
216 1 : defer top.Close()
217 1 :
218 1 : var alloc bytealloc.A
219 1 : res := make([]indexEntry, 0, r.Properties.NumDataBlocks)
220 1 : for valid := top.SeekGE(start.UserKey); valid; valid = top.Next() {
221 1 : bh, err := top.BlockHandleWithProperties()
222 1 : if err != nil {
223 0 : return nil, err
224 0 : }
225 1 : if r.Properties.IndexType != twoLevelIndex {
226 1 : entry := indexEntry{bh: bh, sep: top.Separator()}
227 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
228 1 : alloc, entry.sep = alloc.Copy(entry.sep)
229 1 : res = append(res, entry)
230 1 : } else {
231 1 : subBlk, err := r.readIndexBlock(ctx, noEnv, rh, bh.Handle)
232 1 : if err != nil {
233 0 : return nil, err
234 0 : }
235 1 : defer subBlk.Release() // in-loop, but it is a short loop.
236 1 :
237 1 : sub := r.tableFormat.newIndexIter()
238 1 : err = sub.Init(r.Comparer, subBlk.BlockData(), NoTransforms)
239 1 : if err != nil {
240 0 : return nil, err
241 0 : }
242 1 : defer sub.Close() // in-loop, but it is a short loop.
243 1 :
244 1 : for valid := sub.SeekGE(start.UserKey); valid; valid = sub.Next() {
245 1 : bh, err := sub.BlockHandleWithProperties()
246 1 : if err != nil {
247 0 : return nil, err
248 0 : }
249 1 : entry := indexEntry{bh: bh, sep: sub.Separator()}
250 1 : alloc, entry.bh.Props = alloc.Copy(entry.bh.Props)
251 1 : alloc, entry.sep = alloc.Copy(entry.sep)
252 1 : res = append(res, entry)
253 1 : if r.Compare(end.UserKey, entry.sep) <= 0 {
254 1 : break
255 : }
256 : }
257 : }
258 1 : if top.SeparatorGT(end.UserKey, true /* inclusively */) {
259 1 : break
260 : }
261 : }
262 1 : return res, nil
263 : }
264 :
265 : // copyWholeFileBecauseOfUnsupportedFeature is a thin wrapper around Copy that
266 : // exists to ensure it is visible in profiles/stack traces if we are looking at
267 : // cluster copying more than expected.
268 : //
269 : // Finishes or Aborts output; does *not* Close input.
270 : func copyWholeFileBecauseOfUnsupportedFeature(
271 : ctx context.Context, input objstorage.Readable, output objstorage.Writable,
272 0 : ) (size uint64, _ error) {
273 0 : length := uint64(input.Size())
274 0 : rh := input.NewReadHandle(objstorage.NoReadBefore)
275 0 : rh.SetupForCompaction()
276 0 : if err := objstorage.Copy(ctx, rh, output, 0, length); err != nil {
277 0 : output.Abort()
278 0 : return 0, err
279 0 : }
280 0 : return length, output.Finish()
281 : }
|