Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 2 : ) (*sstable.WriterMetadata, error) {
38 2 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 2 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 2 : writerOpts.DisableValueBlocks = true
41 2 : }
42 2 : w := sstable.NewWriter(writable, writerOpts)
43 2 : pointIterCloser := base.CloseHelper(pointIter)
44 2 : defer pointIterCloser.Close()
45 2 : rangeDelIterCloser := base.CloseHelper(rangeDelIter)
46 2 : defer rangeDelIterCloser.Close()
47 2 : rangeKeyIterCloser := base.CloseHelper(rangeKeyIter)
48 2 : defer rangeKeyIterCloser.Close()
49 2 :
50 2 : outputKey := func(key []byte) []byte {
51 2 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
52 2 : return slices.Clone(key)
53 2 : }
54 2 : if syntheticPrefix.IsSet() {
55 2 : key = syntheticPrefix.Apply(key)
56 2 : }
57 2 : if syntheticSuffix.IsSet() {
58 2 : n := t.opts.Comparer.Split(key)
59 2 : key = append(key[:n:n], syntheticSuffix...)
60 2 : }
61 2 : return key
62 : }
63 :
64 2 : var lastUserKey []byte
65 2 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
66 2 : // Ignore duplicate keys.
67 2 : if lastUserKey != nil {
68 2 : last := lastUserKey
69 2 : this := kv.K.UserKey
70 2 : if uniquePrefixes {
71 2 : last = last[:t.opts.Comparer.Split(last)]
72 2 : this = this[:t.opts.Comparer.Split(this)]
73 2 : }
74 2 : if t.opts.Comparer.Equal(last, this) {
75 1 : continue
76 : }
77 : }
78 2 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
79 2 :
80 2 : k := *kv
81 2 : k.K.SetSeqNum(base.SeqNumZero)
82 2 : k.K.UserKey = outputKey(k.K.UserKey)
83 2 : value := kv.V
84 2 : // It's possible that we wrote the key on a batch from a db that supported
85 2 : // DeleteSized, but will be ingesting into a db that does not. Detect this
86 2 : // case and translate the key to an InternalKeyKindDelete.
87 2 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
88 1 : value = pebble.LazyValue{}
89 1 : k.K.SetKind(pebble.InternalKeyKindDelete)
90 1 : }
91 2 : valBytes, _, err := value.Value(nil)
92 2 : if err != nil {
93 0 : return nil, err
94 0 : }
95 2 : if err := w.Add(k.K, valBytes); err != nil {
96 0 : return nil, err
97 0 : }
98 : }
99 2 : if err := pointIterCloser.Close(); err != nil {
100 0 : return nil, err
101 0 : }
102 :
103 2 : if rangeDelIter != nil {
104 2 : span, err := rangeDelIter.First()
105 2 : for ; span != nil; span, err = rangeDelIter.Next() {
106 2 : if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
107 0 : return nil, err
108 0 : }
109 : }
110 2 : if err != nil {
111 0 : return nil, err
112 0 : }
113 2 : if err := rangeDelIterCloser.Close(); err != nil {
114 0 : return nil, err
115 0 : }
116 : }
117 :
118 2 : if rangeKeyIter != nil {
119 2 : span, err := rangeKeyIter.First()
120 2 : for ; span != nil; span, err = rangeKeyIter.Next() {
121 2 : // Coalesce the keys of this span and then zero the sequence
122 2 : // numbers. This is necessary in order to make the range keys within
123 2 : // the ingested sstable internally consistent at the sequence number
124 2 : // it's ingested at. The individual keys within a batch are
125 2 : // committed at unique sequence numbers, whereas all the keys of an
126 2 : // ingested sstable are given the same sequence number. A span
127 2 : // containing keys that both set and unset the same suffix at the
128 2 : // same sequence number is nonsensical, so we "coalesce" or collapse
129 2 : // the keys.
130 2 : collapsed := keyspan.Span{
131 2 : Start: outputKey(span.Start),
132 2 : End: outputKey(span.End),
133 2 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
134 2 : }
135 2 : rangekey.Coalesce(
136 2 : t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
137 2 : )
138 2 : for i := range collapsed.Keys {
139 2 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
140 2 : }
141 2 : keyspan.SortKeysByTrailer(&collapsed.Keys)
142 2 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
143 0 : return nil, err
144 0 : }
145 : }
146 2 : if err != nil {
147 0 : return nil, err
148 0 : }
149 2 : if err := rangeKeyIterCloser.Close(); err != nil {
150 0 : return nil, err
151 0 : }
152 : }
153 :
154 2 : if err := w.Close(); err != nil {
155 0 : return nil, err
156 0 : }
157 2 : sstMeta, err := w.Metadata()
158 2 : if err != nil {
159 0 : return nil, err
160 0 : }
161 2 : return sstMeta, nil
162 : }
163 :
164 : // buildForIngest builds a local SST file containing the keys in the given batch
165 : // and returns its path and metadata.
166 : func buildForIngest(
167 : t *Test, dbID objID, b *pebble.Batch, i int,
168 2 : ) (path string, _ *sstable.WriterMetadata, _ error) {
169 2 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
170 2 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
171 2 : if err != nil {
172 0 : return "", nil, err
173 0 : }
174 2 : db := t.getDB(dbID)
175 2 :
176 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
177 2 :
178 2 : writable := objstorageprovider.NewFileWritable(f)
179 2 : meta, err := writeSSTForIngestion(
180 2 : t,
181 2 : iter, rangeDelIter, rangeKeyIter,
182 2 : false, /* uniquePrefixes */
183 2 : nil, /* syntheticSuffix */
184 2 : nil, /* syntheticPrefix */
185 2 : writable,
186 2 : db.FormatMajorVersion(),
187 2 : )
188 2 : return path, meta, err
189 : }
190 :
191 : // buildForIngest builds a local SST file containing the keys in the given
192 : // external object (truncated to the given bounds) and returns its path and
193 : // metadata.
194 : func buildForIngestExternalEmulation(
195 : t *Test,
196 : dbID objID,
197 : externalObjID objID,
198 : bounds pebble.KeyRange,
199 : syntheticSuffix sstable.SyntheticSuffix,
200 : syntheticPrefix sstable.SyntheticPrefix,
201 : i int,
202 2 : ) (path string, _ *sstable.WriterMetadata) {
203 2 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
204 2 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
205 2 : panicIfErr(err)
206 2 :
207 2 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
208 2 : defer reader.Close()
209 2 :
210 2 : writable := objstorageprovider.NewFileWritable(f)
211 2 : // The underlying file should already have unique prefixes. Plus we are
212 2 : // emulating the external ingestion path which won't remove duplicate prefixes
213 2 : // if they exist.
214 2 : const uniquePrefixes = false
215 2 : meta, err := writeSSTForIngestion(
216 2 : t,
217 2 : pointIter, rangeDelIter, rangeKeyIter,
218 2 : uniquePrefixes,
219 2 : syntheticSuffix,
220 2 : syntheticPrefix,
221 2 : writable,
222 2 : t.minFMV(),
223 2 : )
224 2 : if err != nil {
225 0 : panic(err)
226 : }
227 2 : return path, meta
228 : }
229 :
230 : func openExternalObj(
231 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
232 : ) (
233 : reader *sstable.Reader,
234 : pointIter base.InternalIterator,
235 : rangeDelIter keyspan.FragmentIterator,
236 : rangeKeyIter keyspan.FragmentIterator,
237 2 : ) {
238 2 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
239 2 : panicIfErr(err)
240 2 : opts := sstable.ReaderOptions{
241 2 : Comparer: t.opts.Comparer,
242 2 : }
243 2 : reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
244 2 : panicIfErr(err)
245 2 :
246 2 : start := bounds.Start
247 2 : end := bounds.End
248 2 : if syntheticPrefix.IsSet() {
249 2 : start = syntheticPrefix.Invert(start)
250 2 : end = syntheticPrefix.Invert(end)
251 2 : }
252 2 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
253 2 : panicIfErr(err)
254 2 :
255 2 : rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
256 2 : panicIfErr(err)
257 2 : if rangeDelIter != nil {
258 1 : rangeDelIter = keyspan.Truncate(
259 1 : t.opts.Comparer.Compare,
260 1 : rangeDelIter,
261 1 : base.UserKeyBoundsEndExclusive(start, end),
262 1 : )
263 1 : }
264 :
265 2 : rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
266 2 : panicIfErr(err)
267 2 : if rangeKeyIter != nil {
268 0 : rangeKeyIter = keyspan.Truncate(
269 0 : t.opts.Comparer.Compare,
270 0 : rangeKeyIter,
271 0 : base.UserKeyBoundsEndExclusive(start, end),
272 0 : )
273 0 : }
274 2 : return reader, pointIter, rangeDelIter, rangeKeyIter
275 : }
276 :
277 2 : func panicIfErr(err error) {
278 2 : if err != nil {
279 0 : panic(err)
280 : }
281 : }
|