Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package sstable
6 :
7 : import (
8 : "bytes"
9 : "cmp"
10 : "context"
11 : "math"
12 : "slices"
13 : "sync"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble/internal/base"
17 : "github.com/cockroachdb/pebble/internal/bytealloc"
18 : "github.com/cockroachdb/pebble/internal/invariants"
19 : "github.com/cockroachdb/pebble/objstorage"
20 : "github.com/cockroachdb/pebble/sstable/block"
21 : )
22 :
23 : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
24 : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
25 : // replaced with `to` in every key. The input sstable must consist of only
26 : // Sets or RangeKeySets and every key must have `from` as its suffix as
27 : // determined by the Split function of the Comparer in the passed
28 : // WriterOptions. Range deletes must not exist in this sstable, as they will
29 : // be ignored.
30 : //
31 : // Data blocks are rewritten in parallel by `concurrency` workers and then
32 : // assembled into a final SST. Filters are copied from the original SST without
33 : // modification as they are not affected by the suffix, while block and table
34 : // properties are only minimally recomputed.
35 : //
36 : // TODO(sumeer): document limitations, if any, due to this limited
37 : // re-computation of properties (is there any loss of fidelity?).
38 : //
39 : // Any block property collectors configured in the WriterOptions must implement
40 : // AddCollectedWithSuffixChange.
41 : //
42 : // The WriterOptions.TableFormat is ignored, and the output sstable has the
43 : // same TableFormat as the input, which is returned in case the caller wants
44 : // to do some error checking. Suffix rewriting is meant to be efficient, and
45 : // allowing changes in the TableFormat detracts from that efficiency.
46 : //
47 : // Any obsolete bits that key-value pairs may be annotated with are ignored
48 : // and lost during the rewrite. Additionally, the output sstable has the
49 : // pebble.obsolete.is_strict property set to false. These limitations could be
50 : // removed if needed. The current use case for
51 : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
52 : // ingestion, where these files do not contain RANGEDELs and have one
53 : // key-value pair per userkey -- so they trivially satisfy the strict
54 : // criteria, and we don't need the obsolete bit as a performance optimization.
55 : // For disaggregated storage, strict obsolete sstables are needed for L5 and
56 : // L6, but at the time of writing, we expect such MVCC-compliant file
57 : // ingestion to only ingest into levels L4 and higher. If this changes, we can
58 : // do one of two things to get rid of this limitation:
59 : // - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
60 : // in the sstable to be rewritten. Validating no duplicate userkeys is
61 : // non-trivial when rewriting blocks in parallel, so we could encode the
62 : // pre-existing condition in the (existing) SnapshotPinnedKeys property --
63 : // we need to update the external sst writer to calculate and encode this
64 : // property.
65 : // - Preserve the obsolete bit (with changes to the blockIter).
66 : func RewriteKeySuffixesAndReturnFormat(
67 : sst []byte,
68 : rOpts ReaderOptions,
69 : out objstorage.Writable,
70 : o WriterOptions,
71 : from, to []byte,
72 : concurrency int,
73 0 : ) (*WriterMetadata, TableFormat, error) {
74 0 : r, err := NewMemReader(sst, rOpts)
75 0 : if err != nil {
76 0 : return nil, TableFormatUnspecified, err
77 0 : }
78 0 : defer r.Close()
79 0 : return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
80 : }
81 :
82 : func rewriteKeySuffixesInBlocks(
83 : r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
84 1 : ) (*WriterMetadata, TableFormat, error) {
85 1 : o = o.ensureDefaults()
86 1 : switch {
87 0 : case concurrency < 1:
88 0 : return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
89 0 : case r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0:
90 0 : return nil, TableFormatUnspecified,
91 0 : errors.New("sstable with a single suffix should not have value blocks")
92 1 : case r.Properties.ComparerName != o.Comparer.Name:
93 1 : return nil, TableFormatUnspecified, errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters",
94 1 : r.Properties.ComparerName, o.Comparer.Name)
95 0 : case o.FilterPolicy != nil && r.Properties.FilterPolicyName != o.FilterPolicy.Name():
96 0 : return nil, TableFormatUnspecified, errors.New("mismatched filters")
97 : }
98 :
99 1 : o.TableFormat = r.tableFormat
100 1 : w := NewRawWriter(out, o)
101 1 : defer func() {
102 1 : if w != nil {
103 1 : w.Close()
104 1 : }
105 : }()
106 :
107 1 : if err := w.rewriteSuffixes(r, o, from, to, concurrency); err != nil {
108 1 : return nil, TableFormatUnspecified, err
109 1 : }
110 :
111 1 : if err := w.Close(); err != nil {
112 0 : w = nil
113 0 : return nil, TableFormatUnspecified, err
114 0 : }
115 1 : writerMeta, err := w.Metadata()
116 1 : w = nil
117 1 : return writerMeta, r.tableFormat, err
118 : }
119 :
120 : var errBadKind = errors.New("key does not have expected kind (set)")
121 :
122 : type blockWithSpan struct {
123 : start, end base.InternalKey
124 : physical block.PhysicalBlock
125 : }
126 :
127 : type blockRewriter interface {
128 : RewriteSuffixes(
129 : input []byte, from []byte, to []byte,
130 : ) (start, end base.InternalKey, rewritten []byte, err error)
131 : }
132 :
133 : func rewriteDataBlocksInParallel(
134 : r *Reader,
135 : opts WriterOptions,
136 : input []block.HandleWithProperties,
137 : from, to []byte,
138 : concurrency int,
139 : newDataBlockRewriter func() blockRewriter,
140 1 : ) ([]blockWithSpan, error) {
141 1 : if r.Properties.NumEntries == 0 {
142 1 : // No point keys.
143 1 : return nil, nil
144 1 : }
145 1 : output := make([]blockWithSpan, len(input))
146 1 :
147 1 : g := &sync.WaitGroup{}
148 1 : g.Add(concurrency)
149 1 : type workerErr struct {
150 1 : worker int
151 1 : err error
152 1 : }
153 1 : errCh := make(chan workerErr, concurrency)
154 1 : for j := 0; j < concurrency; j++ {
155 1 : worker := j
156 1 : go func() {
157 1 : defer g.Done()
158 1 : rw := newDataBlockRewriter()
159 1 : var blockAlloc bytealloc.A
160 1 : var compressedBuf []byte
161 1 : var inputBlock, inputBlockBuf []byte
162 1 : checksummer := block.Checksummer{Type: opts.Checksum}
163 1 : // We'll assume all blocks are _roughly_ equal so round-robin static partition
164 1 : // of each worker doing every ith block is probably enough.
165 1 : err := func() error {
166 1 : for i := worker; i < len(input); i += concurrency {
167 1 : bh := input[i]
168 1 : var err error
169 1 : inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.Handle, inputBlockBuf)
170 1 : if err != nil {
171 0 : return err
172 0 : }
173 1 : var outputBlock []byte
174 1 : output[i].start, output[i].end, outputBlock, err =
175 1 : rw.RewriteSuffixes(inputBlock, from, to)
176 1 : if err != nil {
177 1 : return err
178 1 : }
179 1 : compressedBuf = compressedBuf[:cap(compressedBuf)]
180 1 : finished := block.CompressAndChecksum(&compressedBuf, outputBlock, opts.Compression, &checksummer)
181 1 : output[i].physical = finished.CloneWithByteAlloc(&blockAlloc)
182 : }
183 1 : return nil
184 : }()
185 1 : if err != nil {
186 1 : errCh <- workerErr{worker: worker, err: err}
187 1 : }
188 : }()
189 : }
190 1 : g.Wait()
191 1 : close(errCh)
192 1 : if werr, ok := <-errCh; ok {
193 1 : // Collect errors from all workers and sort them by worker for determinism.
194 1 : werrs := []workerErr{werr}
195 1 : for werr := range errCh {
196 1 : werrs = append(werrs, werr)
197 1 : }
198 1 : slices.SortFunc(werrs, func(a, b workerErr) int { return cmp.Compare(a.worker, b.worker) })
199 1 : return nil, werrs[0].err
200 : }
201 1 : return output, nil
202 : }
203 :
204 1 : func rewriteRangeKeyBlockToWriter(r *Reader, w RawWriter, from, to []byte) error {
205 1 : iter, err := r.NewRawRangeKeyIter(context.TODO(), NoFragmentTransforms)
206 1 : if err != nil {
207 0 : return err
208 0 : }
209 1 : if iter == nil {
210 1 : // No range keys.
211 1 : return nil
212 1 : }
213 1 : defer iter.Close()
214 1 :
215 1 : s, err := iter.First()
216 1 : for ; s != nil; s, err = iter.Next() {
217 1 : if !s.Valid() {
218 0 : break
219 : }
220 1 : for i := range s.Keys {
221 1 : if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
222 0 : return errBadKind
223 0 : }
224 1 : if !bytes.Equal(s.Keys[i].Suffix, from) {
225 0 : return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
226 0 : }
227 1 : s.Keys[i].Suffix = to
228 : }
229 :
230 1 : if err := w.EncodeSpan(*s); err != nil {
231 0 : return err
232 0 : }
233 : }
234 1 : return err
235 : }
236 :
237 : // getShortIDs returns a slice keyed by the shortIDs of the block property
238 : // collector in r, with the values containing a new shortID corresponding to the
239 : // index of the corresponding block property collector in collectors.
240 : //
241 : // getShortIDs errors if any of the collectors are not found in the sstable.
242 1 : func getShortIDs(r *Reader, collectors []BlockPropertyCollector) ([]shortID, error) {
243 1 : if len(collectors) == 0 {
244 1 : return nil, nil
245 1 : }
246 1 : shortIDs := make([]shortID, math.MaxUint8)
247 1 : for i := range shortIDs {
248 1 : shortIDs[i] = invalidShortID
249 1 : }
250 1 : for i, p := range collectors {
251 1 : prop, ok := r.Properties.UserProperties[p.Name()]
252 1 : if !ok {
253 0 : return nil, errors.Errorf("sstable does not contain property %s", p.Name())
254 0 : }
255 1 : shortIDs[shortID(prop[0])] = shortID(i)
256 : }
257 1 : return shortIDs, nil
258 : }
259 :
260 : type copyFilterWriter struct {
261 : origMetaName string
262 : origPolicyName string
263 : data []byte
264 : }
265 :
266 0 : func (copyFilterWriter) addKey(key []byte) { panic("unimplemented") }
267 2 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
268 2 : func (c copyFilterWriter) metaName() string { return c.origMetaName }
269 2 : func (c copyFilterWriter) policyName() string { return c.origPolicyName }
270 :
271 : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
272 : // single loop over the Reader that writes each key to the Writer with the new
273 : // suffix. The is significantly slower than the parallelized rewriter, and does
274 : // more work to rederive filters, props, etc.
275 : //
276 : // Any obsolete bits that key-value pairs may be annotated with are ignored
277 : // and lost during the rewrite. Some of the obsolete bits may be recreated --
278 : // specifically when there are multiple keys with the same user key.
279 : // Additionally, the output sstable has the pebble.obsolete.is_strict property
280 : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
281 : func RewriteKeySuffixesViaWriter(
282 : r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
283 1 : ) (*WriterMetadata, error) {
284 1 : if o.Comparer == nil || o.Comparer.Split == nil {
285 0 : return nil, errors.New("a valid splitter is required to rewrite suffixes")
286 0 : }
287 :
288 1 : o.IsStrictObsolete = false
289 1 : w := newRowWriter(out, o)
290 1 : defer func() {
291 1 : if w != nil {
292 0 : w.Close()
293 0 : }
294 : }()
295 1 : i, err := r.NewIter(NoTransforms, nil, nil)
296 1 : if err != nil {
297 0 : return nil, err
298 0 : }
299 1 : defer i.Close()
300 1 :
301 1 : kv := i.First()
302 1 : var scratch InternalKey
303 1 : for kv != nil {
304 1 : if kv.Kind() != InternalKeyKindSet {
305 0 : return nil, errors.New("invalid key type")
306 0 : }
307 1 : oldSuffix := kv.K.UserKey[r.Split(kv.K.UserKey):]
308 1 : if !bytes.Equal(oldSuffix, from) {
309 0 : return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
310 0 : }
311 1 : scratch.UserKey = append(scratch.UserKey[:0], kv.K.UserKey[:len(kv.K.UserKey)-len(from)]...)
312 1 : scratch.UserKey = append(scratch.UserKey, to...)
313 1 : scratch.Trailer = kv.K.Trailer
314 1 :
315 1 : val, _, err := kv.Value(nil)
316 1 : if err != nil {
317 0 : return nil, err
318 0 : }
319 1 : w.addPoint(scratch, val, false)
320 1 : kv = i.Next()
321 : }
322 1 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
323 0 : return nil, err
324 0 : }
325 1 : if err := w.Close(); err != nil {
326 0 : w = nil
327 0 : return nil, err
328 0 : }
329 1 : writerMeta, err := w.Metadata()
330 1 : w = nil
331 1 : return writerMeta, err
332 : }
333 :
334 : // NewMemReader opens a reader over the SST stored in the passed []byte.
335 1 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
336 1 : // Since all operations are from memory, plumbing a context here is not useful.
337 1 : return NewReader(context.Background(), newMemReader(sst), o)
338 1 : }
339 :
340 1 : func readBlockBuf(r *Reader, bh block.Handle, buf []byte) ([]byte, []byte, error) {
341 1 : raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+block.TrailerLen]
342 1 : if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
343 0 : return nil, buf, err
344 0 : }
345 1 : algo := block.CompressionIndicator(raw[bh.Length])
346 1 : raw = raw[:bh.Length]
347 1 : if algo == block.NoCompressionIndicator {
348 1 : return raw, buf, nil
349 1 : }
350 1 : decompressedLen, prefix, err := block.DecompressedLen(algo, raw)
351 1 : if err != nil {
352 0 : return nil, buf, err
353 0 : }
354 1 : if cap(buf) < decompressedLen {
355 1 : buf = make([]byte, decompressedLen)
356 1 : }
357 1 : dst := buf[:decompressedLen]
358 1 : err = block.DecompressInto(algo, raw[prefix:], dst)
359 1 : return dst, buf, err
360 : }
361 :
362 : // memReader is a thin wrapper around a []byte such that it can be passed to
363 : // sstable.Reader. It supports concurrent use, and does so without locking in
364 : // contrast to the heavier read/write vfs.MemFile.
365 : type memReader struct {
366 : b []byte
367 : r *bytes.Reader
368 : rh objstorage.NoopReadHandle
369 : }
370 :
371 : var _ objstorage.Readable = (*memReader)(nil)
372 :
373 1 : func newMemReader(b []byte) *memReader {
374 1 : r := &memReader{
375 1 : b: b,
376 1 : r: bytes.NewReader(b),
377 1 : }
378 1 : r.rh = objstorage.MakeNoopReadHandle(r)
379 1 : return r
380 1 : }
381 :
382 : // ReadAt is part of objstorage.Readable.
383 1 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
384 1 : n, err := m.r.ReadAt(p, off)
385 1 : if invariants.Enabled && err == nil && n != len(p) {
386 0 : panic("short read")
387 : }
388 1 : return err
389 : }
390 :
391 : // Close is part of objstorage.Readable.
392 1 : func (*memReader) Close() error {
393 1 : return nil
394 1 : }
395 :
396 : // Stat is part of objstorage.Readable.
397 1 : func (m *memReader) Size() int64 {
398 1 : return int64(len(m.b))
399 1 : }
400 :
401 : // NewReadHandle is part of objstorage.Readable.
402 1 : func (m *memReader) NewReadHandle(readBeforeSize objstorage.ReadBeforeSize) objstorage.ReadHandle {
403 1 : return &m.rh
404 1 : }
|