Line data Source code
1 : package sstable
2 :
3 : import (
4 : "bytes"
5 : "context"
6 : "math"
7 : "sync"
8 :
9 : "github.com/cespare/xxhash/v2"
10 : "github.com/cockroachdb/errors"
11 : "github.com/cockroachdb/pebble/internal/base"
12 : "github.com/cockroachdb/pebble/internal/bytealloc"
13 : "github.com/cockroachdb/pebble/internal/invariants"
14 : "github.com/cockroachdb/pebble/internal/rangekey"
15 : "github.com/cockroachdb/pebble/objstorage"
16 : )
17 :
18 : // RewriteKeySuffixes is deprecated.
19 : //
20 : // TODO(sumeer): remove after switching CockroachDB to RewriteKeySuffixesAndReturnFormat.
21 : func RewriteKeySuffixes(
22 : sst []byte,
23 : rOpts ReaderOptions,
24 : out objstorage.Writable,
25 : o WriterOptions,
26 : from, to []byte,
27 : concurrency int,
28 0 : ) (*WriterMetadata, error) {
29 0 : meta, _, err := RewriteKeySuffixesAndReturnFormat(sst, rOpts, out, o, from, to, concurrency)
30 0 : return meta, err
31 0 : }
32 :
33 : // RewriteKeySuffixesAndReturnFormat copies the content of the passed SSTable
34 : // bytes to a new sstable, written to `out`, in which the suffix `from` has is
35 : // replaced with `to` in every key. The input sstable must consist of only
36 : // Sets or RangeKeySets and every key must have `from` as its suffix as
37 : // determined by the Split function of the Comparer in the passed
38 : // WriterOptions. Range deletes must not exist in this sstable, as they will
39 : // be ignored.
40 : //
41 : // Data blocks are rewritten in parallel by `concurrency` workers and then
42 : // assembled into a final SST. Filters are copied from the original SST without
43 : // modification as they are not affected by the suffix, while block and table
44 : // properties are only minimally recomputed.
45 : //
46 : // TODO(sumeer): document limitations, if any, due to this limited
47 : // re-computation of properties (is there any loss of fidelity?).
48 : //
49 : // Any block property collectors configured in the WriterOptions must implement
50 : // SuffixReplaceableBlockCollector.
51 : //
52 : // The WriterOptions.TableFormat is ignored, and the output sstable has the
53 : // same TableFormat as the input, which is returned in case the caller wants
54 : // to do some error checking. Suffix rewriting is meant to be efficient, and
55 : // allowing changes in the TableFormat detracts from that efficiency.
56 : //
57 : // Any obsolete bits that key-value pairs may be annotated with are ignored
58 : // and lost during the rewrite. Additionally, the output sstable has the
59 : // pebble.obsolete.is_strict property set to false. These limitations could be
60 : // removed if needed. The current use case for
61 : // RewriteKeySuffixesAndReturnFormat in CockroachDB is for MVCC-compliant file
62 : // ingestion, where these files do not contain RANGEDELs and have one
63 : // key-value pair per userkey -- so they trivially satisfy the strict
64 : // criteria, and we don't need the obsolete bit as a performance optimization.
65 : // For disaggregated storage, strict obsolete sstables are needed for L5 and
66 : // L6, but at the time of writing, we expect such MVCC-compliant file
67 : // ingestion to only ingest into levels L4 and higher. If this changes, we can
68 : // do one of two things to get rid of this limitation:
69 : // - Validate that there are no duplicate userkeys and no RANGEDELs/MERGEs
70 : // in the sstable to be rewritten. Validating no duplicate userkeys is
71 : // non-trivial when rewriting blocks in parallel, so we could encode the
72 : // pre-existing condition in the (existing) SnapshotPinnedKeys property --
73 : // we need to update the external sst writer to calculate and encode this
74 : // property.
75 : // - Preserve the obsolete bit (with changes to the blockIter).
76 : func RewriteKeySuffixesAndReturnFormat(
77 : sst []byte,
78 : rOpts ReaderOptions,
79 : out objstorage.Writable,
80 : o WriterOptions,
81 : from, to []byte,
82 : concurrency int,
83 0 : ) (*WriterMetadata, TableFormat, error) {
84 0 : r, err := NewMemReader(sst, rOpts)
85 0 : if err != nil {
86 0 : return nil, TableFormatUnspecified, err
87 0 : }
88 0 : defer r.Close()
89 0 : return rewriteKeySuffixesInBlocks(r, out, o, from, to, concurrency)
90 : }
91 :
92 : func rewriteKeySuffixesInBlocks(
93 : r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte, concurrency int,
94 0 : ) (*WriterMetadata, TableFormat, error) {
95 0 : if o.Comparer == nil || o.Comparer.Split == nil {
96 0 : return nil, TableFormatUnspecified,
97 0 : errors.New("a valid splitter is required to rewrite suffixes")
98 0 : }
99 0 : if concurrency < 1 {
100 0 : return nil, TableFormatUnspecified, errors.New("concurrency must be >= 1")
101 0 : }
102 : // Even though NumValueBlocks = 0 => NumValuesInValueBlocks = 0, check both
103 : // as a defensive measure.
104 0 : if r.Properties.NumValueBlocks > 0 || r.Properties.NumValuesInValueBlocks > 0 {
105 0 : return nil, TableFormatUnspecified,
106 0 : errors.New("sstable with a single suffix should not have value blocks")
107 0 : }
108 :
109 0 : tableFormat := r.tableFormat
110 0 : o.TableFormat = tableFormat
111 0 : w := NewWriter(out, o)
112 0 : defer func() {
113 0 : if w != nil {
114 0 : w.Close()
115 0 : }
116 : }()
117 :
118 0 : for _, c := range w.blockPropCollectors {
119 0 : if _, ok := c.(SuffixReplaceableBlockCollector); !ok {
120 0 : return nil, TableFormatUnspecified,
121 0 : errors.Errorf("block property collector %s does not support suffix replacement", c.Name())
122 0 : }
123 : }
124 :
125 0 : l, err := r.Layout()
126 0 : if err != nil {
127 0 : return nil, TableFormatUnspecified, errors.Wrap(err, "reading layout")
128 0 : }
129 :
130 0 : if err := rewriteDataBlocksToWriter(r, w, l.Data, from, to, w.split, concurrency); err != nil {
131 0 : return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting data blocks")
132 0 : }
133 :
134 : // Copy over the range key block and replace suffixes in it if it exists.
135 0 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
136 0 : return nil, TableFormatUnspecified, errors.Wrap(err, "rewriting range key blocks")
137 0 : }
138 :
139 : // Copy over the filter block if it exists (rewriteDataBlocksToWriter will
140 : // already have ensured this is valid if it exists).
141 0 : if w.filter != nil && l.Filter.Length > 0 {
142 0 : filterBlock, _, err := readBlockBuf(r, l.Filter, nil)
143 0 : if err != nil {
144 0 : return nil, TableFormatUnspecified, errors.Wrap(err, "reading filter")
145 0 : }
146 0 : w.filter = copyFilterWriter{
147 0 : origPolicyName: w.filter.policyName(), origMetaName: w.filter.metaName(), data: filterBlock,
148 0 : }
149 : }
150 :
151 0 : if err := w.Close(); err != nil {
152 0 : w = nil
153 0 : return nil, TableFormatUnspecified, err
154 0 : }
155 0 : writerMeta, err := w.Metadata()
156 0 : w = nil
157 0 : return writerMeta, tableFormat, err
158 : }
159 :
160 : var errBadKind = errors.New("key does not have expected kind (set)")
161 :
162 : type blockWithSpan struct {
163 : start, end InternalKey
164 : data []byte
165 : }
166 :
167 : func rewriteBlocks(
168 : r *Reader,
169 : restartInterval int,
170 : checksumType ChecksumType,
171 : compression Compression,
172 : input []BlockHandleWithProperties,
173 : output []blockWithSpan,
174 : totalWorkers, worker int,
175 : from, to []byte,
176 : split Split,
177 0 : ) error {
178 0 : bw := blockWriter{
179 0 : restartInterval: restartInterval,
180 0 : }
181 0 : buf := blockBuf{checksummer: checksummer{checksumType: checksumType}}
182 0 : if checksumType == ChecksumTypeXXHash {
183 0 : buf.checksummer.xxHasher = xxhash.New()
184 0 : }
185 :
186 0 : var blockAlloc bytealloc.A
187 0 : var keyAlloc bytealloc.A
188 0 : var scratch InternalKey
189 0 :
190 0 : var inputBlock, inputBlockBuf []byte
191 0 :
192 0 : iter := &blockIter{}
193 0 :
194 0 : // We'll assume all blocks are _roughly_ equal so round-robin static partition
195 0 : // of each worker doing every ith block is probably enough.
196 0 : for i := worker; i < len(input); i += totalWorkers {
197 0 : bh := input[i]
198 0 :
199 0 : var err error
200 0 : inputBlock, inputBlockBuf, err = readBlockBuf(r, bh.BlockHandle, inputBlockBuf)
201 0 : if err != nil {
202 0 : return err
203 0 : }
204 0 : if err := iter.init(r.Compare, r.Split, inputBlock, r.Properties.GlobalSeqNum, false, nil); err != nil {
205 0 : return err
206 0 : }
207 :
208 0 : if cap(bw.restarts) < int(iter.restarts) {
209 0 : bw.restarts = make([]uint32, 0, iter.restarts)
210 0 : }
211 0 : if cap(bw.buf) == 0 {
212 0 : bw.buf = make([]byte, 0, len(inputBlock))
213 0 : }
214 0 : if cap(bw.restarts) < int(iter.numRestarts) {
215 0 : bw.restarts = make([]uint32, 0, iter.numRestarts)
216 0 : }
217 :
218 0 : for key, val := iter.First(); key != nil; key, val = iter.Next() {
219 0 : if key.Kind() != InternalKeyKindSet {
220 0 : return errBadKind
221 0 : }
222 0 : si := split(key.UserKey)
223 0 : oldSuffix := key.UserKey[si:]
224 0 : if !bytes.Equal(oldSuffix, from) {
225 0 : err := errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
226 0 : return err
227 0 : }
228 0 : newLen := si + len(to)
229 0 : if cap(scratch.UserKey) < newLen {
230 0 : scratch.UserKey = make([]byte, 0, len(key.UserKey)*2+len(to)-len(from))
231 0 : }
232 :
233 0 : scratch.Trailer = key.Trailer
234 0 : scratch.UserKey = scratch.UserKey[:newLen]
235 0 : copy(scratch.UserKey, key.UserKey[:si])
236 0 : copy(scratch.UserKey[si:], to)
237 0 :
238 0 : // NB: for TableFormatPebblev3 and higher, since
239 0 : // !iter.lazyValueHandling.hasValuePrefix, it will return the raw value
240 0 : // in the block, which includes the 1-byte prefix. This is fine since bw
241 0 : // also does not know about the prefix and will preserve it in bw.add.
242 0 : v := val.InPlaceValue()
243 0 : if invariants.Enabled && r.tableFormat >= TableFormatPebblev3 &&
244 0 : key.Kind() == InternalKeyKindSet {
245 0 : if len(v) < 1 {
246 0 : return errors.Errorf("value has no prefix")
247 0 : }
248 0 : prefix := valuePrefix(v[0])
249 0 : if isValueHandle(prefix) {
250 0 : return errors.Errorf("value prefix is incorrect")
251 0 : }
252 0 : if setHasSamePrefix(prefix) {
253 0 : return errors.Errorf("multiple keys with same key prefix")
254 0 : }
255 : }
256 0 : bw.add(scratch, v)
257 0 : if output[i].start.UserKey == nil {
258 0 : keyAlloc, output[i].start = cloneKeyWithBuf(scratch, keyAlloc)
259 0 : }
260 : }
261 0 : *iter = iter.resetForReuse()
262 0 :
263 0 : keyAlloc, output[i].end = cloneKeyWithBuf(scratch, keyAlloc)
264 0 :
265 0 : finished := compressAndChecksum(bw.finish(), compression, &buf)
266 0 :
267 0 : // copy our finished block into the output buffer.
268 0 : blockAlloc, output[i].data = blockAlloc.Alloc(len(finished) + blockTrailerLen)
269 0 : copy(output[i].data, finished)
270 0 : copy(output[i].data[len(finished):], buf.tmp[:blockTrailerLen])
271 : }
272 0 : return nil
273 : }
274 :
275 : func rewriteDataBlocksToWriter(
276 : r *Reader,
277 : w *Writer,
278 : data []BlockHandleWithProperties,
279 : from, to []byte,
280 : split Split,
281 : concurrency int,
282 0 : ) error {
283 0 : if r.Properties.NumEntries == 0 {
284 0 : // No point keys.
285 0 : return nil
286 0 : }
287 0 : blocks := make([]blockWithSpan, len(data))
288 0 :
289 0 : if w.filter != nil {
290 0 : if r.Properties.FilterPolicyName != w.filter.policyName() {
291 0 : return errors.New("mismatched filters")
292 0 : }
293 0 : if was, is := r.Properties.ComparerName, w.props.ComparerName; was != is {
294 0 : return errors.Errorf("mismatched Comparer %s vs %s, replacement requires same splitter to copy filters", was, is)
295 0 : }
296 : }
297 :
298 0 : g := &sync.WaitGroup{}
299 0 : g.Add(concurrency)
300 0 : errCh := make(chan error, concurrency)
301 0 : for i := 0; i < concurrency; i++ {
302 0 : worker := i
303 0 : go func() {
304 0 : defer g.Done()
305 0 : err := rewriteBlocks(
306 0 : r,
307 0 : w.dataBlockBuf.dataBlock.restartInterval,
308 0 : w.blockBuf.checksummer.checksumType,
309 0 : w.compression,
310 0 : data,
311 0 : blocks,
312 0 : concurrency,
313 0 : worker,
314 0 : from, to,
315 0 : split,
316 0 : )
317 0 : if err != nil {
318 0 : errCh <- err
319 0 : }
320 : }()
321 : }
322 0 : g.Wait()
323 0 : close(errCh)
324 0 : if err, ok := <-errCh; ok {
325 0 : return err
326 0 : }
327 :
328 0 : var decoder blockPropertiesDecoder
329 0 : var oldShortIDs []shortID
330 0 : var oldProps [][]byte
331 0 : if len(w.blockPropCollectors) > 0 {
332 0 : oldProps = make([][]byte, len(w.blockPropCollectors))
333 0 : oldShortIDs = make([]shortID, math.MaxUint8)
334 0 : for i, p := range w.blockPropCollectors {
335 0 : if prop, ok := r.Properties.UserProperties[p.Name()]; ok {
336 0 : was, is := shortID(byte(prop[0])), shortID(i)
337 0 : oldShortIDs[was] = is
338 0 : }
339 : }
340 : }
341 :
342 0 : for i := range blocks {
343 0 : // Write the rewritten block to the file.
344 0 : if err := w.writable.Write(blocks[i].data); err != nil {
345 0 : return err
346 0 : }
347 :
348 0 : n := len(blocks[i].data)
349 0 : bh := BlockHandle{Offset: w.meta.Size, Length: uint64(n) - blockTrailerLen}
350 0 : // Update the overall size.
351 0 : w.meta.Size += uint64(n)
352 0 :
353 0 : // Load any previous values for our prop collectors into oldProps.
354 0 : for i := range oldProps {
355 0 : oldProps[i] = nil
356 0 : }
357 0 : decoder.props = data[i].Props
358 0 : for !decoder.done() {
359 0 : id, val, err := decoder.next()
360 0 : if err != nil {
361 0 : return err
362 0 : }
363 0 : oldProps[oldShortIDs[id]] = val
364 : }
365 :
366 0 : for i, p := range w.blockPropCollectors {
367 0 : if err := p.(SuffixReplaceableBlockCollector).UpdateKeySuffixes(oldProps[i], from, to); err != nil {
368 0 : return err
369 0 : }
370 : }
371 :
372 0 : bhp, err := w.maybeAddBlockPropertiesToBlockHandle(bh)
373 0 : if err != nil {
374 0 : return err
375 0 : }
376 0 : var nextKey InternalKey
377 0 : if i+1 < len(blocks) {
378 0 : nextKey = blocks[i+1].start
379 0 : }
380 0 : if err = w.addIndexEntrySync(blocks[i].end, nextKey, bhp, w.dataBlockBuf.tmp[:]); err != nil {
381 0 : return err
382 0 : }
383 : }
384 :
385 0 : w.meta.updateSeqNum(blocks[0].start.SeqNum())
386 0 : w.props.NumEntries = r.Properties.NumEntries
387 0 : w.props.RawKeySize = r.Properties.RawKeySize
388 0 : w.props.RawValueSize = r.Properties.RawValueSize
389 0 : w.meta.SetSmallestPointKey(blocks[0].start)
390 0 : w.meta.SetLargestPointKey(blocks[len(blocks)-1].end)
391 0 : return nil
392 : }
393 :
394 0 : func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
395 0 : iter, err := r.NewRawRangeKeyIter()
396 0 : if err != nil {
397 0 : return err
398 0 : }
399 0 : if iter == nil {
400 0 : // No range keys.
401 0 : return nil
402 0 : }
403 0 : defer iter.Close()
404 0 :
405 0 : s, err := iter.First()
406 0 : for ; s != nil; s, err = iter.Next() {
407 0 : if !s.Valid() {
408 0 : break
409 : }
410 0 : for i := range s.Keys {
411 0 : if s.Keys[i].Kind() != base.InternalKeyKindRangeKeySet {
412 0 : return errBadKind
413 0 : }
414 0 : if !bytes.Equal(s.Keys[i].Suffix, from) {
415 0 : return errors.Errorf("key has suffix %q, expected %q", s.Keys[i].Suffix, from)
416 0 : }
417 0 : s.Keys[i].Suffix = to
418 : }
419 :
420 0 : err = rangekey.Encode(s, func(k base.InternalKey, v []byte) error {
421 0 : // Calling AddRangeKey instead of addRangeKeySpan bypasses the fragmenter.
422 0 : // This is okay because the raw fragments off of `iter` are already
423 0 : // fragmented, and suffix replacement should not affect fragmentation.
424 0 : return w.AddRangeKey(k, v)
425 0 : })
426 0 : if err != nil {
427 0 : return err
428 0 : }
429 : }
430 0 : return err
431 : }
432 :
433 : type copyFilterWriter struct {
434 : origMetaName string
435 : origPolicyName string
436 : data []byte
437 : }
438 :
439 0 : func (copyFilterWriter) addKey(key []byte) { panic("unimplemented") }
440 0 : func (c copyFilterWriter) finish() ([]byte, error) { return c.data, nil }
441 0 : func (c copyFilterWriter) metaName() string { return c.origMetaName }
442 0 : func (c copyFilterWriter) policyName() string { return c.origPolicyName }
443 :
444 : // RewriteKeySuffixesViaWriter is similar to RewriteKeySuffixes but uses just a
445 : // single loop over the Reader that writes each key to the Writer with the new
446 : // suffix. The is significantly slower than the parallelized rewriter, and does
447 : // more work to rederive filters, props, etc.
448 : //
449 : // Any obsolete bits that key-value pairs may be annotated with are ignored
450 : // and lost during the rewrite. Some of the obsolete bits may be recreated --
451 : // specifically when there are multiple keys with the same user key.
452 : // Additionally, the output sstable has the pebble.obsolete.is_strict property
453 : // set to false. See the longer comment at RewriteKeySuffixesAndReturnFormat.
454 : func RewriteKeySuffixesViaWriter(
455 : r *Reader, out objstorage.Writable, o WriterOptions, from, to []byte,
456 0 : ) (*WriterMetadata, error) {
457 0 : if o.Comparer == nil || o.Comparer.Split == nil {
458 0 : return nil, errors.New("a valid splitter is required to rewrite suffixes")
459 0 : }
460 :
461 0 : o.IsStrictObsolete = false
462 0 : w := NewWriter(out, o)
463 0 : defer func() {
464 0 : if w != nil {
465 0 : w.Close()
466 0 : }
467 : }()
468 0 : i, err := r.NewIter(nil, nil)
469 0 : if err != nil {
470 0 : return nil, err
471 0 : }
472 0 : defer i.Close()
473 0 :
474 0 : k, v := i.First()
475 0 : var scratch InternalKey
476 0 : for k != nil {
477 0 : if k.Kind() != InternalKeyKindSet {
478 0 : return nil, errors.New("invalid key type")
479 0 : }
480 0 : oldSuffix := k.UserKey[r.Split(k.UserKey):]
481 0 : if !bytes.Equal(oldSuffix, from) {
482 0 : return nil, errors.Errorf("key has suffix %q, expected %q", oldSuffix, from)
483 0 : }
484 0 : scratch.UserKey = append(scratch.UserKey[:0], k.UserKey[:len(k.UserKey)-len(from)]...)
485 0 : scratch.UserKey = append(scratch.UserKey, to...)
486 0 : scratch.Trailer = k.Trailer
487 0 :
488 0 : val, _, err := v.Value(nil)
489 0 : if err != nil {
490 0 : return nil, err
491 0 : }
492 0 : if w.addPoint(scratch, val, false); err != nil {
493 0 : return nil, err
494 0 : }
495 0 : k, v = i.Next()
496 : }
497 0 : if err := rewriteRangeKeyBlockToWriter(r, w, from, to); err != nil {
498 0 : return nil, err
499 0 : }
500 0 : if err := w.Close(); err != nil {
501 0 : w = nil
502 0 : return nil, err
503 0 : }
504 0 : writerMeta, err := w.Metadata()
505 0 : w = nil
506 0 : return writerMeta, err
507 : }
508 :
509 : // NewMemReader opens a reader over the SST stored in the passed []byte.
510 0 : func NewMemReader(sst []byte, o ReaderOptions) (*Reader, error) {
511 0 : return NewReader(newMemReader(sst), o)
512 0 : }
513 :
514 0 : func readBlockBuf(r *Reader, bh BlockHandle, buf []byte) ([]byte, []byte, error) {
515 0 : raw := r.readable.(*memReader).b[bh.Offset : bh.Offset+bh.Length+blockTrailerLen]
516 0 : if err := checkChecksum(r.checksumType, raw, bh, 0); err != nil {
517 0 : return nil, buf, err
518 0 : }
519 0 : typ := blockType(raw[bh.Length])
520 0 : raw = raw[:bh.Length]
521 0 : if typ == noCompressionBlockType {
522 0 : return raw, buf, nil
523 0 : }
524 0 : decompressedLen, prefix, err := decompressedLen(typ, raw)
525 0 : if err != nil {
526 0 : return nil, buf, err
527 0 : }
528 0 : if cap(buf) < decompressedLen {
529 0 : buf = make([]byte, decompressedLen)
530 0 : }
531 0 : res, err := decompressInto(typ, raw[prefix:], buf[:decompressedLen])
532 0 : return res, buf, err
533 : }
534 :
535 : // memReader is a thin wrapper around a []byte such that it can be passed to
536 : // sstable.Reader. It supports concurrent use, and does so without locking in
537 : // contrast to the heavier read/write vfs.MemFile.
538 : type memReader struct {
539 : b []byte
540 : r *bytes.Reader
541 : rh objstorage.NoopReadHandle
542 : }
543 :
544 : var _ objstorage.Readable = (*memReader)(nil)
545 :
546 0 : func newMemReader(b []byte) *memReader {
547 0 : r := &memReader{
548 0 : b: b,
549 0 : r: bytes.NewReader(b),
550 0 : }
551 0 : r.rh = objstorage.MakeNoopReadHandle(r)
552 0 : return r
553 0 : }
554 :
555 : // ReadAt is part of objstorage.Readable.
556 0 : func (m *memReader) ReadAt(_ context.Context, p []byte, off int64) error {
557 0 : n, err := m.r.ReadAt(p, off)
558 0 : if invariants.Enabled && err == nil && n != len(p) {
559 0 : panic("short read")
560 : }
561 0 : return err
562 : }
563 :
564 : // Close is part of objstorage.Readable.
565 0 : func (*memReader) Close() error {
566 0 : return nil
567 0 : }
568 :
569 : // Stat is part of objstorage.Readable.
570 0 : func (m *memReader) Size() int64 {
571 0 : return int64(len(m.b))
572 0 : }
573 :
574 : // NewReadHandle is part of objstorage.Readable.
575 0 : func (m *memReader) NewReadHandle(_ context.Context) objstorage.ReadHandle {
576 0 : return &m.rh
577 0 : }
|