Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 1 : ) (*sstable.WriterMetadata, error) {
38 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 1 : writerOpts.DisableValueBlocks = true
41 1 : }
42 1 : w := sstable.NewWriter(writable, writerOpts)
43 1 : pointIterCloser := base.CloseHelper(pointIter)
44 1 : defer pointIterCloser.Close()
45 1 : rangeDelIterCloser := base.CloseHelper(rangeDelIter)
46 1 : defer rangeDelIterCloser.Close()
47 1 : rangeKeyIterCloser := base.CloseHelper(rangeKeyIter)
48 1 : defer rangeKeyIterCloser.Close()
49 1 :
50 1 : outputKey := func(key []byte) []byte {
51 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
52 1 : return slices.Clone(key)
53 1 : }
54 1 : if syntheticPrefix.IsSet() {
55 1 : key = syntheticPrefix.Apply(key)
56 1 : }
57 1 : if syntheticSuffix.IsSet() {
58 1 : n := t.opts.Comparer.Split(key)
59 1 : key = append(key[:n:n], syntheticSuffix...)
60 1 : }
61 1 : return key
62 : }
63 :
64 1 : var lastUserKey []byte
65 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
66 1 : // Ignore duplicate keys.
67 1 : if lastUserKey != nil {
68 1 : last := lastUserKey
69 1 : this := kv.K.UserKey
70 1 : if uniquePrefixes {
71 1 : last = last[:t.opts.Comparer.Split(last)]
72 1 : this = this[:t.opts.Comparer.Split(this)]
73 1 : }
74 1 : if t.opts.Comparer.Equal(last, this) {
75 1 : continue
76 : }
77 : }
78 1 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
79 1 :
80 1 : k := *kv
81 1 : k.K.SetSeqNum(base.SeqNumZero)
82 1 : k.K.UserKey = outputKey(k.K.UserKey)
83 1 : value := kv.V
84 1 : // It's possible that we wrote the key on a batch from a db that supported
85 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
86 1 : // case and translate the key to an InternalKeyKindDelete.
87 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
88 1 : value = pebble.LazyValue{}
89 1 : k.K.SetKind(pebble.InternalKeyKindDelete)
90 1 : }
91 1 : valBytes, _, err := value.Value(nil)
92 1 : if err != nil {
93 0 : return nil, err
94 0 : }
95 1 : if err := w.Add(k.K, valBytes); err != nil {
96 0 : return nil, err
97 0 : }
98 : }
99 1 : if err := pointIterCloser.Close(); err != nil {
100 0 : return nil, err
101 0 : }
102 :
103 1 : if rangeDelIter != nil {
104 1 : span, err := rangeDelIter.First()
105 1 : for ; span != nil; span, err = rangeDelIter.Next() {
106 1 : if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
107 0 : return nil, err
108 0 : }
109 : }
110 1 : if err != nil {
111 0 : return nil, err
112 0 : }
113 1 : if err := rangeDelIterCloser.Close(); err != nil {
114 0 : return nil, err
115 0 : }
116 : }
117 :
118 1 : if rangeKeyIter != nil {
119 1 : span, err := rangeKeyIter.First()
120 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
121 1 : // Coalesce the keys of this span and then zero the sequence
122 1 : // numbers. This is necessary in order to make the range keys within
123 1 : // the ingested sstable internally consistent at the sequence number
124 1 : // it's ingested at. The individual keys within a batch are
125 1 : // committed at unique sequence numbers, whereas all the keys of an
126 1 : // ingested sstable are given the same sequence number. A span
127 1 : // containing keys that both set and unset the same suffix at the
128 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
129 1 : // the keys.
130 1 : collapsed := keyspan.Span{
131 1 : Start: outputKey(span.Start),
132 1 : End: outputKey(span.End),
133 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
134 1 : }
135 1 : rangekey.Coalesce(
136 1 : t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
137 1 : )
138 1 : for i := range collapsed.Keys {
139 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
140 1 : }
141 1 : keyspan.SortKeysByTrailer(&collapsed.Keys)
142 1 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
143 0 : return nil, err
144 0 : }
145 : }
146 1 : if err != nil {
147 0 : return nil, err
148 0 : }
149 1 : if err := rangeKeyIterCloser.Close(); err != nil {
150 0 : return nil, err
151 0 : }
152 : }
153 :
154 1 : if err := w.Close(); err != nil {
155 0 : return nil, err
156 0 : }
157 1 : sstMeta, err := w.Metadata()
158 1 : if err != nil {
159 0 : return nil, err
160 0 : }
161 1 : return sstMeta, nil
162 : }
163 :
164 : // buildForIngest builds a local SST file containing the keys in the given batch
165 : // and returns its path and metadata.
166 : func buildForIngest(
167 : t *Test, dbID objID, b *pebble.Batch, i int,
168 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
169 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
170 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
171 1 : if err != nil {
172 0 : return "", nil, err
173 0 : }
174 1 : db := t.getDB(dbID)
175 1 :
176 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
177 1 :
178 1 : writable := objstorageprovider.NewFileWritable(f)
179 1 : meta, err := writeSSTForIngestion(
180 1 : t,
181 1 : iter, rangeDelIter, rangeKeyIter,
182 1 : false, /* uniquePrefixes */
183 1 : nil, /* syntheticSuffix */
184 1 : nil, /* syntheticPrefix */
185 1 : writable,
186 1 : db.FormatMajorVersion(),
187 1 : )
188 1 : return path, meta, err
189 : }
190 :
191 : // buildForIngest builds a local SST file containing the keys in the given
192 : // external object (truncated to the given bounds) and returns its path and
193 : // metadata.
194 : func buildForIngestExternalEmulation(
195 : t *Test,
196 : dbID objID,
197 : externalObjID objID,
198 : bounds pebble.KeyRange,
199 : syntheticSuffix sstable.SyntheticSuffix,
200 : syntheticPrefix sstable.SyntheticPrefix,
201 : i int,
202 1 : ) (path string, _ *sstable.WriterMetadata) {
203 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
204 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
205 1 : panicIfErr(err)
206 1 :
207 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
208 1 : defer reader.Close()
209 1 :
210 1 : writable := objstorageprovider.NewFileWritable(f)
211 1 : // The underlying file should already have unique prefixes. Plus we are
212 1 : // emulating the external ingestion path which won't remove duplicate prefixes
213 1 : // if they exist.
214 1 : const uniquePrefixes = false
215 1 : meta, err := writeSSTForIngestion(
216 1 : t,
217 1 : pointIter, rangeDelIter, rangeKeyIter,
218 1 : uniquePrefixes,
219 1 : syntheticSuffix,
220 1 : syntheticPrefix,
221 1 : writable,
222 1 : t.minFMV(),
223 1 : )
224 1 : if err != nil {
225 0 : panic(err)
226 : }
227 1 : return path, meta
228 : }
229 :
230 : func openExternalObj(
231 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
232 : ) (
233 : reader *sstable.Reader,
234 : pointIter base.InternalIterator,
235 : rangeDelIter keyspan.FragmentIterator,
236 : rangeKeyIter keyspan.FragmentIterator,
237 1 : ) {
238 1 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
239 1 : panicIfErr(err)
240 1 : opts := sstable.ReaderOptions{
241 1 : Comparer: t.opts.Comparer,
242 1 : }
243 1 : reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
244 1 : panicIfErr(err)
245 1 :
246 1 : start := bounds.Start
247 1 : end := bounds.End
248 1 : if syntheticPrefix.IsSet() {
249 1 : start = syntheticPrefix.Invert(start)
250 1 : end = syntheticPrefix.Invert(end)
251 1 : }
252 1 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
253 1 : panicIfErr(err)
254 1 :
255 1 : rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
256 1 : panicIfErr(err)
257 1 : if rangeDelIter != nil {
258 1 : rangeDelIter = keyspan.Truncate(
259 1 : t.opts.Comparer.Compare,
260 1 : rangeDelIter,
261 1 : base.UserKeyBoundsEndExclusive(start, end),
262 1 : )
263 1 : }
264 :
265 1 : rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
266 1 : panicIfErr(err)
267 1 : if rangeKeyIter != nil {
268 0 : rangeKeyIter = keyspan.Truncate(
269 0 : t.opts.Comparer.Compare,
270 0 : rangeKeyIter,
271 0 : base.UserKeyBoundsEndExclusive(start, end),
272 0 : )
273 0 : }
274 1 : return reader, pointIter, rangeDelIter, rangeKeyIter
275 : }
276 :
277 1 : func panicIfErr(err error) {
278 1 : if err != nil {
279 0 : panic(err)
280 : }
281 : }
|