Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 2 : ) (*sstable.WriterMetadata, error) {
38 2 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 2 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 2 : writerOpts.DisableValueBlocks = true
41 2 : }
42 2 : w := sstable.NewWriter(writable, writerOpts)
43 2 : pointIterCloser := base.CloseHelper(pointIter)
44 2 : defer func() {
45 2 : _ = pointIterCloser.Close()
46 2 : if rangeDelIter != nil {
47 0 : rangeDelIter.Close()
48 0 : }
49 2 : if rangeKeyIter != nil {
50 0 : rangeKeyIter.Close()
51 0 : }
52 : }()
53 :
54 2 : outputKey := func(key []byte, syntheticSuffix sstable.SyntheticSuffix) []byte {
55 2 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
56 2 : return slices.Clone(key)
57 2 : }
58 2 : if syntheticPrefix.IsSet() {
59 2 : key = syntheticPrefix.Apply(key)
60 2 : }
61 2 : if syntheticSuffix.IsSet() {
62 2 : n := t.opts.Comparer.Split(key)
63 2 : key = append(key[:n:n], syntheticSuffix...)
64 2 : }
65 2 : return key
66 : }
67 :
68 2 : var lastUserKey []byte
69 2 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
70 2 : // Ignore duplicate keys.
71 2 : if lastUserKey != nil {
72 2 : last := lastUserKey
73 2 : this := kv.K.UserKey
74 2 : if uniquePrefixes {
75 2 : last = last[:t.opts.Comparer.Split(last)]
76 2 : this = this[:t.opts.Comparer.Split(this)]
77 2 : }
78 2 : if t.opts.Comparer.Equal(last, this) {
79 2 : continue
80 : }
81 : }
82 2 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
83 2 :
84 2 : k := *kv
85 2 : k.K.SetSeqNum(base.SeqNumZero)
86 2 : k.K.UserKey = outputKey(k.K.UserKey, syntheticSuffix)
87 2 : value := kv.V
88 2 : // It's possible that we wrote the key on a batch from a db that supported
89 2 : // DeleteSized, but will be ingesting into a db that does not. Detect this
90 2 : // case and translate the key to an InternalKeyKindDelete.
91 2 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
92 1 : value = pebble.LazyValue{}
93 1 : k.K.SetKind(pebble.InternalKeyKindDelete)
94 1 : }
95 2 : valBytes, _, err := value.Value(nil)
96 2 : if err != nil {
97 0 : return nil, err
98 0 : }
99 2 : if err := w.Raw().Add(k.K, valBytes); err != nil {
100 0 : return nil, err
101 0 : }
102 : }
103 2 : if err := pointIterCloser.Close(); err != nil {
104 0 : return nil, err
105 0 : }
106 :
107 2 : if rangeDelIter != nil {
108 2 : span, err := rangeDelIter.First()
109 2 : for ; span != nil; span, err = rangeDelIter.Next() {
110 2 : if syntheticSuffix.IsSet() {
111 0 : panic("synthetic suffix with RangeDel")
112 : }
113 2 : if err := w.DeleteRange(outputKey(span.Start, nil), outputKey(span.End, nil)); err != nil {
114 0 : return nil, err
115 0 : }
116 : }
117 2 : if err != nil {
118 0 : return nil, err
119 0 : }
120 2 : rangeDelIter.Close()
121 2 : rangeDelIter = nil
122 : }
123 :
124 2 : if rangeKeyIter != nil {
125 2 : span, err := rangeKeyIter.First()
126 2 : for ; span != nil; span, err = rangeKeyIter.Next() {
127 2 : // Coalesce the keys of this span and then zero the sequence
128 2 : // numbers. This is necessary in order to make the range keys within
129 2 : // the ingested sstable internally consistent at the sequence number
130 2 : // it's ingested at. The individual keys within a batch are
131 2 : // committed at unique sequence numbers, whereas all the keys of an
132 2 : // ingested sstable are given the same sequence number. A span
133 2 : // containing keys that both set and unset the same suffix at the
134 2 : // same sequence number is nonsensical, so we "coalesce" or collapse
135 2 : // the keys.
136 2 : collapsed := keyspan.Span{
137 2 : Start: outputKey(span.Start, nil),
138 2 : End: outputKey(span.End, nil),
139 2 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
140 2 : }
141 2 : keys := span.Keys
142 2 : if syntheticSuffix.IsSet() {
143 0 : keys = slices.Clone(span.Keys)
144 0 : for i := range keys {
145 0 : if keys[i].Kind() == base.InternalKeyKindRangeKeyUnset {
146 0 : panic("RangeKeyUnset with synthetic suffix")
147 : }
148 0 : if len(keys[i].Suffix) > 0 {
149 0 : keys[i].Suffix = syntheticSuffix
150 0 : }
151 : }
152 : }
153 2 : rangekey.Coalesce(t.opts.Comparer.CompareSuffixes, keys, &collapsed.Keys)
154 2 : for i := range collapsed.Keys {
155 2 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
156 2 : }
157 2 : keyspan.SortKeysByTrailer(collapsed.Keys)
158 2 : if err := w.Raw().EncodeSpan(collapsed); err != nil {
159 0 : return nil, err
160 0 : }
161 : }
162 2 : if err != nil {
163 0 : return nil, err
164 0 : }
165 2 : rangeKeyIter.Close()
166 2 : rangeKeyIter = nil
167 : }
168 :
169 2 : if err := w.Close(); err != nil {
170 0 : return nil, err
171 0 : }
172 2 : sstMeta, err := w.Raw().Metadata()
173 2 : if err != nil {
174 0 : return nil, err
175 0 : }
176 2 : return sstMeta, nil
177 : }
178 :
179 : // buildForIngest builds a local SST file containing the keys in the given batch
180 : // and returns its path and metadata.
181 : func buildForIngest(
182 : t *Test, dbID objID, b *pebble.Batch, i int,
183 2 : ) (path string, _ *sstable.WriterMetadata, _ error) {
184 2 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
185 2 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
186 2 : if err != nil {
187 0 : return "", nil, err
188 0 : }
189 2 : db := t.getDB(dbID)
190 2 :
191 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
192 2 :
193 2 : writable := objstorageprovider.NewFileWritable(f)
194 2 : meta, err := writeSSTForIngestion(
195 2 : t,
196 2 : iter, rangeDelIter, rangeKeyIter,
197 2 : false, /* uniquePrefixes */
198 2 : nil, /* syntheticSuffix */
199 2 : nil, /* syntheticPrefix */
200 2 : writable,
201 2 : db.FormatMajorVersion(),
202 2 : )
203 2 : return path, meta, err
204 : }
205 :
206 : // buildForIngest builds a local SST file containing the keys in the given
207 : // external object (truncated to the given bounds) and returns its path and
208 : // metadata.
209 : func buildForIngestExternalEmulation(
210 : t *Test,
211 : dbID objID,
212 : externalObjID objID,
213 : bounds pebble.KeyRange,
214 : syntheticSuffix sstable.SyntheticSuffix,
215 : syntheticPrefix sstable.SyntheticPrefix,
216 : i int,
217 2 : ) (path string, _ *sstable.WriterMetadata) {
218 2 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
219 2 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
220 2 : panicIfErr(err)
221 2 :
222 2 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
223 2 : defer reader.Close()
224 2 :
225 2 : writable := objstorageprovider.NewFileWritable(f)
226 2 : // The underlying file should already have unique prefixes. Plus we are
227 2 : // emulating the external ingestion path which won't remove duplicate prefixes
228 2 : // if they exist.
229 2 : const uniquePrefixes = false
230 2 : meta, err := writeSSTForIngestion(
231 2 : t,
232 2 : pointIter, rangeDelIter, rangeKeyIter,
233 2 : uniquePrefixes,
234 2 : syntheticSuffix,
235 2 : syntheticPrefix,
236 2 : writable,
237 2 : t.minFMV(),
238 2 : )
239 2 : if err != nil {
240 0 : panic(err)
241 : }
242 2 : return path, meta
243 : }
244 :
245 : func openExternalObj(
246 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
247 : ) (
248 : reader *sstable.Reader,
249 : pointIter base.InternalIterator,
250 : rangeDelIter keyspan.FragmentIterator,
251 : rangeKeyIter keyspan.FragmentIterator,
252 2 : ) {
253 2 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
254 2 : panicIfErr(err)
255 2 : opts := sstable.ReaderOptions{
256 2 : Comparer: t.opts.Comparer,
257 2 : }
258 2 : reader, err = sstable.NewReader(
259 2 : context.Background(),
260 2 : objstorageprovider.NewRemoteReadable(objReader, objSize),
261 2 : opts,
262 2 : )
263 2 : panicIfErr(err)
264 2 :
265 2 : start := bounds.Start
266 2 : end := bounds.End
267 2 : if syntheticPrefix.IsSet() {
268 2 : start = syntheticPrefix.Invert(start)
269 2 : end = syntheticPrefix.Invert(end)
270 2 : }
271 2 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
272 2 : panicIfErr(err)
273 2 :
274 2 : rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
275 2 : panicIfErr(err)
276 2 : if rangeDelIter != nil {
277 1 : rangeDelIter = keyspan.Truncate(
278 1 : t.opts.Comparer.Compare,
279 1 : rangeDelIter,
280 1 : base.UserKeyBoundsEndExclusive(start, end),
281 1 : )
282 1 : }
283 :
284 2 : rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms)
285 2 : panicIfErr(err)
286 2 : if rangeKeyIter != nil {
287 0 : rangeKeyIter = keyspan.Truncate(
288 0 : t.opts.Comparer.Compare,
289 0 : rangeKeyIter,
290 0 : base.UserKeyBoundsEndExclusive(start, end),
291 0 : )
292 0 : }
293 2 : return reader, pointIter, rangeDelIter, rangeKeyIter
294 : }
295 :
296 2 : func panicIfErr(err error) {
297 2 : if err != nil {
298 0 : panic(err)
299 : }
300 : }
|