Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : )
21 :
22 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
23 : // as an external file. Returns the sstable metadata.
24 : //
25 : // Closes the iterators in all cases.
26 : func writeSSTForIngestion(
27 : t *Test,
28 : pointIter base.InternalIterator,
29 : rangeDelIter keyspan.FragmentIterator,
30 : rangeKeyIter keyspan.FragmentIterator,
31 : uniquePrefixes bool,
32 : syntheticSuffix sstable.SyntheticSuffix,
33 : syntheticPrefix sstable.SyntheticPrefix,
34 : writable objstorage.Writable,
35 : targetFMV pebble.FormatMajorVersion,
36 1 : ) (*sstable.WriterMetadata, error) {
37 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
38 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
39 1 : writerOpts.DisableValueBlocks = true
40 1 : }
41 1 : w := sstable.NewWriter(writable, writerOpts)
42 1 : pointIterCloser := base.CloseHelper(pointIter)
43 1 : defer pointIterCloser.Close()
44 1 : rangeDelIterCloser := base.CloseHelper(rangeDelIter)
45 1 : defer rangeDelIterCloser.Close()
46 1 : rangeKeyIterCloser := base.CloseHelper(rangeKeyIter)
47 1 : defer rangeKeyIterCloser.Close()
48 1 :
49 1 : outputKey := func(key []byte) []byte {
50 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
51 1 : return slices.Clone(key)
52 1 : }
53 1 : if syntheticPrefix.IsSet() {
54 1 : key = syntheticPrefix.Apply(key)
55 1 : }
56 1 : if syntheticSuffix.IsSet() {
57 1 : n := t.opts.Comparer.Split(key)
58 1 : key = append(key[:n:n], syntheticSuffix...)
59 1 : }
60 1 : return key
61 : }
62 :
63 1 : var lastUserKey []byte
64 1 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
65 1 : // Ignore duplicate keys.
66 1 : if lastUserKey != nil {
67 1 : last := lastUserKey
68 1 : this := key.UserKey
69 1 : if uniquePrefixes {
70 1 : last = last[:t.opts.Comparer.Split(last)]
71 1 : this = this[:t.opts.Comparer.Split(this)]
72 1 : }
73 1 : if t.opts.Comparer.Equal(last, this) {
74 1 : continue
75 : }
76 : }
77 1 : lastUserKey = append(lastUserKey[:0], key.UserKey...)
78 1 :
79 1 : key.SetSeqNum(base.SeqNumZero)
80 1 : // It's possible that we wrote the key on a batch from a db that supported
81 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
82 1 : // case and translate the key to an InternalKeyKindDelete.
83 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && key.Kind() == pebble.InternalKeyKindDeleteSized {
84 1 : value = pebble.LazyValue{}
85 1 : key.SetKind(pebble.InternalKeyKindDelete)
86 1 : }
87 1 : valBytes, _, err := value.Value(nil)
88 1 : if err != nil {
89 0 : return nil, err
90 0 : }
91 1 : k := *key
92 1 : k.UserKey = outputKey(k.UserKey)
93 1 : if err := w.Add(k, valBytes); err != nil {
94 0 : return nil, err
95 0 : }
96 : }
97 1 : if err := pointIterCloser.Close(); err != nil {
98 0 : return nil, err
99 0 : }
100 :
101 1 : if rangeDelIter != nil {
102 1 : span, err := rangeDelIter.First()
103 1 : for ; span != nil; span, err = rangeDelIter.Next() {
104 1 : if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
105 0 : return nil, err
106 0 : }
107 : }
108 1 : if err != nil {
109 0 : return nil, err
110 0 : }
111 1 : if err := rangeDelIterCloser.Close(); err != nil {
112 0 : return nil, err
113 0 : }
114 : }
115 :
116 1 : if rangeKeyIter != nil {
117 1 : span, err := rangeKeyIter.First()
118 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
119 1 : // Coalesce the keys of this span and then zero the sequence
120 1 : // numbers. This is necessary in order to make the range keys within
121 1 : // the ingested sstable internally consistent at the sequence number
122 1 : // it's ingested at. The individual keys within a batch are
123 1 : // committed at unique sequence numbers, whereas all the keys of an
124 1 : // ingested sstable are given the same sequence number. A span
125 1 : // containing keys that both set and unset the same suffix at the
126 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
127 1 : // the keys.
128 1 : collapsed := keyspan.Span{
129 1 : Start: outputKey(span.Start),
130 1 : End: outputKey(span.End),
131 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
132 1 : }
133 1 : rangekey.Coalesce(
134 1 : t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
135 1 : )
136 1 : for i := range collapsed.Keys {
137 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
138 1 : }
139 1 : keyspan.SortKeysByTrailer(&collapsed.Keys)
140 1 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
141 0 : return nil, err
142 0 : }
143 : }
144 1 : if err != nil {
145 0 : return nil, err
146 0 : }
147 1 : if err := rangeKeyIterCloser.Close(); err != nil {
148 0 : return nil, err
149 0 : }
150 : }
151 :
152 1 : if err := w.Close(); err != nil {
153 0 : return nil, err
154 0 : }
155 1 : sstMeta, err := w.Metadata()
156 1 : if err != nil {
157 0 : return nil, err
158 0 : }
159 1 : return sstMeta, nil
160 : }
161 :
162 : // buildForIngest builds a local SST file containing the keys in the given batch
163 : // and returns its path and metadata.
164 : func buildForIngest(
165 : t *Test, dbID objID, b *pebble.Batch, i int,
166 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
167 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
168 1 : f, err := t.opts.FS.Create(path)
169 1 : if err != nil {
170 0 : return "", nil, err
171 0 : }
172 1 : db := t.getDB(dbID)
173 1 :
174 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
175 1 :
176 1 : writable := objstorageprovider.NewFileWritable(f)
177 1 : meta, err := writeSSTForIngestion(
178 1 : t,
179 1 : iter, rangeDelIter, rangeKeyIter,
180 1 : false, /* uniquePrefixes */
181 1 : nil, /* syntheticSuffix */
182 1 : nil, /* syntheticPrefix */
183 1 : writable,
184 1 : db.FormatMajorVersion(),
185 1 : )
186 1 : return path, meta, err
187 : }
188 :
189 : // buildForIngest builds a local SST file containing the keys in the given
190 : // external object (truncated to the given bounds) and returns its path and
191 : // metadata.
192 : func buildForIngestExternalEmulation(
193 : t *Test,
194 : dbID objID,
195 : externalObjID objID,
196 : bounds pebble.KeyRange,
197 : syntheticSuffix sstable.SyntheticSuffix,
198 : syntheticPrefix sstable.SyntheticPrefix,
199 : i int,
200 1 : ) (path string, _ *sstable.WriterMetadata) {
201 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
202 1 : f, err := t.opts.FS.Create(path)
203 1 : panicIfErr(err)
204 1 :
205 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
206 1 : defer reader.Close()
207 1 :
208 1 : writable := objstorageprovider.NewFileWritable(f)
209 1 : // The underlying file should already have unique prefixes. Plus we are
210 1 : // emulating the external ingestion path which won't remove duplicate prefixes
211 1 : // if they exist.
212 1 : const uniquePrefixes = false
213 1 : meta, err := writeSSTForIngestion(
214 1 : t,
215 1 : pointIter, rangeDelIter, rangeKeyIter,
216 1 : uniquePrefixes,
217 1 : syntheticSuffix,
218 1 : syntheticPrefix,
219 1 : writable,
220 1 : t.minFMV(),
221 1 : )
222 1 : if err != nil {
223 0 : panic(err)
224 : }
225 1 : return path, meta
226 : }
227 :
228 : func openExternalObj(
229 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
230 : ) (
231 : reader *sstable.Reader,
232 : pointIter base.InternalIterator,
233 : rangeDelIter keyspan.FragmentIterator,
234 : rangeKeyIter keyspan.FragmentIterator,
235 1 : ) {
236 1 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
237 1 : panicIfErr(err)
238 1 : opts := sstable.ReaderOptions{
239 1 : Comparer: t.opts.Comparer,
240 1 : }
241 1 : reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
242 1 : panicIfErr(err)
243 1 :
244 1 : start := bounds.Start
245 1 : end := bounds.End
246 1 : if syntheticPrefix.IsSet() {
247 1 : start = syntheticPrefix.Invert(start)
248 1 : end = syntheticPrefix.Invert(end)
249 1 : }
250 1 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
251 1 : panicIfErr(err)
252 1 :
253 1 : rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
254 1 : panicIfErr(err)
255 1 : if rangeDelIter != nil {
256 1 : rangeDelIter = keyspan.Truncate(
257 1 : t.opts.Comparer.Compare,
258 1 : rangeDelIter,
259 1 : base.UserKeyBoundsEndExclusive(start, end),
260 1 : )
261 1 : }
262 :
263 1 : rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
264 1 : panicIfErr(err)
265 1 : if rangeKeyIter != nil {
266 0 : rangeKeyIter = keyspan.Truncate(
267 0 : t.opts.Comparer.Compare,
268 0 : rangeKeyIter,
269 0 : base.UserKeyBoundsEndExclusive(start, end),
270 0 : )
271 0 : }
272 1 : return reader, pointIter, rangeDelIter, rangeKeyIter
273 : }
274 :
275 : // externalObjIsEmpty returns true if the given external object has no point or
276 : // range keys withing the given bounds.
277 : func externalObjIsEmpty(
278 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
279 1 : ) bool {
280 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
281 1 : defer reader.Close()
282 1 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
283 1 :
284 1 : key, _ := pointIter.First()
285 1 : panicIfErr(pointIter.Error())
286 1 : if key != nil {
287 1 : return false
288 1 : }
289 1 : for _, it := range []keyspan.FragmentIterator{rangeDelIter, rangeKeyIter} {
290 1 : if it != nil {
291 1 : span, err := it.First()
292 1 : panicIfErr(err)
293 1 : if span != nil {
294 1 : return false
295 1 : }
296 : }
297 : }
298 1 : return true
299 : }
300 :
301 1 : func panicIfErr(err error) {
302 1 : if err != nil {
303 0 : panic(err)
304 : }
305 : }
|