Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "crypto/rand"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "path/filepath"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/private"
21 : "github.com/cockroachdb/pebble/internal/testkeys"
22 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
23 : "github.com/cockroachdb/pebble/sstable"
24 : "github.com/cockroachdb/pebble/vfs"
25 : "github.com/cockroachdb/pebble/vfs/errorfs"
26 : )
27 :
28 : // op defines the interface for a single operation, such as creating a batch,
29 : // or advancing an iterator.
30 : type op interface {
31 : String() string
32 : run(t *test, h historyRecorder)
33 :
34 : // receiver returns the object ID of the object the operation is performed
35 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
36 : // its receiver). Receivers are used for synchronization when running with
37 : // concurrency.
38 : receiver() objID
39 :
40 : // syncObjs returns an additional set of object IDs—excluding the
41 : // receiver—that the operation must synchronize with. At execution time,
42 : // the operation will run serially with respect to all other operations
43 : // that return these objects from their own syncObjs or receiver methods.
44 : syncObjs() objIDSlice
45 : }
46 :
47 : // initOp performs test initialization
48 : type initOp struct {
49 : batchSlots uint32
50 : iterSlots uint32
51 : snapshotSlots uint32
52 : }
53 :
54 2 : func (o *initOp) run(t *test, h historyRecorder) {
55 2 : t.batches = make([]*pebble.Batch, o.batchSlots)
56 2 : t.iters = make([]*retryableIter, o.iterSlots)
57 2 : t.snapshots = make([]readerCloser, o.snapshotSlots)
58 2 : h.Recordf("%s", o)
59 2 : }
60 :
61 2 : func (o *initOp) String() string {
62 2 : return fmt.Sprintf("Init(%d /* batches */, %d /* iters */, %d /* snapshots */)",
63 2 : o.batchSlots, o.iterSlots, o.snapshotSlots)
64 2 : }
65 :
66 2 : func (o *initOp) receiver() objID { return dbObjID }
67 0 : func (o *initOp) syncObjs() objIDSlice { return nil }
68 :
69 : // applyOp models a Writer.Apply operation.
70 : type applyOp struct {
71 : writerID objID
72 : batchID objID
73 : }
74 :
75 2 : func (o *applyOp) run(t *test, h historyRecorder) {
76 2 : b := t.getBatch(o.batchID)
77 2 : w := t.getWriter(o.writerID)
78 2 : var err error
79 2 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
80 1 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
81 1 : if err == nil {
82 1 : err = b.SyncWait()
83 1 : }
84 2 : } else {
85 2 : err = w.Apply(b, t.writeOpts)
86 2 : }
87 2 : h.Recordf("%s // %v", o, err)
88 : // batch will be closed by a closeOp which is guaranteed to be generated
89 : }
90 :
91 2 : func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
92 2 : func (o *applyOp) receiver() objID { return o.writerID }
93 2 : func (o *applyOp) syncObjs() objIDSlice {
94 2 : // Apply should not be concurrent with operations that are mutating the
95 2 : // batch.
96 2 : return []objID{o.batchID}
97 2 : }
98 :
99 : // checkpointOp models a DB.Checkpoint operation.
100 : type checkpointOp struct {
101 : // If non-empty, the checkpoint is restricted to these spans.
102 : spans []pebble.CheckpointSpan
103 : }
104 :
105 2 : func (o *checkpointOp) run(t *test, h historyRecorder) {
106 2 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
107 2 : // It would be better to filter out ahead of calling run on the op,
108 2 : // by setting the weight that generator.go uses to zero, or similar.
109 2 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
110 2 : // not sure how to do that easily:
111 2 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
112 2 : if t.testOpts.sharedStorageEnabled {
113 1 : h.Recordf("%s // %v", o, nil)
114 1 : return
115 1 : }
116 2 : var opts []pebble.CheckpointOption
117 2 : if len(o.spans) > 0 {
118 2 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
119 2 : }
120 2 : err := withRetries(func() error {
121 2 : return t.db.Checkpoint(o.dir(t.dir, h.op), opts...)
122 2 : })
123 2 : h.Recordf("%s // %v", o, err)
124 : }
125 :
126 2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
127 2 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
128 2 : }
129 :
130 2 : func (o *checkpointOp) String() string {
131 2 : var spanStr bytes.Buffer
132 2 : for i, span := range o.spans {
133 2 : if i > 0 {
134 2 : spanStr.WriteString(",")
135 2 : }
136 2 : fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
137 : }
138 2 : return fmt.Sprintf("db.Checkpoint(%s)", spanStr.String())
139 : }
140 :
141 2 : func (o *checkpointOp) receiver() objID { return dbObjID }
142 2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
143 :
144 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
145 : type closeOp struct {
146 : objID objID
147 : }
148 :
149 2 : func (o *closeOp) run(t *test, h historyRecorder) {
150 2 : c := t.getCloser(o.objID)
151 2 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
152 1 : // Special case: If WAL is disabled, do a flush right before DB Close. This
153 1 : // allows us to reuse this run's data directory as initial state for
154 1 : // future runs without losing any mutations.
155 1 : _ = t.db.Flush()
156 1 : }
157 2 : t.clearObj(o.objID)
158 2 : err := c.Close()
159 2 : h.Recordf("%s // %v", o, err)
160 : }
161 :
162 2 : func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
163 2 : func (o *closeOp) receiver() objID { return o.objID }
164 2 : func (o *closeOp) syncObjs() objIDSlice {
165 2 : // Synchronize on the database so that we don't close the database before
166 2 : // all its iterators, snapshots and batches are closed.
167 2 : // TODO(jackson): It would be nice to relax this so that Close calls can
168 2 : // execute in parallel.
169 2 : if o.objID == dbObjID {
170 2 : return nil
171 2 : }
172 2 : return []objID{dbObjID}
173 : }
174 :
175 : // compactOp models a DB.Compact operation.
176 : type compactOp struct {
177 : start []byte
178 : end []byte
179 : parallelize bool
180 : }
181 :
182 2 : func (o *compactOp) run(t *test, h historyRecorder) {
183 2 : err := withRetries(func() error {
184 2 : return t.db.Compact(o.start, o.end, o.parallelize)
185 2 : })
186 2 : h.Recordf("%s // %v", o, err)
187 : }
188 :
189 2 : func (o *compactOp) String() string {
190 2 : return fmt.Sprintf("db.Compact(%q, %q, %t /* parallelize */)", o.start, o.end, o.parallelize)
191 2 : }
192 :
193 2 : func (o *compactOp) receiver() objID { return dbObjID }
194 2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
195 :
196 : // deleteOp models a Write.Delete operation.
197 : type deleteOp struct {
198 : writerID objID
199 : key []byte
200 : }
201 :
202 2 : func (o *deleteOp) run(t *test, h historyRecorder) {
203 2 : w := t.getWriter(o.writerID)
204 2 : var err error
205 2 : if t.testOpts.deleteSized && t.isFMV(pebble.FormatDeleteSizedAndObsolete) {
206 1 : // Call DeleteSized with a deterministic size derived from the index.
207 1 : // The size does not need to be accurate for correctness.
208 1 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
209 2 : } else {
210 2 : err = w.Delete(o.key, t.writeOpts)
211 2 : }
212 2 : h.Recordf("%s // %v", o, err)
213 : }
214 :
215 1 : func hashSize(index int) uint32 {
216 1 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
217 1 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
218 1 : }
219 :
220 2 : func (o *deleteOp) String() string {
221 2 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
222 2 : }
223 2 : func (o *deleteOp) receiver() objID { return o.writerID }
224 2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
225 :
226 : // singleDeleteOp models a Write.SingleDelete operation.
227 : type singleDeleteOp struct {
228 : writerID objID
229 : key []byte
230 : maybeReplaceDelete bool
231 : }
232 :
233 2 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
234 2 : w := t.getWriter(o.writerID)
235 2 : var err error
236 2 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
237 1 : err = w.Delete(o.key, t.writeOpts)
238 2 : } else {
239 2 : err = w.SingleDelete(o.key, t.writeOpts)
240 2 : }
241 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
242 : // write the former to the history log. The log line will indicate whether
243 : // or not the delete *could* have been replaced. The OPTIONS file should
244 : // also be consulted to determine what happened at runtime (i.e. by taking
245 : // the logical AND).
246 2 : h.Recordf("%s // %v", o, err)
247 : }
248 :
249 2 : func (o *singleDeleteOp) String() string {
250 2 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
251 2 : }
252 :
253 2 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
254 2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
255 :
256 : // deleteRangeOp models a Write.DeleteRange operation.
257 : type deleteRangeOp struct {
258 : writerID objID
259 : start []byte
260 : end []byte
261 : }
262 :
263 2 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
264 2 : w := t.getWriter(o.writerID)
265 2 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
266 2 : h.Recordf("%s // %v", o, err)
267 2 : }
268 :
269 2 : func (o *deleteRangeOp) String() string {
270 2 : return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
271 2 : }
272 :
273 2 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
274 2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
275 :
276 : // flushOp models a DB.Flush operation.
277 : type flushOp struct {
278 : }
279 :
280 2 : func (o *flushOp) run(t *test, h historyRecorder) {
281 2 : err := t.db.Flush()
282 2 : h.Recordf("%s // %v", o, err)
283 2 : }
284 :
285 2 : func (o *flushOp) String() string { return "db.Flush()" }
286 2 : func (o *flushOp) receiver() objID { return dbObjID }
287 2 : func (o *flushOp) syncObjs() objIDSlice { return nil }
288 :
289 : // mergeOp models a Write.Merge operation.
290 : type mergeOp struct {
291 : writerID objID
292 : key []byte
293 : value []byte
294 : }
295 :
296 2 : func (o *mergeOp) run(t *test, h historyRecorder) {
297 2 : w := t.getWriter(o.writerID)
298 2 : err := w.Merge(o.key, o.value, t.writeOpts)
299 2 : h.Recordf("%s // %v", o, err)
300 2 : }
301 :
302 2 : func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
303 2 : func (o *mergeOp) receiver() objID { return o.writerID }
304 2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
305 :
306 : // setOp models a Write.Set operation.
307 : type setOp struct {
308 : writerID objID
309 : key []byte
310 : value []byte
311 : }
312 :
313 2 : func (o *setOp) run(t *test, h historyRecorder) {
314 2 : w := t.getWriter(o.writerID)
315 2 : err := w.Set(o.key, o.value, t.writeOpts)
316 2 : h.Recordf("%s // %v", o, err)
317 2 : }
318 :
319 2 : func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
320 2 : func (o *setOp) receiver() objID { return o.writerID }
321 2 : func (o *setOp) syncObjs() objIDSlice { return nil }
322 :
323 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
324 : type rangeKeyDeleteOp struct {
325 : writerID objID
326 : start []byte
327 : end []byte
328 : }
329 :
330 2 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
331 2 : w := t.getWriter(o.writerID)
332 2 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
333 2 : h.Recordf("%s // %v", o, err)
334 2 : }
335 :
336 2 : func (o *rangeKeyDeleteOp) String() string {
337 2 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
338 2 : }
339 :
340 2 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
341 2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
342 :
343 : // rangeKeySetOp models a Write.RangeKeySet operation.
344 : type rangeKeySetOp struct {
345 : writerID objID
346 : start []byte
347 : end []byte
348 : suffix []byte
349 : value []byte
350 : }
351 :
352 2 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
353 2 : w := t.getWriter(o.writerID)
354 2 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
355 2 : h.Recordf("%s // %v", o, err)
356 2 : }
357 :
358 2 : func (o *rangeKeySetOp) String() string {
359 2 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
360 2 : o.writerID, o.start, o.end, o.suffix, o.value)
361 2 : }
362 :
363 2 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
364 2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
365 :
366 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
367 : type rangeKeyUnsetOp struct {
368 : writerID objID
369 : start []byte
370 : end []byte
371 : suffix []byte
372 : }
373 :
374 2 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
375 2 : w := t.getWriter(o.writerID)
376 2 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
377 2 : h.Recordf("%s // %v", o, err)
378 2 : }
379 :
380 2 : func (o *rangeKeyUnsetOp) String() string {
381 2 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
382 2 : o.writerID, o.start, o.end, o.suffix)
383 2 : }
384 :
385 2 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
386 2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
387 :
388 : // newBatchOp models a Write.NewBatch operation.
389 : type newBatchOp struct {
390 : batchID objID
391 : }
392 :
393 2 : func (o *newBatchOp) run(t *test, h historyRecorder) {
394 2 : b := t.db.NewBatch()
395 2 : t.setBatch(o.batchID, b)
396 2 : h.Recordf("%s", o)
397 2 : }
398 :
399 2 : func (o *newBatchOp) String() string { return fmt.Sprintf("%s = db.NewBatch()", o.batchID) }
400 2 : func (o *newBatchOp) receiver() objID { return dbObjID }
401 2 : func (o *newBatchOp) syncObjs() objIDSlice {
402 2 : // NewBatch should not be concurrent with operations that interact with that
403 2 : // same batch.
404 2 : return []objID{o.batchID}
405 2 : }
406 :
407 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
408 : type newIndexedBatchOp struct {
409 : batchID objID
410 : }
411 :
412 2 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
413 2 : b := t.db.NewIndexedBatch()
414 2 : t.setBatch(o.batchID, b)
415 2 : h.Recordf("%s", o)
416 2 : }
417 :
418 2 : func (o *newIndexedBatchOp) String() string {
419 2 : return fmt.Sprintf("%s = db.NewIndexedBatch()", o.batchID)
420 2 : }
421 2 : func (o *newIndexedBatchOp) receiver() objID { return dbObjID }
422 2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
423 2 : // NewIndexedBatch should not be concurrent with operations that interact
424 2 : // with that same batch.
425 2 : return []objID{o.batchID}
426 2 : }
427 :
428 : // batchCommitOp models a Batch.Commit operation.
429 : type batchCommitOp struct {
430 : batchID objID
431 : }
432 :
433 1 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
434 1 : b := t.getBatch(o.batchID)
435 1 : err := b.Commit(t.writeOpts)
436 1 : h.Recordf("%s // %v", o, err)
437 1 : }
438 :
439 2 : func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
440 1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
441 1 : func (o *batchCommitOp) syncObjs() objIDSlice {
442 1 : // Synchronize on the database so that NewIters wait for the commit.
443 1 : return []objID{dbObjID}
444 1 : }
445 :
446 : // ingestOp models a DB.Ingest operation.
447 : type ingestOp struct {
448 : batchIDs []objID
449 : }
450 :
451 2 : func (o *ingestOp) run(t *test, h historyRecorder) {
452 2 : // We can only use apply as an alternative for ingestion if we are ingesting
453 2 : // a single batch. If we are ingesting multiple batches, the batches may
454 2 : // overlap which would cause ingestion to fail but apply would succeed.
455 2 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 {
456 1 : id := o.batchIDs[0]
457 1 : b := t.getBatch(id)
458 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
459 1 : c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter)
460 1 : if err == nil {
461 1 : w := t.getWriter(makeObjID(dbTag, 0))
462 1 : err = w.Apply(c, t.writeOpts)
463 1 : }
464 1 : _ = b.Close()
465 1 : _ = c.Close()
466 1 : t.clearObj(id)
467 1 : h.Recordf("%s // %v", o, err)
468 1 : return
469 : }
470 :
471 2 : var paths []string
472 2 : var err error
473 2 : for i, id := range o.batchIDs {
474 2 : b := t.getBatch(id)
475 2 : t.clearObj(id)
476 2 : path, err2 := o.build(t, h, b, i)
477 2 : if err2 != nil {
478 0 : h.Recordf("Build(%s) // %v", id, err2)
479 0 : }
480 2 : err = firstError(err, err2)
481 2 : if err2 == nil {
482 2 : paths = append(paths, path)
483 2 : }
484 2 : err = firstError(err, b.Close())
485 : }
486 :
487 2 : err = firstError(err, withRetries(func() error {
488 2 : return t.db.Ingest(paths)
489 2 : }))
490 :
491 2 : h.Recordf("%s // %v", o, err)
492 : }
493 :
494 2 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
495 2 : rootFS := vfs.Root(t.opts.FS)
496 2 : path := rootFS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d", i))
497 2 : f, err := rootFS.Create(path)
498 2 : if err != nil {
499 0 : return "", err
500 0 : }
501 :
502 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
503 2 : defer closeIters(iter, rangeDelIter, rangeKeyIter)
504 2 :
505 2 : equal := t.opts.Comparer.Equal
506 2 : tableFormat := t.db.FormatMajorVersion().MaxTableFormat()
507 2 : w := sstable.NewWriter(
508 2 : objstorageprovider.NewFileWritable(f),
509 2 : t.opts.MakeWriterOptions(0, tableFormat),
510 2 : )
511 2 :
512 2 : var lastUserKey []byte
513 2 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
514 2 : // Ignore duplicate keys.
515 2 : if equal(lastUserKey, key.UserKey) {
516 2 : continue
517 : }
518 : // NB: We don't have to copy the key or value since we're reading from a
519 : // batch which doesn't do prefix compression.
520 2 : lastUserKey = key.UserKey
521 2 :
522 2 : key.SetSeqNum(base.SeqNumZero)
523 2 : if err := w.Add(*key, value.InPlaceValue()); err != nil {
524 0 : return "", err
525 0 : }
526 : }
527 2 : if err := iter.Close(); err != nil {
528 0 : return "", err
529 0 : }
530 2 : iter = nil
531 2 :
532 2 : if rangeDelIter != nil {
533 2 : // NB: The range tombstones have already been fragmented by the Batch.
534 2 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
535 2 : // NB: We don't have to copy the key or value since we're reading from a
536 2 : // batch which doesn't do prefix compression.
537 2 : if err := w.DeleteRange(t.Start, t.End); err != nil {
538 0 : return "", err
539 0 : }
540 : }
541 2 : if err := rangeDelIter.Close(); err != nil {
542 0 : return "", err
543 0 : }
544 2 : rangeDelIter = nil
545 : }
546 :
547 2 : if err := w.Close(); err != nil {
548 0 : return "", err
549 0 : }
550 2 : return path, nil
551 : }
552 :
553 2 : func (o *ingestOp) receiver() objID { return dbObjID }
554 2 : func (o *ingestOp) syncObjs() objIDSlice {
555 2 : // Ingest should not be concurrent with mutating the batches that will be
556 2 : // ingested as sstables.
557 2 : return o.batchIDs
558 2 : }
559 :
560 : func closeIters(
561 : pointIter base.InternalIterator,
562 : rangeDelIter keyspan.FragmentIterator,
563 : rangeKeyIter keyspan.FragmentIterator,
564 2 : ) {
565 2 : if pointIter != nil {
566 2 : pointIter.Close()
567 2 : }
568 2 : if rangeDelIter != nil {
569 2 : rangeDelIter.Close()
570 2 : }
571 2 : if rangeKeyIter != nil {
572 2 : rangeKeyIter.Close()
573 2 : }
574 : }
575 :
576 : // collapseBatch collapses the mutations in a batch to be equivalent to an
577 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
578 : // so that only the latest update is performed. All range deletions are
579 : // performed first in the batch to match the semantics of ingestion where a
580 : // range deletion does not delete a point record contained in the sstable.
581 : func (o *ingestOp) collapseBatch(
582 : t *test, pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
583 1 : ) (*pebble.Batch, error) {
584 1 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
585 1 : equal := t.opts.Comparer.Equal
586 1 : collapsed := t.db.NewBatch()
587 1 :
588 1 : if rangeDelIter != nil {
589 1 : // NB: The range tombstones have already been fragmented by the Batch.
590 1 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
591 1 : // NB: We don't have to copy the key or value since we're reading from a
592 1 : // batch which doesn't do prefix compression.
593 1 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
594 0 : return nil, err
595 0 : }
596 : }
597 1 : if err := rangeDelIter.Close(); err != nil {
598 0 : return nil, err
599 0 : }
600 1 : rangeDelIter = nil
601 : }
602 :
603 1 : if pointIter != nil {
604 1 : var lastUserKey []byte
605 1 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
606 1 : // Ignore duplicate keys.
607 1 : if equal(lastUserKey, key.UserKey) {
608 1 : continue
609 : }
610 : // NB: We don't have to copy the key or value since we're reading from a
611 : // batch which doesn't do prefix compression.
612 1 : lastUserKey = key.UserKey
613 1 :
614 1 : var err error
615 1 : switch key.Kind() {
616 1 : case pebble.InternalKeyKindDelete:
617 1 : err = collapsed.Delete(key.UserKey, nil)
618 1 : case pebble.InternalKeyKindDeleteSized:
619 1 : v, _ := binary.Uvarint(value.InPlaceValue())
620 1 : // Batch.DeleteSized takes just the length of the value being
621 1 : // deleted and adds the key's length to derive the overall entry
622 1 : // size of the value being deleted. This has already been done
623 1 : // to the key we're reading from the batch, so we must subtract
624 1 : // the key length from the encoded value before calling
625 1 : // collapsed.DeleteSized, which will again add the key length
626 1 : // before encoding.
627 1 : err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
628 1 : case pebble.InternalKeyKindSingleDelete:
629 1 : err = collapsed.SingleDelete(key.UserKey, nil)
630 1 : case pebble.InternalKeyKindSet:
631 1 : err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
632 1 : case pebble.InternalKeyKindMerge:
633 1 : err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
634 0 : case pebble.InternalKeyKindLogData:
635 0 : err = collapsed.LogData(key.UserKey, nil)
636 0 : default:
637 0 : err = errors.Errorf("unknown batch record kind: %d", key.Kind())
638 : }
639 1 : if err != nil {
640 0 : return nil, err
641 0 : }
642 : }
643 1 : if err := pointIter.Close(); err != nil {
644 0 : return nil, err
645 0 : }
646 1 : pointIter = nil
647 : }
648 :
649 1 : return collapsed, nil
650 : }
651 :
652 2 : func (o *ingestOp) String() string {
653 2 : var buf strings.Builder
654 2 : buf.WriteString("db.Ingest(")
655 2 : for i, id := range o.batchIDs {
656 2 : if i > 0 {
657 2 : buf.WriteString(", ")
658 2 : }
659 2 : buf.WriteString(id.String())
660 : }
661 2 : buf.WriteString(")")
662 2 : return buf.String()
663 : }
664 :
665 : // getOp models a Reader.Get operation.
666 : type getOp struct {
667 : readerID objID
668 : key []byte
669 : }
670 :
671 2 : func (o *getOp) run(t *test, h historyRecorder) {
672 2 : r := t.getReader(o.readerID)
673 2 : var val []byte
674 2 : var closer io.Closer
675 2 : err := withRetries(func() (err error) {
676 2 : val, closer, err = r.Get(o.key)
677 2 : return err
678 2 : })
679 2 : h.Recordf("%s // [%q] %v", o, val, err)
680 2 : if closer != nil {
681 2 : closer.Close()
682 2 : }
683 : }
684 :
685 2 : func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
686 2 : func (o *getOp) receiver() objID { return o.readerID }
687 2 : func (o *getOp) syncObjs() objIDSlice {
688 2 : if o.readerID == dbObjID {
689 2 : return nil
690 2 : }
691 : // batch.Get reads through to the current database state.
692 2 : return []objID{dbObjID}
693 : }
694 :
695 : // newIterOp models a Reader.NewIter operation.
696 : type newIterOp struct {
697 : readerID objID
698 : iterID objID
699 : iterOpts
700 : }
701 :
702 2 : func (o *newIterOp) run(t *test, h historyRecorder) {
703 2 : r := t.getReader(o.readerID)
704 2 : opts := iterOptions(o.iterOpts)
705 2 :
706 2 : var i *pebble.Iterator
707 2 : for {
708 2 : i, _ = r.NewIter(opts)
709 2 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
710 2 : break
711 : }
712 : // close this iter and retry NewIter
713 0 : _ = i.Close()
714 : }
715 2 : t.setIter(o.iterID, i)
716 2 :
717 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
718 2 : // the user-provided bounds.
719 2 : if opts != nil {
720 2 : rand.Read(opts.LowerBound[:])
721 2 : rand.Read(opts.UpperBound[:])
722 2 : }
723 2 : h.Recordf("%s // %v", o, i.Error())
724 : }
725 :
726 2 : func (o *newIterOp) String() string {
727 2 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
728 2 : o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
729 2 : }
730 :
731 2 : func (o *newIterOp) receiver() objID { return o.readerID }
732 2 : func (o *newIterOp) syncObjs() objIDSlice {
733 2 : // Prevent o.iterID ops from running before it exists.
734 2 : objs := []objID{o.iterID}
735 2 : // If reading through a batch, the new iterator will also observe database
736 2 : // state, and we must synchronize on the database state for a consistent
737 2 : // view.
738 2 : if o.readerID.tag() == batchTag {
739 1 : objs = append(objs, dbObjID)
740 1 : }
741 2 : return objs
742 : }
743 :
744 : // newIterUsingCloneOp models a Iterator.Clone operation.
745 : type newIterUsingCloneOp struct {
746 : existingIterID objID
747 : iterID objID
748 : refreshBatch bool
749 : iterOpts
750 :
751 : // derivedReaderID is the ID of the underlying reader that backs both the
752 : // existing iterator and the new iterator. The derivedReaderID is NOT
753 : // serialized by String and is derived from other operations during parse.
754 : derivedReaderID objID
755 : }
756 :
757 2 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
758 2 : iter := t.getIter(o.existingIterID)
759 2 : cloneOpts := pebble.CloneOptions{
760 2 : IterOptions: iterOptions(o.iterOpts),
761 2 : RefreshBatchView: o.refreshBatch,
762 2 : }
763 2 : i, err := iter.iter.Clone(cloneOpts)
764 2 : if err != nil {
765 0 : panic(err)
766 : }
767 2 : t.setIter(o.iterID, i)
768 2 : h.Recordf("%s // %v", o, i.Error())
769 : }
770 :
771 2 : func (o *newIterUsingCloneOp) String() string {
772 2 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
773 2 : o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
774 2 : o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
775 2 : }
776 :
777 2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
778 :
779 2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
780 2 : objIDs := []objID{o.iterID}
781 2 : // If the underlying reader is a batch, we must synchronize with the batch.
782 2 : // If refreshBatch=true, synchronizing is necessary to observe all the
783 2 : // mutations up to until this op and no more. Even when refreshBatch=false,
784 2 : // we must synchronize because iterator construction may access state cached
785 2 : // on the indexed batch to avoid refragmenting range tombstones or range
786 2 : // keys.
787 2 : if o.derivedReaderID.tag() == batchTag {
788 0 : objIDs = append(objIDs, o.derivedReaderID)
789 0 : }
790 2 : return objIDs
791 : }
792 :
793 : // iterSetBoundsOp models an Iterator.SetBounds operation.
794 : type iterSetBoundsOp struct {
795 : iterID objID
796 : lower []byte
797 : upper []byte
798 : }
799 :
800 2 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
801 2 : i := t.getIter(o.iterID)
802 2 : var lower, upper []byte
803 2 : if o.lower != nil {
804 2 : lower = append(lower, o.lower...)
805 2 : }
806 2 : if o.upper != nil {
807 2 : upper = append(upper, o.upper...)
808 2 : }
809 2 : i.SetBounds(lower, upper)
810 2 :
811 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
812 2 : // the user-provided bounds.
813 2 : rand.Read(lower[:])
814 2 : rand.Read(upper[:])
815 2 :
816 2 : h.Recordf("%s // %v", o, i.Error())
817 : }
818 :
819 2 : func (o *iterSetBoundsOp) String() string {
820 2 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
821 2 : }
822 :
823 2 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
824 2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
825 :
826 : // iterSetOptionsOp models an Iterator.SetOptions operation.
827 : type iterSetOptionsOp struct {
828 : iterID objID
829 : iterOpts
830 :
831 : // derivedReaderID is the ID of the underlying reader that backs the
832 : // iterator. The derivedReaderID is NOT serialized by String and is derived
833 : // from other operations during parse.
834 : derivedReaderID objID
835 : }
836 :
837 2 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
838 2 : i := t.getIter(o.iterID)
839 2 :
840 2 : opts := iterOptions(o.iterOpts)
841 2 : if opts == nil {
842 2 : opts = &pebble.IterOptions{}
843 2 : }
844 2 : i.SetOptions(opts)
845 2 :
846 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
847 2 : // the user-provided bounds.
848 2 : rand.Read(opts.LowerBound[:])
849 2 : rand.Read(opts.UpperBound[:])
850 2 :
851 2 : h.Recordf("%s // %v", o, i.Error())
852 : }
853 :
854 2 : func (o *iterSetOptionsOp) String() string {
855 2 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
856 2 : o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
857 2 : }
858 :
859 2 : func iterOptions(o iterOpts) *pebble.IterOptions {
860 2 : if o.IsZero() {
861 2 : return nil
862 2 : }
863 2 : var lower, upper []byte
864 2 : if o.lower != nil {
865 2 : lower = append(lower, o.lower...)
866 2 : }
867 2 : if o.upper != nil {
868 2 : upper = append(upper, o.upper...)
869 2 : }
870 2 : opts := &pebble.IterOptions{
871 2 : LowerBound: lower,
872 2 : UpperBound: upper,
873 2 : KeyTypes: pebble.IterKeyType(o.keyTypes),
874 2 : RangeKeyMasking: pebble.RangeKeyMasking{
875 2 : Suffix: o.maskSuffix,
876 2 : },
877 2 : UseL6Filters: o.useL6Filters,
878 2 : }
879 2 : if opts.RangeKeyMasking.Suffix != nil {
880 2 : opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
881 2 : return sstable.NewTestKeysMaskingFilter()
882 2 : }
883 : }
884 2 : if o.filterMax > 0 {
885 2 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
886 2 : sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
887 2 : }
888 2 : // Enforce the timestamp bounds in SkipPoint, so that the iterator never
889 2 : // returns a key outside the filterMin, filterMax bounds. This provides
890 2 : // deterministic iteration.
891 2 : opts.SkipPoint = func(k []byte) (skip bool) {
892 2 : n := testkeys.Comparer.Split(k)
893 2 : if n == len(k) {
894 2 : // No suffix, don't skip it.
895 2 : return false
896 2 : }
897 2 : v, err := testkeys.ParseSuffix(k[n:])
898 2 : if err != nil {
899 0 : panic(err)
900 : }
901 2 : ts := uint64(v)
902 2 : return ts < o.filterMin || ts >= o.filterMax
903 : }
904 : }
905 2 : return opts
906 : }
907 :
908 2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
909 :
910 2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
911 2 : if o.derivedReaderID.tag() == batchTag {
912 1 : // If the underlying reader is a batch, we must synchronize with the
913 1 : // batch so that we observe all the mutations up until this operation
914 1 : // and no more.
915 1 : return []objID{o.derivedReaderID}
916 1 : }
917 2 : return nil
918 : }
919 :
920 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
921 : type iterSeekGEOp struct {
922 : iterID objID
923 : key []byte
924 : limit []byte
925 :
926 : derivedReaderID objID
927 : }
928 :
929 2 : func iteratorPos(i *retryableIter) string {
930 2 : var buf bytes.Buffer
931 2 : fmt.Fprintf(&buf, "%q", i.Key())
932 2 : hasPoint, hasRange := i.HasPointAndRange()
933 2 : if hasPoint {
934 2 : fmt.Fprintf(&buf, ",%q", i.Value())
935 2 : } else {
936 2 : fmt.Fprint(&buf, ",<no point>")
937 2 : }
938 2 : if hasRange {
939 2 : start, end := i.RangeBounds()
940 2 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
941 2 : for i, rk := range i.RangeKeys() {
942 2 : if i > 0 {
943 2 : fmt.Fprint(&buf, ",")
944 2 : }
945 2 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
946 : }
947 2 : fmt.Fprint(&buf, "}")
948 2 : } else {
949 2 : fmt.Fprint(&buf, ",<no range>")
950 2 : }
951 2 : if i.RangeKeyChanged() {
952 2 : fmt.Fprint(&buf, "*")
953 2 : }
954 2 : return buf.String()
955 : }
956 :
957 2 : func validBoolToStr(valid bool) string {
958 2 : return fmt.Sprintf("%t", valid)
959 2 : }
960 :
961 2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
962 2 : // We can't distinguish between IterExhausted and IterAtLimit in a
963 2 : // deterministic manner.
964 2 : switch validity {
965 2 : case pebble.IterExhausted, pebble.IterAtLimit:
966 2 : return false, "invalid"
967 2 : case pebble.IterValid:
968 2 : return true, "valid"
969 0 : default:
970 0 : panic("unknown validity")
971 : }
972 : }
973 :
974 2 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
975 2 : i := t.getIter(o.iterID)
976 2 : var valid bool
977 2 : var validStr string
978 2 : if o.limit == nil {
979 2 : valid = i.SeekGE(o.key)
980 2 : validStr = validBoolToStr(valid)
981 2 : } else {
982 2 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
983 2 : }
984 2 : if valid {
985 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
986 2 : } else {
987 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
988 2 : }
989 : }
990 :
991 2 : func (o *iterSeekGEOp) String() string {
992 2 : return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
993 2 : }
994 2 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
995 2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
996 :
997 2 : func onlyBatchIDs(ids ...objID) objIDSlice {
998 2 : var ret objIDSlice
999 2 : for _, id := range ids {
1000 2 : if id.tag() == batchTag {
1001 1 : ret = append(ret, id)
1002 1 : }
1003 : }
1004 2 : return ret
1005 : }
1006 :
1007 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1008 : type iterSeekPrefixGEOp struct {
1009 : iterID objID
1010 : key []byte
1011 :
1012 : derivedReaderID objID
1013 : }
1014 :
1015 2 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
1016 2 : i := t.getIter(o.iterID)
1017 2 : valid := i.SeekPrefixGE(o.key)
1018 2 : if valid {
1019 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1020 2 : } else {
1021 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1022 2 : }
1023 : }
1024 :
1025 2 : func (o *iterSeekPrefixGEOp) String() string {
1026 2 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
1027 2 : }
1028 2 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1029 2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1030 :
1031 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1032 : type iterSeekLTOp struct {
1033 : iterID objID
1034 : key []byte
1035 : limit []byte
1036 :
1037 : derivedReaderID objID
1038 : }
1039 :
1040 2 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
1041 2 : i := t.getIter(o.iterID)
1042 2 : var valid bool
1043 2 : var validStr string
1044 2 : if o.limit == nil {
1045 2 : valid = i.SeekLT(o.key)
1046 2 : validStr = validBoolToStr(valid)
1047 2 : } else {
1048 2 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1049 2 : }
1050 2 : if valid {
1051 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1052 2 : } else {
1053 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1054 2 : }
1055 : }
1056 :
1057 2 : func (o *iterSeekLTOp) String() string {
1058 2 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
1059 2 : }
1060 :
1061 2 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1062 2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1063 :
1064 : // iterFirstOp models an Iterator.First operation.
1065 : type iterFirstOp struct {
1066 : iterID objID
1067 :
1068 : derivedReaderID objID
1069 : }
1070 :
1071 2 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
1072 2 : i := t.getIter(o.iterID)
1073 2 : valid := i.First()
1074 2 : if valid {
1075 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1076 2 : } else {
1077 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1078 2 : }
1079 : }
1080 :
1081 2 : func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
1082 2 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1083 2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1084 :
1085 : // iterLastOp models an Iterator.Last operation.
1086 : type iterLastOp struct {
1087 : iterID objID
1088 :
1089 : derivedReaderID objID
1090 : }
1091 :
1092 2 : func (o *iterLastOp) run(t *test, h historyRecorder) {
1093 2 : i := t.getIter(o.iterID)
1094 2 : valid := i.Last()
1095 2 : if valid {
1096 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1097 2 : } else {
1098 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1099 2 : }
1100 : }
1101 :
1102 2 : func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
1103 2 : func (o *iterLastOp) receiver() objID { return o.iterID }
1104 2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1105 :
1106 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1107 : type iterNextOp struct {
1108 : iterID objID
1109 : limit []byte
1110 :
1111 : derivedReaderID objID
1112 : }
1113 :
1114 2 : func (o *iterNextOp) run(t *test, h historyRecorder) {
1115 2 : i := t.getIter(o.iterID)
1116 2 : var valid bool
1117 2 : var validStr string
1118 2 : if o.limit == nil {
1119 2 : valid = i.Next()
1120 2 : validStr = validBoolToStr(valid)
1121 2 : } else {
1122 2 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1123 2 : }
1124 2 : if valid {
1125 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1126 2 : } else {
1127 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1128 2 : }
1129 : }
1130 :
1131 2 : func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
1132 2 : func (o *iterNextOp) receiver() objID { return o.iterID }
1133 2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1134 :
1135 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1136 : type iterNextPrefixOp struct {
1137 : iterID objID
1138 :
1139 : derivedReaderID objID
1140 : }
1141 :
1142 2 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
1143 2 : i := t.getIter(o.iterID)
1144 2 : valid := i.NextPrefix()
1145 2 : validStr := validBoolToStr(valid)
1146 2 : if valid {
1147 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1148 2 : } else {
1149 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1150 2 : }
1151 : }
1152 :
1153 2 : func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
1154 2 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1155 2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1156 :
1157 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1158 : type iterPrevOp struct {
1159 : iterID objID
1160 : limit []byte
1161 :
1162 : derivedReaderID objID
1163 : }
1164 :
1165 2 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
1166 2 : i := t.getIter(o.iterID)
1167 2 : var valid bool
1168 2 : var validStr string
1169 2 : if o.limit == nil {
1170 2 : valid = i.Prev()
1171 2 : validStr = validBoolToStr(valid)
1172 2 : } else {
1173 2 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1174 2 : }
1175 2 : if valid {
1176 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1177 2 : } else {
1178 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1179 2 : }
1180 : }
1181 :
1182 2 : func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
1183 2 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1184 2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1185 :
1186 : // newSnapshotOp models a DB.NewSnapshot operation.
1187 : type newSnapshotOp struct {
1188 : snapID objID
1189 : // If nonempty, this snapshot must not be used to read any keys outside of
1190 : // the provided bounds. This allows some implementations to use 'Eventually
1191 : // file-only snapshots,' which require bounds.
1192 : bounds []pebble.KeyRange
1193 : }
1194 :
1195 2 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
1196 2 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1197 2 : if len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1 {
1198 1 : s := t.db.NewEventuallyFileOnlySnapshot(o.bounds)
1199 1 : t.setSnapshot(o.snapID, s)
1200 2 : } else {
1201 2 : s := t.db.NewSnapshot()
1202 2 : t.setSnapshot(o.snapID, s)
1203 2 : }
1204 2 : h.Recordf("%s", o)
1205 : }
1206 :
1207 2 : func (o *newSnapshotOp) String() string {
1208 2 : var buf bytes.Buffer
1209 2 : fmt.Fprintf(&buf, "%s = db.NewSnapshot(", o.snapID)
1210 2 : for i := range o.bounds {
1211 2 : if i > 0 {
1212 2 : fmt.Fprint(&buf, ", ")
1213 2 : }
1214 2 : fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
1215 : }
1216 2 : fmt.Fprint(&buf, ")")
1217 2 : return buf.String()
1218 : }
1219 2 : func (o *newSnapshotOp) receiver() objID { return dbObjID }
1220 2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1221 :
1222 : type dbRatchetFormatMajorVersionOp struct {
1223 : vers pebble.FormatMajorVersion
1224 : }
1225 :
1226 2 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
1227 2 : var err error
1228 2 : // NB: We no-op the operation if we're already at or above the provided
1229 2 : // format major version. Different runs start at different format major
1230 2 : // versions, making the presence of an error and the error message itself
1231 2 : // non-deterministic if we attempt to upgrade to an older version.
1232 2 : //
1233 2 : //Regardless, subsequent operations should behave identically, which is what
1234 2 : //we're really aiming to test by including this format major version ratchet
1235 2 : //operation.
1236 2 : if t.db.FormatMajorVersion() < o.vers {
1237 2 : err = t.db.RatchetFormatMajorVersion(o.vers)
1238 2 : }
1239 2 : h.Recordf("%s // %v", o, err)
1240 : }
1241 :
1242 2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
1243 2 : return fmt.Sprintf("db.RatchetFormatMajorVersion(%s)", o.vers)
1244 2 : }
1245 2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return dbObjID }
1246 2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1247 :
1248 : type dbRestartOp struct{}
1249 :
1250 2 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
1251 2 : if err := t.restartDB(); err != nil {
1252 0 : h.Recordf("%s // %v", o, err)
1253 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1254 2 : } else {
1255 2 : h.Recordf("%s", o)
1256 2 : }
1257 : }
1258 :
1259 2 : func (o *dbRestartOp) String() string { return "db.Restart()" }
1260 2 : func (o *dbRestartOp) receiver() objID { return dbObjID }
1261 2 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
1262 :
1263 1 : func formatOps(ops []op) string {
1264 1 : var buf strings.Builder
1265 1 : for _, op := range ops {
1266 1 : fmt.Fprintf(&buf, "%s\n", op)
1267 1 : }
1268 1 : return buf.String()
1269 : }
|