Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "crypto/rand"
10 : "encoding/binary"
11 : "fmt"
12 : "io"
13 : "path/filepath"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble"
18 : "github.com/cockroachdb/pebble/internal/base"
19 : "github.com/cockroachdb/pebble/internal/keyspan"
20 : "github.com/cockroachdb/pebble/internal/private"
21 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
22 : "github.com/cockroachdb/pebble/sstable"
23 : "github.com/cockroachdb/pebble/vfs"
24 : "github.com/cockroachdb/pebble/vfs/errorfs"
25 : )
26 :
27 : // op defines the interface for a single operation, such as creating a batch,
28 : // or advancing an iterator.
29 : type op interface {
30 : String() string
31 : run(t *test, h historyRecorder)
32 :
33 : // receiver returns the object ID of the object the operation is performed
34 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
35 : // its receiver). Receivers are used for synchronization when running with
36 : // concurrency.
37 : receiver() objID
38 :
39 : // syncObjs returns an additional set of object IDs—excluding the
40 : // receiver—that the operation must synchronize with. At execution time,
41 : // the operation will run serially with respect to all other operations
42 : // that return these objects from their own syncObjs or receiver methods.
43 : syncObjs() objIDSlice
44 : }
45 :
46 : // initOp performs test initialization
47 : type initOp struct {
48 : batchSlots uint32
49 : iterSlots uint32
50 : snapshotSlots uint32
51 : }
52 :
53 0 : func (o *initOp) run(t *test, h historyRecorder) {
54 0 : t.batches = make([]*pebble.Batch, o.batchSlots)
55 0 : t.iters = make([]*retryableIter, o.iterSlots)
56 0 : t.snapshots = make([]readerCloser, o.snapshotSlots)
57 0 : h.Recordf("%s", o)
58 0 : }
59 :
60 1 : func (o *initOp) String() string {
61 1 : return fmt.Sprintf("Init(%d /* batches */, %d /* iters */, %d /* snapshots */)",
62 1 : o.batchSlots, o.iterSlots, o.snapshotSlots)
63 1 : }
64 :
65 0 : func (o *initOp) receiver() objID { return dbObjID }
66 0 : func (o *initOp) syncObjs() objIDSlice { return nil }
67 :
68 : // applyOp models a Writer.Apply operation.
69 : type applyOp struct {
70 : writerID objID
71 : batchID objID
72 : }
73 :
74 0 : func (o *applyOp) run(t *test, h historyRecorder) {
75 0 : b := t.getBatch(o.batchID)
76 0 : w := t.getWriter(o.writerID)
77 0 : var err error
78 0 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
79 0 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
80 0 : if err == nil {
81 0 : err = b.SyncWait()
82 0 : }
83 0 : } else {
84 0 : err = w.Apply(b, t.writeOpts)
85 0 : }
86 0 : h.Recordf("%s // %v", o, err)
87 : // batch will be closed by a closeOp which is guaranteed to be generated
88 : }
89 :
90 1 : func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
91 0 : func (o *applyOp) receiver() objID { return o.writerID }
92 0 : func (o *applyOp) syncObjs() objIDSlice {
93 0 : // Apply should not be concurrent with operations that are mutating the
94 0 : // batch.
95 0 : return []objID{o.batchID}
96 0 : }
97 :
98 : // checkpointOp models a DB.Checkpoint operation.
99 : type checkpointOp struct {
100 : // If non-empty, the checkpoint is restricted to these spans.
101 : spans []pebble.CheckpointSpan
102 : }
103 :
104 0 : func (o *checkpointOp) run(t *test, h historyRecorder) {
105 0 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
106 0 : // It would be better to filter out ahead of calling run on the op,
107 0 : // by setting the weight that generator.go uses to zero, or similar.
108 0 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
109 0 : // not sure how to do that easily:
110 0 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
111 0 : if t.testOpts.sharedStorageEnabled {
112 0 : h.Recordf("%s // %v", o, nil)
113 0 : return
114 0 : }
115 0 : var opts []pebble.CheckpointOption
116 0 : if len(o.spans) > 0 {
117 0 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
118 0 : }
119 0 : err := withRetries(func() error {
120 0 : return t.db.Checkpoint(o.dir(t.dir, h.op), opts...)
121 0 : })
122 0 : h.Recordf("%s // %v", o, err)
123 : }
124 :
125 0 : func (o *checkpointOp) dir(dataDir string, idx int) string {
126 0 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
127 0 : }
128 :
129 1 : func (o *checkpointOp) String() string {
130 1 : var spanStr bytes.Buffer
131 1 : for i, span := range o.spans {
132 1 : if i > 0 {
133 1 : spanStr.WriteString(",")
134 1 : }
135 1 : fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
136 : }
137 1 : return fmt.Sprintf("db.Checkpoint(%s)", spanStr.String())
138 : }
139 :
140 0 : func (o *checkpointOp) receiver() objID { return dbObjID }
141 0 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
142 :
143 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
144 : type closeOp struct {
145 : objID objID
146 : }
147 :
148 0 : func (o *closeOp) run(t *test, h historyRecorder) {
149 0 : c := t.getCloser(o.objID)
150 0 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
151 0 : // Special case: If WAL is disabled, do a flush right before DB Close. This
152 0 : // allows us to reuse this run's data directory as initial state for
153 0 : // future runs without losing any mutations.
154 0 : _ = t.db.Flush()
155 0 : }
156 0 : t.clearObj(o.objID)
157 0 : err := c.Close()
158 0 : h.Recordf("%s // %v", o, err)
159 : }
160 :
161 1 : func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
162 0 : func (o *closeOp) receiver() objID { return o.objID }
163 0 : func (o *closeOp) syncObjs() objIDSlice {
164 0 : // Synchronize on the database so that we don't close the database before
165 0 : // all its iterators, snapshots and batches are closed.
166 0 : // TODO(jackson): It would be nice to relax this so that Close calls can
167 0 : // execute in parallel.
168 0 : if o.objID == dbObjID {
169 0 : return nil
170 0 : }
171 0 : return []objID{dbObjID}
172 : }
173 :
174 : // compactOp models a DB.Compact operation.
175 : type compactOp struct {
176 : start []byte
177 : end []byte
178 : parallelize bool
179 : }
180 :
181 0 : func (o *compactOp) run(t *test, h historyRecorder) {
182 0 : err := withRetries(func() error {
183 0 : return t.db.Compact(o.start, o.end, o.parallelize)
184 0 : })
185 0 : h.Recordf("%s // %v", o, err)
186 : }
187 :
188 1 : func (o *compactOp) String() string {
189 1 : return fmt.Sprintf("db.Compact(%q, %q, %t /* parallelize */)", o.start, o.end, o.parallelize)
190 1 : }
191 :
192 0 : func (o *compactOp) receiver() objID { return dbObjID }
193 0 : func (o *compactOp) syncObjs() objIDSlice { return nil }
194 :
195 : // deleteOp models a Write.Delete operation.
196 : type deleteOp struct {
197 : writerID objID
198 : key []byte
199 : }
200 :
201 0 : func (o *deleteOp) run(t *test, h historyRecorder) {
202 0 : w := t.getWriter(o.writerID)
203 0 : var err error
204 0 : if t.testOpts.deleteSized && t.isFMV(pebble.ExperimentalFormatDeleteSizedAndObsolete) {
205 0 : // Call DeleteSized with a deterministic size derived from the index.
206 0 : // The size does not need to be accurate for correctness.
207 0 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
208 0 : } else {
209 0 : err = w.Delete(o.key, t.writeOpts)
210 0 : }
211 0 : h.Recordf("%s // %v", o, err)
212 : }
213 :
214 0 : func hashSize(index int) uint32 {
215 0 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
216 0 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
217 0 : }
218 :
219 1 : func (o *deleteOp) String() string {
220 1 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
221 1 : }
222 0 : func (o *deleteOp) receiver() objID { return o.writerID }
223 0 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
224 :
225 : // singleDeleteOp models a Write.SingleDelete operation.
226 : type singleDeleteOp struct {
227 : writerID objID
228 : key []byte
229 : maybeReplaceDelete bool
230 : }
231 :
232 0 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
233 0 : w := t.getWriter(o.writerID)
234 0 : var err error
235 0 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
236 0 : err = w.Delete(o.key, t.writeOpts)
237 0 : } else {
238 0 : err = w.SingleDelete(o.key, t.writeOpts)
239 0 : }
240 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
241 : // write the former to the history log. The log line will indicate whether
242 : // or not the delete *could* have been replaced. The OPTIONS file should
243 : // also be consulted to determine what happened at runtime (i.e. by taking
244 : // the logical AND).
245 0 : h.Recordf("%s // %v", o, err)
246 : }
247 :
248 1 : func (o *singleDeleteOp) String() string {
249 1 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
250 1 : }
251 :
252 0 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
253 0 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
254 :
255 : // deleteRangeOp models a Write.DeleteRange operation.
256 : type deleteRangeOp struct {
257 : writerID objID
258 : start []byte
259 : end []byte
260 : }
261 :
262 0 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
263 0 : w := t.getWriter(o.writerID)
264 0 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
265 0 : h.Recordf("%s // %v", o, err)
266 0 : }
267 :
268 1 : func (o *deleteRangeOp) String() string {
269 1 : return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
270 1 : }
271 :
272 0 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
273 0 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
274 :
275 : // flushOp models a DB.Flush operation.
276 : type flushOp struct {
277 : }
278 :
279 0 : func (o *flushOp) run(t *test, h historyRecorder) {
280 0 : err := t.db.Flush()
281 0 : h.Recordf("%s // %v", o, err)
282 0 : }
283 :
284 1 : func (o *flushOp) String() string { return "db.Flush()" }
285 0 : func (o *flushOp) receiver() objID { return dbObjID }
286 0 : func (o *flushOp) syncObjs() objIDSlice { return nil }
287 :
288 : // mergeOp models a Write.Merge operation.
289 : type mergeOp struct {
290 : writerID objID
291 : key []byte
292 : value []byte
293 : }
294 :
295 0 : func (o *mergeOp) run(t *test, h historyRecorder) {
296 0 : w := t.getWriter(o.writerID)
297 0 : err := w.Merge(o.key, o.value, t.writeOpts)
298 0 : h.Recordf("%s // %v", o, err)
299 0 : }
300 :
301 1 : func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
302 0 : func (o *mergeOp) receiver() objID { return o.writerID }
303 0 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
304 :
305 : // setOp models a Write.Set operation.
306 : type setOp struct {
307 : writerID objID
308 : key []byte
309 : value []byte
310 : }
311 :
312 0 : func (o *setOp) run(t *test, h historyRecorder) {
313 0 : w := t.getWriter(o.writerID)
314 0 : err := w.Set(o.key, o.value, t.writeOpts)
315 0 : h.Recordf("%s // %v", o, err)
316 0 : }
317 :
318 1 : func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
319 0 : func (o *setOp) receiver() objID { return o.writerID }
320 0 : func (o *setOp) syncObjs() objIDSlice { return nil }
321 :
322 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
323 : type rangeKeyDeleteOp struct {
324 : writerID objID
325 : start []byte
326 : end []byte
327 : }
328 :
329 0 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
330 0 : w := t.getWriter(o.writerID)
331 0 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
332 0 : h.Recordf("%s // %v", o, err)
333 0 : }
334 :
335 1 : func (o *rangeKeyDeleteOp) String() string {
336 1 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
337 1 : }
338 :
339 0 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
340 0 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
341 :
342 : // rangeKeySetOp models a Write.RangeKeySet operation.
343 : type rangeKeySetOp struct {
344 : writerID objID
345 : start []byte
346 : end []byte
347 : suffix []byte
348 : value []byte
349 : }
350 :
351 0 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
352 0 : w := t.getWriter(o.writerID)
353 0 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
354 0 : h.Recordf("%s // %v", o, err)
355 0 : }
356 :
357 1 : func (o *rangeKeySetOp) String() string {
358 1 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
359 1 : o.writerID, o.start, o.end, o.suffix, o.value)
360 1 : }
361 :
362 0 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
363 0 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
364 :
365 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
366 : type rangeKeyUnsetOp struct {
367 : writerID objID
368 : start []byte
369 : end []byte
370 : suffix []byte
371 : }
372 :
373 0 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
374 0 : w := t.getWriter(o.writerID)
375 0 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
376 0 : h.Recordf("%s // %v", o, err)
377 0 : }
378 :
379 1 : func (o *rangeKeyUnsetOp) String() string {
380 1 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
381 1 : o.writerID, o.start, o.end, o.suffix)
382 1 : }
383 :
384 0 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
385 0 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
386 :
387 : // newBatchOp models a Write.NewBatch operation.
388 : type newBatchOp struct {
389 : batchID objID
390 : }
391 :
392 0 : func (o *newBatchOp) run(t *test, h historyRecorder) {
393 0 : b := t.db.NewBatch()
394 0 : t.setBatch(o.batchID, b)
395 0 : h.Recordf("%s", o)
396 0 : }
397 :
398 1 : func (o *newBatchOp) String() string { return fmt.Sprintf("%s = db.NewBatch()", o.batchID) }
399 0 : func (o *newBatchOp) receiver() objID { return dbObjID }
400 0 : func (o *newBatchOp) syncObjs() objIDSlice {
401 0 : // NewBatch should not be concurrent with operations that interact with that
402 0 : // same batch.
403 0 : return []objID{o.batchID}
404 0 : }
405 :
406 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
407 : type newIndexedBatchOp struct {
408 : batchID objID
409 : }
410 :
411 0 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
412 0 : b := t.db.NewIndexedBatch()
413 0 : t.setBatch(o.batchID, b)
414 0 : h.Recordf("%s", o)
415 0 : }
416 :
417 1 : func (o *newIndexedBatchOp) String() string {
418 1 : return fmt.Sprintf("%s = db.NewIndexedBatch()", o.batchID)
419 1 : }
420 0 : func (o *newIndexedBatchOp) receiver() objID { return dbObjID }
421 0 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
422 0 : // NewIndexedBatch should not be concurrent with operations that interact
423 0 : // with that same batch.
424 0 : return []objID{o.batchID}
425 0 : }
426 :
427 : // batchCommitOp models a Batch.Commit operation.
428 : type batchCommitOp struct {
429 : batchID objID
430 : }
431 :
432 0 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
433 0 : b := t.getBatch(o.batchID)
434 0 : err := b.Commit(t.writeOpts)
435 0 : h.Recordf("%s // %v", o, err)
436 0 : }
437 :
438 1 : func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
439 0 : func (o *batchCommitOp) receiver() objID { return o.batchID }
440 0 : func (o *batchCommitOp) syncObjs() objIDSlice {
441 0 : // Synchronize on the database so that NewIters wait for the commit.
442 0 : return []objID{dbObjID}
443 0 : }
444 :
445 : // ingestOp models a DB.Ingest operation.
446 : type ingestOp struct {
447 : batchIDs []objID
448 : }
449 :
450 0 : func (o *ingestOp) run(t *test, h historyRecorder) {
451 0 : // We can only use apply as an alternative for ingestion if we are ingesting
452 0 : // a single batch. If we are ingesting multiple batches, the batches may
453 0 : // overlap which would cause ingestion to fail but apply would succeed.
454 0 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 {
455 0 : id := o.batchIDs[0]
456 0 : b := t.getBatch(id)
457 0 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
458 0 : c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter)
459 0 : if err == nil {
460 0 : w := t.getWriter(makeObjID(dbTag, 0))
461 0 : err = w.Apply(c, t.writeOpts)
462 0 : }
463 0 : _ = b.Close()
464 0 : _ = c.Close()
465 0 : t.clearObj(id)
466 0 : h.Recordf("%s // %v", o, err)
467 0 : return
468 : }
469 :
470 0 : var paths []string
471 0 : var err error
472 0 : for i, id := range o.batchIDs {
473 0 : b := t.getBatch(id)
474 0 : t.clearObj(id)
475 0 : path, err2 := o.build(t, h, b, i)
476 0 : if err2 != nil {
477 0 : h.Recordf("Build(%s) // %v", id, err2)
478 0 : }
479 0 : err = firstError(err, err2)
480 0 : if err2 == nil {
481 0 : paths = append(paths, path)
482 0 : }
483 0 : err = firstError(err, b.Close())
484 : }
485 :
486 0 : err = firstError(err, withRetries(func() error {
487 0 : return t.db.Ingest(paths)
488 0 : }))
489 :
490 0 : h.Recordf("%s // %v", o, err)
491 : }
492 :
493 0 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
494 0 : rootFS := vfs.Root(t.opts.FS)
495 0 : path := rootFS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d", i))
496 0 : f, err := rootFS.Create(path)
497 0 : if err != nil {
498 0 : return "", err
499 0 : }
500 :
501 0 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
502 0 : defer closeIters(iter, rangeDelIter, rangeKeyIter)
503 0 :
504 0 : equal := t.opts.Comparer.Equal
505 0 : tableFormat := t.db.FormatMajorVersion().MaxTableFormat()
506 0 : w := sstable.NewWriter(
507 0 : objstorageprovider.NewFileWritable(f),
508 0 : t.opts.MakeWriterOptions(0, tableFormat),
509 0 : )
510 0 :
511 0 : var lastUserKey []byte
512 0 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
513 0 : // Ignore duplicate keys.
514 0 : if equal(lastUserKey, key.UserKey) {
515 0 : continue
516 : }
517 : // NB: We don't have to copy the key or value since we're reading from a
518 : // batch which doesn't do prefix compression.
519 0 : lastUserKey = key.UserKey
520 0 :
521 0 : key.SetSeqNum(base.SeqNumZero)
522 0 : if err := w.Add(*key, value.InPlaceValue()); err != nil {
523 0 : return "", err
524 0 : }
525 : }
526 0 : if err := iter.Close(); err != nil {
527 0 : return "", err
528 0 : }
529 0 : iter = nil
530 0 :
531 0 : if rangeDelIter != nil {
532 0 : // NB: The range tombstones have already been fragmented by the Batch.
533 0 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
534 0 : // NB: We don't have to copy the key or value since we're reading from a
535 0 : // batch which doesn't do prefix compression.
536 0 : if err := w.DeleteRange(t.Start, t.End); err != nil {
537 0 : return "", err
538 0 : }
539 : }
540 0 : if err := rangeDelIter.Close(); err != nil {
541 0 : return "", err
542 0 : }
543 0 : rangeDelIter = nil
544 : }
545 :
546 0 : if err := w.Close(); err != nil {
547 0 : return "", err
548 0 : }
549 0 : return path, nil
550 : }
551 :
552 0 : func (o *ingestOp) receiver() objID { return dbObjID }
553 0 : func (o *ingestOp) syncObjs() objIDSlice {
554 0 : // Ingest should not be concurrent with mutating the batches that will be
555 0 : // ingested as sstables.
556 0 : return o.batchIDs
557 0 : }
558 :
559 : func closeIters(
560 : pointIter base.InternalIterator,
561 : rangeDelIter keyspan.FragmentIterator,
562 : rangeKeyIter keyspan.FragmentIterator,
563 0 : ) {
564 0 : if pointIter != nil {
565 0 : pointIter.Close()
566 0 : }
567 0 : if rangeDelIter != nil {
568 0 : rangeDelIter.Close()
569 0 : }
570 0 : if rangeKeyIter != nil {
571 0 : rangeKeyIter.Close()
572 0 : }
573 : }
574 :
575 : // collapseBatch collapses the mutations in a batch to be equivalent to an
576 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
577 : // so that only the latest update is performed. All range deletions are
578 : // performed first in the batch to match the semantics of ingestion where a
579 : // range deletion does not delete a point record contained in the sstable.
580 : func (o *ingestOp) collapseBatch(
581 : t *test, pointIter base.InternalIterator, rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
582 0 : ) (*pebble.Batch, error) {
583 0 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
584 0 : equal := t.opts.Comparer.Equal
585 0 : collapsed := t.db.NewBatch()
586 0 :
587 0 : if rangeDelIter != nil {
588 0 : // NB: The range tombstones have already been fragmented by the Batch.
589 0 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
590 0 : // NB: We don't have to copy the key or value since we're reading from a
591 0 : // batch which doesn't do prefix compression.
592 0 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
593 0 : return nil, err
594 0 : }
595 : }
596 0 : if err := rangeDelIter.Close(); err != nil {
597 0 : return nil, err
598 0 : }
599 0 : rangeDelIter = nil
600 : }
601 :
602 0 : if pointIter != nil {
603 0 : var lastUserKey []byte
604 0 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
605 0 : // Ignore duplicate keys.
606 0 : if equal(lastUserKey, key.UserKey) {
607 0 : continue
608 : }
609 : // NB: We don't have to copy the key or value since we're reading from a
610 : // batch which doesn't do prefix compression.
611 0 : lastUserKey = key.UserKey
612 0 :
613 0 : var err error
614 0 : switch key.Kind() {
615 0 : case pebble.InternalKeyKindDelete:
616 0 : err = collapsed.Delete(key.UserKey, nil)
617 0 : case pebble.InternalKeyKindDeleteSized:
618 0 : v, _ := binary.Uvarint(value.InPlaceValue())
619 0 : // Batch.DeleteSized takes just the length of the value being
620 0 : // deleted and adds the key's length to derive the overall entry
621 0 : // size of the value being deleted. This has already been done
622 0 : // to the key we're reading from the batch, so we must subtract
623 0 : // the key length from the encoded value before calling
624 0 : // collapsed.DeleteSized, which will again add the key length
625 0 : // before encoding.
626 0 : err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
627 0 : case pebble.InternalKeyKindSingleDelete:
628 0 : err = collapsed.SingleDelete(key.UserKey, nil)
629 0 : case pebble.InternalKeyKindSet:
630 0 : err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
631 0 : case pebble.InternalKeyKindMerge:
632 0 : err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
633 0 : case pebble.InternalKeyKindLogData:
634 0 : err = collapsed.LogData(key.UserKey, nil)
635 0 : default:
636 0 : err = errors.Errorf("unknown batch record kind: %d", key.Kind())
637 : }
638 0 : if err != nil {
639 0 : return nil, err
640 0 : }
641 : }
642 0 : if err := pointIter.Close(); err != nil {
643 0 : return nil, err
644 0 : }
645 0 : pointIter = nil
646 : }
647 :
648 0 : return collapsed, nil
649 : }
650 :
651 1 : func (o *ingestOp) String() string {
652 1 : var buf strings.Builder
653 1 : buf.WriteString("db.Ingest(")
654 1 : for i, id := range o.batchIDs {
655 1 : if i > 0 {
656 1 : buf.WriteString(", ")
657 1 : }
658 1 : buf.WriteString(id.String())
659 : }
660 1 : buf.WriteString(")")
661 1 : return buf.String()
662 : }
663 :
664 : // getOp models a Reader.Get operation.
665 : type getOp struct {
666 : readerID objID
667 : key []byte
668 : }
669 :
670 0 : func (o *getOp) run(t *test, h historyRecorder) {
671 0 : r := t.getReader(o.readerID)
672 0 : var val []byte
673 0 : var closer io.Closer
674 0 : err := withRetries(func() (err error) {
675 0 : val, closer, err = r.Get(o.key)
676 0 : return err
677 0 : })
678 0 : h.Recordf("%s // [%q] %v", o, val, err)
679 0 : if closer != nil {
680 0 : closer.Close()
681 0 : }
682 : }
683 :
684 1 : func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
685 0 : func (o *getOp) receiver() objID { return o.readerID }
686 0 : func (o *getOp) syncObjs() objIDSlice {
687 0 : if o.readerID == dbObjID {
688 0 : return nil
689 0 : }
690 : // batch.Get reads through to the current database state.
691 0 : return []objID{dbObjID}
692 : }
693 :
694 : // newIterOp models a Reader.NewIter operation.
695 : type newIterOp struct {
696 : readerID objID
697 : iterID objID
698 : iterOpts
699 : }
700 :
701 0 : func (o *newIterOp) run(t *test, h historyRecorder) {
702 0 : r := t.getReader(o.readerID)
703 0 : opts := iterOptions(o.iterOpts)
704 0 :
705 0 : var i *pebble.Iterator
706 0 : for {
707 0 : i, _ = r.NewIter(opts)
708 0 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
709 0 : break
710 : }
711 : // close this iter and retry NewIter
712 0 : _ = i.Close()
713 : }
714 0 : t.setIter(o.iterID, i, o.filterMin, o.filterMax)
715 0 :
716 0 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
717 0 : // the user-provided bounds.
718 0 : if opts != nil {
719 0 : rand.Read(opts.LowerBound[:])
720 0 : rand.Read(opts.UpperBound[:])
721 0 : }
722 0 : h.Recordf("%s // %v", o, i.Error())
723 : }
724 :
725 1 : func (o *newIterOp) String() string {
726 1 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
727 1 : o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
728 1 : }
729 :
730 0 : func (o *newIterOp) receiver() objID { return o.readerID }
731 0 : func (o *newIterOp) syncObjs() objIDSlice {
732 0 : // Prevent o.iterID ops from running before it exists.
733 0 : objs := []objID{o.iterID}
734 0 : // If reading through a batch, the new iterator will also observe database
735 0 : // state, and we must synchronize on the database state for a consistent
736 0 : // view.
737 0 : if o.readerID.tag() == batchTag {
738 0 : objs = append(objs, dbObjID)
739 0 : }
740 0 : return objs
741 : }
742 :
743 : // newIterUsingCloneOp models a Iterator.Clone operation.
744 : type newIterUsingCloneOp struct {
745 : existingIterID objID
746 : iterID objID
747 : refreshBatch bool
748 : iterOpts
749 :
750 : // derivedReaderID is the ID of the underlying reader that backs both the
751 : // existing iterator and the new iterator. The derivedReaderID is NOT
752 : // serialized by String and is derived from other operations during parse.
753 : derivedReaderID objID
754 : }
755 :
756 0 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
757 0 : iter := t.getIter(o.existingIterID)
758 0 : cloneOpts := pebble.CloneOptions{
759 0 : IterOptions: iterOptions(o.iterOpts),
760 0 : RefreshBatchView: o.refreshBatch,
761 0 : }
762 0 : i, err := iter.iter.Clone(cloneOpts)
763 0 : if err != nil {
764 0 : panic(err)
765 : }
766 0 : filterMin, filterMax := o.filterMin, o.filterMax
767 0 : if cloneOpts.IterOptions == nil {
768 0 : // We're adopting the same block property filters as iter, so we need to
769 0 : // adopt the same run-time filters to ensure determinism.
770 0 : filterMin, filterMax = iter.filterMin, iter.filterMax
771 0 : }
772 0 : t.setIter(o.iterID, i, filterMin, filterMax)
773 0 : h.Recordf("%s // %v", o, i.Error())
774 : }
775 :
776 1 : func (o *newIterUsingCloneOp) String() string {
777 1 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
778 1 : o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
779 1 : o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
780 1 : }
781 :
782 0 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
783 :
784 0 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
785 0 : objIDs := []objID{o.iterID}
786 0 : // If the underlying reader is a batch, we must synchronize with the batch.
787 0 : // If refreshBatch=true, synchronizing is necessary to observe all the
788 0 : // mutations up to until this op and no more. Even when refreshBatch=false,
789 0 : // we must synchronize because iterator construction may access state cached
790 0 : // on the indexed batch to avoid refragmenting range tombstones or range
791 0 : // keys.
792 0 : if o.derivedReaderID.tag() == batchTag {
793 0 : objIDs = append(objIDs, o.derivedReaderID)
794 0 : }
795 0 : return objIDs
796 : }
797 :
798 : // iterSetBoundsOp models an Iterator.SetBounds operation.
799 : type iterSetBoundsOp struct {
800 : iterID objID
801 : lower []byte
802 : upper []byte
803 : }
804 :
805 0 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
806 0 : i := t.getIter(o.iterID)
807 0 : var lower, upper []byte
808 0 : if o.lower != nil {
809 0 : lower = append(lower, o.lower...)
810 0 : }
811 0 : if o.upper != nil {
812 0 : upper = append(upper, o.upper...)
813 0 : }
814 0 : i.SetBounds(lower, upper)
815 0 :
816 0 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
817 0 : // the user-provided bounds.
818 0 : rand.Read(lower[:])
819 0 : rand.Read(upper[:])
820 0 :
821 0 : h.Recordf("%s // %v", o, i.Error())
822 : }
823 :
824 1 : func (o *iterSetBoundsOp) String() string {
825 1 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
826 1 : }
827 :
828 0 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
829 0 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
830 :
831 : // iterSetOptionsOp models an Iterator.SetOptions operation.
832 : type iterSetOptionsOp struct {
833 : iterID objID
834 : iterOpts
835 :
836 : // derivedReaderID is the ID of the underlying reader that backs the
837 : // iterator. The derivedReaderID is NOT serialized by String and is derived
838 : // from other operations during parse.
839 : derivedReaderID objID
840 : }
841 :
842 0 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
843 0 : i := t.getIter(o.iterID)
844 0 :
845 0 : opts := iterOptions(o.iterOpts)
846 0 : if opts == nil {
847 0 : opts = &pebble.IterOptions{}
848 0 : }
849 0 : i.SetOptions(opts)
850 0 :
851 0 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
852 0 : // the user-provided bounds.
853 0 : rand.Read(opts.LowerBound[:])
854 0 : rand.Read(opts.UpperBound[:])
855 0 :
856 0 : // Adjust the iterator's filters.
857 0 : i.filterMin, i.filterMax = o.filterMin, o.filterMax
858 0 :
859 0 : h.Recordf("%s // %v", o, i.Error())
860 : }
861 :
862 1 : func (o *iterSetOptionsOp) String() string {
863 1 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
864 1 : o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
865 1 : }
866 :
867 0 : func iterOptions(o iterOpts) *pebble.IterOptions {
868 0 : if o.IsZero() {
869 0 : return nil
870 0 : }
871 0 : var lower, upper []byte
872 0 : if o.lower != nil {
873 0 : lower = append(lower, o.lower...)
874 0 : }
875 0 : if o.upper != nil {
876 0 : upper = append(upper, o.upper...)
877 0 : }
878 0 : opts := &pebble.IterOptions{
879 0 : LowerBound: lower,
880 0 : UpperBound: upper,
881 0 : KeyTypes: pebble.IterKeyType(o.keyTypes),
882 0 : RangeKeyMasking: pebble.RangeKeyMasking{
883 0 : Suffix: o.maskSuffix,
884 0 : },
885 0 : UseL6Filters: o.useL6Filters,
886 0 : }
887 0 : if opts.RangeKeyMasking.Suffix != nil {
888 0 : opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
889 0 : return sstable.NewTestKeysMaskingFilter()
890 0 : }
891 : }
892 0 : if o.filterMax > 0 {
893 0 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
894 0 : sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
895 0 : }
896 0 : }
897 0 : return opts
898 : }
899 :
900 0 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
901 :
902 0 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
903 0 : if o.derivedReaderID.tag() == batchTag {
904 0 : // If the underlying reader is a batch, we must synchronize with the
905 0 : // batch so that we observe all the mutations up until this operation
906 0 : // and no more.
907 0 : return []objID{o.derivedReaderID}
908 0 : }
909 0 : return nil
910 : }
911 :
912 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
913 : type iterSeekGEOp struct {
914 : iterID objID
915 : key []byte
916 : limit []byte
917 :
918 : derivedReaderID objID
919 : }
920 :
921 0 : func iteratorPos(i *retryableIter) string {
922 0 : var buf bytes.Buffer
923 0 : fmt.Fprintf(&buf, "%q", i.Key())
924 0 : hasPoint, hasRange := i.HasPointAndRange()
925 0 : if hasPoint {
926 0 : fmt.Fprintf(&buf, ",%q", i.Value())
927 0 : } else {
928 0 : fmt.Fprint(&buf, ",<no point>")
929 0 : }
930 0 : if hasRange {
931 0 : start, end := i.RangeBounds()
932 0 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
933 0 : for i, rk := range i.RangeKeys() {
934 0 : if i > 0 {
935 0 : fmt.Fprint(&buf, ",")
936 0 : }
937 0 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
938 : }
939 0 : fmt.Fprint(&buf, "}")
940 0 : } else {
941 0 : fmt.Fprint(&buf, ",<no range>")
942 0 : }
943 0 : if i.RangeKeyChanged() {
944 0 : fmt.Fprint(&buf, "*")
945 0 : }
946 0 : return buf.String()
947 : }
948 :
949 0 : func validBoolToStr(valid bool) string {
950 0 : return fmt.Sprintf("%t", valid)
951 0 : }
952 :
953 0 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
954 0 : // We can't distinguish between IterExhausted and IterAtLimit in a
955 0 : // deterministic manner.
956 0 : switch validity {
957 0 : case pebble.IterExhausted, pebble.IterAtLimit:
958 0 : return false, "invalid"
959 0 : case pebble.IterValid:
960 0 : return true, "valid"
961 0 : default:
962 0 : panic("unknown validity")
963 : }
964 : }
965 :
966 0 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
967 0 : i := t.getIter(o.iterID)
968 0 : var valid bool
969 0 : var validStr string
970 0 : if o.limit == nil {
971 0 : valid = i.SeekGE(o.key)
972 0 : validStr = validBoolToStr(valid)
973 0 : } else {
974 0 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
975 0 : }
976 0 : if valid {
977 0 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
978 0 : } else {
979 0 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
980 0 : }
981 : }
982 :
983 1 : func (o *iterSeekGEOp) String() string {
984 1 : return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
985 1 : }
986 0 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
987 0 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
988 :
989 0 : func onlyBatchIDs(ids ...objID) objIDSlice {
990 0 : var ret objIDSlice
991 0 : for _, id := range ids {
992 0 : if id.tag() == batchTag {
993 0 : ret = append(ret, id)
994 0 : }
995 : }
996 0 : return ret
997 : }
998 :
999 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1000 : type iterSeekPrefixGEOp struct {
1001 : iterID objID
1002 : key []byte
1003 :
1004 : derivedReaderID objID
1005 : }
1006 :
1007 0 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
1008 0 : i := t.getIter(o.iterID)
1009 0 : valid := i.SeekPrefixGE(o.key)
1010 0 : if valid {
1011 0 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1012 0 : } else {
1013 0 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1014 0 : }
1015 : }
1016 :
1017 1 : func (o *iterSeekPrefixGEOp) String() string {
1018 1 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
1019 1 : }
1020 0 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1021 0 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1022 :
1023 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1024 : type iterSeekLTOp struct {
1025 : iterID objID
1026 : key []byte
1027 : limit []byte
1028 :
1029 : derivedReaderID objID
1030 : }
1031 :
1032 0 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
1033 0 : i := t.getIter(o.iterID)
1034 0 : var valid bool
1035 0 : var validStr string
1036 0 : if o.limit == nil {
1037 0 : valid = i.SeekLT(o.key)
1038 0 : validStr = validBoolToStr(valid)
1039 0 : } else {
1040 0 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1041 0 : }
1042 0 : if valid {
1043 0 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1044 0 : } else {
1045 0 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1046 0 : }
1047 : }
1048 :
1049 1 : func (o *iterSeekLTOp) String() string {
1050 1 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
1051 1 : }
1052 :
1053 0 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1054 0 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1055 :
1056 : // iterFirstOp models an Iterator.First operation.
1057 : type iterFirstOp struct {
1058 : iterID objID
1059 :
1060 : derivedReaderID objID
1061 : }
1062 :
1063 0 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
1064 0 : i := t.getIter(o.iterID)
1065 0 : valid := i.First()
1066 0 : if valid {
1067 0 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1068 0 : } else {
1069 0 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1070 0 : }
1071 : }
1072 :
1073 1 : func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
1074 0 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1075 0 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1076 :
1077 : // iterLastOp models an Iterator.Last operation.
1078 : type iterLastOp struct {
1079 : iterID objID
1080 :
1081 : derivedReaderID objID
1082 : }
1083 :
1084 0 : func (o *iterLastOp) run(t *test, h historyRecorder) {
1085 0 : i := t.getIter(o.iterID)
1086 0 : valid := i.Last()
1087 0 : if valid {
1088 0 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1089 0 : } else {
1090 0 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1091 0 : }
1092 : }
1093 :
1094 1 : func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
1095 0 : func (o *iterLastOp) receiver() objID { return o.iterID }
1096 0 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1097 :
1098 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1099 : type iterNextOp struct {
1100 : iterID objID
1101 : limit []byte
1102 :
1103 : derivedReaderID objID
1104 : }
1105 :
1106 0 : func (o *iterNextOp) run(t *test, h historyRecorder) {
1107 0 : i := t.getIter(o.iterID)
1108 0 : var valid bool
1109 0 : var validStr string
1110 0 : if o.limit == nil {
1111 0 : valid = i.Next()
1112 0 : validStr = validBoolToStr(valid)
1113 0 : } else {
1114 0 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1115 0 : }
1116 0 : if valid {
1117 0 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1118 0 : } else {
1119 0 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1120 0 : }
1121 : }
1122 :
1123 1 : func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
1124 0 : func (o *iterNextOp) receiver() objID { return o.iterID }
1125 0 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1126 :
1127 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1128 : type iterNextPrefixOp struct {
1129 : iterID objID
1130 :
1131 : derivedReaderID objID
1132 : }
1133 :
1134 0 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
1135 0 : i := t.getIter(o.iterID)
1136 0 : valid := i.NextPrefix()
1137 0 : validStr := validBoolToStr(valid)
1138 0 : if valid {
1139 0 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1140 0 : } else {
1141 0 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1142 0 : }
1143 : }
1144 :
1145 1 : func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
1146 0 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1147 0 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1148 :
1149 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1150 : type iterPrevOp struct {
1151 : iterID objID
1152 : limit []byte
1153 :
1154 : derivedReaderID objID
1155 : }
1156 :
1157 0 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
1158 0 : i := t.getIter(o.iterID)
1159 0 : var valid bool
1160 0 : var validStr string
1161 0 : if o.limit == nil {
1162 0 : valid = i.Prev()
1163 0 : validStr = validBoolToStr(valid)
1164 0 : } else {
1165 0 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1166 0 : }
1167 0 : if valid {
1168 0 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1169 0 : } else {
1170 0 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1171 0 : }
1172 : }
1173 :
1174 1 : func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
1175 0 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1176 0 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1177 :
1178 : // newSnapshotOp models a DB.NewSnapshot operation.
1179 : type newSnapshotOp struct {
1180 : snapID objID
1181 : // If nonempty, this snapshot must not be used to read any keys outside of
1182 : // the provided bounds. This allows some implementations to use 'Eventually
1183 : // file-only snapshots,' which require bounds.
1184 : bounds []pebble.KeyRange
1185 : }
1186 :
1187 0 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
1188 0 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1189 0 : if len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1 {
1190 0 : s := t.db.NewEventuallyFileOnlySnapshot(o.bounds)
1191 0 : t.setSnapshot(o.snapID, s)
1192 0 : } else {
1193 0 : s := t.db.NewSnapshot()
1194 0 : t.setSnapshot(o.snapID, s)
1195 0 : }
1196 0 : h.Recordf("%s", o)
1197 : }
1198 :
1199 1 : func (o *newSnapshotOp) String() string {
1200 1 : var buf bytes.Buffer
1201 1 : fmt.Fprintf(&buf, "%s = db.NewSnapshot(", o.snapID)
1202 1 : for i := range o.bounds {
1203 1 : if i > 0 {
1204 1 : fmt.Fprint(&buf, ", ")
1205 1 : }
1206 1 : fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
1207 : }
1208 1 : fmt.Fprint(&buf, ")")
1209 1 : return buf.String()
1210 : }
1211 0 : func (o *newSnapshotOp) receiver() objID { return dbObjID }
1212 0 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1213 :
1214 : type dbRatchetFormatMajorVersionOp struct {
1215 : vers pebble.FormatMajorVersion
1216 : }
1217 :
1218 0 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
1219 0 : var err error
1220 0 : // NB: We no-op the operation if we're already at or above the provided
1221 0 : // format major version. Different runs start at different format major
1222 0 : // versions, making the presence of an error and the error message itself
1223 0 : // non-deterministic if we attempt to upgrade to an older version.
1224 0 : //
1225 0 : //Regardless, subsequent operations should behave identically, which is what
1226 0 : //we're really aiming to test by including this format major version ratchet
1227 0 : //operation.
1228 0 : if t.db.FormatMajorVersion() < o.vers {
1229 0 : err = t.db.RatchetFormatMajorVersion(o.vers)
1230 0 : }
1231 0 : h.Recordf("%s // %v", o, err)
1232 : }
1233 :
1234 1 : func (o *dbRatchetFormatMajorVersionOp) String() string {
1235 1 : return fmt.Sprintf("db.RatchetFormatMajorVersion(%s)", o.vers)
1236 1 : }
1237 0 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return dbObjID }
1238 0 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1239 :
1240 : type dbRestartOp struct{}
1241 :
1242 0 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
1243 0 : if err := t.restartDB(); err != nil {
1244 0 : h.Recordf("%s // %v", o, err)
1245 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1246 0 : } else {
1247 0 : h.Recordf("%s", o)
1248 0 : }
1249 : }
1250 :
1251 1 : func (o *dbRestartOp) String() string { return "db.Restart()" }
1252 0 : func (o *dbRestartOp) receiver() objID { return dbObjID }
1253 0 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
1254 :
1255 1 : func formatOps(ops []op) string {
1256 1 : var buf strings.Builder
1257 1 : for _, op := range ops {
1258 1 : fmt.Fprintf(&buf, "%s\n", op)
1259 1 : }
1260 1 : return buf.String()
1261 : }
|