Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/errors"
13 : "github.com/cockroachdb/pebble"
14 : "github.com/cockroachdb/pebble/internal/base"
15 : "github.com/cockroachdb/pebble/internal/keyspan"
16 : "github.com/cockroachdb/pebble/internal/private"
17 : "github.com/cockroachdb/pebble/internal/rangekey"
18 : "github.com/cockroachdb/pebble/objstorage"
19 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
20 : "github.com/cockroachdb/pebble/sstable"
21 : "github.com/cockroachdb/pebble/vfs"
22 : )
23 :
24 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
25 : // as an external file. Returns the sstable metadata.
26 : //
27 : // Closes the iterators in all cases.
28 : func writeSSTForIngestion(
29 : t *Test,
30 : pointIter base.InternalIterator,
31 : rangeDelIter keyspan.FragmentIterator,
32 : rangeKeyIter keyspan.FragmentIterator,
33 : uniquePrefixes bool,
34 : syntheticSuffix sstable.SyntheticSuffix,
35 : syntheticPrefix sstable.SyntheticPrefix,
36 : writable objstorage.Writable,
37 : targetFMV pebble.FormatMajorVersion,
38 1 : ) (*sstable.WriterMetadata, error) {
39 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
40 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
41 1 : writerOpts.DisableValueBlocks = true
42 1 : }
43 1 : w := sstable.NewWriter(writable, writerOpts)
44 1 : pointIterCloser := base.CloseHelper(pointIter)
45 1 : defer func() {
46 1 : _ = pointIterCloser.Close()
47 1 : if rangeDelIter != nil {
48 0 : rangeDelIter.Close()
49 0 : }
50 1 : if rangeKeyIter != nil {
51 0 : rangeKeyIter.Close()
52 0 : }
53 : }()
54 :
55 1 : outputKey := func(key []byte, syntheticSuffix sstable.SyntheticSuffix) []byte {
56 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
57 1 : return slices.Clone(key)
58 1 : }
59 1 : if syntheticPrefix.IsSet() {
60 1 : key = syntheticPrefix.Apply(key)
61 1 : }
62 1 : if syntheticSuffix.IsSet() {
63 1 : n := t.opts.Comparer.Split(key)
64 1 : key = append(key[:n:n], syntheticSuffix...)
65 1 : }
66 1 : return key
67 : }
68 :
69 1 : var lastUserKey []byte
70 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
71 1 : // Ignore duplicate keys.
72 1 : if lastUserKey != nil {
73 1 : last := lastUserKey
74 1 : this := kv.K.UserKey
75 1 : if uniquePrefixes {
76 1 : last = last[:t.opts.Comparer.Split(last)]
77 1 : this = this[:t.opts.Comparer.Split(this)]
78 1 : }
79 1 : if t.opts.Comparer.Equal(last, this) {
80 0 : continue
81 : }
82 : }
83 1 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
84 1 :
85 1 : k := *kv
86 1 : k.K.SetSeqNum(base.SeqNumZero)
87 1 : k.K.UserKey = outputKey(k.K.UserKey, syntheticSuffix)
88 1 : value := kv.LazyValue()
89 1 : // It's possible that we wrote the key on a batch from a db that supported
90 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
91 1 : // case and translate the key to an InternalKeyKindDelete.
92 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
93 0 : value = pebble.LazyValue{}
94 0 : k.K.SetKind(pebble.InternalKeyKindDelete)
95 0 : }
96 1 : valBytes, _, err := value.Value(nil)
97 1 : if err != nil {
98 0 : return nil, err
99 0 : }
100 1 : t.opts.Comparer.ValidateKey.MustValidate(k.K.UserKey)
101 1 : if err := w.Raw().Add(k.K, valBytes, false); err != nil {
102 0 : return nil, err
103 0 : }
104 : }
105 1 : if err := pointIterCloser.Close(); err != nil {
106 0 : return nil, err
107 0 : }
108 :
109 1 : if rangeDelIter != nil {
110 1 : span, err := rangeDelIter.First()
111 1 : for ; span != nil; span, err = rangeDelIter.Next() {
112 1 : if syntheticSuffix.IsSet() {
113 0 : panic("synthetic suffix with RangeDel")
114 : }
115 1 : start := outputKey(span.Start, nil)
116 1 : end := outputKey(span.End, nil)
117 1 : t.opts.Comparer.ValidateKey.MustValidate(start)
118 1 : t.opts.Comparer.ValidateKey.MustValidate(end)
119 1 : if err := w.DeleteRange(start, end); err != nil {
120 0 : return nil, err
121 0 : }
122 : }
123 1 : if err != nil {
124 0 : return nil, err
125 0 : }
126 1 : rangeDelIter.Close()
127 1 : rangeDelIter = nil
128 : }
129 :
130 1 : if rangeKeyIter != nil {
131 1 : span, err := rangeKeyIter.First()
132 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
133 1 : // Coalesce the keys of this span and then zero the sequence
134 1 : // numbers. This is necessary in order to make the range keys within
135 1 : // the ingested sstable internally consistent at the sequence number
136 1 : // it's ingested at. The individual keys within a batch are
137 1 : // committed at unique sequence numbers, whereas all the keys of an
138 1 : // ingested sstable are given the same sequence number. A span
139 1 : // containing keys that both set and unset the same suffix at the
140 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
141 1 : // the keys.
142 1 : collapsed := keyspan.Span{
143 1 : Start: outputKey(span.Start, nil),
144 1 : End: outputKey(span.End, nil),
145 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
146 1 : }
147 1 : t.opts.Comparer.ValidateKey.MustValidate(collapsed.Start)
148 1 : t.opts.Comparer.ValidateKey.MustValidate(collapsed.End)
149 1 : keys := span.Keys
150 1 : if syntheticSuffix.IsSet() {
151 0 : keys = slices.Clone(span.Keys)
152 0 : for i := range keys {
153 0 : if keys[i].Kind() == base.InternalKeyKindRangeKeyUnset {
154 0 : panic("RangeKeyUnset with synthetic suffix")
155 : }
156 0 : if len(keys[i].Suffix) > 0 {
157 0 : keys[i].Suffix = syntheticSuffix
158 0 : }
159 : }
160 : }
161 1 : rangekey.Coalesce(t.opts.Comparer.CompareRangeSuffixes, keys, &collapsed.Keys)
162 1 : for i := range collapsed.Keys {
163 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
164 1 : }
165 1 : keyspan.SortKeysByTrailer(collapsed.Keys)
166 1 : if err := w.Raw().EncodeSpan(collapsed); err != nil {
167 0 : return nil, err
168 0 : }
169 : }
170 1 : if err != nil {
171 0 : return nil, err
172 0 : }
173 1 : rangeKeyIter.Close()
174 1 : rangeKeyIter = nil
175 : }
176 :
177 1 : if err := w.Close(); err != nil {
178 0 : return nil, err
179 0 : }
180 1 : sstMeta, err := w.Raw().Metadata()
181 1 : if err != nil {
182 0 : return nil, err
183 0 : }
184 1 : return sstMeta, nil
185 : }
186 :
187 : // buildForIngest builds a local SST file containing the keys in the given batch
188 : // and returns its path and metadata.
189 : func buildForIngest(
190 : t *Test, dbID objID, b *pebble.Batch, i int,
191 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
192 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
193 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
194 1 : if err != nil {
195 0 : return "", nil, err
196 0 : }
197 1 : db := t.getDB(dbID)
198 1 :
199 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
200 1 :
201 1 : writable := objstorageprovider.NewFileWritable(f)
202 1 : meta, err := writeSSTForIngestion(
203 1 : t,
204 1 : iter, rangeDelIter, rangeKeyIter,
205 1 : false, /* uniquePrefixes */
206 1 : nil, /* syntheticSuffix */
207 1 : nil, /* syntheticPrefix */
208 1 : writable,
209 1 : db.FormatMajorVersion(),
210 1 : )
211 1 : return path, meta, err
212 : }
213 :
214 : // buildForIngest builds a local SST file containing the keys in the given
215 : // external object (truncated to the given bounds) and returns its path and
216 : // metadata.
217 : func buildForIngestExternalEmulation(
218 : t *Test,
219 : dbID objID,
220 : externalObjID objID,
221 : bounds pebble.KeyRange,
222 : syntheticSuffix sstable.SyntheticSuffix,
223 : syntheticPrefix sstable.SyntheticPrefix,
224 : i int,
225 1 : ) (path string, _ *sstable.WriterMetadata) {
226 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
227 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
228 1 : panicIfErr(err)
229 1 :
230 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
231 1 : defer func() { _ = reader.Close() }()
232 :
233 1 : writable := objstorageprovider.NewFileWritable(f)
234 1 : // The underlying file should already have unique prefixes. Plus we are
235 1 : // emulating the external ingestion path which won't remove duplicate prefixes
236 1 : // if they exist.
237 1 : const uniquePrefixes = false
238 1 : meta, err := writeSSTForIngestion(
239 1 : t,
240 1 : pointIter, rangeDelIter, rangeKeyIter,
241 1 : uniquePrefixes,
242 1 : syntheticSuffix,
243 1 : syntheticPrefix,
244 1 : writable,
245 1 : t.minFMV(),
246 1 : )
247 1 : if err != nil {
248 0 : panic(err)
249 : }
250 1 : return path, meta
251 : }
252 :
253 : func openExternalObj(
254 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
255 : ) (
256 : reader *sstable.Reader,
257 : pointIter base.InternalIterator,
258 : rangeDelIter keyspan.FragmentIterator,
259 : rangeKeyIter keyspan.FragmentIterator,
260 1 : ) {
261 1 : objMeta := t.getExternalObj(externalObjID)
262 1 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), objMeta.objName)
263 1 : panicIfErr(err)
264 1 : opts := t.opts.MakeReaderOptions()
265 1 : reader, err = sstable.NewReader(
266 1 : context.Background(),
267 1 : objstorageprovider.NewRemoteReadable(objReader, objSize),
268 1 : opts,
269 1 : )
270 1 : if err != nil {
271 0 : panic(errors.CombineErrors(err, objReader.Close()))
272 : }
273 :
274 1 : start := bounds.Start
275 1 : end := bounds.End
276 1 : if syntheticPrefix.IsSet() {
277 1 : start = syntheticPrefix.Invert(start)
278 1 : end = syntheticPrefix.Invert(end)
279 1 : }
280 1 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end, sstable.AssertNoBlobHandles)
281 1 : panicIfErr(err)
282 1 :
283 1 : rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms, sstable.NoReadEnv)
284 1 : panicIfErr(err)
285 1 : if rangeDelIter != nil {
286 1 : rangeDelIter = keyspan.Truncate(
287 1 : t.opts.Comparer.Compare,
288 1 : rangeDelIter,
289 1 : base.UserKeyBoundsEndExclusive(start, end),
290 1 : )
291 1 : }
292 :
293 1 : rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms, sstable.NoReadEnv)
294 1 : panicIfErr(err)
295 1 : if rangeKeyIter != nil {
296 0 : rangeKeyIter = keyspan.Truncate(
297 0 : t.opts.Comparer.Compare,
298 0 : rangeKeyIter,
299 0 : base.UserKeyBoundsEndExclusive(start, end),
300 0 : )
301 0 : }
302 1 : return reader, pointIter, rangeDelIter, rangeKeyIter
303 : }
304 :
305 1 : func panicIfErr(err error) {
306 1 : if err != nil {
307 0 : panic(err)
308 : }
309 : }
|