Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "crypto/rand"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "path"
15 : "path/filepath"
16 : "slices"
17 : "strings"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble"
21 : "github.com/cockroachdb/pebble/internal/base"
22 : "github.com/cockroachdb/pebble/internal/keyspan"
23 : "github.com/cockroachdb/pebble/internal/private"
24 : "github.com/cockroachdb/pebble/internal/rangekey"
25 : "github.com/cockroachdb/pebble/internal/testkeys"
26 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
27 : "github.com/cockroachdb/pebble/sstable"
28 : "github.com/cockroachdb/pebble/vfs"
29 : "github.com/cockroachdb/pebble/vfs/errorfs"
30 : )
31 :
32 : // Ops holds a sequence of operations to be executed by the metamorphic tests.
33 : type Ops []op
34 :
35 : // op defines the interface for a single operation, such as creating a batch,
36 : // or advancing an iterator.
37 : type op interface {
38 : String() string
39 :
40 : run(t *Test, h historyRecorder)
41 :
42 : // receiver returns the object ID of the object the operation is performed
43 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
44 : // its receiver). Receivers are used for synchronization when running with
45 : // concurrency.
46 : receiver() objID
47 :
48 : // syncObjs returns an additional set of object IDs—excluding the
49 : // receiver—that the operation must synchronize with. At execution time,
50 : // the operation will run serially with respect to all other operations
51 : // that return these objects from their own syncObjs or receiver methods.
52 : syncObjs() objIDSlice
53 :
54 : // keys returns all user keys used by the operation, as pointers to slices.
55 : // The caller can then modify these slices to rewrite the keys.
56 : //
57 : // Used for simplification of operations for easier investigations.
58 : keys() []*[]byte
59 :
60 : // diagramKeyRanges() returns key spans associated with this operation, to be
61 : // shown on an ASCII diagram of operations.
62 : diagramKeyRanges() []pebble.KeyRange
63 : }
64 :
65 : // initOp performs test initialization
66 : type initOp struct {
67 : dbSlots uint32
68 : batchSlots uint32
69 : iterSlots uint32
70 : snapshotSlots uint32
71 : externalObjSlots uint32
72 : }
73 :
74 2 : func (o *initOp) run(t *Test, h historyRecorder) {
75 2 : t.batches = make([]*pebble.Batch, o.batchSlots)
76 2 : t.iters = make([]*retryableIter, o.iterSlots)
77 2 : t.snapshots = make([]readerCloser, o.snapshotSlots)
78 2 : t.externalObjs = make([]externalObjMeta, o.externalObjSlots)
79 2 : h.Recordf("%s", o)
80 2 : }
81 :
82 2 : func (o *initOp) String() string {
83 2 : return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */, %d /* externalObjs */)",
84 2 : o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots, o.externalObjSlots)
85 2 : }
86 :
87 2 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
88 2 : func (o *initOp) syncObjs() objIDSlice {
89 2 : syncObjs := make([]objID, 0)
90 2 : // Add any additional DBs to syncObjs.
91 2 : for i := uint32(2); i < o.dbSlots+1; i++ {
92 1 : syncObjs = append(syncObjs, makeObjID(dbTag, i))
93 1 : }
94 2 : return syncObjs
95 : }
96 :
97 0 : func (o *initOp) keys() []*[]byte { return nil }
98 1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
99 :
100 : // applyOp models a Writer.Apply operation.
101 : type applyOp struct {
102 : writerID objID
103 : batchID objID
104 : }
105 :
106 2 : func (o *applyOp) run(t *Test, h historyRecorder) {
107 2 : b := t.getBatch(o.batchID)
108 2 : w := t.getWriter(o.writerID)
109 2 : var err error
110 2 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
111 1 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
112 1 : if err == nil {
113 1 : err = b.SyncWait()
114 1 : }
115 2 : } else {
116 2 : err = w.Apply(b, t.writeOpts)
117 2 : }
118 2 : h.Recordf("%s // %v", o, err)
119 : // batch will be closed by a closeOp which is guaranteed to be generated
120 : }
121 :
122 2 : func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
123 2 : func (o *applyOp) receiver() objID { return o.writerID }
124 2 : func (o *applyOp) syncObjs() objIDSlice {
125 2 : // Apply should not be concurrent with operations that are mutating the
126 2 : // batch.
127 2 : return []objID{o.batchID}
128 2 : }
129 :
130 0 : func (o *applyOp) keys() []*[]byte { return nil }
131 0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
132 :
133 : // checkpointOp models a DB.Checkpoint operation.
134 : type checkpointOp struct {
135 : dbID objID
136 : // If non-empty, the checkpoint is restricted to these spans.
137 : spans []pebble.CheckpointSpan
138 : }
139 :
140 2 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
141 2 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
142 2 : // It would be better to filter out ahead of calling run on the op,
143 2 : // by setting the weight that generator.go uses to zero, or similar.
144 2 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
145 2 : // not sure how to do that easily:
146 2 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
147 2 : if t.testOpts.sharedStorageEnabled || t.testOpts.externalStorageEnabled {
148 2 : h.Recordf("%s // %v", o, nil)
149 2 : return
150 2 : }
151 2 : var opts []pebble.CheckpointOption
152 2 : if len(o.spans) > 0 {
153 2 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
154 2 : }
155 2 : db := t.getDB(o.dbID)
156 2 : err := t.withRetries(func() error {
157 2 : return db.Checkpoint(o.dir(t.dir, h.op), opts...)
158 2 : })
159 2 : h.Recordf("%s // %v", o, err)
160 : }
161 :
162 2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
163 2 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
164 2 : }
165 :
166 2 : func (o *checkpointOp) String() string {
167 2 : var spanStr bytes.Buffer
168 2 : for i, span := range o.spans {
169 2 : if i > 0 {
170 2 : spanStr.WriteString(",")
171 2 : }
172 2 : fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
173 : }
174 2 : return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
175 : }
176 :
177 2 : func (o *checkpointOp) receiver() objID { return o.dbID }
178 2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
179 :
180 0 : func (o *checkpointOp) keys() []*[]byte {
181 0 : var res []*[]byte
182 0 : for i := range o.spans {
183 0 : res = append(res, &o.spans[i].Start, &o.spans[i].End)
184 0 : }
185 0 : return res
186 : }
187 :
188 0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
189 0 : var res []pebble.KeyRange
190 0 : for i := range o.spans {
191 0 : res = append(res, pebble.KeyRange{
192 0 : Start: o.spans[i].Start,
193 0 : End: o.spans[i].End,
194 0 : })
195 0 : }
196 0 : return res
197 : }
198 :
199 : // downloadOp models a DB.Download operation.
200 : type downloadOp struct {
201 : dbID objID
202 : spans []pebble.DownloadSpan
203 : }
204 :
205 2 : func (o *downloadOp) run(t *Test, h historyRecorder) {
206 2 : db := t.getDB(o.dbID)
207 2 : err := t.withRetries(func() error {
208 2 : return db.Download(context.Background(), o.spans)
209 2 : })
210 2 : h.Recordf("%s // %v", o, err)
211 : }
212 :
213 2 : func (o *downloadOp) String() string {
214 2 : var spanStr bytes.Buffer
215 2 : for i, span := range o.spans {
216 2 : if i > 0 {
217 2 : spanStr.WriteString(", ")
218 2 : }
219 2 : fmt.Fprintf(&spanStr, "%q /* start */, %q /* end */, %v /* viaBackingFileDownload */",
220 2 : span.StartKey, span.EndKey, span.ViaBackingFileDownload)
221 : }
222 2 : return fmt.Sprintf("%s.Download(%s)", o.dbID, spanStr.String())
223 : }
224 :
225 2 : func (o *downloadOp) receiver() objID { return o.dbID }
226 2 : func (o downloadOp) syncObjs() objIDSlice { return nil }
227 :
228 0 : func (o *downloadOp) keys() []*[]byte {
229 0 : var res []*[]byte
230 0 : for i := range o.spans {
231 0 : res = append(res, &o.spans[i].StartKey, &o.spans[i].EndKey)
232 0 : }
233 0 : return res
234 : }
235 :
236 0 : func (o *downloadOp) diagramKeyRanges() []pebble.KeyRange {
237 0 : var res []pebble.KeyRange
238 0 : for i := range o.spans {
239 0 : res = append(res, pebble.KeyRange{
240 0 : Start: o.spans[i].StartKey,
241 0 : End: o.spans[i].EndKey,
242 0 : })
243 0 : }
244 0 : return res
245 : }
246 :
247 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
248 : type closeOp struct {
249 : objID objID
250 :
251 : // affectedObjects is the list of additional objects that are affected by this
252 : // operation, and which syncObjs() must return so that we don't perform the
253 : // close in parallel with other operations to affected objects.
254 : affectedObjects []objID
255 : }
256 :
257 2 : func (o *closeOp) run(t *Test, h historyRecorder) {
258 2 : c := t.getCloser(o.objID)
259 2 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
260 2 : // Special case: If WAL is disabled, do a flush right before DB Close. This
261 2 : // allows us to reuse this run's data directory as initial state for
262 2 : // future runs without losing any mutations.
263 2 : _ = t.getDB(o.objID).Flush()
264 2 : }
265 2 : t.clearObj(o.objID)
266 2 : err := c.Close()
267 2 : h.Recordf("%s // %v", o, err)
268 : }
269 :
270 2 : func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
271 2 : func (o *closeOp) receiver() objID { return o.objID }
272 2 : func (o *closeOp) syncObjs() objIDSlice {
273 2 : return o.affectedObjects
274 2 : }
275 :
276 0 : func (o *closeOp) keys() []*[]byte { return nil }
277 0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
278 :
279 : // compactOp models a DB.Compact operation.
280 : type compactOp struct {
281 : dbID objID
282 : start []byte
283 : end []byte
284 : parallelize bool
285 : }
286 :
287 2 : func (o *compactOp) run(t *Test, h historyRecorder) {
288 2 : err := t.withRetries(func() error {
289 2 : return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
290 2 : })
291 2 : h.Recordf("%s // %v", o, err)
292 : }
293 :
294 2 : func (o *compactOp) String() string {
295 2 : return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
296 2 : }
297 :
298 2 : func (o *compactOp) receiver() objID { return o.dbID }
299 2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
300 :
301 1 : func (o *compactOp) keys() []*[]byte {
302 1 : return []*[]byte{&o.start, &o.end}
303 1 : }
304 :
305 1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
306 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
307 1 : }
308 :
309 : // deleteOp models a Write.Delete operation.
310 : type deleteOp struct {
311 : writerID objID
312 : key []byte
313 :
314 : derivedDBID objID
315 : }
316 :
317 2 : func (o *deleteOp) run(t *Test, h historyRecorder) {
318 2 : w := t.getWriter(o.writerID)
319 2 : var err error
320 2 : if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
321 2 : // Call DeleteSized with a deterministic size derived from the index.
322 2 : // The size does not need to be accurate for correctness.
323 2 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
324 2 : } else {
325 2 : err = w.Delete(o.key, t.writeOpts)
326 2 : }
327 2 : h.Recordf("%s // %v", o, err)
328 : }
329 :
330 2 : func hashSize(index int) uint32 {
331 2 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
332 2 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
333 2 : }
334 :
335 2 : func (o *deleteOp) String() string {
336 2 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
337 2 : }
338 2 : func (o *deleteOp) receiver() objID { return o.writerID }
339 2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
340 :
341 0 : func (o *deleteOp) keys() []*[]byte {
342 0 : return []*[]byte{&o.key}
343 0 : }
344 :
345 0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
346 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
347 0 : }
348 :
349 : // singleDeleteOp models a Write.SingleDelete operation.
350 : type singleDeleteOp struct {
351 : writerID objID
352 : key []byte
353 : maybeReplaceDelete bool
354 : }
355 :
356 2 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
357 2 : w := t.getWriter(o.writerID)
358 2 : var err error
359 2 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
360 2 : err = w.Delete(o.key, t.writeOpts)
361 2 : } else {
362 2 : err = w.SingleDelete(o.key, t.writeOpts)
363 2 : }
364 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
365 : // write the former to the history log. The log line will indicate whether
366 : // or not the delete *could* have been replaced. The OPTIONS file should
367 : // also be consulted to determine what happened at runtime (i.e. by taking
368 : // the logical AND).
369 2 : h.Recordf("%s // %v", o, err)
370 : }
371 :
372 2 : func (o *singleDeleteOp) String() string {
373 2 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
374 2 : }
375 :
376 2 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
377 2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
378 :
379 0 : func (o *singleDeleteOp) keys() []*[]byte {
380 0 : return []*[]byte{&o.key}
381 0 : }
382 :
383 0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
384 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
385 0 : }
386 :
387 : // deleteRangeOp models a Write.DeleteRange operation.
388 : type deleteRangeOp struct {
389 : writerID objID
390 : start []byte
391 : end []byte
392 : }
393 :
394 2 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
395 2 : w := t.getWriter(o.writerID)
396 2 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
397 2 : h.Recordf("%s // %v", o, err)
398 2 : }
399 :
400 2 : func (o *deleteRangeOp) String() string {
401 2 : return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
402 2 : }
403 :
404 2 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
405 2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
406 :
407 0 : func (o *deleteRangeOp) keys() []*[]byte {
408 0 : return []*[]byte{&o.start, &o.end}
409 0 : }
410 :
411 0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
412 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
413 0 : }
414 :
415 : // flushOp models a DB.Flush operation.
416 : type flushOp struct {
417 : db objID
418 : }
419 :
420 2 : func (o *flushOp) run(t *Test, h historyRecorder) {
421 2 : db := t.getDB(o.db)
422 2 : err := db.Flush()
423 2 : h.Recordf("%s // %v", o, err)
424 2 : }
425 :
426 2 : func (o *flushOp) String() string { return fmt.Sprintf("%s.Flush()", o.db) }
427 2 : func (o *flushOp) receiver() objID { return o.db }
428 2 : func (o *flushOp) syncObjs() objIDSlice { return nil }
429 0 : func (o *flushOp) keys() []*[]byte { return nil }
430 0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
431 :
432 : // mergeOp models a Write.Merge operation.
433 : type mergeOp struct {
434 : writerID objID
435 : key []byte
436 : value []byte
437 : }
438 :
439 2 : func (o *mergeOp) run(t *Test, h historyRecorder) {
440 2 : w := t.getWriter(o.writerID)
441 2 : err := w.Merge(o.key, o.value, t.writeOpts)
442 2 : h.Recordf("%s // %v", o, err)
443 2 : }
444 :
445 2 : func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
446 2 : func (o *mergeOp) receiver() objID { return o.writerID }
447 2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
448 :
449 0 : func (o *mergeOp) keys() []*[]byte {
450 0 : return []*[]byte{&o.key}
451 0 : }
452 :
453 0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
454 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
455 0 : }
456 :
457 : // setOp models a Write.Set operation.
458 : type setOp struct {
459 : writerID objID
460 : key []byte
461 : value []byte
462 : }
463 :
464 2 : func (o *setOp) run(t *Test, h historyRecorder) {
465 2 : w := t.getWriter(o.writerID)
466 2 : err := w.Set(o.key, o.value, t.writeOpts)
467 2 : h.Recordf("%s // %v", o, err)
468 2 : }
469 :
470 2 : func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
471 2 : func (o *setOp) receiver() objID { return o.writerID }
472 2 : func (o *setOp) syncObjs() objIDSlice { return nil }
473 :
474 0 : func (o *setOp) keys() []*[]byte {
475 0 : return []*[]byte{&o.key}
476 0 : }
477 :
478 0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
479 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
480 0 : }
481 :
482 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
483 : type rangeKeyDeleteOp struct {
484 : writerID objID
485 : start []byte
486 : end []byte
487 : }
488 :
489 2 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
490 2 : w := t.getWriter(o.writerID)
491 2 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
492 2 : h.Recordf("%s // %v", o, err)
493 2 : }
494 :
495 2 : func (o *rangeKeyDeleteOp) String() string {
496 2 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
497 2 : }
498 :
499 2 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
500 2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
501 :
502 0 : func (o *rangeKeyDeleteOp) keys() []*[]byte {
503 0 : return []*[]byte{&o.start, &o.end}
504 0 : }
505 :
506 1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
507 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
508 1 : }
509 :
510 : // rangeKeySetOp models a Write.RangeKeySet operation.
511 : type rangeKeySetOp struct {
512 : writerID objID
513 : start []byte
514 : end []byte
515 : suffix []byte
516 : value []byte
517 : }
518 :
519 2 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
520 2 : w := t.getWriter(o.writerID)
521 2 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
522 2 : h.Recordf("%s // %v", o, err)
523 2 : }
524 :
525 2 : func (o *rangeKeySetOp) String() string {
526 2 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
527 2 : o.writerID, o.start, o.end, o.suffix, o.value)
528 2 : }
529 :
530 2 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
531 2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
532 :
533 1 : func (o *rangeKeySetOp) keys() []*[]byte {
534 1 : return []*[]byte{&o.start, &o.end}
535 1 : }
536 :
537 1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
538 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
539 1 : }
540 :
541 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
542 : type rangeKeyUnsetOp struct {
543 : writerID objID
544 : start []byte
545 : end []byte
546 : suffix []byte
547 : }
548 :
549 2 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
550 2 : w := t.getWriter(o.writerID)
551 2 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
552 2 : h.Recordf("%s // %v", o, err)
553 2 : }
554 :
555 2 : func (o *rangeKeyUnsetOp) String() string {
556 2 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
557 2 : o.writerID, o.start, o.end, o.suffix)
558 2 : }
559 :
560 2 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
561 2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
562 :
563 0 : func (o *rangeKeyUnsetOp) keys() []*[]byte {
564 0 : return []*[]byte{&o.start, &o.end}
565 0 : }
566 :
567 0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
568 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
569 0 : }
570 :
571 : // logDataOp models a Writer.LogData operation.
572 : type logDataOp struct {
573 : writerID objID
574 : data []byte
575 : }
576 :
577 2 : func (o *logDataOp) run(t *Test, h historyRecorder) {
578 2 : w := t.getWriter(o.writerID)
579 2 : err := w.LogData(o.data, t.writeOpts)
580 2 : h.Recordf("%s // %v", o, err)
581 2 : }
582 :
583 2 : func (o *logDataOp) String() string {
584 2 : return fmt.Sprintf("%s.LogData(%q)", o.writerID, o.data)
585 2 : }
586 :
587 2 : func (o *logDataOp) receiver() objID { return o.writerID }
588 2 : func (o *logDataOp) syncObjs() objIDSlice { return nil }
589 0 : func (o *logDataOp) keys() []*[]byte { return []*[]byte{} }
590 0 : func (o *logDataOp) diagramKeyRanges() []pebble.KeyRange { return []pebble.KeyRange{} }
591 :
592 : // newBatchOp models a Write.NewBatch operation.
593 : type newBatchOp struct {
594 : dbID objID
595 : batchID objID
596 : }
597 :
598 2 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
599 2 : b := t.getDB(o.dbID).NewBatch()
600 2 : t.setBatch(o.batchID, b)
601 2 : h.Recordf("%s", o)
602 2 : }
603 :
604 2 : func (o *newBatchOp) String() string { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
605 2 : func (o *newBatchOp) receiver() objID { return o.dbID }
606 2 : func (o *newBatchOp) syncObjs() objIDSlice {
607 2 : // NewBatch should not be concurrent with operations that interact with that
608 2 : // same batch.
609 2 : return []objID{o.batchID}
610 2 : }
611 :
612 0 : func (o *newBatchOp) keys() []*[]byte { return nil }
613 0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
614 :
615 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
616 : type newIndexedBatchOp struct {
617 : dbID objID
618 : batchID objID
619 : }
620 :
621 2 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
622 2 : b := t.getDB(o.dbID).NewIndexedBatch()
623 2 : t.setBatch(o.batchID, b)
624 2 : h.Recordf("%s", o)
625 2 : }
626 :
627 2 : func (o *newIndexedBatchOp) String() string {
628 2 : return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
629 2 : }
630 2 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
631 2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
632 2 : // NewIndexedBatch should not be concurrent with operations that interact
633 2 : // with that same batch.
634 2 : return []objID{o.batchID}
635 2 : }
636 :
637 0 : func (o *newIndexedBatchOp) keys() []*[]byte { return nil }
638 0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
639 :
640 : // batchCommitOp models a Batch.Commit operation.
641 : type batchCommitOp struct {
642 : dbID objID
643 : batchID objID
644 : }
645 :
646 2 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
647 2 : b := t.getBatch(o.batchID)
648 2 : err := b.Commit(t.writeOpts)
649 2 : h.Recordf("%s // %v", o, err)
650 2 : }
651 :
652 2 : func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
653 2 : func (o *batchCommitOp) receiver() objID { return o.batchID }
654 2 : func (o *batchCommitOp) syncObjs() objIDSlice {
655 2 : // Synchronize on the database so that NewIters wait for the commit.
656 2 : return []objID{o.dbID}
657 2 : }
658 :
659 0 : func (o *batchCommitOp) keys() []*[]byte { return nil }
660 0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
661 :
662 : // ingestOp models a DB.Ingest operation.
663 : type ingestOp struct {
664 : dbID objID
665 : batchIDs []objID
666 :
667 : derivedDBIDs []objID
668 : }
669 :
670 2 : func (o *ingestOp) run(t *Test, h historyRecorder) {
671 2 : // We can only use apply as an alternative for ingestion if we are ingesting
672 2 : // a single batch. If we are ingesting multiple batches, the batches may
673 2 : // overlap which would cause ingestion to fail but apply would succeed.
674 2 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
675 2 : id := o.batchIDs[0]
676 2 : b := t.getBatch(id)
677 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
678 2 : db := t.getDB(o.dbID)
679 2 : c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
680 2 : if err == nil {
681 2 : err = db.Apply(c, t.writeOpts)
682 2 : }
683 2 : _ = b.Close()
684 2 : _ = c.Close()
685 2 : t.clearObj(id)
686 2 : h.Recordf("%s // %v", o, err)
687 2 : return
688 : }
689 :
690 2 : var paths []string
691 2 : var err error
692 2 : for i, id := range o.batchIDs {
693 2 : b := t.getBatch(id)
694 2 : t.clearObj(id)
695 2 : path, _, err2 := buildForIngest(t, o.dbID, b, i)
696 2 : if err2 != nil {
697 0 : h.Recordf("Build(%s) // %v", id, err2)
698 0 : }
699 2 : err = firstError(err, err2)
700 2 : if err2 == nil {
701 2 : paths = append(paths, path)
702 2 : }
703 2 : err = firstError(err, b.Close())
704 : }
705 :
706 2 : err = firstError(err, t.withRetries(func() error {
707 2 : return t.getDB(o.dbID).Ingest(paths)
708 2 : }))
709 :
710 2 : h.Recordf("%s // %v", o, err)
711 : }
712 :
713 2 : func (o *ingestOp) receiver() objID { return o.dbID }
714 2 : func (o *ingestOp) syncObjs() objIDSlice {
715 2 : // Ingest should not be concurrent with mutating the batches that will be
716 2 : // ingested as sstables.
717 2 : objs := make([]objID, 0, len(o.batchIDs)+1)
718 2 : objs = append(objs, o.batchIDs...)
719 2 : addedDBs := make(map[objID]struct{})
720 2 : for i := range o.derivedDBIDs {
721 2 : _, ok := addedDBs[o.derivedDBIDs[i]]
722 2 : if !ok && o.derivedDBIDs[i] != o.dbID {
723 1 : objs = append(objs, o.derivedDBIDs[i])
724 1 : addedDBs[o.derivedDBIDs[i]] = struct{}{}
725 1 : }
726 : }
727 2 : return objs
728 : }
729 :
730 : func closeIters(
731 : pointIter base.InternalIterator,
732 : rangeDelIter keyspan.FragmentIterator,
733 : rangeKeyIter keyspan.FragmentIterator,
734 2 : ) {
735 2 : if pointIter != nil {
736 2 : pointIter.Close()
737 2 : }
738 2 : if rangeDelIter != nil {
739 2 : rangeDelIter.Close()
740 2 : }
741 2 : if rangeKeyIter != nil {
742 2 : rangeKeyIter.Close()
743 2 : }
744 : }
745 :
746 : // collapseBatch collapses the mutations in a batch to be equivalent to an
747 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
748 : // so that only the latest update is performed. All range deletions are
749 : // performed first in the batch to match the semantics of ingestion where a
750 : // range deletion does not delete a point record contained in the sstable.
751 : func (o *ingestOp) collapseBatch(
752 : t *Test,
753 : db *pebble.DB,
754 : pointIter base.InternalIterator,
755 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
756 : b *pebble.Batch,
757 2 : ) (*pebble.Batch, error) {
758 2 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
759 2 : equal := t.opts.Comparer.Equal
760 2 : collapsed := db.NewBatch()
761 2 :
762 2 : if rangeDelIter != nil {
763 2 : // NB: The range tombstones have already been fragmented by the Batch.
764 2 : t, err := rangeDelIter.First()
765 2 : for ; t != nil; t, err = rangeDelIter.Next() {
766 2 : // NB: We don't have to copy the key or value since we're reading from a
767 2 : // batch which doesn't do prefix compression.
768 2 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
769 0 : return nil, err
770 0 : }
771 : }
772 2 : if err != nil {
773 0 : return nil, err
774 0 : }
775 2 : rangeDelIter.Close()
776 2 : rangeDelIter = nil
777 : }
778 :
779 2 : if pointIter != nil {
780 2 : var lastUserKey []byte
781 2 : for kv := pointIter.First(); kv != nil; kv = pointIter.Next() {
782 2 : // Ignore duplicate keys.
783 2 : //
784 2 : // Note: this is necessary due to MERGE keys, otherwise it would be
785 2 : // fine to include all the keys in the batch and let the normal
786 2 : // sequence number precedence determine which of the keys "wins".
787 2 : // But the code to build the ingested sstable will only keep the
788 2 : // most recent internal key and will not merge across internal keys.
789 2 : if equal(lastUserKey, kv.K.UserKey) {
790 2 : continue
791 : }
792 : // NB: We don't have to copy the key or value since we're reading from a
793 : // batch which doesn't do prefix compression.
794 2 : lastUserKey = kv.K.UserKey
795 2 :
796 2 : var err error
797 2 : switch kv.Kind() {
798 2 : case pebble.InternalKeyKindDelete:
799 2 : err = collapsed.Delete(kv.K.UserKey, nil)
800 2 : case pebble.InternalKeyKindDeleteSized:
801 2 : v, _ := binary.Uvarint(kv.InPlaceValue())
802 2 : // Batch.DeleteSized takes just the length of the value being
803 2 : // deleted and adds the key's length to derive the overall entry
804 2 : // size of the value being deleted. This has already been done
805 2 : // to the key we're reading from the batch, so we must subtract
806 2 : // the key length from the encoded value before calling
807 2 : // collapsed.DeleteSized, which will again add the key length
808 2 : // before encoding.
809 2 : err = collapsed.DeleteSized(kv.K.UserKey, uint32(v-uint64(len(kv.K.UserKey))), nil)
810 2 : case pebble.InternalKeyKindSingleDelete:
811 2 : err = collapsed.SingleDelete(kv.K.UserKey, nil)
812 2 : case pebble.InternalKeyKindSet:
813 2 : err = collapsed.Set(kv.K.UserKey, kv.InPlaceValue(), nil)
814 2 : case pebble.InternalKeyKindMerge:
815 2 : err = collapsed.Merge(kv.K.UserKey, kv.InPlaceValue(), nil)
816 0 : case pebble.InternalKeyKindLogData:
817 0 : err = collapsed.LogData(kv.K.UserKey, nil)
818 0 : default:
819 0 : err = errors.Errorf("unknown batch record kind: %d", kv.Kind())
820 : }
821 2 : if err != nil {
822 0 : return nil, err
823 0 : }
824 : }
825 2 : if err := pointIter.Close(); err != nil {
826 0 : return nil, err
827 0 : }
828 2 : pointIter = nil
829 : }
830 :
831 : // There's no equivalent of a MERGE operator for range keys, so there's no
832 : // need to collapse the range keys here. Rather than reading the range keys
833 : // from `rangeKeyIter`, which will already be fragmented, read the range
834 : // keys from the batch and copy them verbatim. This marginally improves our
835 : // test coverage over the alternative approach of pre-fragmenting and
836 : // pre-coalescing before writing to the batch.
837 : //
838 : // The `rangeKeyIter` is used only to determine if there are any range keys
839 : // in the batch at all, and only because we already have it handy from
840 : // private.BatchSort.
841 2 : if rangeKeyIter != nil {
842 2 : for r := b.Reader(); ; {
843 2 : kind, key, value, ok, err := r.Next()
844 2 : if !ok {
845 2 : if err != nil {
846 0 : return nil, err
847 0 : }
848 2 : break
849 2 : } else if !rangekey.IsRangeKey(kind) {
850 2 : continue
851 : }
852 2 : ik := base.MakeInternalKey(key, 0, kind)
853 2 : if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
854 0 : return nil, err
855 0 : }
856 : }
857 2 : rangeKeyIter.Close()
858 2 : rangeKeyIter = nil
859 : }
860 :
861 2 : return collapsed, nil
862 : }
863 :
864 2 : func (o *ingestOp) String() string {
865 2 : var buf strings.Builder
866 2 : buf.WriteString(o.dbID.String())
867 2 : buf.WriteString(".Ingest(")
868 2 : for i, id := range o.batchIDs {
869 2 : if i > 0 {
870 2 : buf.WriteString(", ")
871 2 : }
872 2 : buf.WriteString(id.String())
873 : }
874 2 : buf.WriteString(")")
875 2 : return buf.String()
876 : }
877 :
878 0 : func (o *ingestOp) keys() []*[]byte { return nil }
879 0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
880 :
881 : type ingestAndExciseOp struct {
882 : dbID objID
883 : batchID objID
884 : derivedDBID objID
885 : exciseStart, exciseEnd []byte
886 : sstContainsExciseTombstone bool
887 : }
888 :
889 2 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
890 2 : var err error
891 2 : b := t.getBatch(o.batchID)
892 2 : t.clearObj(o.batchID)
893 2 : if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
894 0 : panic("non-well-formed excise span")
895 : }
896 2 : db := t.getDB(o.dbID)
897 2 : if b.Empty() {
898 2 : h.Recordf("%s // %v", o, o.simulateExcise(db, t))
899 2 : return
900 2 : }
901 :
902 2 : if o.sstContainsExciseTombstone {
903 2 : // Add a rangedel and rangekeydel to the batch. This ensures it'll end up
904 2 : // inside the sstable. Note that all entries in the sstable will have the
905 2 : // same sequence number, so the ordering within the batch doesn't matter.
906 2 : err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
907 2 : err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts))
908 2 : }
909 2 : path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */)
910 2 : if err2 != nil {
911 0 : h.Recordf("Build(%s) // %v", o.batchID, err2)
912 0 : return
913 0 : }
914 2 : err = firstError(err, b.Close())
915 2 :
916 2 : if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
917 2 : h.Recordf("%s // %v", o, o.simulateExcise(db, t))
918 2 : return
919 2 : }
920 :
921 2 : if t.testOpts.useExcise {
922 2 : err = firstError(err, t.withRetries(func() error {
923 2 : _, err := db.IngestAndExcise([]string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{
924 2 : Start: o.exciseStart,
925 2 : End: o.exciseEnd,
926 2 : }, o.sstContainsExciseTombstone)
927 2 : return err
928 2 : }))
929 2 : } else {
930 2 : err = firstError(err, o.simulateExcise(db, t))
931 2 : err = firstError(err, t.withRetries(func() error {
932 2 : return db.Ingest([]string{path})
933 2 : }))
934 : }
935 :
936 2 : h.Recordf("%s // %v", o, err)
937 : }
938 :
939 2 : func (o *ingestAndExciseOp) simulateExcise(db *pebble.DB, t *Test) error {
940 2 : // Simulate the excise using a DeleteRange and RangeKeyDelete.
941 2 : return errors.CombineErrors(
942 2 : db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts),
943 2 : db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts),
944 2 : )
945 2 : }
946 :
947 2 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
948 2 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
949 2 : // Ingest should not be concurrent with mutating the batches that will be
950 2 : // ingested as sstables.
951 2 : objs := []objID{o.batchID}
952 2 : if o.derivedDBID != o.dbID {
953 1 : objs = append(objs, o.derivedDBID)
954 1 : }
955 2 : return objs
956 : }
957 :
958 2 : func (o *ingestAndExciseOp) String() string {
959 2 : return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q, %t /* sstContainsExciseTombstone */)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd, o.sstContainsExciseTombstone)
960 2 : }
961 :
962 0 : func (o *ingestAndExciseOp) keys() []*[]byte {
963 0 : return []*[]byte{&o.exciseStart, &o.exciseEnd}
964 0 : }
965 :
966 0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
967 0 : return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
968 0 : }
969 :
970 : // ingestExternalFilesOp models a DB.IngestExternalFiles operation.
971 : //
972 : // When remote storage is not enabled, the operation is emulated using the
973 : // regular DB.Ingest; this serves as a cross-check of the result.
974 : type ingestExternalFilesOp struct {
975 : dbID objID
976 : // The bounds of the objects cannot overlap.
977 : objs []externalObjWithBounds
978 : }
979 :
980 : type externalObjWithBounds struct {
981 : externalObjID objID
982 :
983 : // bounds for the external object. These bounds apply after keys undergo
984 : // any prefix or suffix transforms.
985 : bounds pebble.KeyRange
986 :
987 : syntheticPrefix sstable.SyntheticPrefix
988 : syntheticSuffix sstable.SyntheticSuffix
989 : }
990 :
991 2 : func (o *ingestExternalFilesOp) run(t *Test, h historyRecorder) {
992 2 : db := t.getDB(o.dbID)
993 2 :
994 2 : // Verify the objects exist (useful for --try-to-reduce).
995 2 : for i := range o.objs {
996 2 : t.getExternalObj(o.objs[i].externalObjID)
997 2 : }
998 :
999 2 : var err error
1000 2 : if !t.testOpts.externalStorageEnabled {
1001 2 : // Emulate the operation by crating local, truncated SST files and ingesting
1002 2 : // them.
1003 2 : var paths []string
1004 2 : for i, obj := range o.objs {
1005 2 : // Make sure the object exists and is not empty.
1006 2 : path, sstMeta := buildForIngestExternalEmulation(
1007 2 : t, o.dbID, obj.externalObjID, obj.bounds, obj.syntheticSuffix, obj.syntheticPrefix, i,
1008 2 : )
1009 2 : if sstMeta.HasPointKeys || sstMeta.HasRangeKeys || sstMeta.HasRangeDelKeys {
1010 2 : paths = append(paths, path)
1011 2 : }
1012 : }
1013 2 : if len(paths) > 0 {
1014 2 : err = db.Ingest(paths)
1015 2 : }
1016 2 : } else {
1017 2 : external := make([]pebble.ExternalFile, len(o.objs))
1018 2 : for i, obj := range o.objs {
1019 2 : meta := t.getExternalObj(obj.externalObjID)
1020 2 : external[i] = pebble.ExternalFile{
1021 2 : Locator: "external",
1022 2 : ObjName: externalObjName(obj.externalObjID),
1023 2 : Size: meta.sstMeta.Size,
1024 2 : StartKey: obj.bounds.Start,
1025 2 : EndKey: obj.bounds.End,
1026 2 : EndKeyIsInclusive: false,
1027 2 : // Note: if the table has point/range keys, we don't know for sure whether
1028 2 : // this particular range has any, but that's acceptable.
1029 2 : HasPointKey: meta.sstMeta.HasPointKeys || meta.sstMeta.HasRangeDelKeys,
1030 2 : HasRangeKey: meta.sstMeta.HasRangeKeys,
1031 2 : SyntheticSuffix: obj.syntheticSuffix,
1032 2 : }
1033 2 : if obj.syntheticPrefix.IsSet() {
1034 2 : external[i].SyntheticPrefix = obj.syntheticPrefix
1035 2 : }
1036 : }
1037 2 : _, err = db.IngestExternalFiles(external)
1038 : }
1039 :
1040 2 : h.Recordf("%s // %v", o, err)
1041 : }
1042 :
1043 2 : func (o *ingestExternalFilesOp) receiver() objID { return o.dbID }
1044 2 : func (o *ingestExternalFilesOp) syncObjs() objIDSlice {
1045 2 : res := make(objIDSlice, len(o.objs))
1046 2 : for i := range res {
1047 2 : res[i] = o.objs[i].externalObjID
1048 2 : }
1049 : // Deduplicate the IDs.
1050 2 : slices.Sort(res)
1051 2 : return slices.Compact(res)
1052 : }
1053 :
1054 2 : func (o *ingestExternalFilesOp) String() string {
1055 2 : strs := make([]string, len(o.objs))
1056 2 : for i, obj := range o.objs {
1057 2 : strs[i] = fmt.Sprintf("%s, %q /* start */, %q /* end */, %q /* syntheticSuffix */, %q /* syntheticPrefix */",
1058 2 : obj.externalObjID, obj.bounds.Start, obj.bounds.End, obj.syntheticSuffix, obj.syntheticPrefix,
1059 2 : )
1060 2 : }
1061 2 : return fmt.Sprintf("%s.IngestExternalFiles(%s)", o.dbID, strings.Join(strs, ", "))
1062 : }
1063 :
1064 0 : func (o *ingestExternalFilesOp) keys() []*[]byte {
1065 0 : // If any of the objects have synthetic prefixes, we can't allow modification
1066 0 : // of external object bounds.
1067 0 : for i := range o.objs {
1068 0 : if o.objs[i].syntheticPrefix.IsSet() {
1069 0 : return nil
1070 0 : }
1071 : }
1072 :
1073 0 : var res []*[]byte
1074 0 : for i := range o.objs {
1075 0 : res = append(res, &o.objs[i].bounds.Start, &o.objs[i].bounds.End)
1076 0 : }
1077 0 : return res
1078 : }
1079 :
1080 0 : func (o *ingestExternalFilesOp) diagramKeyRanges() []pebble.KeyRange {
1081 0 : ranges := make([]pebble.KeyRange, len(o.objs))
1082 0 : for i, obj := range o.objs {
1083 0 : ranges[i] = obj.bounds
1084 0 : }
1085 0 : return ranges
1086 : }
1087 :
1088 : // getOp models a Reader.Get operation.
1089 : type getOp struct {
1090 : readerID objID
1091 : key []byte
1092 : derivedDBID objID
1093 : }
1094 :
1095 2 : func (o *getOp) run(t *Test, h historyRecorder) {
1096 2 : r := t.getReader(o.readerID)
1097 2 : var val []byte
1098 2 : var closer io.Closer
1099 2 : err := t.withRetries(func() (err error) {
1100 2 : val, closer, err = r.Get(o.key)
1101 2 : return err
1102 2 : })
1103 2 : h.Recordf("%s // [%q] %v", o, val, err)
1104 2 : if closer != nil {
1105 2 : closer.Close()
1106 2 : }
1107 : }
1108 :
1109 2 : func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
1110 2 : func (o *getOp) receiver() objID { return o.readerID }
1111 2 : func (o *getOp) syncObjs() objIDSlice {
1112 2 : if o.readerID.tag() == dbTag {
1113 2 : return nil
1114 2 : }
1115 : // batch.Get reads through to the current database state.
1116 2 : if o.derivedDBID != 0 {
1117 2 : return []objID{o.derivedDBID}
1118 2 : }
1119 0 : return nil
1120 : }
1121 :
1122 0 : func (o *getOp) keys() []*[]byte {
1123 0 : return []*[]byte{&o.key}
1124 0 : }
1125 :
1126 0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
1127 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1128 0 : }
1129 :
1130 : // newIterOp models a Reader.NewIter operation.
1131 : type newIterOp struct {
1132 : readerID objID
1133 : iterID objID
1134 : iterOpts
1135 : derivedDBID objID
1136 : }
1137 :
1138 : // Enable this to enable debug logging of range key iterator operations.
1139 : const debugIterators = false
1140 :
1141 2 : func (o *newIterOp) run(t *Test, h historyRecorder) {
1142 2 : r := t.getReader(o.readerID)
1143 2 : opts := iterOptions(o.iterOpts)
1144 2 : if debugIterators {
1145 0 : opts.DebugRangeKeyStack = true
1146 0 : }
1147 :
1148 2 : var i *pebble.Iterator
1149 2 : for {
1150 2 : i, _ = r.NewIter(opts)
1151 2 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
1152 2 : break
1153 : }
1154 : // close this iter and retry NewIter
1155 0 : _ = i.Close()
1156 : }
1157 2 : t.setIter(o.iterID, i)
1158 2 :
1159 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1160 2 : // the user-provided bounds.
1161 2 : if opts != nil {
1162 2 : rand.Read(opts.LowerBound[:])
1163 2 : rand.Read(opts.UpperBound[:])
1164 2 : }
1165 2 : h.Recordf("%s // %v", o, i.Error())
1166 : }
1167 :
1168 2 : func (o *newIterOp) String() string {
1169 2 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1170 2 : o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1171 2 : }
1172 :
1173 2 : func (o *newIterOp) receiver() objID { return o.readerID }
1174 2 : func (o *newIterOp) syncObjs() objIDSlice {
1175 2 : // Prevent o.iterID ops from running before it exists.
1176 2 : objs := []objID{o.iterID}
1177 2 : // If reading through a batch or snapshot, the new iterator will also observe database
1178 2 : // state, and we must synchronize on the database state for a consistent
1179 2 : // view.
1180 2 : if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
1181 2 : objs = append(objs, o.derivedDBID)
1182 2 : }
1183 2 : return objs
1184 : }
1185 :
1186 0 : func (o *newIterOp) keys() []*[]byte {
1187 0 : var res []*[]byte
1188 0 : if o.lower != nil {
1189 0 : res = append(res, &o.lower)
1190 0 : }
1191 0 : if o.upper != nil {
1192 0 : res = append(res, &o.upper)
1193 0 : }
1194 0 : return res
1195 : }
1196 :
1197 1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange {
1198 1 : var res []pebble.KeyRange
1199 1 : if o.lower != nil {
1200 0 : res = append(res, pebble.KeyRange{Start: o.lower, End: o.lower})
1201 0 : }
1202 1 : if o.upper != nil {
1203 0 : res = append(res, pebble.KeyRange{Start: o.upper, End: o.upper})
1204 0 : }
1205 1 : return res
1206 : }
1207 :
1208 : // newIterUsingCloneOp models a Iterator.Clone operation.
1209 : type newIterUsingCloneOp struct {
1210 : existingIterID objID
1211 : iterID objID
1212 : refreshBatch bool
1213 : iterOpts
1214 :
1215 : // derivedReaderID is the ID of the underlying reader that backs both the
1216 : // existing iterator and the new iterator. The derivedReaderID is NOT
1217 : // serialized by String and is derived from other operations during parse.
1218 : derivedReaderID objID
1219 : }
1220 :
1221 2 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
1222 2 : iter := t.getIter(o.existingIterID)
1223 2 : cloneOpts := pebble.CloneOptions{
1224 2 : IterOptions: iterOptions(o.iterOpts),
1225 2 : RefreshBatchView: o.refreshBatch,
1226 2 : }
1227 2 : i, err := iter.iter.Clone(cloneOpts)
1228 2 : if err != nil {
1229 0 : panic(err)
1230 : }
1231 2 : t.setIter(o.iterID, i)
1232 2 : h.Recordf("%s // %v", o, i.Error())
1233 : }
1234 :
1235 2 : func (o *newIterUsingCloneOp) String() string {
1236 2 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1237 2 : o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
1238 2 : o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1239 2 : }
1240 :
1241 2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
1242 :
1243 2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
1244 2 : objIDs := []objID{o.iterID}
1245 2 : // If the underlying reader is a batch, we must synchronize with the batch.
1246 2 : // If refreshBatch=true, synchronizing is necessary to observe all the
1247 2 : // mutations up to until this op and no more. Even when refreshBatch=false,
1248 2 : // we must synchronize because iterator construction may access state cached
1249 2 : // on the indexed batch to avoid refragmenting range tombstones or range
1250 2 : // keys.
1251 2 : if o.derivedReaderID.tag() == batchTag {
1252 1 : objIDs = append(objIDs, o.derivedReaderID)
1253 1 : }
1254 2 : return objIDs
1255 : }
1256 :
1257 0 : func (o *newIterUsingCloneOp) keys() []*[]byte { return nil }
1258 0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1259 :
1260 : // iterSetBoundsOp models an Iterator.SetBounds operation.
1261 : type iterSetBoundsOp struct {
1262 : iterID objID
1263 : lower []byte
1264 : upper []byte
1265 : }
1266 :
1267 2 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
1268 2 : i := t.getIter(o.iterID)
1269 2 : var lower, upper []byte
1270 2 : if o.lower != nil {
1271 2 : lower = append(lower, o.lower...)
1272 2 : }
1273 2 : if o.upper != nil {
1274 2 : upper = append(upper, o.upper...)
1275 2 : }
1276 2 : i.SetBounds(lower, upper)
1277 2 :
1278 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1279 2 : // the user-provided bounds.
1280 2 : rand.Read(lower[:])
1281 2 : rand.Read(upper[:])
1282 2 :
1283 2 : h.Recordf("%s // %v", o, i.Error())
1284 : }
1285 :
1286 2 : func (o *iterSetBoundsOp) String() string {
1287 2 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
1288 2 : }
1289 :
1290 2 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
1291 2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
1292 :
1293 0 : func (o *iterSetBoundsOp) keys() []*[]byte {
1294 0 : return []*[]byte{&o.lower, &o.upper}
1295 0 : }
1296 :
1297 0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
1298 0 : return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
1299 0 : }
1300 :
1301 : // iterSetOptionsOp models an Iterator.SetOptions operation.
1302 : type iterSetOptionsOp struct {
1303 : iterID objID
1304 : iterOpts
1305 :
1306 : // derivedReaderID is the ID of the underlying reader that backs the
1307 : // iterator. The derivedReaderID is NOT serialized by String and is derived
1308 : // from other operations during parse.
1309 : derivedReaderID objID
1310 : }
1311 :
1312 2 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
1313 2 : i := t.getIter(o.iterID)
1314 2 :
1315 2 : opts := iterOptions(o.iterOpts)
1316 2 : if opts == nil {
1317 2 : opts = &pebble.IterOptions{}
1318 2 : }
1319 2 : i.SetOptions(opts)
1320 2 :
1321 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1322 2 : // the user-provided bounds.
1323 2 : rand.Read(opts.LowerBound[:])
1324 2 : rand.Read(opts.UpperBound[:])
1325 2 :
1326 2 : h.Recordf("%s // %v", o, i.Error())
1327 : }
1328 :
1329 2 : func (o *iterSetOptionsOp) String() string {
1330 2 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1331 2 : o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1332 2 : }
1333 :
1334 2 : func iterOptions(o iterOpts) *pebble.IterOptions {
1335 2 : if o.IsZero() && !debugIterators {
1336 2 : return nil
1337 2 : }
1338 2 : var lower, upper []byte
1339 2 : if o.lower != nil {
1340 2 : lower = append(lower, o.lower...)
1341 2 : }
1342 2 : if o.upper != nil {
1343 2 : upper = append(upper, o.upper...)
1344 2 : }
1345 2 : opts := &pebble.IterOptions{
1346 2 : LowerBound: lower,
1347 2 : UpperBound: upper,
1348 2 : KeyTypes: pebble.IterKeyType(o.keyTypes),
1349 2 : RangeKeyMasking: pebble.RangeKeyMasking{
1350 2 : Suffix: o.maskSuffix,
1351 2 : },
1352 2 : UseL6Filters: o.useL6Filters,
1353 2 : DebugRangeKeyStack: debugIterators,
1354 2 : }
1355 2 : if opts.RangeKeyMasking.Suffix != nil {
1356 2 : opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
1357 2 : return sstable.NewTestKeysMaskingFilter()
1358 2 : }
1359 : }
1360 2 : if o.filterMax > 0 {
1361 2 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
1362 2 : sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
1363 2 : }
1364 2 : // Enforce the timestamp bounds in SkipPoint, so that the iterator never
1365 2 : // returns a key outside the filterMin, filterMax bounds. This provides
1366 2 : // deterministic iteration.
1367 2 : opts.SkipPoint = func(k []byte) (skip bool) {
1368 2 : n := testkeys.Comparer.Split(k)
1369 2 : if n == len(k) {
1370 2 : // No suffix, don't skip it.
1371 2 : return false
1372 2 : }
1373 2 : v, err := testkeys.ParseSuffix(k[n:])
1374 2 : if err != nil {
1375 0 : panic(err)
1376 : }
1377 2 : ts := uint64(v)
1378 2 : return ts < o.filterMin || ts >= o.filterMax
1379 : }
1380 : }
1381 2 : return opts
1382 : }
1383 :
1384 2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
1385 :
1386 2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
1387 2 : if o.derivedReaderID.tag() == batchTag {
1388 1 : // If the underlying reader is a batch, we must synchronize with the
1389 1 : // batch so that we observe all the mutations up until this operation
1390 1 : // and no more.
1391 1 : return []objID{o.derivedReaderID}
1392 1 : }
1393 2 : return nil
1394 : }
1395 :
1396 0 : func (o *iterSetOptionsOp) keys() []*[]byte { return nil }
1397 0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1398 :
1399 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
1400 : type iterSeekGEOp struct {
1401 : iterID objID
1402 : key []byte
1403 : limit []byte
1404 :
1405 : derivedReaderID objID
1406 : }
1407 :
1408 2 : func iteratorPos(i *retryableIter) string {
1409 2 : var buf bytes.Buffer
1410 2 : fmt.Fprintf(&buf, "%q", i.Key())
1411 2 : hasPoint, hasRange := i.HasPointAndRange()
1412 2 : if hasPoint {
1413 2 : fmt.Fprintf(&buf, ",%q", i.Value())
1414 2 : } else {
1415 2 : fmt.Fprint(&buf, ",<no point>")
1416 2 : }
1417 2 : if hasRange {
1418 2 : start, end := i.RangeBounds()
1419 2 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
1420 2 : for i, rk := range i.RangeKeys() {
1421 2 : if i > 0 {
1422 2 : fmt.Fprint(&buf, ",")
1423 2 : }
1424 2 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
1425 : }
1426 2 : fmt.Fprint(&buf, "}")
1427 2 : } else {
1428 2 : fmt.Fprint(&buf, ",<no range>")
1429 2 : }
1430 2 : if i.RangeKeyChanged() {
1431 2 : fmt.Fprint(&buf, "*")
1432 2 : }
1433 2 : return buf.String()
1434 : }
1435 :
1436 2 : func validBoolToStr(valid bool) string {
1437 2 : return fmt.Sprintf("%t", valid)
1438 2 : }
1439 :
1440 2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
1441 2 : // We can't distinguish between IterExhausted and IterAtLimit in a
1442 2 : // deterministic manner.
1443 2 : switch validity {
1444 2 : case pebble.IterExhausted, pebble.IterAtLimit:
1445 2 : return false, "invalid"
1446 2 : case pebble.IterValid:
1447 2 : return true, "valid"
1448 0 : default:
1449 0 : panic("unknown validity")
1450 : }
1451 : }
1452 :
1453 2 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
1454 2 : i := t.getIter(o.iterID)
1455 2 : var valid bool
1456 2 : var validStr string
1457 2 : if o.limit == nil {
1458 2 : valid = i.SeekGE(o.key)
1459 2 : validStr = validBoolToStr(valid)
1460 2 : } else {
1461 2 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
1462 2 : }
1463 2 : if valid {
1464 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1465 2 : } else {
1466 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1467 2 : }
1468 : }
1469 :
1470 2 : func (o *iterSeekGEOp) String() string {
1471 2 : return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
1472 2 : }
1473 2 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
1474 2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1475 :
1476 0 : func (o *iterSeekGEOp) keys() []*[]byte {
1477 0 : return []*[]byte{&o.key}
1478 0 : }
1479 :
1480 1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
1481 1 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1482 1 : }
1483 :
1484 2 : func onlyBatchIDs(ids ...objID) objIDSlice {
1485 2 : var ret objIDSlice
1486 2 : for _, id := range ids {
1487 2 : if id.tag() == batchTag {
1488 1 : ret = append(ret, id)
1489 1 : }
1490 : }
1491 2 : return ret
1492 : }
1493 :
1494 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1495 : type iterSeekPrefixGEOp struct {
1496 : iterID objID
1497 : key []byte
1498 :
1499 : derivedReaderID objID
1500 : }
1501 :
1502 2 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
1503 2 : i := t.getIter(o.iterID)
1504 2 : valid := i.SeekPrefixGE(o.key)
1505 2 : if valid {
1506 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1507 2 : } else {
1508 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1509 2 : }
1510 : }
1511 :
1512 2 : func (o *iterSeekPrefixGEOp) String() string {
1513 2 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
1514 2 : }
1515 2 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1516 2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1517 :
1518 0 : func (o *iterSeekPrefixGEOp) keys() []*[]byte {
1519 0 : return []*[]byte{&o.key}
1520 0 : }
1521 :
1522 0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
1523 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1524 0 : }
1525 :
1526 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1527 : type iterSeekLTOp struct {
1528 : iterID objID
1529 : key []byte
1530 : limit []byte
1531 :
1532 : derivedReaderID objID
1533 : }
1534 :
1535 2 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
1536 2 : i := t.getIter(o.iterID)
1537 2 : var valid bool
1538 2 : var validStr string
1539 2 : if o.limit == nil {
1540 2 : valid = i.SeekLT(o.key)
1541 2 : validStr = validBoolToStr(valid)
1542 2 : } else {
1543 2 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1544 2 : }
1545 2 : if valid {
1546 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1547 2 : } else {
1548 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1549 2 : }
1550 : }
1551 :
1552 2 : func (o *iterSeekLTOp) String() string {
1553 2 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
1554 2 : }
1555 :
1556 2 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1557 2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1558 :
1559 0 : func (o *iterSeekLTOp) keys() []*[]byte {
1560 0 : return []*[]byte{&o.key}
1561 0 : }
1562 :
1563 0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
1564 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1565 0 : }
1566 :
1567 : // iterFirstOp models an Iterator.First operation.
1568 : type iterFirstOp struct {
1569 : iterID objID
1570 :
1571 : derivedReaderID objID
1572 : }
1573 :
1574 2 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
1575 2 : i := t.getIter(o.iterID)
1576 2 : valid := i.First()
1577 2 : if valid {
1578 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1579 2 : } else {
1580 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1581 2 : }
1582 : }
1583 :
1584 2 : func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
1585 2 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1586 2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1587 :
1588 0 : func (o *iterFirstOp) keys() []*[]byte { return nil }
1589 0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1590 :
1591 : // iterLastOp models an Iterator.Last operation.
1592 : type iterLastOp struct {
1593 : iterID objID
1594 :
1595 : derivedReaderID objID
1596 : }
1597 :
1598 2 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
1599 2 : i := t.getIter(o.iterID)
1600 2 : valid := i.Last()
1601 2 : if valid {
1602 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1603 2 : } else {
1604 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1605 2 : }
1606 : }
1607 :
1608 2 : func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
1609 2 : func (o *iterLastOp) receiver() objID { return o.iterID }
1610 2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1611 :
1612 0 : func (o *iterLastOp) keys() []*[]byte { return nil }
1613 0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1614 :
1615 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1616 : type iterNextOp struct {
1617 : iterID objID
1618 : limit []byte
1619 :
1620 : derivedReaderID objID
1621 : }
1622 :
1623 2 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
1624 2 : i := t.getIter(o.iterID)
1625 2 : var valid bool
1626 2 : var validStr string
1627 2 : if o.limit == nil {
1628 2 : valid = i.Next()
1629 2 : validStr = validBoolToStr(valid)
1630 2 : } else {
1631 2 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1632 2 : }
1633 2 : if valid {
1634 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1635 2 : } else {
1636 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1637 2 : }
1638 : }
1639 :
1640 2 : func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
1641 2 : func (o *iterNextOp) receiver() objID { return o.iterID }
1642 2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1643 :
1644 0 : func (o *iterNextOp) keys() []*[]byte { return nil }
1645 0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1646 :
1647 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1648 : type iterNextPrefixOp struct {
1649 : iterID objID
1650 :
1651 : derivedReaderID objID
1652 : }
1653 :
1654 2 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
1655 2 : i := t.getIter(o.iterID)
1656 2 : valid := i.NextPrefix()
1657 2 : validStr := validBoolToStr(valid)
1658 2 : if valid {
1659 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1660 2 : } else {
1661 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1662 2 : }
1663 : }
1664 :
1665 2 : func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
1666 2 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1667 2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1668 :
1669 0 : func (o *iterNextPrefixOp) keys() []*[]byte { return nil }
1670 0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1671 :
1672 : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
1673 : // Iterator.
1674 : type iterCanSingleDelOp struct {
1675 : iterID objID
1676 :
1677 : derivedReaderID objID
1678 : }
1679 :
1680 2 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
1681 2 : // TODO(jackson): When we perform error injection, we'll need to rethink
1682 2 : // this.
1683 2 : _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
1684 2 : // The return value of CanDeterministicallySingleDelete is dependent on
1685 2 : // internal LSM state and non-deterministic, so we don't record it.
1686 2 : // Including the operation within the metamorphic test at all helps ensure
1687 2 : // that it does not change the result of any other Iterator operation that
1688 2 : // should be deterministic, regardless of its own outcome.
1689 2 : //
1690 2 : // We still record the value of the error because it's deterministic, at
1691 2 : // least for now. The possible error cases are:
1692 2 : // - The iterator was already in an error state when the operation ran.
1693 2 : // - The operation is deterministically invalid (like using an InternalNext
1694 2 : // to change directions.)
1695 2 : h.Recordf("%s // %v", o, err)
1696 2 : }
1697 :
1698 2 : func (o *iterCanSingleDelOp) String() string { return fmt.Sprintf("%s.InternalNext()", o.iterID) }
1699 2 : func (o *iterCanSingleDelOp) receiver() objID { return o.iterID }
1700 2 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1701 :
1702 0 : func (o *iterCanSingleDelOp) keys() []*[]byte { return nil }
1703 0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1704 :
1705 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1706 : type iterPrevOp struct {
1707 : iterID objID
1708 : limit []byte
1709 :
1710 : derivedReaderID objID
1711 : }
1712 :
1713 2 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
1714 2 : i := t.getIter(o.iterID)
1715 2 : var valid bool
1716 2 : var validStr string
1717 2 : if o.limit == nil {
1718 2 : valid = i.Prev()
1719 2 : validStr = validBoolToStr(valid)
1720 2 : } else {
1721 2 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1722 2 : }
1723 2 : if valid {
1724 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1725 2 : } else {
1726 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1727 2 : }
1728 : }
1729 :
1730 2 : func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
1731 2 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1732 2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1733 :
1734 0 : func (o *iterPrevOp) keys() []*[]byte { return nil }
1735 0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1736 :
1737 : // newSnapshotOp models a DB.NewSnapshot operation.
1738 : type newSnapshotOp struct {
1739 : dbID objID
1740 : snapID objID
1741 : // If nonempty, this snapshot must not be used to read any keys outside of
1742 : // the provided bounds. This allows some implementations to use 'Eventually
1743 : // file-only snapshots,' which require bounds.
1744 : bounds []pebble.KeyRange
1745 : }
1746 :
1747 2 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
1748 2 : bounds := o.bounds
1749 2 : if len(bounds) == 0 {
1750 0 : panic("bounds unexpectedly unset for newSnapshotOp")
1751 : }
1752 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1753 2 : createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
1754 2 : // If either of these options is true, an EFOS _must_ be created, regardless
1755 2 : // of what the fibonacci hash returned.
1756 2 : excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExternalReplicate || t.testOpts.useExcise
1757 2 : if createEfos || excisePossible {
1758 2 : s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
1759 2 : t.setSnapshot(o.snapID, s)
1760 2 : } else {
1761 2 : s := t.getDB(o.dbID).NewSnapshot()
1762 2 : t.setSnapshot(o.snapID, s)
1763 2 : }
1764 2 : h.Recordf("%s", o)
1765 : }
1766 :
1767 2 : func (o *newSnapshotOp) String() string {
1768 2 : var buf bytes.Buffer
1769 2 : fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
1770 2 : for i := range o.bounds {
1771 2 : if i > 0 {
1772 2 : fmt.Fprint(&buf, ", ")
1773 2 : }
1774 2 : fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
1775 : }
1776 2 : fmt.Fprint(&buf, ")")
1777 2 : return buf.String()
1778 : }
1779 2 : func (o *newSnapshotOp) receiver() objID { return o.dbID }
1780 2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1781 :
1782 1 : func (o *newSnapshotOp) keys() []*[]byte {
1783 1 : var res []*[]byte
1784 1 : for i := range o.bounds {
1785 1 : res = append(res, &o.bounds[i].Start, &o.bounds[i].End)
1786 1 : }
1787 1 : return res
1788 : }
1789 :
1790 1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
1791 1 : return o.bounds
1792 1 : }
1793 :
1794 : // newExternalObjOp models a DB.NewExternalObj operation.
1795 : type newExternalObjOp struct {
1796 : batchID objID
1797 : externalObjID objID
1798 : }
1799 :
1800 2 : func externalObjName(externalObjID objID) string {
1801 2 : if externalObjID.tag() != externalObjTag {
1802 0 : panic(fmt.Sprintf("invalid externalObjID %s", externalObjID))
1803 : }
1804 2 : return fmt.Sprintf("external-for-ingest-%d.sst", externalObjID.slot())
1805 : }
1806 :
1807 2 : func (o *newExternalObjOp) run(t *Test, h historyRecorder) {
1808 2 : b := t.getBatch(o.batchID)
1809 2 : t.clearObj(o.batchID)
1810 2 :
1811 2 : writeCloser, err := t.externalStorage.CreateObject(externalObjName(o.externalObjID))
1812 2 : if err != nil {
1813 0 : panic(err)
1814 : }
1815 2 : writable := objstorageprovider.NewRemoteWritable(writeCloser)
1816 2 :
1817 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
1818 2 :
1819 2 : sstMeta, err := writeSSTForIngestion(
1820 2 : t,
1821 2 : iter, rangeDelIter, rangeKeyIter,
1822 2 : true, /* uniquePrefixes */
1823 2 : nil, /* syntheticSuffix */
1824 2 : nil, /* syntheticPrefix */
1825 2 : writable,
1826 2 : t.minFMV(),
1827 2 : )
1828 2 : if err != nil {
1829 0 : panic(err)
1830 : }
1831 2 : if !sstMeta.HasPointKeys && !sstMeta.HasRangeDelKeys && !sstMeta.HasRangeKeys {
1832 0 : // This can occur when using --try-to-reduce.
1833 0 : panic("metamorphic test internal error: external object empty")
1834 : }
1835 2 : t.setExternalObj(o.externalObjID, externalObjMeta{
1836 2 : sstMeta: sstMeta,
1837 2 : })
1838 2 : h.Recordf("%s", o)
1839 : }
1840 :
1841 2 : func (o *newExternalObjOp) String() string {
1842 2 : return fmt.Sprintf("%s = %s.NewExternalObj()", o.externalObjID, o.batchID)
1843 2 : }
1844 2 : func (o *newExternalObjOp) receiver() objID { return o.batchID }
1845 2 : func (o *newExternalObjOp) syncObjs() objIDSlice { return []objID{o.externalObjID} }
1846 :
1847 0 : func (o *newExternalObjOp) keys() []*[]byte { return nil }
1848 0 : func (o *newExternalObjOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1849 :
1850 : type dbRatchetFormatMajorVersionOp struct {
1851 : dbID objID
1852 : vers pebble.FormatMajorVersion
1853 : }
1854 :
1855 2 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
1856 2 : var err error
1857 2 : // NB: We no-op the operation if we're already at or above the provided
1858 2 : // format major version. Different runs start at different format major
1859 2 : // versions, making the presence of an error and the error message itself
1860 2 : // non-deterministic if we attempt to upgrade to an older version.
1861 2 : //
1862 2 : //Regardless, subsequent operations should behave identically, which is what
1863 2 : //we're really aiming to test by including this format major version ratchet
1864 2 : //operation.
1865 2 : if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
1866 2 : err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
1867 2 : }
1868 2 : h.Recordf("%s // %v", o, err)
1869 : }
1870 :
1871 2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
1872 2 : return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
1873 2 : }
1874 2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID }
1875 2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1876 :
1877 0 : func (o *dbRatchetFormatMajorVersionOp) keys() []*[]byte { return nil }
1878 0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1879 :
1880 : type dbRestartOp struct {
1881 : dbID objID
1882 :
1883 : // affectedObjects is the list of additional objects that are affected by this
1884 : // operation, and which syncObjs() must return so that we don't perform the
1885 : // restart in parallel with other operations to affected objects.
1886 : affectedObjects []objID
1887 : }
1888 :
1889 2 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1890 2 : if err := t.restartDB(o.dbID); err != nil {
1891 0 : h.Recordf("%s // %v", o, err)
1892 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1893 2 : } else {
1894 2 : h.Recordf("%s", o)
1895 2 : }
1896 : }
1897 :
1898 2 : func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) }
1899 2 : func (o *dbRestartOp) receiver() objID { return o.dbID }
1900 2 : func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
1901 :
1902 0 : func (o *dbRestartOp) keys() []*[]byte { return nil }
1903 0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1904 :
1905 1 : func formatOps(ops []op) string {
1906 1 : var buf strings.Builder
1907 1 : for _, op := range ops {
1908 1 : fmt.Fprintf(&buf, "%s\n", op)
1909 1 : }
1910 1 : return buf.String()
1911 : }
1912 :
1913 : // replicateOp models an operation that could copy keys from one db to
1914 : // another through either an IngestAndExcise, or an Ingest.
1915 : type replicateOp struct {
1916 : source, dest objID
1917 : start, end []byte
1918 : }
1919 :
1920 : func (r *replicateOp) runSharedReplicate(
1921 : t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
1922 1 : ) {
1923 1 : var sharedSSTs []pebble.SharedSSTMeta
1924 1 : var err error
1925 1 : err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
1926 1 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
1927 1 : val, _, err := value.Value(nil)
1928 1 : if err != nil {
1929 0 : panic(err)
1930 : }
1931 1 : return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
1932 : },
1933 1 : func(start, end []byte, seqNum base.SeqNum) error {
1934 1 : return w.DeleteRange(start, end)
1935 1 : },
1936 1 : func(start, end []byte, keys []keyspan.Key) error {
1937 1 : return w.Raw().EncodeSpan(keyspan.Span{
1938 1 : Start: start,
1939 1 : End: end,
1940 1 : Keys: keys,
1941 1 : })
1942 1 : },
1943 1 : func(sst *pebble.SharedSSTMeta) error {
1944 1 : sharedSSTs = append(sharedSSTs, *sst)
1945 1 : return nil
1946 1 : },
1947 : nil,
1948 : )
1949 1 : if err != nil {
1950 0 : h.Recordf("%s // %v", r, err)
1951 0 : return
1952 0 : }
1953 :
1954 1 : err = w.Close()
1955 1 : if err != nil {
1956 0 : h.Recordf("%s // %v", r, err)
1957 0 : return
1958 0 : }
1959 1 : meta, err := w.Raw().Metadata()
1960 1 : if err != nil {
1961 0 : h.Recordf("%s // %v", r, err)
1962 0 : return
1963 0 : }
1964 1 : if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
1965 1 : // IngestAndExcise below will be a no-op. We should do a
1966 1 : // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
1967 1 : // case.
1968 1 : //
1969 1 : // TODO(bilal): Remove this when we support excises with no matching ingests.
1970 1 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
1971 0 : h.Recordf("%s // %v", r, err)
1972 0 : return
1973 0 : }
1974 1 : err := dest.DeleteRange(r.start, r.end, t.writeOpts)
1975 1 : h.Recordf("%s // %v", r, err)
1976 1 : return
1977 : }
1978 :
1979 1 : _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false)
1980 1 : h.Recordf("%s // %v", r, err)
1981 : }
1982 :
1983 : func (r *replicateOp) runExternalReplicate(
1984 : t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
1985 0 : ) {
1986 0 : var externalSSTs []pebble.ExternalFile
1987 0 : var err error
1988 0 : err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
1989 0 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
1990 0 : val, _, err := value.Value(nil)
1991 0 : if err != nil {
1992 0 : panic(err)
1993 : }
1994 0 : return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
1995 : },
1996 0 : func(start, end []byte, seqNum base.SeqNum) error {
1997 0 : return w.DeleteRange(start, end)
1998 0 : },
1999 0 : func(start, end []byte, keys []keyspan.Key) error {
2000 0 : return w.Raw().EncodeSpan(keyspan.Span{
2001 0 : Start: start,
2002 0 : End: end,
2003 0 : Keys: keys,
2004 0 : })
2005 0 : },
2006 : nil,
2007 0 : func(sst *pebble.ExternalFile) error {
2008 0 : externalSSTs = append(externalSSTs, *sst)
2009 0 : return nil
2010 0 : },
2011 : )
2012 0 : if err != nil {
2013 0 : h.Recordf("%s // %v", r, err)
2014 0 : return
2015 0 : }
2016 :
2017 0 : err = w.Close()
2018 0 : if err != nil {
2019 0 : h.Recordf("%s // %v", r, err)
2020 0 : return
2021 0 : }
2022 0 : meta, err := w.Raw().Metadata()
2023 0 : if err != nil {
2024 0 : h.Recordf("%s // %v", r, err)
2025 0 : return
2026 0 : }
2027 0 : if len(externalSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
2028 0 : // IngestAndExcise below will be a no-op. We should do a
2029 0 : // DeleteRange+RangeKeyDel to mimic the behaviour of the non-external-replicate
2030 0 : // case.
2031 0 : //
2032 0 : // TODO(bilal): Remove this when we support excises with no matching ingests.
2033 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
2034 0 : h.Recordf("%s // %v", r, err)
2035 0 : return
2036 0 : }
2037 0 : err := dest.DeleteRange(r.start, r.end, t.writeOpts)
2038 0 : h.Recordf("%s // %v", r, err)
2039 0 : return
2040 : }
2041 :
2042 0 : _, err = dest.IngestAndExcise([]string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */)
2043 0 : h.Recordf("%s // %v", r, err)
2044 : }
2045 :
2046 1 : func (r *replicateOp) run(t *Test, h historyRecorder) {
2047 1 : // Shared replication only works if shared storage is enabled.
2048 1 : useSharedIngest := t.testOpts.useSharedReplicate && t.testOpts.sharedStorageEnabled
2049 1 : useExternalIngest := t.testOpts.useExternalReplicate && t.testOpts.externalStorageEnabled
2050 1 :
2051 1 : source := t.getDB(r.source)
2052 1 : dest := t.getDB(r.dest)
2053 1 : sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
2054 1 : f, err := t.opts.FS.Create(sstPath, vfs.WriteCategoryUnspecified)
2055 1 : if err != nil {
2056 0 : h.Recordf("%s // %v", r, err)
2057 0 : return
2058 0 : }
2059 1 : w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
2060 1 :
2061 1 : // NB: In practice we'll either do shared replicate or external replicate,
2062 1 : // as ScanInternal does not support both. We arbitrarily choose to prioritize
2063 1 : // external replication if both are enabled, as those are likely to hit
2064 1 : // widespread usage first.
2065 1 : if useExternalIngest {
2066 0 : r.runExternalReplicate(t, h, source, dest, w, sstPath)
2067 0 : return
2068 0 : }
2069 1 : if useSharedIngest {
2070 1 : r.runSharedReplicate(t, h, source, dest, w, sstPath)
2071 1 : return
2072 1 : }
2073 :
2074 : // First, do a RangeKeyDelete and DeleteRange on the whole span.
2075 1 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
2076 0 : h.Recordf("%s // %v", r, err)
2077 0 : return
2078 0 : }
2079 1 : if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
2080 0 : h.Recordf("%s // %v", r, err)
2081 0 : return
2082 0 : }
2083 1 : iter, err := source.NewIter(&pebble.IterOptions{
2084 1 : LowerBound: r.start,
2085 1 : UpperBound: r.end,
2086 1 : KeyTypes: pebble.IterKeyTypePointsAndRanges,
2087 1 : })
2088 1 : if err != nil {
2089 0 : panic(err)
2090 : }
2091 1 : defer iter.Close()
2092 1 :
2093 1 : for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
2094 1 : hasPoint, hasRange := iter.HasPointAndRange()
2095 1 : if hasPoint {
2096 1 : val, err := iter.ValueAndErr()
2097 1 : if err != nil {
2098 0 : panic(err)
2099 : }
2100 1 : if err := w.Set(iter.Key(), val); err != nil {
2101 0 : panic(err)
2102 : }
2103 : }
2104 1 : if hasRange && iter.RangeKeyChanged() {
2105 1 : rangeKeys := iter.RangeKeys()
2106 1 : rkStart, rkEnd := iter.RangeBounds()
2107 1 :
2108 1 : span := keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
2109 1 : for i := range rangeKeys {
2110 1 : span.Keys[i] = keyspan.Key{
2111 1 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
2112 1 : Suffix: rangeKeys[i].Suffix,
2113 1 : Value: rangeKeys[i].Value,
2114 1 : }
2115 1 : }
2116 1 : keyspan.SortKeysByTrailer(span.Keys)
2117 1 : if err := w.Raw().EncodeSpan(span); err != nil {
2118 0 : panic(err)
2119 : }
2120 : }
2121 : }
2122 1 : if err := iter.Error(); err != nil {
2123 0 : h.Recordf("%s // %v", r, err)
2124 0 : return
2125 0 : }
2126 1 : if err := w.Close(); err != nil {
2127 0 : panic(err)
2128 : }
2129 :
2130 1 : err = dest.Ingest([]string{sstPath})
2131 1 : h.Recordf("%s // %v", r, err)
2132 : }
2133 :
2134 2 : func (r *replicateOp) String() string {
2135 2 : return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
2136 2 : }
2137 :
2138 1 : func (r *replicateOp) receiver() objID { return r.source }
2139 1 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
2140 :
2141 0 : func (r *replicateOp) keys() []*[]byte {
2142 0 : return []*[]byte{&r.start, &r.end}
2143 0 : }
2144 :
2145 1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
2146 1 : return []pebble.KeyRange{{Start: r.start, End: r.end}}
2147 1 : }
|