Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "crypto/rand"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "path"
15 : "path/filepath"
16 : "slices"
17 : "strings"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/internal/rangekey"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
26 : "github.com/cockroachdb/pebble/sstable"
27 : "github.com/cockroachdb/pebble/vfs"
28 : "github.com/cockroachdb/pebble/vfs/errorfs"
29 : )
30 :
31 : // Ops holds a sequence of operations to be executed by the metamorphic tests.
32 : type Ops []op
33 :
34 : // op defines the interface for a single operation, such as creating a batch,
35 : // or advancing an iterator.
36 : type op interface {
37 : formattedString(KeyFormat) string
38 :
39 : run(t *Test, h historyRecorder)
40 :
41 : // receiver returns the object ID of the object the operation is performed
42 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
43 : // its receiver). Receivers are used for synchronization when running with
44 : // concurrency.
45 : receiver() objID
46 :
47 : // syncObjs returns an additional set of object IDs—excluding the
48 : // receiver—that the operation must synchronize with. At execution time,
49 : // the operation will run serially with respect to all other operations
50 : // that return these objects from their own syncObjs or receiver methods.
51 : syncObjs() objIDSlice
52 :
53 : // rewriteKeys invokes fn for every user key used by the operation. The key
54 : // is updated to be the result of fn(key).
55 : //
56 : // Used for simplification of operations for easier investigations.
57 : rewriteKeys(fn func(UserKey) UserKey)
58 :
59 : // diagramKeyRanges() returns key spans associated with this operation, to be
60 : // shown on an ASCII diagram of operations.
61 : diagramKeyRanges() []pebble.KeyRange
62 : }
63 :
64 : // A UserKey is a user key used by the metamorphic test. The format is
65 : // determined by the KeyFormat used by the test.
66 : type UserKey []byte
67 :
68 : // A UserKeySuffix is the suffix of a user key used by the metamorphic test. The
69 : // format is determined by the KeyFormat used by the test.
70 : type UserKeySuffix []byte
71 :
72 : // initOp performs test initialization
73 : type initOp struct {
74 : dbSlots uint32
75 : batchSlots uint32
76 : iterSlots uint32
77 : snapshotSlots uint32
78 : externalObjSlots uint32
79 : }
80 :
81 1 : func (o *initOp) run(t *Test, h historyRecorder) {
82 1 : t.batches = make([]*pebble.Batch, o.batchSlots)
83 1 : t.iters = make([]*retryableIter, o.iterSlots)
84 1 : t.snapshots = make([]readerCloser, o.snapshotSlots)
85 1 : t.externalObjs = make([]externalObjMeta, o.externalObjSlots)
86 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
87 1 : }
88 :
89 1 : func (o *initOp) formattedString(kf KeyFormat) string {
90 1 : return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */, %d /* externalObjs */)",
91 1 : o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots, o.externalObjSlots)
92 1 : }
93 :
94 1 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
95 1 : func (o *initOp) syncObjs() objIDSlice {
96 1 : syncObjs := make([]objID, 0)
97 1 : // Add any additional DBs to syncObjs.
98 1 : for i := uint32(2); i < o.dbSlots+1; i++ {
99 0 : syncObjs = append(syncObjs, makeObjID(dbTag, i))
100 0 : }
101 1 : return syncObjs
102 : }
103 :
104 0 : func (o *initOp) rewriteKeys(func(UserKey) UserKey) {}
105 1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
106 :
107 : // applyOp models a Writer.Apply operation.
108 : type applyOp struct {
109 : writerID objID
110 : batchID objID
111 : }
112 :
113 1 : func (o *applyOp) run(t *Test, h historyRecorder) {
114 1 : b := t.getBatch(o.batchID)
115 1 : w := t.getWriter(o.writerID)
116 1 : var err error
117 1 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
118 0 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
119 0 : if err == nil {
120 0 : err = b.SyncWait()
121 0 : }
122 1 : } else {
123 1 : err = w.Apply(b, t.writeOpts)
124 1 : }
125 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
126 : // batch will be closed by a closeOp which is guaranteed to be generated
127 : }
128 :
129 1 : func (o *applyOp) formattedString(KeyFormat) string {
130 1 : return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID)
131 1 : }
132 1 : func (o *applyOp) receiver() objID { return o.writerID }
133 1 : func (o *applyOp) syncObjs() objIDSlice {
134 1 : // Apply should not be concurrent with operations that are mutating the
135 1 : // batch.
136 1 : return []objID{o.batchID}
137 1 : }
138 :
139 0 : func (o *applyOp) rewriteKeys(func(UserKey) UserKey) {}
140 0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
141 :
142 : // checkpointOp models a DB.Checkpoint operation.
143 : type checkpointOp struct {
144 : dbID objID
145 : // If non-empty, the checkpoint is restricted to these spans.
146 : spans []pebble.CheckpointSpan
147 : }
148 :
149 1 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
150 1 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
151 1 : // It would be better to filter out ahead of calling run on the op,
152 1 : // by setting the weight that generator.go uses to zero, or similar.
153 1 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
154 1 : // not sure how to do that easily:
155 1 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
156 1 : if t.testOpts.sharedStorageEnabled || t.testOpts.externalStorageEnabled {
157 0 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), nil)
158 0 : return
159 0 : }
160 1 : var opts []pebble.CheckpointOption
161 1 : if len(o.spans) > 0 {
162 1 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
163 1 : }
164 1 : db := t.getDB(o.dbID)
165 1 : err := t.withRetries(func() error {
166 1 : return db.Checkpoint(o.dir(t.dir, h.op), opts...)
167 1 : })
168 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
169 : }
170 :
171 1 : func (o *checkpointOp) dir(dataDir string, idx int) string {
172 1 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
173 1 : }
174 :
175 1 : func (o *checkpointOp) formattedString(kf KeyFormat) string {
176 1 : var spanStr bytes.Buffer
177 1 : for i, span := range o.spans {
178 1 : if i > 0 {
179 1 : spanStr.WriteString(",")
180 1 : }
181 1 : fmt.Fprintf(&spanStr, "%q,%q", kf.FormatKey(span.Start), kf.FormatKey(span.End))
182 : }
183 1 : return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
184 : }
185 :
186 1 : func (o *checkpointOp) receiver() objID { return o.dbID }
187 1 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
188 0 : func (o *checkpointOp) rewriteKeys(fn func(UserKey) UserKey) {
189 0 : for i := range o.spans {
190 0 : o.spans[i].Start = fn(o.spans[i].Start)
191 0 : o.spans[i].End = fn(o.spans[i].End)
192 0 : }
193 : }
194 :
195 0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
196 0 : var res []pebble.KeyRange
197 0 : for i := range o.spans {
198 0 : res = append(res, pebble.KeyRange{
199 0 : Start: o.spans[i].Start,
200 0 : End: o.spans[i].End,
201 0 : })
202 0 : }
203 0 : return res
204 : }
205 :
206 : // downloadOp models a DB.Download operation.
207 : type downloadOp struct {
208 : dbID objID
209 : spans []pebble.DownloadSpan
210 : }
211 :
212 1 : func (o *downloadOp) run(t *Test, h historyRecorder) {
213 1 : if t.testOpts.disableDownloads {
214 0 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), nil)
215 0 : return
216 0 : }
217 1 : db := t.getDB(o.dbID)
218 1 : err := t.withRetries(func() error {
219 1 : return db.Download(context.Background(), o.spans)
220 1 : })
221 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
222 : }
223 :
224 1 : func (o *downloadOp) formattedString(kf KeyFormat) string {
225 1 : var spanStr bytes.Buffer
226 1 : for i, span := range o.spans {
227 1 : if i > 0 {
228 1 : spanStr.WriteString(", ")
229 1 : }
230 1 : fmt.Fprintf(&spanStr, "%q /* start */, %q /* end */, %v /* viaBackingFileDownload */",
231 1 : kf.FormatKey(span.StartKey),
232 1 : kf.FormatKey(span.EndKey),
233 1 : span.ViaBackingFileDownload)
234 : }
235 1 : return fmt.Sprintf("%s.Download(%s)", o.dbID, spanStr.String())
236 : }
237 :
238 1 : func (o *downloadOp) receiver() objID { return o.dbID }
239 1 : func (o downloadOp) syncObjs() objIDSlice { return nil }
240 :
241 0 : func (o *downloadOp) rewriteKeys(fn func(UserKey) UserKey) {
242 0 : for i := range o.spans {
243 0 : o.spans[i].StartKey = fn(o.spans[i].StartKey)
244 0 : o.spans[i].EndKey = fn(o.spans[i].EndKey)
245 0 : }
246 : }
247 :
248 0 : func (o *downloadOp) diagramKeyRanges() []pebble.KeyRange {
249 0 : var res []pebble.KeyRange
250 0 : for i := range o.spans {
251 0 : res = append(res, pebble.KeyRange{
252 0 : Start: o.spans[i].StartKey,
253 0 : End: o.spans[i].EndKey,
254 0 : })
255 0 : }
256 0 : return res
257 : }
258 :
259 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
260 : type closeOp struct {
261 : objID objID
262 :
263 : // affectedObjects is the list of additional objects that are affected by this
264 : // operation, and which syncObjs() must return so that we don't perform the
265 : // close in parallel with other operations to affected objects.
266 : affectedObjects []objID
267 : }
268 :
269 1 : func (o *closeOp) run(t *Test, h historyRecorder) {
270 1 : c := t.getCloser(o.objID)
271 1 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
272 1 : // Special case: If WAL is disabled, do a flush right before DB Close. This
273 1 : // allows us to reuse this run's data directory as initial state for
274 1 : // future runs without losing any mutations.
275 1 : _ = t.getDB(o.objID).Flush()
276 1 : }
277 1 : t.clearObj(o.objID)
278 1 : err := c.Close()
279 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
280 : }
281 :
282 1 : func (o *closeOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Close()", o.objID) }
283 1 : func (o *closeOp) receiver() objID { return o.objID }
284 1 : func (o *closeOp) syncObjs() objIDSlice {
285 1 : return o.affectedObjects
286 1 : }
287 :
288 0 : func (o *closeOp) rewriteKeys(func(UserKey) UserKey) {}
289 0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
290 :
291 : // compactOp models a DB.Compact operation.
292 : type compactOp struct {
293 : dbID objID
294 : start UserKey
295 : end UserKey
296 : parallelize bool
297 : }
298 :
299 1 : func (o *compactOp) run(t *Test, h historyRecorder) {
300 1 : err := t.withRetries(func() error {
301 1 : return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
302 1 : })
303 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
304 : }
305 :
306 1 : func (o *compactOp) formattedString(kf KeyFormat) string {
307 1 : return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)",
308 1 : o.dbID, kf.FormatKey(o.start), kf.FormatKey(o.end), o.parallelize)
309 1 : }
310 :
311 1 : func (o *compactOp) receiver() objID { return o.dbID }
312 1 : func (o *compactOp) syncObjs() objIDSlice { return nil }
313 :
314 1 : func (o *compactOp) rewriteKeys(fn func(UserKey) UserKey) {
315 1 : o.start = fn(o.start)
316 1 : o.end = fn(o.end)
317 1 : }
318 :
319 1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
320 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
321 1 : }
322 :
323 : // deleteOp models a Write.Delete operation.
324 : type deleteOp struct {
325 : writerID objID
326 : key UserKey
327 :
328 : derivedDBID objID
329 : }
330 :
331 1 : func (o *deleteOp) run(t *Test, h historyRecorder) {
332 1 : w := t.getWriter(o.writerID)
333 1 : var err error
334 1 : if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
335 1 : // Call DeleteSized with a deterministic size derived from the index.
336 1 : // The size does not need to be accurate for correctness.
337 1 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
338 1 : } else {
339 1 : err = w.Delete(o.key, t.writeOpts)
340 1 : }
341 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
342 : }
343 :
344 1 : func hashSize(index int) uint32 {
345 1 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
346 1 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
347 1 : }
348 :
349 1 : func (o *deleteOp) formattedString(kf KeyFormat) string {
350 1 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, kf.FormatKey(o.key))
351 1 : }
352 1 : func (o *deleteOp) receiver() objID { return o.writerID }
353 1 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
354 :
355 0 : func (o *deleteOp) rewriteKeys(fn func(UserKey) UserKey) {
356 0 : o.key = fn(o.key)
357 0 : }
358 :
359 0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
360 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
361 0 : }
362 :
363 : // singleDeleteOp models a Write.SingleDelete operation.
364 : type singleDeleteOp struct {
365 : writerID objID
366 : key UserKey
367 : maybeReplaceDelete bool
368 : }
369 :
370 1 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
371 1 : w := t.getWriter(o.writerID)
372 1 : var err error
373 1 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
374 1 : err = w.Delete(o.key, t.writeOpts)
375 1 : } else {
376 1 : err = w.SingleDelete(o.key, t.writeOpts)
377 1 : }
378 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
379 : // write the former to the history log. The log line will indicate whether
380 : // or not the delete *could* have been replaced. The OPTIONS file should
381 : // also be consulted to determine what happened at runtime (i.e. by taking
382 : // the logical AND).
383 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
384 : }
385 :
386 1 : func (o *singleDeleteOp) formattedString(kf KeyFormat) string {
387 1 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)",
388 1 : o.writerID, kf.FormatKey(o.key), o.maybeReplaceDelete)
389 1 : }
390 :
391 1 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
392 1 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
393 :
394 0 : func (o *singleDeleteOp) rewriteKeys(fn func(UserKey) UserKey) {
395 0 : o.key = fn(o.key)
396 0 : }
397 :
398 0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
399 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
400 0 : }
401 :
402 : // deleteRangeOp models a Write.DeleteRange operation.
403 : type deleteRangeOp struct {
404 : writerID objID
405 : start UserKey
406 : end UserKey
407 : }
408 :
409 1 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
410 1 : w := t.getWriter(o.writerID)
411 1 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
412 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
413 1 : }
414 :
415 1 : func (o *deleteRangeOp) formattedString(kf KeyFormat) string {
416 1 : return fmt.Sprintf("%s.DeleteRange(%q, %q)",
417 1 : o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end))
418 1 : }
419 :
420 1 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
421 1 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
422 :
423 0 : func (o *deleteRangeOp) rewriteKeys(fn func(UserKey) UserKey) {
424 0 : o.start = fn(o.start)
425 0 : o.end = fn(o.end)
426 0 : }
427 :
428 0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
429 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
430 0 : }
431 :
432 : // flushOp models a DB.Flush operation.
433 : type flushOp struct {
434 : db objID
435 : }
436 :
437 1 : func (o *flushOp) run(t *Test, h historyRecorder) {
438 1 : db := t.getDB(o.db)
439 1 : err := db.Flush()
440 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
441 1 : }
442 :
443 1 : func (o *flushOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Flush()", o.db) }
444 1 : func (o *flushOp) receiver() objID { return o.db }
445 1 : func (o *flushOp) syncObjs() objIDSlice { return nil }
446 0 : func (o *flushOp) rewriteKeys(func(UserKey) UserKey) {}
447 0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
448 :
449 : // mergeOp models a Write.Merge operation.
450 : type mergeOp struct {
451 : writerID objID
452 : key UserKey
453 : value []byte
454 : }
455 :
456 1 : func (o *mergeOp) run(t *Test, h historyRecorder) {
457 1 : w := t.getWriter(o.writerID)
458 1 : err := w.Merge(o.key, o.value, t.writeOpts)
459 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
460 1 : }
461 :
462 1 : func (o *mergeOp) formattedString(kf KeyFormat) string {
463 1 : return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, kf.FormatKey(o.key), o.value)
464 1 : }
465 1 : func (o *mergeOp) receiver() objID { return o.writerID }
466 1 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
467 :
468 0 : func (o *mergeOp) rewriteKeys(fn func(UserKey) UserKey) {
469 0 : o.key = fn(o.key)
470 0 : }
471 :
472 0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
473 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
474 0 : }
475 :
476 : // setOp models a Write.Set operation.
477 : type setOp struct {
478 : writerID objID
479 : key UserKey
480 : value []byte
481 : }
482 :
483 1 : func (o *setOp) run(t *Test, h historyRecorder) {
484 1 : w := t.getWriter(o.writerID)
485 1 : err := w.Set(o.key, o.value, t.writeOpts)
486 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
487 1 : }
488 :
489 1 : func (o *setOp) formattedString(kf KeyFormat) string {
490 1 : return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, kf.FormatKey(o.key), o.value)
491 1 : }
492 1 : func (o *setOp) receiver() objID { return o.writerID }
493 1 : func (o *setOp) syncObjs() objIDSlice { return nil }
494 :
495 0 : func (o *setOp) rewriteKeys(fn func(UserKey) UserKey) {
496 0 : o.key = fn(o.key)
497 0 : }
498 :
499 0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
500 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
501 0 : }
502 :
503 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
504 : type rangeKeyDeleteOp struct {
505 : writerID objID
506 : start []byte
507 : end []byte
508 : }
509 :
510 1 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
511 1 : w := t.getWriter(o.writerID)
512 1 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
513 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
514 1 : }
515 :
516 1 : func (o *rangeKeyDeleteOp) formattedString(kf KeyFormat) string {
517 1 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)",
518 1 : o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end))
519 1 : }
520 :
521 1 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
522 1 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
523 :
524 0 : func (o *rangeKeyDeleteOp) rewriteKeys(fn func(UserKey) UserKey) {
525 0 : o.start = fn(o.start)
526 0 : o.end = fn(o.end)
527 0 : }
528 :
529 1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
530 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
531 1 : }
532 :
533 : // rangeKeySetOp models a Write.RangeKeySet operation.
534 : type rangeKeySetOp struct {
535 : writerID objID
536 : start []byte
537 : end []byte
538 : suffix []byte
539 : value []byte
540 : }
541 :
542 1 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
543 1 : w := t.getWriter(o.writerID)
544 1 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
545 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
546 1 : }
547 :
548 1 : func (o *rangeKeySetOp) formattedString(kf KeyFormat) string {
549 1 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
550 1 : o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end),
551 1 : kf.FormatKeySuffix(o.suffix), o.value)
552 1 : }
553 :
554 1 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
555 1 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
556 :
557 1 : func (o *rangeKeySetOp) rewriteKeys(fn func(UserKey) UserKey) {
558 1 : o.start = fn(o.start)
559 1 : o.end = fn(o.end)
560 1 : }
561 :
562 1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
563 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
564 1 : }
565 :
566 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
567 : type rangeKeyUnsetOp struct {
568 : writerID objID
569 : start UserKey
570 : end UserKey
571 : suffix UserKeySuffix
572 : }
573 :
574 1 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
575 1 : w := t.getWriter(o.writerID)
576 1 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
577 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
578 1 : }
579 :
580 1 : func (o *rangeKeyUnsetOp) formattedString(kf KeyFormat) string {
581 1 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
582 1 : o.writerID, kf.FormatKey(o.start), kf.FormatKey(o.end),
583 1 : kf.FormatKeySuffix(o.suffix))
584 1 : }
585 :
586 1 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
587 1 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
588 :
589 0 : func (o *rangeKeyUnsetOp) rewriteKeys(fn func(UserKey) UserKey) {
590 0 : o.start = fn(o.start)
591 0 : o.end = fn(o.end)
592 0 : }
593 :
594 0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
595 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
596 0 : }
597 :
598 : // logDataOp models a Writer.LogData operation.
599 : type logDataOp struct {
600 : writerID objID
601 : data []byte
602 : }
603 :
604 1 : func (o *logDataOp) run(t *Test, h historyRecorder) {
605 1 : w := t.getWriter(o.writerID)
606 1 : err := w.LogData(o.data, t.writeOpts)
607 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
608 1 : }
609 :
610 1 : func (o *logDataOp) formattedString(KeyFormat) string {
611 1 : return fmt.Sprintf("%s.LogData(%q)", o.writerID, o.data)
612 1 : }
613 :
614 1 : func (o *logDataOp) receiver() objID { return o.writerID }
615 1 : func (o *logDataOp) syncObjs() objIDSlice { return nil }
616 0 : func (o *logDataOp) rewriteKeys(func(UserKey) UserKey) {}
617 0 : func (o *logDataOp) diagramKeyRanges() []pebble.KeyRange { return []pebble.KeyRange{} }
618 :
619 : // newBatchOp models a Write.NewBatch operation.
620 : type newBatchOp struct {
621 : dbID objID
622 : batchID objID
623 : }
624 :
625 1 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
626 1 : b := t.getDB(o.dbID).NewBatch()
627 1 : t.setBatch(o.batchID, b)
628 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
629 1 : }
630 :
631 1 : func (o *newBatchOp) formattedString(KeyFormat) string {
632 1 : return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID)
633 1 : }
634 1 : func (o *newBatchOp) receiver() objID { return o.dbID }
635 1 : func (o *newBatchOp) syncObjs() objIDSlice {
636 1 : // NewBatch should not be concurrent with operations that interact with that
637 1 : // same batch.
638 1 : return []objID{o.batchID}
639 1 : }
640 :
641 0 : func (o *newBatchOp) rewriteKeys(fn func(UserKey) UserKey) {}
642 0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
643 :
644 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
645 : type newIndexedBatchOp struct {
646 : dbID objID
647 : batchID objID
648 : }
649 :
650 1 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
651 1 : b := t.getDB(o.dbID).NewIndexedBatch()
652 1 : t.setBatch(o.batchID, b)
653 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
654 1 : }
655 :
656 1 : func (o *newIndexedBatchOp) formattedString(KeyFormat) string {
657 1 : return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
658 1 : }
659 1 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
660 1 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
661 1 : // NewIndexedBatch should not be concurrent with operations that interact
662 1 : // with that same batch.
663 1 : return []objID{o.batchID}
664 1 : }
665 :
666 0 : func (o *newIndexedBatchOp) rewriteKeys(func(UserKey) UserKey) {}
667 0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
668 :
669 : // batchCommitOp models a Batch.Commit operation.
670 : type batchCommitOp struct {
671 : dbID objID
672 : batchID objID
673 : }
674 :
675 1 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
676 1 : b := t.getBatch(o.batchID)
677 1 : err := b.Commit(t.writeOpts)
678 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
679 1 : }
680 :
681 1 : func (o *batchCommitOp) formattedString(KeyFormat) string {
682 1 : return fmt.Sprintf("%s.Commit()", o.batchID)
683 1 : }
684 1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
685 1 : func (o *batchCommitOp) syncObjs() objIDSlice {
686 1 : // Synchronize on the database so that NewIters wait for the commit.
687 1 : return []objID{o.dbID}
688 1 : }
689 :
690 0 : func (o *batchCommitOp) rewriteKeys(func(UserKey) UserKey) {}
691 0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
692 :
693 : // ingestOp models a DB.Ingest operation.
694 : type ingestOp struct {
695 : dbID objID
696 : batchIDs []objID
697 :
698 : derivedDBIDs []objID
699 : }
700 :
701 1 : func (o *ingestOp) run(t *Test, h historyRecorder) {
702 1 : // We can only use apply as an alternative for ingestion if we are ingesting
703 1 : // a single batch. If we are ingesting multiple batches, the batches may
704 1 : // overlap which would cause ingestion to fail but apply would succeed.
705 1 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
706 0 : id := o.batchIDs[0]
707 0 : b := t.getBatch(id)
708 0 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
709 0 : db := t.getDB(o.dbID)
710 0 : c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
711 0 : if err == nil {
712 0 : err = db.Apply(c, t.writeOpts)
713 0 : }
714 0 : _ = b.Close()
715 0 : _ = c.Close()
716 0 : t.clearObj(id)
717 0 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
718 0 : return
719 : }
720 :
721 1 : var paths []string
722 1 : var err error
723 1 : for i, id := range o.batchIDs {
724 1 : b := t.getBatch(id)
725 1 : t.clearObj(id)
726 1 : path, _, err2 := buildForIngest(t, o.dbID, b, i)
727 1 : if err2 != nil {
728 0 : h.Recordf("Build(%s) // %v", id, err2)
729 0 : }
730 1 : err = firstError(err, err2)
731 1 : if err2 == nil {
732 1 : paths = append(paths, path)
733 1 : }
734 1 : err = firstError(err, b.Close())
735 : }
736 :
737 1 : err = firstError(err, t.withRetries(func() error {
738 1 : return t.getDB(o.dbID).Ingest(context.Background(), paths)
739 1 : }))
740 :
741 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
742 : }
743 :
744 1 : func (o *ingestOp) receiver() objID { return o.dbID }
745 1 : func (o *ingestOp) syncObjs() objIDSlice {
746 1 : // Ingest should not be concurrent with mutating the batches that will be
747 1 : // ingested as sstables.
748 1 : objs := make([]objID, 0, len(o.batchIDs)+1)
749 1 : objs = append(objs, o.batchIDs...)
750 1 : addedDBs := make(map[objID]struct{})
751 1 : for i := range o.derivedDBIDs {
752 1 : _, ok := addedDBs[o.derivedDBIDs[i]]
753 1 : if !ok && o.derivedDBIDs[i] != o.dbID {
754 0 : objs = append(objs, o.derivedDBIDs[i])
755 0 : addedDBs[o.derivedDBIDs[i]] = struct{}{}
756 0 : }
757 : }
758 1 : return objs
759 : }
760 :
761 : func closeIters(
762 : pointIter base.InternalIterator,
763 : rangeDelIter keyspan.FragmentIterator,
764 : rangeKeyIter keyspan.FragmentIterator,
765 0 : ) {
766 0 : if pointIter != nil {
767 0 : pointIter.Close()
768 0 : }
769 0 : if rangeDelIter != nil {
770 0 : rangeDelIter.Close()
771 0 : }
772 0 : if rangeKeyIter != nil {
773 0 : rangeKeyIter.Close()
774 0 : }
775 : }
776 :
777 : // collapseBatch collapses the mutations in a batch to be equivalent to an
778 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
779 : // so that only the latest update is performed. All range deletions are
780 : // performed first in the batch to match the semantics of ingestion where a
781 : // range deletion does not delete a point record contained in the sstable.
782 : func (o *ingestOp) collapseBatch(
783 : t *Test,
784 : db *pebble.DB,
785 : pointIter base.InternalIterator,
786 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
787 : b *pebble.Batch,
788 0 : ) (*pebble.Batch, error) {
789 0 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
790 0 : equal := t.opts.Comparer.Equal
791 0 : collapsed := db.NewBatch()
792 0 :
793 0 : if rangeDelIter != nil {
794 0 : // NB: The range tombstones have already been fragmented by the Batch.
795 0 : t, err := rangeDelIter.First()
796 0 : for ; t != nil; t, err = rangeDelIter.Next() {
797 0 : // NB: We don't have to copy the key or value since we're reading from a
798 0 : // batch which doesn't do prefix compression.
799 0 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
800 0 : return nil, err
801 0 : }
802 : }
803 0 : if err != nil {
804 0 : return nil, err
805 0 : }
806 0 : rangeDelIter.Close()
807 0 : rangeDelIter = nil
808 : }
809 :
810 0 : if pointIter != nil {
811 0 : var lastUserKey []byte
812 0 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
813 0 : // Ignore duplicate keys.
814 0 : //
815 0 : // Note: this is necessary due to MERGE keys, otherwise it would be
816 0 : // fine to include all the keys in the batch and let the normal
817 0 : // sequence number precedence determine which of the keys "wins".
818 0 : // But the code to build the ingested sstable will only keep the
819 0 : // most recent internal key and will not merge across internal keys.
820 0 : if equal(lastUserKey, kv.K.UserKey) {
821 0 : continue
822 : }
823 : // NB: We don't have to copy the key or value since we're reading from a
824 : // batch which doesn't do prefix compression.
825 0 : lastUserKey = kv.K.UserKey
826 0 :
827 0 : var err error
828 0 : switch kv.Kind() {
829 0 : case pebble.InternalKeyKindDelete:
830 0 : err = collapsed.Delete(kv.K.UserKey, nil)
831 0 : case pebble.InternalKeyKindDeleteSized:
832 0 : v, _ := binary.Uvarint(kv.InPlaceValue())
833 0 : // Batch.DeleteSized takes just the length of the value being
834 0 : // deleted and adds the key's length to derive the overall entry
835 0 : // size of the value being deleted. This has already been done
836 0 : // to the key we're reading from the batch, so we must subtract
837 0 : // the key length from the encoded value before calling
838 0 : // collapsed.DeleteSized, which will again add the key length
839 0 : // before encoding.
840 0 : err = collapsed.DeleteSized(kv.K.UserKey, uint32(v-uint64(len(kv.K.UserKey))), nil)
841 0 : case pebble.InternalKeyKindSingleDelete:
842 0 : err = collapsed.SingleDelete(kv.K.UserKey, nil)
843 0 : case pebble.InternalKeyKindSet:
844 0 : err = collapsed.Set(kv.K.UserKey, kv.InPlaceValue(), nil)
845 0 : case pebble.InternalKeyKindMerge:
846 0 : err = collapsed.Merge(kv.K.UserKey, kv.InPlaceValue(), nil)
847 0 : case pebble.InternalKeyKindLogData:
848 0 : err = collapsed.LogData(kv.K.UserKey, nil)
849 0 : default:
850 0 : err = errors.Errorf("unknown batch record kind: %d", kv.Kind())
851 : }
852 0 : if err != nil {
853 0 : return nil, err
854 0 : }
855 : }
856 0 : if err := pointIter.Close(); err != nil {
857 0 : return nil, err
858 0 : }
859 0 : pointIter = nil
860 : }
861 :
862 : // There's no equivalent of a MERGE operator for range keys, so there's no
863 : // need to collapse the range keys here. Rather than reading the range keys
864 : // from `rangeKeyIter`, which will already be fragmented, read the range
865 : // keys from the batch and copy them verbatim. This marginally improves our
866 : // test coverage over the alternative approach of pre-fragmenting and
867 : // pre-coalescing before writing to the batch.
868 : //
869 : // The `rangeKeyIter` is used only to determine if there are any range keys
870 : // in the batch at all, and only because we already have it handy from
871 : // private.BatchSort.
872 0 : if rangeKeyIter != nil {
873 0 : for r := b.Reader(); ; {
874 0 : kind, key, value, ok, err := r.Next()
875 0 : if !ok {
876 0 : if err != nil {
877 0 : return nil, err
878 0 : }
879 0 : break
880 0 : } else if !rangekey.IsRangeKey(kind) {
881 0 : continue
882 : }
883 0 : ik := base.MakeInternalKey(key, 0, kind)
884 0 : if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
885 0 : return nil, err
886 0 : }
887 : }
888 0 : rangeKeyIter.Close()
889 0 : rangeKeyIter = nil
890 : }
891 :
892 0 : return collapsed, nil
893 : }
894 :
895 1 : func (o *ingestOp) formattedString(KeyFormat) string {
896 1 : var buf strings.Builder
897 1 : buf.WriteString(o.dbID.String())
898 1 : buf.WriteString(".Ingest(")
899 1 : for i, id := range o.batchIDs {
900 1 : if i > 0 {
901 1 : buf.WriteString(", ")
902 1 : }
903 1 : buf.WriteString(id.String())
904 : }
905 1 : buf.WriteString(")")
906 1 : return buf.String()
907 : }
908 :
909 0 : func (o *ingestOp) rewriteKeys(func(UserKey) UserKey) {}
910 0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
911 :
912 : type ingestAndExciseOp struct {
913 : dbID objID
914 : batchID objID
915 : derivedDBID objID
916 : exciseStart, exciseEnd UserKey
917 : }
918 :
919 1 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
920 1 : var err error
921 1 : b := t.getBatch(o.batchID)
922 1 : t.clearObj(o.batchID)
923 1 : if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
924 0 : panic("non-well-formed excise span")
925 : }
926 1 : db := t.getDB(o.dbID)
927 1 : if b.Empty() {
928 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat),
929 1 : o.simulateExcise(db, t))
930 1 : return
931 1 : }
932 :
933 1 : path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */)
934 1 : if err2 != nil {
935 0 : h.Recordf("Build(%s) // %v", o.batchID, err2)
936 0 : return
937 0 : }
938 1 : err = firstError(err, b.Close())
939 1 :
940 1 : if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
941 0 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), o.simulateExcise(db, t))
942 0 : return
943 0 : }
944 :
945 1 : if t.testOpts.useExcise {
946 1 : err = firstError(err, t.withRetries(func() error {
947 1 : _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{
948 1 : Start: o.exciseStart,
949 1 : End: o.exciseEnd,
950 1 : })
951 1 : return err
952 1 : }))
953 1 : } else {
954 1 : err = firstError(err, o.simulateExcise(db, t))
955 1 : err = firstError(err, t.withRetries(func() error {
956 1 : return db.Ingest(context.Background(), []string{path})
957 1 : }))
958 : }
959 :
960 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
961 : }
962 :
963 1 : func (o *ingestAndExciseOp) simulateExcise(db *pebble.DB, t *Test) error {
964 1 : // Simulate the excise using a DeleteRange and RangeKeyDelete.
965 1 : return errors.CombineErrors(
966 1 : db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts),
967 1 : db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts),
968 1 : )
969 1 : }
970 :
971 1 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
972 1 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
973 1 : // Ingest should not be concurrent with mutating the batches that will be
974 1 : // ingested as sstables.
975 1 : objs := []objID{o.batchID}
976 1 : if o.derivedDBID != o.dbID {
977 0 : objs = append(objs, o.derivedDBID)
978 0 : }
979 1 : return objs
980 : }
981 :
982 1 : func (o *ingestAndExciseOp) formattedString(kf KeyFormat) string {
983 1 : return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)",
984 1 : o.dbID, o.batchID, kf.FormatKey(o.exciseStart), kf.FormatKey(o.exciseEnd))
985 1 : }
986 :
987 0 : func (o *ingestAndExciseOp) rewriteKeys(fn func(UserKey) UserKey) {
988 0 : o.exciseStart = fn(o.exciseStart)
989 0 : o.exciseEnd = fn(o.exciseEnd)
990 0 : }
991 :
992 0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
993 0 : return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
994 0 : }
995 :
996 : // ingestExternalFilesOp models a DB.IngestExternalFiles operation.
997 : //
998 : // When remote storage is not enabled, the operation is emulated using the
999 : // regular DB.Ingest; this serves as a cross-check of the result.
1000 : type ingestExternalFilesOp struct {
1001 : dbID objID
1002 : // The bounds of the objects cannot overlap.
1003 : objs []externalObjWithBounds
1004 : }
1005 :
1006 : type externalObjWithBounds struct {
1007 : externalObjID objID
1008 :
1009 : // bounds for the external object. These bounds apply after keys undergo
1010 : // any prefix or suffix transforms.
1011 : bounds pebble.KeyRange
1012 :
1013 : syntheticPrefix sstable.SyntheticPrefix
1014 : syntheticSuffix sstable.SyntheticSuffix
1015 : }
1016 :
1017 1 : func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
1018 1 : db := t.getDB(o.dbID)
1019 1 :
1020 1 : // Verify the objects exist (useful for --try-to-reduce).
1021 1 : for i := range o.objs {
1022 1 : t.getExternalObj(o.objs[i].externalObjID)
1023 1 : }
1024 :
1025 1 : var err error
1026 1 : if !t.testOpts.externalStorageEnabled {
1027 1 : // Emulate the operation by crating local, truncated SST files and ingesting
1028 1 : // them.
1029 1 : var paths []string
1030 1 : for i, obj := range o.objs {
1031 1 : // Make sure the object exists and is not empty.
1032 1 : path, sstMeta := buildForIngestExternalEmulation(
1033 1 : t, o.dbID, obj.externalObjID, obj.bounds, obj.syntheticSuffix, obj.syntheticPrefix, i,
1034 1 : )
1035 1 : if sstMeta.HasPointKeys || sstMeta.HasRangeKeys || sstMeta.HasRangeDelKeys {
1036 1 : paths = append(paths, path)
1037 1 : }
1038 : }
1039 1 : if len(paths) > 0 {
1040 1 : err = db.Ingest(context.Background(), paths)
1041 1 : }
1042 0 : } else {
1043 0 : external := make([]pebble.ExternalFile, len(o.objs))
1044 0 : for i, obj := range o.objs {
1045 0 : meta := t.getExternalObj(obj.externalObjID)
1046 0 : external[i] = pebble.ExternalFile{
1047 0 : Locator: "external",
1048 0 : ObjName: externalObjName(obj.externalObjID),
1049 0 : Size: meta.sstMeta.Size,
1050 0 : StartKey: obj.bounds.Start,
1051 0 : EndKey: obj.bounds.End,
1052 0 : EndKeyIsInclusive: false,
1053 0 : // Note: if the table has point/range keys, we don't know for sure whether
1054 0 : // this particular range has any, but that's acceptable.
1055 0 : HasPointKey: meta.sstMeta.HasPointKeys || meta.sstMeta.HasRangeDelKeys,
1056 0 : HasRangeKey: meta.sstMeta.HasRangeKeys,
1057 0 : SyntheticSuffix: obj.syntheticSuffix,
1058 0 : }
1059 0 : if obj.syntheticPrefix.IsSet() {
1060 0 : external[i].SyntheticPrefix = obj.syntheticPrefix
1061 0 : }
1062 : }
1063 0 : _, err = db.IngestExternalFiles(context.Background(), external)
1064 : }
1065 :
1066 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1067 : }
1068 :
1069 1 : func (o *ingestExternalFilesOp) receiver() objID { return o.dbID }
1070 1 : func (o *ingestExternalFilesOp) syncObjs() objIDSlice {
1071 1 : res := make(objIDSlice, len(o.objs))
1072 1 : for i := range res {
1073 1 : res[i] = o.objs[i].externalObjID
1074 1 : }
1075 : // Deduplicate the IDs.
1076 1 : slices.Sort(res)
1077 1 : return slices.Compact(res)
1078 : }
1079 :
1080 1 : func (o *ingestExternalFilesOp) formattedString(kf KeyFormat) string {
1081 1 : strs := make([]string, len(o.objs))
1082 1 : for i, obj := range o.objs {
1083 1 : strs[i] = fmt.Sprintf("%s, %q /* start */, %q /* end */, %q /* syntheticSuffix */, %q /* syntheticPrefix */",
1084 1 : obj.externalObjID, kf.FormatKey(obj.bounds.Start), kf.FormatKey(obj.bounds.End),
1085 1 : kf.FormatKeySuffix(UserKeySuffix(obj.syntheticSuffix)), kf.FormatKey(UserKey(obj.syntheticPrefix)),
1086 1 : )
1087 1 : }
1088 1 : return fmt.Sprintf("%s.IngestExternalFiles(%s)", o.dbID, strings.Join(strs, ", "))
1089 : }
1090 :
1091 0 : func (o *ingestExternalFilesOp) rewriteKeys(fn func(UserKey) UserKey) {
1092 0 : // If any of the objects have synthetic prefixes, we can't allow
1093 0 : // modification of external object bounds.
1094 0 : allowModification := true
1095 0 : for i := range o.objs {
1096 0 : allowModification = allowModification && !o.objs[i].syntheticPrefix.IsSet()
1097 0 : }
1098 :
1099 0 : for i := range o.objs {
1100 0 : s, e := fn(o.objs[i].bounds.Start), fn(o.objs[i].bounds.End)
1101 0 : if allowModification {
1102 0 : o.objs[i].bounds.Start = s
1103 0 : o.objs[i].bounds.End = e
1104 0 : }
1105 : }
1106 : }
1107 :
1108 0 : func (o *ingestExternalFilesOp) diagramKeyRanges() []pebble.KeyRange {
1109 0 : ranges := make([]pebble.KeyRange, len(o.objs))
1110 0 : for i, obj := range o.objs {
1111 0 : ranges[i] = obj.bounds
1112 0 : }
1113 0 : return ranges
1114 : }
1115 :
1116 : // getOp models a Reader.Get operation.
1117 : type getOp struct {
1118 : readerID objID
1119 : key UserKey
1120 : derivedDBID objID
1121 : }
1122 :
1123 1 : func (o *getOp) run(t *Test, h historyRecorder) {
1124 1 : r := t.getReader(o.readerID)
1125 1 : var val []byte
1126 1 : var closer io.Closer
1127 1 : err := t.withRetries(func() (err error) {
1128 1 : val, closer, err = r.Get(o.key)
1129 1 : return err
1130 1 : })
1131 1 : h.Recordf("%s // [%q] %v", o.formattedString(t.testOpts.KeyFormat), val, err)
1132 1 : if closer != nil {
1133 1 : closer.Close()
1134 1 : }
1135 : }
1136 :
1137 1 : func (o *getOp) formattedString(kf KeyFormat) string {
1138 1 : return fmt.Sprintf("%s.Get(%q)", o.readerID, kf.FormatKey(o.key))
1139 1 : }
1140 1 : func (o *getOp) receiver() objID { return o.readerID }
1141 1 : func (o *getOp) syncObjs() objIDSlice {
1142 1 : if o.readerID.tag() == dbTag {
1143 1 : return nil
1144 1 : }
1145 : // batch.Get reads through to the current database state.
1146 1 : if o.derivedDBID != 0 {
1147 1 : return []objID{o.derivedDBID}
1148 1 : }
1149 0 : return nil
1150 : }
1151 :
1152 0 : func (o *getOp) rewriteKeys(fn func(UserKey) UserKey) {
1153 0 : o.key = fn(o.key)
1154 0 : }
1155 :
1156 0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
1157 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1158 0 : }
1159 :
1160 : // newIterOp models a Reader.NewIter operation.
1161 : type newIterOp struct {
1162 : readerID objID
1163 : iterID objID
1164 : iterOpts
1165 : derivedDBID objID
1166 : }
1167 :
1168 : // Enable this to enable debug logging of range key iterator operations.
1169 : const debugIterators = false
1170 :
1171 1 : func (o *newIterOp) run(t *Test, h historyRecorder) {
1172 1 : r := t.getReader(o.readerID)
1173 1 : opts := iterOptions(t.testOpts.KeyFormat, o.iterOpts)
1174 1 : if debugIterators {
1175 0 : opts.DebugRangeKeyStack = true
1176 0 : }
1177 :
1178 1 : var i *pebble.Iterator
1179 1 : for {
1180 1 : i, _ = r.NewIter(opts)
1181 1 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
1182 1 : break
1183 : }
1184 : // close this iter and retry NewIter
1185 0 : _ = i.Close()
1186 : }
1187 1 : t.setIter(o.iterID, i)
1188 1 :
1189 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1190 1 : // the user-provided bounds.
1191 1 : if opts != nil {
1192 1 : rand.Read(opts.LowerBound[:])
1193 1 : rand.Read(opts.UpperBound[:])
1194 1 : }
1195 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
1196 : }
1197 :
1198 1 : func (o *newIterOp) formattedString(kf KeyFormat) string {
1199 1 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
1200 1 : o.iterID, o.readerID, kf.FormatKey(o.lower), kf.FormatKey(o.upper),
1201 1 : o.keyTypes, kf.FormatKeySuffix(o.filterMin), kf.FormatKeySuffix(o.filterMax),
1202 1 : o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
1203 1 : }
1204 :
1205 1 : func (o *newIterOp) receiver() objID { return o.readerID }
1206 1 : func (o *newIterOp) syncObjs() objIDSlice {
1207 1 : // Prevent o.iterID ops from running before it exists.
1208 1 : objs := []objID{o.iterID}
1209 1 : // If reading through a batch or snapshot, the new iterator will also observe database
1210 1 : // state, and we must synchronize on the database state for a consistent
1211 1 : // view.
1212 1 : if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
1213 1 : objs = append(objs, o.derivedDBID)
1214 1 : }
1215 1 : return objs
1216 : }
1217 :
1218 0 : func (o *newIterOp) rewriteKeys(fn func(UserKey) UserKey) {
1219 0 : if o.lower != nil {
1220 0 : o.lower = fn(o.lower)
1221 0 : }
1222 0 : if o.upper != nil {
1223 0 : o.upper = fn(o.upper)
1224 0 : }
1225 : }
1226 :
1227 1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange {
1228 1 : var res []pebble.KeyRange
1229 1 : if o.lower != nil {
1230 0 : res = append(res, pebble.KeyRange{Start: o.lower, End: o.lower})
1231 0 : }
1232 1 : if o.upper != nil {
1233 0 : res = append(res, pebble.KeyRange{Start: o.upper, End: o.upper})
1234 0 : }
1235 1 : return res
1236 : }
1237 :
1238 : // newIterUsingCloneOp models a Iterator.Clone operation.
1239 : type newIterUsingCloneOp struct {
1240 : existingIterID objID
1241 : iterID objID
1242 : refreshBatch bool
1243 : iterOpts
1244 :
1245 : // derivedReaderID is the ID of the underlying reader that backs both the
1246 : // existing iterator and the new iterator. The derivedReaderID is NOT
1247 : // serialized by String and is derived from other operations during parse.
1248 : derivedReaderID objID
1249 : }
1250 :
1251 1 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
1252 1 : iter := t.getIter(o.existingIterID)
1253 1 : cloneOpts := pebble.CloneOptions{
1254 1 : IterOptions: iterOptions(t.testOpts.KeyFormat, o.iterOpts),
1255 1 : RefreshBatchView: o.refreshBatch,
1256 1 : }
1257 1 : i, err := iter.iter.Clone(cloneOpts)
1258 1 : if err != nil {
1259 0 : panic(err)
1260 : }
1261 1 : t.setIter(o.iterID, i)
1262 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
1263 : }
1264 :
1265 1 : func (o *newIterUsingCloneOp) formattedString(kf KeyFormat) string {
1266 1 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
1267 1 : o.iterID, o.existingIterID, o.refreshBatch, kf.FormatKey(o.lower),
1268 1 : kf.FormatKey(o.upper), o.keyTypes, kf.FormatKeySuffix(o.filterMin),
1269 1 : kf.FormatKeySuffix(o.filterMax), o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
1270 1 : }
1271 :
1272 1 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
1273 :
1274 1 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
1275 1 : objIDs := []objID{o.iterID}
1276 1 : // If the underlying reader is a batch, we must synchronize with the batch.
1277 1 : // If refreshBatch=true, synchronizing is necessary to observe all the
1278 1 : // mutations up to until this op and no more. Even when refreshBatch=false,
1279 1 : // we must synchronize because iterator construction may access state cached
1280 1 : // on the indexed batch to avoid refragmenting range tombstones or range
1281 1 : // keys.
1282 1 : if o.derivedReaderID.tag() == batchTag {
1283 0 : objIDs = append(objIDs, o.derivedReaderID)
1284 0 : }
1285 1 : return objIDs
1286 : }
1287 :
1288 0 : func (o *newIterUsingCloneOp) rewriteKeys(fn func(UserKey) UserKey) {}
1289 0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1290 :
1291 : // iterSetBoundsOp models an Iterator.SetBounds operation.
1292 : type iterSetBoundsOp struct {
1293 : iterID objID
1294 : lower UserKey
1295 : upper UserKey
1296 : }
1297 :
1298 1 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
1299 1 : i := t.getIter(o.iterID)
1300 1 : var lower, upper []byte
1301 1 : if o.lower != nil {
1302 1 : lower = append(lower, o.lower...)
1303 1 : }
1304 1 : if o.upper != nil {
1305 1 : upper = append(upper, o.upper...)
1306 1 : }
1307 1 : i.SetBounds(lower, upper)
1308 1 :
1309 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1310 1 : // the user-provided bounds.
1311 1 : rand.Read(lower[:])
1312 1 : rand.Read(upper[:])
1313 1 :
1314 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
1315 : }
1316 :
1317 1 : func (o *iterSetBoundsOp) formattedString(kf KeyFormat) string {
1318 1 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID,
1319 1 : kf.FormatKey(o.lower), kf.FormatKey(o.upper))
1320 1 : }
1321 :
1322 1 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
1323 1 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
1324 :
1325 0 : func (o *iterSetBoundsOp) rewriteKeys(fn func(UserKey) UserKey) {
1326 0 : o.lower = fn(o.lower)
1327 0 : o.upper = fn(o.upper)
1328 0 : }
1329 :
1330 0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
1331 0 : return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
1332 0 : }
1333 :
1334 : // iterSetOptionsOp models an Iterator.SetOptions operation.
1335 : type iterSetOptionsOp struct {
1336 : iterID objID
1337 : iterOpts
1338 :
1339 : // derivedReaderID is the ID of the underlying reader that backs the
1340 : // iterator. The derivedReaderID is NOT serialized by String and is derived
1341 : // from other operations during parse.
1342 : derivedReaderID objID
1343 : }
1344 :
1345 1 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
1346 1 : i := t.getIter(o.iterID)
1347 1 :
1348 1 : opts := iterOptions(t.testOpts.KeyFormat, o.iterOpts)
1349 1 : if opts == nil {
1350 0 : opts = &pebble.IterOptions{}
1351 0 : }
1352 1 : i.SetOptions(opts)
1353 1 :
1354 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1355 1 : // the user-provided bounds.
1356 1 : rand.Read(opts.LowerBound[:])
1357 1 : rand.Read(opts.UpperBound[:])
1358 1 :
1359 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), i.Error())
1360 : }
1361 :
1362 1 : func (o *iterSetOptionsOp) formattedString(kf KeyFormat) string {
1363 1 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %q, %q, %t /* use L6 filters */, %q /* masking suffix */)",
1364 1 : o.iterID, kf.FormatKey(o.lower), kf.FormatKey(o.upper),
1365 1 : o.keyTypes, kf.FormatKeySuffix(o.filterMin), kf.FormatKeySuffix(o.filterMax),
1366 1 : o.useL6Filters, kf.FormatKeySuffix(o.maskSuffix))
1367 1 : }
1368 :
1369 1 : func iterOptions(kf KeyFormat, o iterOpts) *pebble.IterOptions {
1370 1 : if o.IsZero() && !debugIterators {
1371 1 : return nil
1372 1 : }
1373 1 : var lower, upper []byte
1374 1 : if o.lower != nil {
1375 1 : lower = append(lower, o.lower...)
1376 1 : }
1377 1 : if o.upper != nil {
1378 1 : upper = append(upper, o.upper...)
1379 1 : }
1380 1 : opts := &pebble.IterOptions{
1381 1 : LowerBound: lower,
1382 1 : UpperBound: upper,
1383 1 : KeyTypes: pebble.IterKeyType(o.keyTypes),
1384 1 : RangeKeyMasking: pebble.RangeKeyMasking{
1385 1 : Suffix: o.maskSuffix,
1386 1 : },
1387 1 : UseL6Filters: o.useL6Filters,
1388 1 : DebugRangeKeyStack: debugIterators,
1389 1 : }
1390 1 : if opts.RangeKeyMasking.Suffix != nil {
1391 1 : opts.RangeKeyMasking.Filter = kf.NewSuffixFilterMask
1392 1 : }
1393 1 : if o.filterMax != nil {
1394 1 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
1395 1 : kf.NewSuffixBlockPropertyFilter(o.filterMin, o.filterMax),
1396 1 : }
1397 1 : // Enforce the timestamp bounds in SkipPoint, so that the iterator never
1398 1 : // returns a key outside the filterMin, filterMax bounds. This provides
1399 1 : // deterministic iteration.
1400 1 : opts.SkipPoint = func(k []byte) (skip bool) {
1401 1 : n := kf.Comparer.Split(k)
1402 1 : if n == len(k) {
1403 1 : // No suffix, don't skip it.
1404 1 : return false
1405 1 : }
1406 1 : if kf.Comparer.ComparePointSuffixes(k[n:], o.filterMin) < 0 {
1407 1 : return true
1408 1 : }
1409 1 : if kf.Comparer.ComparePointSuffixes(k[n:], o.filterMax) >= 0 {
1410 1 : return true
1411 1 : }
1412 0 : return false
1413 : }
1414 : }
1415 1 : return opts
1416 : }
1417 :
1418 1 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
1419 :
1420 1 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
1421 1 : if o.derivedReaderID.tag() == batchTag {
1422 0 : // If the underlying reader is a batch, we must synchronize with the
1423 0 : // batch so that we observe all the mutations up until this operation
1424 0 : // and no more.
1425 0 : return []objID{o.derivedReaderID}
1426 0 : }
1427 1 : return nil
1428 : }
1429 :
1430 0 : func (o *iterSetOptionsOp) rewriteKeys(fn func(UserKey) UserKey) {}
1431 0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1432 :
1433 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
1434 : type iterSeekGEOp struct {
1435 : iterID objID
1436 : key UserKey
1437 : limit UserKey
1438 :
1439 : derivedReaderID objID
1440 : }
1441 :
1442 1 : func iteratorPos(i *retryableIter) string {
1443 1 : var buf bytes.Buffer
1444 1 : fmt.Fprintf(&buf, "%q", i.Key())
1445 1 : hasPoint, hasRange := i.HasPointAndRange()
1446 1 : if hasPoint {
1447 1 : fmt.Fprintf(&buf, ",%q", i.Value())
1448 1 : } else {
1449 1 : fmt.Fprint(&buf, ",<no point>")
1450 1 : }
1451 1 : if hasRange {
1452 1 : start, end := i.RangeBounds()
1453 1 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
1454 1 : for i, rk := range i.RangeKeys() {
1455 1 : if i > 0 {
1456 1 : fmt.Fprint(&buf, ",")
1457 1 : }
1458 1 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
1459 : }
1460 1 : fmt.Fprint(&buf, "}")
1461 1 : } else {
1462 1 : fmt.Fprint(&buf, ",<no range>")
1463 1 : }
1464 1 : if i.RangeKeyChanged() {
1465 1 : fmt.Fprint(&buf, "*")
1466 1 : }
1467 1 : return buf.String()
1468 : }
1469 :
1470 1 : func validBoolToStr(valid bool) string {
1471 1 : return fmt.Sprintf("%t", valid)
1472 1 : }
1473 :
1474 1 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
1475 1 : // We can't distinguish between IterExhausted and IterAtLimit in a
1476 1 : // deterministic manner.
1477 1 : switch validity {
1478 1 : case pebble.IterExhausted, pebble.IterAtLimit:
1479 1 : return false, "invalid"
1480 1 : case pebble.IterValid:
1481 1 : return true, "valid"
1482 0 : default:
1483 0 : panic("unknown validity")
1484 : }
1485 : }
1486 :
1487 1 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
1488 1 : i := t.getIter(o.iterID)
1489 1 : var valid bool
1490 1 : var validStr string
1491 1 : if o.limit == nil {
1492 1 : valid = i.SeekGE(o.key)
1493 1 : validStr = validBoolToStr(valid)
1494 1 : } else {
1495 1 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
1496 1 : }
1497 1 : if valid {
1498 1 : h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1499 1 : validStr, iteratorPos(i), i.Error())
1500 1 : } else {
1501 1 : h.Recordf("%s // [%s] %v",
1502 1 : o.formattedString(t.testOpts.KeyFormat), validStr, i.Error())
1503 1 : }
1504 : }
1505 :
1506 1 : func (o *iterSeekGEOp) formattedString(kf KeyFormat) string {
1507 1 : return fmt.Sprintf("%s.SeekGE(%q, %q)",
1508 1 : o.iterID, kf.FormatKey(o.key), kf.FormatKey(o.limit))
1509 1 : }
1510 1 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
1511 1 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1512 :
1513 0 : func (o *iterSeekGEOp) rewriteKeys(fn func(UserKey) UserKey) {
1514 0 : o.key = fn(o.key)
1515 0 : }
1516 :
1517 1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
1518 1 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1519 1 : }
1520 :
1521 1 : func onlyBatchIDs(ids ...objID) objIDSlice {
1522 1 : var ret objIDSlice
1523 1 : for _, id := range ids {
1524 1 : if id.tag() == batchTag {
1525 1 : ret = append(ret, id)
1526 1 : }
1527 : }
1528 1 : return ret
1529 : }
1530 :
1531 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1532 : type iterSeekPrefixGEOp struct {
1533 : iterID objID
1534 : key UserKey
1535 :
1536 : derivedReaderID objID
1537 : }
1538 :
1539 1 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
1540 1 : i := t.getIter(o.iterID)
1541 1 : valid := i.SeekPrefixGE(o.key)
1542 1 : if valid {
1543 1 : h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1544 1 : valid, iteratorPos(i), i.Error())
1545 1 : } else {
1546 1 : h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat), valid, i.Error())
1547 1 : }
1548 : }
1549 :
1550 1 : func (o *iterSeekPrefixGEOp) formattedString(kf KeyFormat) string {
1551 1 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, kf.FormatKey(o.key))
1552 1 : }
1553 1 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1554 1 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1555 :
1556 0 : func (o *iterSeekPrefixGEOp) rewriteKeys(fn func(UserKey) UserKey) {
1557 0 : o.key = fn(o.key)
1558 0 : }
1559 :
1560 0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
1561 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1562 0 : }
1563 :
1564 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1565 : type iterSeekLTOp struct {
1566 : iterID objID
1567 : key UserKey
1568 : limit UserKey
1569 :
1570 : derivedReaderID objID
1571 : }
1572 :
1573 1 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
1574 1 : i := t.getIter(o.iterID)
1575 1 : var valid bool
1576 1 : var validStr string
1577 1 : if o.limit == nil {
1578 1 : valid = i.SeekLT(o.key)
1579 1 : validStr = validBoolToStr(valid)
1580 1 : } else {
1581 1 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1582 1 : }
1583 1 : if valid {
1584 1 : h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1585 1 : validStr, iteratorPos(i), i.Error())
1586 1 : } else {
1587 1 : h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
1588 1 : validStr, i.Error())
1589 1 : }
1590 : }
1591 :
1592 1 : func (o *iterSeekLTOp) formattedString(kf KeyFormat) string {
1593 1 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID,
1594 1 : kf.FormatKey(o.key), kf.FormatKey(o.limit))
1595 1 : }
1596 :
1597 1 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1598 1 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1599 :
1600 0 : func (o *iterSeekLTOp) rewriteKeys(fn func(UserKey) UserKey) {
1601 0 : o.key = fn(o.key)
1602 0 : }
1603 :
1604 0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
1605 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1606 0 : }
1607 :
1608 : // iterFirstOp models an Iterator.First operation.
1609 : type iterFirstOp struct {
1610 : iterID objID
1611 :
1612 : derivedReaderID objID
1613 : }
1614 :
1615 1 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
1616 1 : i := t.getIter(o.iterID)
1617 1 : valid := i.First()
1618 1 : if valid {
1619 1 : h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1620 1 : valid, iteratorPos(i), i.Error())
1621 1 : } else {
1622 1 : h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat),
1623 1 : valid, i.Error())
1624 1 : }
1625 : }
1626 :
1627 1 : func (o *iterFirstOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.First()", o.iterID) }
1628 1 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1629 1 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1630 :
1631 0 : func (o *iterFirstOp) rewriteKeys(func(UserKey) UserKey) {}
1632 0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1633 :
1634 : // iterLastOp models an Iterator.Last operation.
1635 : type iterLastOp struct {
1636 : iterID objID
1637 :
1638 : derivedReaderID objID
1639 : }
1640 :
1641 1 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
1642 1 : i := t.getIter(o.iterID)
1643 1 : valid := i.Last()
1644 1 : if valid {
1645 1 : h.Recordf("%s // [%t,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1646 1 : valid, iteratorPos(i), i.Error())
1647 1 : } else {
1648 1 : h.Recordf("%s // [%t] %v", o.formattedString(t.testOpts.KeyFormat),
1649 1 : valid, i.Error())
1650 1 : }
1651 : }
1652 :
1653 1 : func (o *iterLastOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Last()", o.iterID) }
1654 1 : func (o *iterLastOp) receiver() objID { return o.iterID }
1655 1 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1656 :
1657 0 : func (o *iterLastOp) rewriteKeys(func(UserKey) UserKey) {}
1658 0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1659 :
1660 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1661 : type iterNextOp struct {
1662 : iterID objID
1663 : limit UserKey
1664 :
1665 : derivedReaderID objID
1666 : }
1667 :
1668 1 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
1669 1 : i := t.getIter(o.iterID)
1670 1 : var valid bool
1671 1 : var validStr string
1672 1 : if o.limit == nil {
1673 1 : valid = i.Next()
1674 1 : validStr = validBoolToStr(valid)
1675 1 : } else {
1676 1 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1677 1 : }
1678 1 : if valid {
1679 1 : h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1680 1 : validStr, iteratorPos(i), i.Error())
1681 1 : } else {
1682 1 : h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
1683 1 : validStr, i.Error())
1684 1 : }
1685 : }
1686 :
1687 1 : func (o *iterNextOp) formattedString(kf KeyFormat) string {
1688 1 : return fmt.Sprintf("%s.Next(%q)", o.iterID, kf.FormatKey(o.limit))
1689 1 : }
1690 1 : func (o *iterNextOp) receiver() objID { return o.iterID }
1691 1 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1692 :
1693 0 : func (o *iterNextOp) rewriteKeys(func(UserKey) UserKey) {}
1694 0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1695 :
1696 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1697 : type iterNextPrefixOp struct {
1698 : iterID objID
1699 :
1700 : derivedReaderID objID
1701 : }
1702 :
1703 1 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
1704 1 : i := t.getIter(o.iterID)
1705 1 : valid := i.NextPrefix()
1706 1 : validStr := validBoolToStr(valid)
1707 1 : if valid {
1708 1 : h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1709 1 : validStr, iteratorPos(i), i.Error())
1710 1 : } else {
1711 1 : h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
1712 1 : validStr, i.Error())
1713 1 : }
1714 : }
1715 :
1716 1 : func (o *iterNextPrefixOp) formattedString(KeyFormat) string {
1717 1 : return fmt.Sprintf("%s.NextPrefix()", o.iterID)
1718 1 : }
1719 1 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1720 1 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1721 :
1722 0 : func (o *iterNextPrefixOp) rewriteKeys(func(UserKey) UserKey) {}
1723 0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1724 :
1725 : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
1726 : // Iterator.
1727 : type iterCanSingleDelOp struct {
1728 : iterID objID
1729 :
1730 : derivedReaderID objID
1731 : }
1732 :
1733 1 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
1734 1 : // TODO(jackson): When we perform error injection, we'll need to rethink
1735 1 : // this.
1736 1 : _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
1737 1 : // The return value of CanDeterministicallySingleDelete is dependent on
1738 1 : // internal LSM state and non-deterministic, so we don't record it.
1739 1 : // Including the operation within the metamorphic test at all helps ensure
1740 1 : // that it does not change the result of any other Iterator operation that
1741 1 : // should be deterministic, regardless of its own outcome.
1742 1 : //
1743 1 : // We still record the value of the error because it's deterministic, at
1744 1 : // least for now. The possible error cases are:
1745 1 : // - The iterator was already in an error state when the operation ran.
1746 1 : // - The operation is deterministically invalid (like using an InternalNext
1747 1 : // to change directions.)
1748 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1749 1 : }
1750 :
1751 1 : func (o *iterCanSingleDelOp) formattedString(KeyFormat) string {
1752 1 : return fmt.Sprintf("%s.InternalNext()", o.iterID)
1753 1 : }
1754 1 : func (o *iterCanSingleDelOp) receiver() objID { return o.iterID }
1755 1 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1756 :
1757 0 : func (o *iterCanSingleDelOp) rewriteKeys(func(UserKey) UserKey) {}
1758 0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1759 :
1760 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1761 : type iterPrevOp struct {
1762 : iterID objID
1763 : limit []byte
1764 :
1765 : derivedReaderID objID
1766 : }
1767 :
1768 1 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
1769 1 : i := t.getIter(o.iterID)
1770 1 : var valid bool
1771 1 : var validStr string
1772 1 : if o.limit == nil {
1773 1 : valid = i.Prev()
1774 1 : validStr = validBoolToStr(valid)
1775 1 : } else {
1776 1 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1777 1 : }
1778 1 : if valid {
1779 1 : h.Recordf("%s // [%s,%s] %v", o.formattedString(t.testOpts.KeyFormat),
1780 1 : validStr, iteratorPos(i), i.Error())
1781 1 : } else {
1782 1 : h.Recordf("%s // [%s] %v", o.formattedString(t.testOpts.KeyFormat),
1783 1 : validStr, i.Error())
1784 1 : }
1785 : }
1786 :
1787 1 : func (o *iterPrevOp) formattedString(kf KeyFormat) string {
1788 1 : return fmt.Sprintf("%s.Prev(%q)", o.iterID, kf.FormatKey(o.limit))
1789 1 : }
1790 1 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1791 1 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1792 :
1793 0 : func (o *iterPrevOp) rewriteKeys(func(UserKey) UserKey) {}
1794 0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1795 :
1796 : // newSnapshotOp models a DB.NewSnapshot operation.
1797 : type newSnapshotOp struct {
1798 : dbID objID
1799 : snapID objID
1800 : // If nonempty, this snapshot must not be used to read any keys outside of
1801 : // the provided bounds. This allows some implementations to use 'Eventually
1802 : // file-only snapshots,' which require bounds.
1803 : bounds []pebble.KeyRange
1804 : }
1805 :
1806 1 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
1807 1 : bounds := o.bounds
1808 1 : if len(bounds) == 0 {
1809 0 : panic("bounds unexpectedly unset for newSnapshotOp")
1810 : }
1811 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1812 1 : createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
1813 1 : // If either of these options is true, an EFOS _must_ be created, regardless
1814 1 : // of what the fibonacci hash returned.
1815 1 : excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExternalReplicate || t.testOpts.useExcise
1816 1 : if createEfos || excisePossible {
1817 1 : s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
1818 1 : t.setSnapshot(o.snapID, s)
1819 1 : } else {
1820 1 : s := t.getDB(o.dbID).NewSnapshot()
1821 1 : t.setSnapshot(o.snapID, s)
1822 1 : }
1823 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
1824 : }
1825 :
1826 1 : func (o *newSnapshotOp) formattedString(kf KeyFormat) string {
1827 1 : var buf bytes.Buffer
1828 1 : fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
1829 1 : for i := range o.bounds {
1830 1 : if i > 0 {
1831 1 : fmt.Fprint(&buf, ", ")
1832 1 : }
1833 1 : fmt.Fprintf(&buf, "%q, %q", kf.FormatKey(o.bounds[i].Start), kf.FormatKey(o.bounds[i].End))
1834 : }
1835 1 : fmt.Fprint(&buf, ")")
1836 1 : return buf.String()
1837 : }
1838 1 : func (o *newSnapshotOp) receiver() objID { return o.dbID }
1839 1 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1840 :
1841 1 : func (o *newSnapshotOp) rewriteKeys(fn func(UserKey) UserKey) {
1842 1 : for i := range o.bounds {
1843 1 : o.bounds[i].Start = fn(o.bounds[i].Start)
1844 1 : o.bounds[i].End = fn(o.bounds[i].End)
1845 1 : }
1846 : }
1847 :
1848 1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
1849 1 : return o.bounds
1850 1 : }
1851 :
1852 : // newExternalObjOp models a DB.NewExternalObj operation.
1853 : type newExternalObjOp struct {
1854 : batchID objID
1855 : externalObjID objID
1856 : }
1857 :
1858 1 : func externalObjName(externalObjID objID) string {
1859 1 : if externalObjID.tag() != externalObjTag {
1860 0 : panic(fmt.Sprintf("invalid externalObjID %s", externalObjID))
1861 : }
1862 1 : return fmt.Sprintf("external-for-ingest-%d.sst", externalObjID.slot())
1863 : }
1864 :
1865 1 : func (o *newExternalObjOp) run(t *Test, h historyRecorder) {
1866 1 : b := t.getBatch(o.batchID)
1867 1 : t.clearObj(o.batchID)
1868 1 :
1869 1 : writeCloser, err := t.externalStorage.CreateObject(externalObjName(o.externalObjID))
1870 1 : if err != nil {
1871 0 : panic(err)
1872 : }
1873 1 : writable := objstorageprovider.NewRemoteWritable(writeCloser)
1874 1 :
1875 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
1876 1 :
1877 1 : sstMeta, err := writeSSTForIngestion(
1878 1 : t,
1879 1 : iter, rangeDelIter, rangeKeyIter,
1880 1 : true, /* uniquePrefixes */
1881 1 : nil, /* syntheticSuffix */
1882 1 : nil, /* syntheticPrefix */
1883 1 : writable,
1884 1 : t.minFMV(),
1885 1 : )
1886 1 : if err != nil {
1887 0 : panic(err)
1888 : }
1889 1 : if !sstMeta.HasPointKeys && !sstMeta.HasRangeDelKeys && !sstMeta.HasRangeKeys {
1890 0 : // This can occur when using --try-to-reduce.
1891 0 : panic("metamorphic test internal error: external object empty")
1892 : }
1893 1 : t.setExternalObj(o.externalObjID, externalObjMeta{
1894 1 : sstMeta: sstMeta,
1895 1 : })
1896 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
1897 : }
1898 :
1899 1 : func (o *newExternalObjOp) formattedString(KeyFormat) string {
1900 1 : return fmt.Sprintf("%s = %s.NewExternalObj()", o.externalObjID, o.batchID)
1901 1 : }
1902 1 : func (o *newExternalObjOp) receiver() objID { return o.batchID }
1903 1 : func (o *newExternalObjOp) syncObjs() objIDSlice { return []objID{o.externalObjID} }
1904 :
1905 0 : func (o *newExternalObjOp) rewriteKeys(func(UserKey) UserKey) {}
1906 0 : func (o *newExternalObjOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1907 :
1908 : type dbRatchetFormatMajorVersionOp struct {
1909 : dbID objID
1910 : vers pebble.FormatMajorVersion
1911 : }
1912 :
1913 1 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
1914 1 : var err error
1915 1 : // NB: We no-op the operation if we're already at or above the provided
1916 1 : // format major version. Different runs start at different format major
1917 1 : // versions, making the presence of an error and the error message itself
1918 1 : // non-deterministic if we attempt to upgrade to an older version.
1919 1 : //
1920 1 : //Regardless, subsequent operations should behave identically, which is what
1921 1 : //we're really aiming to test by including this format major version ratchet
1922 1 : //operation.
1923 1 : if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
1924 1 : err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
1925 1 : }
1926 1 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1927 : }
1928 :
1929 1 : func (o *dbRatchetFormatMajorVersionOp) formattedString(KeyFormat) string {
1930 1 : return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
1931 1 : }
1932 1 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID }
1933 1 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1934 :
1935 0 : func (o *dbRatchetFormatMajorVersionOp) rewriteKeys(func(UserKey) UserKey) {}
1936 0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1937 :
1938 : type dbRestartOp struct {
1939 : dbID objID
1940 :
1941 : // affectedObjects is the list of additional objects that are affected by this
1942 : // operation, and which syncObjs() must return so that we don't perform the
1943 : // restart in parallel with other operations to affected objects.
1944 : affectedObjects []objID
1945 : }
1946 :
1947 1 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1948 1 : if err := t.restartDB(o.dbID); err != nil {
1949 0 : h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1950 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1951 1 : } else {
1952 1 : h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
1953 1 : }
1954 : }
1955 :
1956 1 : func (o *dbRestartOp) formattedString(KeyFormat) string { return fmt.Sprintf("%s.Restart()", o.dbID) }
1957 1 : func (o *dbRestartOp) receiver() objID { return o.dbID }
1958 1 : func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
1959 :
1960 0 : func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {}
1961 0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1962 :
1963 1 : func formatOps(kf KeyFormat, ops []op) string {
1964 1 : var buf strings.Builder
1965 1 : for _, op := range ops {
1966 1 : fmt.Fprintf(&buf, "%s\n", op.formattedString(kf))
1967 1 : }
1968 1 : return buf.String()
1969 : }
1970 :
1971 : // replicateOp models an operation that could copy keys from one db to
1972 : // another through either an IngestAndExcise, or an Ingest.
1973 : type replicateOp struct {
1974 : source, dest objID
1975 : start, end UserKey
1976 : }
1977 :
1978 : func (r *replicateOp) runSharedReplicate(
1979 : t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
1980 0 : ) {
1981 0 : var sharedSSTs []pebble.SharedSSTMeta
1982 0 : var err error
1983 0 : err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
1984 0 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
1985 0 : val, _, err := value.Value(nil)
1986 0 : if err != nil {
1987 0 : panic(err)
1988 : }
1989 0 : return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
1990 : },
1991 0 : func(start, end []byte, seqNum base.SeqNum) error {
1992 0 : return w.DeleteRange(start, end)
1993 0 : },
1994 0 : func(start, end []byte, keys []keyspan.Key) error {
1995 0 : return w.Raw().EncodeSpan(keyspan.Span{
1996 0 : Start: start,
1997 0 : End: end,
1998 0 : Keys: keys,
1999 0 : })
2000 0 : },
2001 0 : func(sst *pebble.SharedSSTMeta) error {
2002 0 : sharedSSTs = append(sharedSSTs, *sst)
2003 0 : return nil
2004 0 : },
2005 : nil,
2006 : )
2007 0 : if err != nil {
2008 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2009 0 : return
2010 0 : }
2011 :
2012 0 : err = w.Close()
2013 0 : if err != nil {
2014 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2015 0 : return
2016 0 : }
2017 0 : meta, err := w.Raw().Metadata()
2018 0 : if err != nil {
2019 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2020 0 : return
2021 0 : }
2022 0 : if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
2023 0 : // IngestAndExcise below will be a no-op. We should do a
2024 0 : // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
2025 0 : // case.
2026 0 : //
2027 0 : // TODO(bilal): Remove this when we support excises with no matching ingests.
2028 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
2029 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2030 0 : return
2031 0 : }
2032 0 : err := dest.DeleteRange(r.start, r.end, t.writeOpts)
2033 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2034 0 : return
2035 : }
2036 :
2037 0 : _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end})
2038 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2039 : }
2040 :
2041 : func (r *replicateOp) runExternalReplicate(
2042 : t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
2043 0 : ) {
2044 0 : var externalSSTs []pebble.ExternalFile
2045 0 : var err error
2046 0 : err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
2047 0 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
2048 0 : val, _, err := value.Value(nil)
2049 0 : if err != nil {
2050 0 : panic(err)
2051 : }
2052 0 : return w.Raw().AddWithForceObsolete(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
2053 : },
2054 0 : func(start, end []byte, seqNum base.SeqNum) error {
2055 0 : return w.DeleteRange(start, end)
2056 0 : },
2057 0 : func(start, end []byte, keys []keyspan.Key) error {
2058 0 : return w.Raw().EncodeSpan(keyspan.Span{
2059 0 : Start: start,
2060 0 : End: end,
2061 0 : Keys: keys,
2062 0 : })
2063 0 : },
2064 : nil,
2065 0 : func(sst *pebble.ExternalFile) error {
2066 0 : externalSSTs = append(externalSSTs, *sst)
2067 0 : return nil
2068 0 : },
2069 : )
2070 0 : if err != nil {
2071 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2072 0 : return
2073 0 : }
2074 :
2075 0 : err = w.Close()
2076 0 : if err != nil {
2077 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2078 0 : return
2079 0 : }
2080 0 : meta, err := w.Raw().Metadata()
2081 0 : if err != nil {
2082 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2083 0 : return
2084 0 : }
2085 0 : if len(externalSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
2086 0 : // IngestAndExcise below will be a no-op. We should do a
2087 0 : // DeleteRange+RangeKeyDel to mimic the behaviour of the non-external-replicate
2088 0 : // case.
2089 0 : //
2090 0 : // TODO(bilal): Remove this when we support excises with no matching ingests.
2091 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
2092 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2093 0 : return
2094 0 : }
2095 0 : err := dest.DeleteRange(r.start, r.end, t.writeOpts)
2096 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2097 0 : return
2098 : }
2099 :
2100 0 : _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end})
2101 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2102 : }
2103 :
2104 0 : func (r *replicateOp) run(t *Test, h historyRecorder) {
2105 0 : // Shared replication only works if shared storage is enabled.
2106 0 : useSharedIngest := t.testOpts.useSharedReplicate && t.testOpts.sharedStorageEnabled
2107 0 : useExternalIngest := t.testOpts.useExternalReplicate && t.testOpts.externalStorageEnabled
2108 0 :
2109 0 : source := t.getDB(r.source)
2110 0 : dest := t.getDB(r.dest)
2111 0 : sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
2112 0 : f, err := t.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
2113 0 : if err != nil {
2114 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2115 0 : return
2116 0 : }
2117 0 : w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.TableFormat()))
2118 0 :
2119 0 : // NB: In practice we'll either do shared replicate or external replicate,
2120 0 : // as ScanInternal does not support both. We arbitrarily choose to prioritize
2121 0 : // external replication if both are enabled, as those are likely to hit
2122 0 : // widespread usage first.
2123 0 : if useExternalIngest {
2124 0 : r.runExternalReplicate(t, h, source, dest, w, sstPath)
2125 0 : return
2126 0 : }
2127 0 : if useSharedIngest {
2128 0 : r.runSharedReplicate(t, h, source, dest, w, sstPath)
2129 0 : return
2130 0 : }
2131 :
2132 : // First, do a RangeKeyDelete and DeleteRange on the whole span.
2133 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
2134 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2135 0 : return
2136 0 : }
2137 0 : if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
2138 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2139 0 : return
2140 0 : }
2141 0 : iter, err := source.NewIter(&pebble.IterOptions{
2142 0 : LowerBound: r.start,
2143 0 : UpperBound: r.end,
2144 0 : KeyTypes: pebble.IterKeyTypePointsAndRanges,
2145 0 : })
2146 0 : if err != nil {
2147 0 : panic(err)
2148 : }
2149 0 : defer iter.Close()
2150 0 :
2151 0 : for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
2152 0 : hasPoint, hasRange := iter.HasPointAndRange()
2153 0 : if hasPoint {
2154 0 : val, err := iter.ValueAndErr()
2155 0 : if err != nil {
2156 0 : panic(err)
2157 : }
2158 0 : if err := w.Set(iter.Key(), val); err != nil {
2159 0 : panic(err)
2160 : }
2161 : }
2162 0 : if hasRange && iter.RangeKeyChanged() {
2163 0 : rangeKeys := iter.RangeKeys()
2164 0 : rkStart, rkEnd := iter.RangeBounds()
2165 0 :
2166 0 : span := keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
2167 0 : for i := range rangeKeys {
2168 0 : span.Keys[i] = keyspan.Key{
2169 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
2170 0 : Suffix: rangeKeys[i].Suffix,
2171 0 : Value: rangeKeys[i].Value,
2172 0 : }
2173 0 : }
2174 0 : keyspan.SortKeysByTrailer(span.Keys)
2175 0 : if err := w.Raw().EncodeSpan(span); err != nil {
2176 0 : panic(err)
2177 : }
2178 : }
2179 : }
2180 0 : if err := iter.Error(); err != nil {
2181 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2182 0 : return
2183 0 : }
2184 0 : if err := w.Close(); err != nil {
2185 0 : panic(err)
2186 : }
2187 :
2188 0 : err = dest.Ingest(context.Background(), []string{sstPath})
2189 0 : h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
2190 : }
2191 :
2192 1 : func (r *replicateOp) formattedString(kf KeyFormat) string {
2193 1 : return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest,
2194 1 : kf.FormatKey(r.start), kf.FormatKey(r.end))
2195 1 : }
2196 :
2197 0 : func (r *replicateOp) receiver() objID { return r.source }
2198 0 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
2199 :
2200 0 : func (r *replicateOp) rewriteKeys(fn func(UserKey) UserKey) {
2201 0 : r.start = fn(r.start)
2202 0 : r.end = fn(r.end)
2203 0 : }
2204 :
2205 1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
2206 1 : return []pebble.KeyRange{{Start: r.start, End: r.end}}
2207 1 : }
|