Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 1 : ) (*sstable.WriterMetadata, error) {
38 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 1 : writerOpts.DisableValueBlocks = true
41 1 : }
42 1 : w := sstable.NewWriter(writable, writerOpts)
43 1 : pointIterCloser := base.CloseHelper(pointIter)
44 1 : defer func() {
45 1 : _ = pointIterCloser.Close()
46 1 : if rangeDelIter != nil {
47 0 : rangeDelIter.Close()
48 0 : }
49 1 : if rangeKeyIter != nil {
50 0 : rangeKeyIter.Close()
51 0 : }
52 : }()
53 :
54 1 : outputKey := func(key []byte, syntheticSuffix sstable.SyntheticSuffix) []byte {
55 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
56 1 : return slices.Clone(key)
57 1 : }
58 1 : if syntheticPrefix.IsSet() {
59 1 : key = syntheticPrefix.Apply(key)
60 1 : }
61 1 : if syntheticSuffix.IsSet() {
62 1 : n := t.opts.Comparer.Split(key)
63 1 : key = append(key[:n:n], syntheticSuffix...)
64 1 : }
65 1 : return key
66 : }
67 :
68 1 : var lastUserKey []byte
69 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
70 1 : // Ignore duplicate keys.
71 1 : if lastUserKey != nil {
72 1 : last := lastUserKey
73 1 : this := kv.K.UserKey
74 1 : if uniquePrefixes {
75 1 : last = last[:t.opts.Comparer.Split(last)]
76 1 : this = this[:t.opts.Comparer.Split(this)]
77 1 : }
78 1 : if t.opts.Comparer.Equal(last, this) {
79 1 : continue
80 : }
81 : }
82 1 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
83 1 :
84 1 : k := *kv
85 1 : k.K.SetSeqNum(base.SeqNumZero)
86 1 : k.K.UserKey = outputKey(k.K.UserKey, syntheticSuffix)
87 1 : value := kv.V
88 1 : // It's possible that we wrote the key on a batch from a db that supported
89 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
90 1 : // case and translate the key to an InternalKeyKindDelete.
91 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
92 1 : value = pebble.LazyValue{}
93 1 : k.K.SetKind(pebble.InternalKeyKindDelete)
94 1 : }
95 1 : valBytes, _, err := value.Value(nil)
96 1 : if err != nil {
97 0 : return nil, err
98 0 : }
99 1 : if err := w.Raw().AddWithForceObsolete(k.K, valBytes, false); err != nil {
100 0 : return nil, err
101 0 : }
102 : }
103 1 : if err := pointIterCloser.Close(); err != nil {
104 0 : return nil, err
105 0 : }
106 :
107 1 : if rangeDelIter != nil {
108 1 : span, err := rangeDelIter.First()
109 1 : for ; span != nil; span, err = rangeDelIter.Next() {
110 1 : if syntheticSuffix.IsSet() {
111 0 : panic("synthetic suffix with RangeDel")
112 : }
113 1 : if err := w.DeleteRange(outputKey(span.Start, nil), outputKey(span.End, nil)); err != nil {
114 0 : return nil, err
115 0 : }
116 : }
117 1 : if err != nil {
118 0 : return nil, err
119 0 : }
120 1 : rangeDelIter.Close()
121 1 : rangeDelIter = nil
122 : }
123 :
124 1 : if rangeKeyIter != nil {
125 1 : span, err := rangeKeyIter.First()
126 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
127 1 : // Coalesce the keys of this span and then zero the sequence
128 1 : // numbers. This is necessary in order to make the range keys within
129 1 : // the ingested sstable internally consistent at the sequence number
130 1 : // it's ingested at. The individual keys within a batch are
131 1 : // committed at unique sequence numbers, whereas all the keys of an
132 1 : // ingested sstable are given the same sequence number. A span
133 1 : // containing keys that both set and unset the same suffix at the
134 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
135 1 : // the keys.
136 1 : collapsed := keyspan.Span{
137 1 : Start: outputKey(span.Start, nil),
138 1 : End: outputKey(span.End, nil),
139 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
140 1 : }
141 1 : keys := span.Keys
142 1 : if syntheticSuffix.IsSet() {
143 0 : keys = slices.Clone(span.Keys)
144 0 : for i := range keys {
145 0 : if keys[i].Kind() == base.InternalKeyKindRangeKeyUnset {
146 0 : panic("RangeKeyUnset with synthetic suffix")
147 : }
148 0 : if len(keys[i].Suffix) > 0 {
149 0 : keys[i].Suffix = syntheticSuffix
150 0 : }
151 : }
152 : }
153 1 : rangekey.Coalesce(t.opts.Comparer.CompareRangeSuffixes, keys, &collapsed.Keys)
154 1 : for i := range collapsed.Keys {
155 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
156 1 : }
157 1 : keyspan.SortKeysByTrailer(collapsed.Keys)
158 1 : if err := w.Raw().EncodeSpan(collapsed); err != nil {
159 0 : return nil, err
160 0 : }
161 : }
162 1 : if err != nil {
163 0 : return nil, err
164 0 : }
165 1 : rangeKeyIter.Close()
166 1 : rangeKeyIter = nil
167 : }
168 :
169 1 : if err := w.Close(); err != nil {
170 0 : return nil, err
171 0 : }
172 1 : sstMeta, err := w.Raw().Metadata()
173 1 : if err != nil {
174 0 : return nil, err
175 0 : }
176 1 : return sstMeta, nil
177 : }
178 :
179 : // buildForIngest builds a local SST file containing the keys in the given batch
180 : // and returns its path and metadata.
181 : func buildForIngest(
182 : t *Test, dbID objID, b *pebble.Batch, i int,
183 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
184 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
185 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
186 1 : if err != nil {
187 0 : return "", nil, err
188 0 : }
189 1 : db := t.getDB(dbID)
190 1 :
191 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
192 1 :
193 1 : writable := objstorageprovider.NewFileWritable(f)
194 1 : meta, err := writeSSTForIngestion(
195 1 : t,
196 1 : iter, rangeDelIter, rangeKeyIter,
197 1 : false, /* uniquePrefixes */
198 1 : nil, /* syntheticSuffix */
199 1 : nil, /* syntheticPrefix */
200 1 : writable,
201 1 : db.FormatMajorVersion(),
202 1 : )
203 1 : return path, meta, err
204 : }
205 :
206 : // buildForIngest builds a local SST file containing the keys in the given
207 : // external object (truncated to the given bounds) and returns its path and
208 : // metadata.
209 : func buildForIngestExternalEmulation(
210 : t *Test,
211 : dbID objID,
212 : externalObjID objID,
213 : bounds pebble.KeyRange,
214 : syntheticSuffix sstable.SyntheticSuffix,
215 : syntheticPrefix sstable.SyntheticPrefix,
216 : i int,
217 1 : ) (path string, _ *sstable.WriterMetadata) {
218 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
219 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
220 1 : panicIfErr(err)
221 1 :
222 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
223 1 : defer reader.Close()
224 1 :
225 1 : writable := objstorageprovider.NewFileWritable(f)
226 1 : // The underlying file should already have unique prefixes. Plus we are
227 1 : // emulating the external ingestion path which won't remove duplicate prefixes
228 1 : // if they exist.
229 1 : const uniquePrefixes = false
230 1 : meta, err := writeSSTForIngestion(
231 1 : t,
232 1 : pointIter, rangeDelIter, rangeKeyIter,
233 1 : uniquePrefixes,
234 1 : syntheticSuffix,
235 1 : syntheticPrefix,
236 1 : writable,
237 1 : t.minFMV(),
238 1 : )
239 1 : if err != nil {
240 0 : panic(err)
241 : }
242 1 : return path, meta
243 : }
244 :
245 : func openExternalObj(
246 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
247 : ) (
248 : reader *sstable.Reader,
249 : pointIter base.InternalIterator,
250 : rangeDelIter keyspan.FragmentIterator,
251 : rangeKeyIter keyspan.FragmentIterator,
252 1 : ) {
253 1 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
254 1 : panicIfErr(err)
255 1 : opts := t.opts.MakeReaderOptions()
256 1 : reader, err = sstable.NewReader(
257 1 : context.Background(),
258 1 : objstorageprovider.NewRemoteReadable(objReader, objSize),
259 1 : opts,
260 1 : )
261 1 : panicIfErr(err)
262 1 :
263 1 : start := bounds.Start
264 1 : end := bounds.End
265 1 : if syntheticPrefix.IsSet() {
266 1 : start = syntheticPrefix.Invert(start)
267 1 : end = syntheticPrefix.Invert(end)
268 1 : }
269 1 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
270 1 : panicIfErr(err)
271 1 :
272 1 : rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
273 1 : panicIfErr(err)
274 1 : if rangeDelIter != nil {
275 1 : rangeDelIter = keyspan.Truncate(
276 1 : t.opts.Comparer.Compare,
277 1 : rangeDelIter,
278 1 : base.UserKeyBoundsEndExclusive(start, end),
279 1 : )
280 1 : }
281 :
282 1 : rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms)
283 1 : panicIfErr(err)
284 1 : if rangeKeyIter != nil {
285 0 : rangeKeyIter = keyspan.Truncate(
286 0 : t.opts.Comparer.Compare,
287 0 : rangeKeyIter,
288 0 : base.UserKeyBoundsEndExclusive(start, end),
289 0 : )
290 0 : }
291 1 : return reader, pointIter, rangeDelIter, rangeKeyIter
292 : }
293 :
294 1 : func panicIfErr(err error) {
295 1 : if err != nil {
296 0 : panic(err)
297 : }
298 : }
|