Line data Source code
1 : // Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "slices"
11 :
12 : "github.com/cockroachdb/pebble"
13 : "github.com/cockroachdb/pebble/internal/base"
14 : "github.com/cockroachdb/pebble/internal/keyspan"
15 : "github.com/cockroachdb/pebble/internal/private"
16 : "github.com/cockroachdb/pebble/internal/rangekey"
17 : "github.com/cockroachdb/pebble/objstorage"
18 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
19 : "github.com/cockroachdb/pebble/sstable"
20 : "github.com/cockroachdb/pebble/vfs"
21 : )
22 :
23 : // writeSSTForIngestion writes an SST that is to be ingested, either directly or
24 : // as an external file. Returns the sstable metadata.
25 : //
26 : // Closes the iterators in all cases.
27 : func writeSSTForIngestion(
28 : t *Test,
29 : pointIter base.InternalIterator,
30 : rangeDelIter keyspan.FragmentIterator,
31 : rangeKeyIter keyspan.FragmentIterator,
32 : uniquePrefixes bool,
33 : syntheticSuffix sstable.SyntheticSuffix,
34 : syntheticPrefix sstable.SyntheticPrefix,
35 : writable objstorage.Writable,
36 : targetFMV pebble.FormatMajorVersion,
37 1 : ) (*sstable.WriterMetadata, error) {
38 1 : writerOpts := t.opts.MakeWriterOptions(0, targetFMV.MaxTableFormat())
39 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
40 1 : writerOpts.DisableValueBlocks = true
41 1 : }
42 1 : w := sstable.NewWriter(writable, writerOpts)
43 1 : pointIterCloser := base.CloseHelper(pointIter)
44 1 : defer func() {
45 1 : _ = pointIterCloser.Close()
46 1 : if rangeDelIter != nil {
47 0 : rangeDelIter.Close()
48 0 : }
49 1 : if rangeKeyIter != nil {
50 0 : rangeKeyIter.Close()
51 0 : }
52 : }()
53 :
54 1 : outputKey := func(key []byte) []byte {
55 1 : if !syntheticPrefix.IsSet() && !syntheticSuffix.IsSet() {
56 1 : return slices.Clone(key)
57 1 : }
58 1 : if syntheticPrefix.IsSet() {
59 1 : key = syntheticPrefix.Apply(key)
60 1 : }
61 1 : if syntheticSuffix.IsSet() {
62 1 : n := t.opts.Comparer.Split(key)
63 1 : key = append(key[:n:n], syntheticSuffix...)
64 1 : }
65 1 : return key
66 : }
67 :
68 1 : var lastUserKey []byte
69 1 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
70 1 : // Ignore duplicate keys.
71 1 : if lastUserKey != nil {
72 1 : last := lastUserKey
73 1 : this := kv.K.UserKey
74 1 : if uniquePrefixes {
75 1 : last = last[:t.opts.Comparer.Split(last)]
76 1 : this = this[:t.opts.Comparer.Split(this)]
77 1 : }
78 1 : if t.opts.Comparer.Equal(last, this) {
79 1 : continue
80 : }
81 : }
82 1 : lastUserKey = append(lastUserKey[:0], kv.K.UserKey...)
83 1 :
84 1 : k := *kv
85 1 : k.K.SetSeqNum(base.SeqNumZero)
86 1 : k.K.UserKey = outputKey(k.K.UserKey)
87 1 : value := kv.V
88 1 : // It's possible that we wrote the key on a batch from a db that supported
89 1 : // DeleteSized, but will be ingesting into a db that does not. Detect this
90 1 : // case and translate the key to an InternalKeyKindDelete.
91 1 : if targetFMV < pebble.FormatDeleteSizedAndObsolete && kv.Kind() == pebble.InternalKeyKindDeleteSized {
92 1 : value = pebble.LazyValue{}
93 1 : k.K.SetKind(pebble.InternalKeyKindDelete)
94 1 : }
95 1 : valBytes, _, err := value.Value(nil)
96 1 : if err != nil {
97 0 : return nil, err
98 0 : }
99 1 : if err := w.Add(k.K, valBytes); err != nil {
100 0 : return nil, err
101 0 : }
102 : }
103 1 : if err := pointIterCloser.Close(); err != nil {
104 0 : return nil, err
105 0 : }
106 :
107 1 : if rangeDelIter != nil {
108 1 : span, err := rangeDelIter.First()
109 1 : for ; span != nil; span, err = rangeDelIter.Next() {
110 1 : if err := w.DeleteRange(outputKey(span.Start), outputKey(span.End)); err != nil {
111 0 : return nil, err
112 0 : }
113 : }
114 1 : if err != nil {
115 0 : return nil, err
116 0 : }
117 1 : rangeDelIter.Close()
118 1 : rangeDelIter = nil
119 : }
120 :
121 1 : if rangeKeyIter != nil {
122 1 : span, err := rangeKeyIter.First()
123 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
124 1 : // Coalesce the keys of this span and then zero the sequence
125 1 : // numbers. This is necessary in order to make the range keys within
126 1 : // the ingested sstable internally consistent at the sequence number
127 1 : // it's ingested at. The individual keys within a batch are
128 1 : // committed at unique sequence numbers, whereas all the keys of an
129 1 : // ingested sstable are given the same sequence number. A span
130 1 : // containing keys that both set and unset the same suffix at the
131 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
132 1 : // the keys.
133 1 : collapsed := keyspan.Span{
134 1 : Start: outputKey(span.Start),
135 1 : End: outputKey(span.End),
136 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
137 1 : }
138 1 : rangekey.Coalesce(
139 1 : t.opts.Comparer.Compare, t.opts.Comparer.Equal, span.Keys, &collapsed.Keys,
140 1 : )
141 1 : for i := range collapsed.Keys {
142 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
143 1 : }
144 1 : keyspan.SortKeysByTrailer(&collapsed.Keys)
145 1 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
146 0 : return nil, err
147 0 : }
148 : }
149 1 : if err != nil {
150 0 : return nil, err
151 0 : }
152 1 : rangeKeyIter.Close()
153 1 : rangeKeyIter = nil
154 : }
155 :
156 1 : if err := w.Close(); err != nil {
157 0 : return nil, err
158 0 : }
159 1 : sstMeta, err := w.Metadata()
160 1 : if err != nil {
161 0 : return nil, err
162 0 : }
163 1 : return sstMeta, nil
164 : }
165 :
166 : // buildForIngest builds a local SST file containing the keys in the given batch
167 : // and returns its path and metadata.
168 : func buildForIngest(
169 : t *Test, dbID objID, b *pebble.Batch, i int,
170 1 : ) (path string, _ *sstable.WriterMetadata, _ error) {
171 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
172 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
173 1 : if err != nil {
174 0 : return "", nil, err
175 0 : }
176 1 : db := t.getDB(dbID)
177 1 :
178 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
179 1 :
180 1 : writable := objstorageprovider.NewFileWritable(f)
181 1 : meta, err := writeSSTForIngestion(
182 1 : t,
183 1 : iter, rangeDelIter, rangeKeyIter,
184 1 : false, /* uniquePrefixes */
185 1 : nil, /* syntheticSuffix */
186 1 : nil, /* syntheticPrefix */
187 1 : writable,
188 1 : db.FormatMajorVersion(),
189 1 : )
190 1 : return path, meta, err
191 : }
192 :
193 : // buildForIngest builds a local SST file containing the keys in the given
194 : // external object (truncated to the given bounds) and returns its path and
195 : // metadata.
196 : func buildForIngestExternalEmulation(
197 : t *Test,
198 : dbID objID,
199 : externalObjID objID,
200 : bounds pebble.KeyRange,
201 : syntheticSuffix sstable.SyntheticSuffix,
202 : syntheticPrefix sstable.SyntheticPrefix,
203 : i int,
204 1 : ) (path string, _ *sstable.WriterMetadata) {
205 1 : path = t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
206 1 : f, err := t.opts.FS.Create(path, vfs.WriteCategoryUnspecified)
207 1 : panicIfErr(err)
208 1 :
209 1 : reader, pointIter, rangeDelIter, rangeKeyIter := openExternalObj(t, externalObjID, bounds, syntheticPrefix)
210 1 : defer reader.Close()
211 1 :
212 1 : writable := objstorageprovider.NewFileWritable(f)
213 1 : // The underlying file should already have unique prefixes. Plus we are
214 1 : // emulating the external ingestion path which won't remove duplicate prefixes
215 1 : // if they exist.
216 1 : const uniquePrefixes = false
217 1 : meta, err := writeSSTForIngestion(
218 1 : t,
219 1 : pointIter, rangeDelIter, rangeKeyIter,
220 1 : uniquePrefixes,
221 1 : syntheticSuffix,
222 1 : syntheticPrefix,
223 1 : writable,
224 1 : t.minFMV(),
225 1 : )
226 1 : if err != nil {
227 0 : panic(err)
228 : }
229 1 : return path, meta
230 : }
231 :
232 : func openExternalObj(
233 : t *Test, externalObjID objID, bounds pebble.KeyRange, syntheticPrefix sstable.SyntheticPrefix,
234 : ) (
235 : reader *sstable.Reader,
236 : pointIter base.InternalIterator,
237 : rangeDelIter keyspan.FragmentIterator,
238 : rangeKeyIter keyspan.FragmentIterator,
239 1 : ) {
240 1 : objReader, objSize, err := t.externalStorage.ReadObject(context.Background(), externalObjName(externalObjID))
241 1 : panicIfErr(err)
242 1 : opts := sstable.ReaderOptions{
243 1 : Comparer: t.opts.Comparer,
244 1 : }
245 1 : reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
246 1 : panicIfErr(err)
247 1 :
248 1 : start := bounds.Start
249 1 : end := bounds.End
250 1 : if syntheticPrefix.IsSet() {
251 1 : start = syntheticPrefix.Invert(start)
252 1 : end = syntheticPrefix.Invert(end)
253 1 : }
254 1 : pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
255 1 : panicIfErr(err)
256 1 :
257 1 : rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
258 1 : panicIfErr(err)
259 1 : if rangeDelIter != nil {
260 1 : rangeDelIter = keyspan.Truncate(
261 1 : t.opts.Comparer.Compare,
262 1 : rangeDelIter,
263 1 : base.UserKeyBoundsEndExclusive(start, end),
264 1 : )
265 1 : }
266 :
267 1 : rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
268 1 : panicIfErr(err)
269 1 : if rangeKeyIter != nil {
270 0 : rangeKeyIter = keyspan.Truncate(
271 0 : t.opts.Comparer.Compare,
272 0 : rangeKeyIter,
273 0 : base.UserKeyBoundsEndExclusive(start, end),
274 0 : )
275 0 : }
276 1 : return reader, pointIter, rangeDelIter, rangeKeyIter
277 : }
278 :
279 1 : func panicIfErr(err error) {
280 1 : if err != nil {
281 0 : panic(err)
282 : }
283 : }
|