Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 1 : ) (*sstable.WriterMetadata, error) {
38 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 1 : writerOpts.DisableValueBlocks = true
41 1 : }
42 1 : w := sstable.NewWriter(writable, writerOpts)
43 1 : pointIterCloser := base.CloseHelper(pointIter)
44 1 : defer pointIterCloser.Close()
45 1 : rangeDelIterCloser := base.CloseHelper(rangeDelIter)
46 1 : defer rangeDelIterCloser.Close()
47 1 : rangeKeyIterCloser := base.CloseHelper(rangeKeyIter)
48 1 : defer rangeKeyIterCloser.Close()
49 1 :
50 1 : outputKey := func(key []byte) []byte {
51 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
52 1 : return slices.Clone(key)
53 1 : }
54 0 : if syntheticPrefix.IsSet() {
55 0 : key = syntheticPrefix.Apply(key)
56 0 : }
57 0 : if syntheticSuffix.IsSet() {
58 0 : n := t.opts.Comparer.Split(key)
59 0 : key = append(key[:n:n], syntheticSuffix...)
60 0 : }
61 0 : return key
62 : }
63 :
64 1 : var lastUserKey []byte
65 1 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
66 1 : // Ignore duplicate keys.
67 1 : if lastUserKey != nil {
68 1 : last := lastUserKey
69 1 : this := key.UserKey
70 1 : if uniquePrefixes {
71 0 : last = last[:t.opts.Comparer.Split(last)]
72 0 : this = this[:t.opts.Comparer.Split(this)]
73 0 : }
74 1 : if t.opts.Comparer.Equal(last, this) {
75 1 : continue
76 : }
77 : }
78 1 : lastUserKey = append(lastUserKey[:0], key.UserKey...)
79 1 :
80 1 : key.SetSeqNum(base.SeqNumZero)
81 1 : // It's possible that we wrote the key on a batch from a db that supported
82 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
83 1 : // case and translate the key to an InternalKeyKindDelete.
84 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && key.Kind() == pebble.InternalKeyKindDeleteSized {
85 0 : value = pebble.LazyValue{}
86 0 : key.SetKind(pebble.InternalKeyKindDelete)
87 0 : }
88 1 : valBytes, _, err := value.Value(nil)
89 1 : if err != nil {
90 0 : return nil, err
91 0 : }
92 1 : k := *key
93 1 : k.UserKey = outputKey(k.UserKey)
94 1 : if err := w.Add(k, valBytes); err != nil {
95 0 : return nil, err
96 0 : }
97 : }
98 1 : if err := pointIterCloser.Close(); err != nil {
99 0 : return nil, err
100 0 : }
101 :
102 1 : if rangeDelIter != nil {
103 1 : span, err := rangeDelIter.First()
104 1 : for ; span != nil; span, err = rangeDelIter.Next() {
105 1 : if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
106 0 : return nil, err
107 0 : }
108 : }
109 1 : if err != nil {
110 0 : return nil, err
111 0 : }
112 1 : if err := rangeDelIterCloser.Close(); err != nil {
113 0 : return nil, err
114 0 : }
115 : }
116 :
117 1 : if rangeKeyIter != nil {
118 1 : span, err := rangeKeyIter.First()
119 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
120 1 : // Coalesce the keys of this span and then zero the sequence
121 1 : // numbers. This is necessary in order to make the range keys within
122 1 : // the ingested sstable internally consistent at the sequence number
123 1 : // it's ingested at. The individual keys within a batch are
124 1 : // committed at unique sequence numbers, whereas all the keys of an
125 1 : // ingested sstable are given the same sequence number. A span
126 1 : // containing keys that both set and unset the same suffix at the
127 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
128 1 : // the keys.
129 1 : collapsed := keyspan.Span{
130 1 : Start: outputKey(span.Start),
131 1 : End: outputKey(span.End),
132 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
133 1 : }
134 1 : rangekey.Coalesce(
135 1 : t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
136 1 : )
137 1 : for i := range collapsed.Keys {
138 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
139 1 : }
140 1 : keyspan.SortKeysByTrailer(&collapsed.Keys)
141 1 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
142 0 : return nil, err
143 0 : }
144 : }
145 1 : if err != nil {
146 0 : return nil, err
147 0 : }
148 1 : if err := rangeKeyIterCloser.Close(); err != nil {
149 0 : return nil, err
150 0 : }
151 : }
152 :
153 1 : if err := w.Close(); err != nil {
154 0 : return nil, err
155 0 : }
156 1 : sstMeta, err := w.Metadata()
157 1 : if err != nil {
158 0 : return nil, err
159 0 : }
160 1 : return sstMeta, nil
161 : }
162 :
163 : // buildForIngest builds a local SST file containing the keys in the given batch
164 : // and returns its path and metadata.
165 : func buildForIngest(
166 : t *Test, dbID objID, b *pebble.Batch, i int,
167 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
168 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
169 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
170 1 : if err != nil {
171 0 : return "", nil, err
172 0 : }
173 1 : db := t.getDB(dbID)
174 1 :
175 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
176 1 :
177 1 : writable := objstorageprovider.NewFileWritable(f)
178 1 : meta, err := writeSSTForIngestion(
179 1 : t,
180 1 : iter, rangeDelIter, rangeKeyIter,
181 1 : false, /* uniquePrefixes */
182 1 : nil, /* syntheticSuffix */
183 1 : nil, /* syntheticPrefix */
184 1 : writable,
185 1 : db.FormatMajorVersion(),
186 1 : )
187 1 : return path, meta, err
188 : }
189 :
190 : // buildForIngest builds a local SST file containing the keys in the given
191 : // external object (truncated to the given bounds) and returns its path and
192 : // metadata.
193 : func buildForIngestExternalEmulation(
194 : t *Test,
195 : dbID objID,
196 : externalObjID objID,
197 : bounds pebble.KeyRange,
198 : syntheticSuffix sstable.SyntheticSuffix,
199 : syntheticPrefix sstable.SyntheticPrefix,
200 : i int,
201 0 : ) (path string, _ *sstable.WriterMetadata) {
202 0 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
203 0 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
204 0 : panicIfErr(err)
205 0 :
206 0 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
207 0 : defer reader.Close()
208 0 :
209 0 : writable := objstorageprovider.NewFileWritable(f)
210 0 : // The underlying file should already have unique prefixes. Plus we are
211 0 : // emulating the external ingestion path which won't remove duplicate prefixes
212 0 : // if they exist.
213 0 : const uniquePrefixes = false
214 0 : meta, err := writeSSTForIngestion(
215 0 : t,
216 0 : pointIter, rangeDelIter, rangeKeyIter,
217 0 : uniquePrefixes,
218 0 : syntheticSuffix,
219 0 : syntheticPrefix,
220 0 : writable,
221 0 : t.minFMV(),
222 0 : )
223 0 : if err != nil {
224 0 : panic(err)
225 : }
226 0 : return path, meta
227 : }
228 :
229 : func openExternalObj(
230 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
231 : ) (
232 : reader *sstable.Reader,
233 : pointIter base.InternalIterator,
234 : rangeDelIter keyspan.FragmentIterator,
235 : rangeKeyIter keyspan.FragmentIterator,
236 0 : ) {
237 0 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
238 0 : panicIfErr(err)
239 0 : opts := sstable.ReaderOptions{
240 0 : Comparer: t.opts.Comparer,
241 0 : }
242 0 : reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
243 0 : panicIfErr(err)
244 0 :
245 0 : start := bounds.Start
246 0 : end := bounds.End
247 0 : if syntheticPrefix.IsSet() {
248 0 : start = syntheticPrefix.Invert(start)
249 0 : end = syntheticPrefix.Invert(end)
250 0 : }
251 0 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
252 0 : panicIfErr(err)
253 0 :
254 0 : rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
255 0 : panicIfErr(err)
256 0 : if rangeDelIter != nil {
257 0 : rangeDelIter = keyspan.Truncate(
258 0 : t.opts.Comparer.Compare,
259 0 : rangeDelIter,
260 0 : base.UserKeyBoundsEndExclusive(start, end),
261 0 : )
262 0 : }
263 :
264 0 : rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
265 0 : panicIfErr(err)
266 0 : if rangeKeyIter != nil {
267 0 : rangeKeyIter = keyspan.Truncate(
268 0 : t.opts.Comparer.Compare,
269 0 : rangeKeyIter,
270 0 : base.UserKeyBoundsEndExclusive(start, end),
271 0 : )
272 0 : }
273 0 : return reader, pointIter, rangeDelIter, rangeKeyIter
274 : }
275 :
276 : // externalObjIsEmpty returns true if the given external object has no point or
277 : // range keys withing the given bounds.
278 : func externalObjIsEmpty(
279 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
280 0 : ) bool {
281 0 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
282 0 : defer reader.Close()
283 0 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
284 0 :
285 0 : key, _ := pointIter.First()
286 0 : panicIfErr(pointIter.Error())
287 0 : if key != nil {
288 0 : return false
289 0 : }
290 0 : for _, it := range []keyspan.FragmentIterator{rangeDelIter, rangeKeyIter} {
291 0 : if it != nil {
292 0 : span, err := it.First()
293 0 : panicIfErr(err)
294 0 : if span != nil {
295 0 : return false
296 0 : }
297 : }
298 : }
299 0 : return true
300 : }
301 :
302 0 : func panicIfErr(err error) {
303 0 : if err != nil {
304 0 : panic(err)
305 : }
306 : }
|