Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "crypto/rand"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "path"
15 : "path/filepath"
16 : "strings"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/private"
23 : "github.com/cockroachdb/pebble/internal/rangekey"
24 : "github.com/cockroachdb/pebble/internal/testkeys"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
26 : "github.com/cockroachdb/pebble/sstable"
27 : "github.com/cockroachdb/pebble/vfs/errorfs"
28 : )
29 :
30 : // Ops holds a sequence of operations to be executed by the metamorphic tests.
31 : type Ops []op
32 :
33 : // op defines the interface for a single operation, such as creating a batch,
34 : // or advancing an iterator.
35 : type op interface {
36 : String() string
37 :
38 : run(t *Test, h historyRecorder)
39 :
40 : // receiver returns the object ID of the object the operation is performed
41 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
42 : // its receiver). Receivers are used for synchronization when running with
43 : // concurrency.
44 : receiver() objID
45 :
46 : // syncObjs returns an additional set of object IDs—excluding the
47 : // receiver—that the operation must synchronize with. At execution time,
48 : // the operation will run serially with respect to all other operations
49 : // that return these objects from their own syncObjs or receiver methods.
50 : syncObjs() objIDSlice
51 :
52 : // keys returns all user keys used by the operation, as pointers to slices.
53 : // The caller can then modify these slices to rewrite the keys.
54 : //
55 : // Used for simplification of operations for easier investigations.
56 : keys() []*[]byte
57 :
58 : // diagramKeyRanges() returns key spans associated with this operation, to be
59 : // shown on an ASCII diagram of operations.
60 : diagramKeyRanges() []pebble.KeyRange
61 : }
62 :
63 : // initOp performs test initialization
64 : type initOp struct {
65 : dbSlots uint32
66 : batchSlots uint32
67 : iterSlots uint32
68 : snapshotSlots uint32
69 : }
70 :
71 1 : func (o *initOp) run(t *Test, h historyRecorder) {
72 1 : t.batches = make([]*pebble.Batch, o.batchSlots)
73 1 : t.iters = make([]*retryableIter, o.iterSlots)
74 1 : t.snapshots = make([]readerCloser, o.snapshotSlots)
75 1 : h.Recordf("%s", o)
76 1 : }
77 :
78 1 : func (o *initOp) String() string {
79 1 : return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)",
80 1 : o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots)
81 1 : }
82 :
83 1 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
84 1 : func (o *initOp) syncObjs() objIDSlice {
85 1 : syncObjs := make([]objID, 0)
86 1 : // Add any additional DBs to syncObjs.
87 1 : for i := uint32(2); i < o.dbSlots+1; i++ {
88 0 : syncObjs = append(syncObjs, makeObjID(dbTag, i))
89 0 : }
90 1 : return syncObjs
91 : }
92 :
93 0 : func (o *initOp) keys() []*[]byte { return nil }
94 1 : func (o *initOp) diagramKeyRanges() []pebble.KeyRange { return nil }
95 :
96 : // applyOp models a Writer.Apply operation.
97 : type applyOp struct {
98 : writerID objID
99 : batchID objID
100 : }
101 :
102 1 : func (o *applyOp) run(t *Test, h historyRecorder) {
103 1 : b := t.getBatch(o.batchID)
104 1 : w := t.getWriter(o.writerID)
105 1 : var err error
106 1 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
107 0 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
108 0 : if err == nil {
109 0 : err = b.SyncWait()
110 0 : }
111 1 : } else {
112 1 : err = w.Apply(b, t.writeOpts)
113 1 : }
114 1 : h.Recordf("%s // %v", o, err)
115 : // batch will be closed by a closeOp which is guaranteed to be generated
116 : }
117 :
118 1 : func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
119 1 : func (o *applyOp) receiver() objID { return o.writerID }
120 1 : func (o *applyOp) syncObjs() objIDSlice {
121 1 : // Apply should not be concurrent with operations that are mutating the
122 1 : // batch.
123 1 : return []objID{o.batchID}
124 1 : }
125 :
126 0 : func (o *applyOp) keys() []*[]byte { return nil }
127 0 : func (o *applyOp) diagramKeyRanges() []pebble.KeyRange { return nil }
128 :
129 : // checkpointOp models a DB.Checkpoint operation.
130 : type checkpointOp struct {
131 : dbID objID
132 : // If non-empty, the checkpoint is restricted to these spans.
133 : spans []pebble.CheckpointSpan
134 : }
135 :
136 1 : func (o *checkpointOp) run(t *Test, h historyRecorder) {
137 1 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
138 1 : // It would be better to filter out ahead of calling run on the op,
139 1 : // by setting the weight that generator.go uses to zero, or similar.
140 1 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
141 1 : // not sure how to do that easily:
142 1 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
143 1 : if t.testOpts.sharedStorageEnabled {
144 0 : h.Recordf("%s // %v", o, nil)
145 0 : return
146 0 : }
147 1 : var opts []pebble.CheckpointOption
148 1 : if len(o.spans) > 0 {
149 1 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
150 1 : }
151 1 : db := t.getDB(o.dbID)
152 1 : err := t.withRetries(func() error {
153 1 : return db.Checkpoint(o.dir(t.dir, h.op), opts...)
154 1 : })
155 1 : h.Recordf("%s // %v", o, err)
156 : }
157 :
158 1 : func (o *checkpointOp) dir(dataDir string, idx int) string {
159 1 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
160 1 : }
161 :
162 1 : func (o *checkpointOp) String() string {
163 1 : var spanStr bytes.Buffer
164 1 : for i, span := range o.spans {
165 1 : if i > 0 {
166 1 : spanStr.WriteString(",")
167 1 : }
168 1 : fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
169 : }
170 1 : return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
171 : }
172 :
173 1 : func (o *checkpointOp) receiver() objID { return o.dbID }
174 1 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
175 :
176 0 : func (o *checkpointOp) keys() []*[]byte {
177 0 : var res []*[]byte
178 0 : for i := range o.spans {
179 0 : res = append(res, &o.spans[i].Start, &o.spans[i].End)
180 0 : }
181 0 : return res
182 : }
183 :
184 0 : func (o *checkpointOp) diagramKeyRanges() []pebble.KeyRange {
185 0 : var res []pebble.KeyRange
186 0 : for i := range o.spans {
187 0 : res = append(res, pebble.KeyRange{
188 0 : Start: o.spans[i].Start,
189 0 : End: o.spans[i].End,
190 0 : })
191 0 : }
192 0 : return res
193 : }
194 :
195 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
196 : type closeOp struct {
197 : objID objID
198 :
199 : // affectedObjects is the list of additional objects that are affected by this
200 : // operation, and which syncObjs() must return so that we don't perform the
201 : // close in parallel with other operations to affected objects.
202 : affectedObjects []objID
203 : }
204 :
205 1 : func (o *closeOp) run(t *Test, h historyRecorder) {
206 1 : c := t.getCloser(o.objID)
207 1 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
208 1 : // Special case: If WAL is disabled, do a flush right before DB Close. This
209 1 : // allows us to reuse this run's data directory as initial state for
210 1 : // future runs without losing any mutations.
211 1 : _ = t.getDB(o.objID).Flush()
212 1 : }
213 1 : t.clearObj(o.objID)
214 1 : err := c.Close()
215 1 : h.Recordf("%s // %v", o, err)
216 : }
217 :
218 1 : func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
219 1 : func (o *closeOp) receiver() objID { return o.objID }
220 1 : func (o *closeOp) syncObjs() objIDSlice {
221 1 : return o.affectedObjects
222 1 : }
223 :
224 0 : func (o *closeOp) keys() []*[]byte { return nil }
225 0 : func (o *closeOp) diagramKeyRanges() []pebble.KeyRange { return nil }
226 :
227 : // compactOp models a DB.Compact operation.
228 : type compactOp struct {
229 : dbID objID
230 : start []byte
231 : end []byte
232 : parallelize bool
233 : }
234 :
235 1 : func (o *compactOp) run(t *Test, h historyRecorder) {
236 1 : err := t.withRetries(func() error {
237 1 : return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
238 1 : })
239 1 : h.Recordf("%s // %v", o, err)
240 : }
241 :
242 1 : func (o *compactOp) String() string {
243 1 : return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
244 1 : }
245 :
246 1 : func (o *compactOp) receiver() objID { return o.dbID }
247 1 : func (o *compactOp) syncObjs() objIDSlice { return nil }
248 :
249 1 : func (o *compactOp) keys() []*[]byte {
250 1 : return []*[]byte{&o.start, &o.end}
251 1 : }
252 :
253 1 : func (o *compactOp) diagramKeyRanges() []pebble.KeyRange {
254 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
255 1 : }
256 :
257 : // deleteOp models a Write.Delete operation.
258 : type deleteOp struct {
259 : writerID objID
260 : key []byte
261 :
262 : derivedDBID objID
263 : }
264 :
265 1 : func (o *deleteOp) run(t *Test, h historyRecorder) {
266 1 : w := t.getWriter(o.writerID)
267 1 : var err error
268 1 : if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
269 0 : // Call DeleteSized with a deterministic size derived from the index.
270 0 : // The size does not need to be accurate for correctness.
271 0 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
272 1 : } else {
273 1 : err = w.Delete(o.key, t.writeOpts)
274 1 : }
275 1 : h.Recordf("%s // %v", o, err)
276 : }
277 :
278 0 : func hashSize(index int) uint32 {
279 0 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
280 0 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
281 0 : }
282 :
283 1 : func (o *deleteOp) String() string {
284 1 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
285 1 : }
286 1 : func (o *deleteOp) receiver() objID { return o.writerID }
287 1 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
288 :
289 0 : func (o *deleteOp) keys() []*[]byte {
290 0 : return []*[]byte{&o.key}
291 0 : }
292 :
293 0 : func (o *deleteOp) diagramKeyRanges() []pebble.KeyRange {
294 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
295 0 : }
296 :
297 : // singleDeleteOp models a Write.SingleDelete operation.
298 : type singleDeleteOp struct {
299 : writerID objID
300 : key []byte
301 : maybeReplaceDelete bool
302 : }
303 :
304 1 : func (o *singleDeleteOp) run(t *Test, h historyRecorder) {
305 1 : w := t.getWriter(o.writerID)
306 1 : var err error
307 1 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
308 1 : err = w.Delete(o.key, t.writeOpts)
309 1 : } else {
310 1 : err = w.SingleDelete(o.key, t.writeOpts)
311 1 : }
312 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
313 : // write the former to the history log. The log line will indicate whether
314 : // or not the delete *could* have been replaced. The OPTIONS file should
315 : // also be consulted to determine what happened at runtime (i.e. by taking
316 : // the logical AND).
317 1 : h.Recordf("%s // %v", o, err)
318 : }
319 :
320 1 : func (o *singleDeleteOp) String() string {
321 1 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
322 1 : }
323 :
324 1 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
325 1 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
326 :
327 0 : func (o *singleDeleteOp) keys() []*[]byte {
328 0 : return []*[]byte{&o.key}
329 0 : }
330 :
331 0 : func (o *singleDeleteOp) diagramKeyRanges() []pebble.KeyRange {
332 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
333 0 : }
334 :
335 : // deleteRangeOp models a Write.DeleteRange operation.
336 : type deleteRangeOp struct {
337 : writerID objID
338 : start []byte
339 : end []byte
340 : }
341 :
342 1 : func (o *deleteRangeOp) run(t *Test, h historyRecorder) {
343 1 : w := t.getWriter(o.writerID)
344 1 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
345 1 : h.Recordf("%s // %v", o, err)
346 1 : }
347 :
348 1 : func (o *deleteRangeOp) String() string {
349 1 : return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
350 1 : }
351 :
352 1 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
353 1 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
354 :
355 0 : func (o *deleteRangeOp) keys() []*[]byte {
356 0 : return []*[]byte{&o.start, &o.end}
357 0 : }
358 :
359 0 : func (o *deleteRangeOp) diagramKeyRanges() []pebble.KeyRange {
360 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
361 0 : }
362 :
363 : // flushOp models a DB.Flush operation.
364 : type flushOp struct {
365 : db objID
366 : }
367 :
368 1 : func (o *flushOp) run(t *Test, h historyRecorder) {
369 1 : db := t.getDB(o.db)
370 1 : err := db.Flush()
371 1 : h.Recordf("%s // %v", o, err)
372 1 : }
373 :
374 1 : func (o *flushOp) String() string { return fmt.Sprintf("%s.Flush()", o.db) }
375 1 : func (o *flushOp) receiver() objID { return o.db }
376 1 : func (o *flushOp) syncObjs() objIDSlice { return nil }
377 0 : func (o *flushOp) keys() []*[]byte { return nil }
378 0 : func (o *flushOp) diagramKeyRanges() []pebble.KeyRange { return nil }
379 :
380 : // mergeOp models a Write.Merge operation.
381 : type mergeOp struct {
382 : writerID objID
383 : key []byte
384 : value []byte
385 : }
386 :
387 1 : func (o *mergeOp) run(t *Test, h historyRecorder) {
388 1 : w := t.getWriter(o.writerID)
389 1 : err := w.Merge(o.key, o.value, t.writeOpts)
390 1 : h.Recordf("%s // %v", o, err)
391 1 : }
392 :
393 1 : func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
394 1 : func (o *mergeOp) receiver() objID { return o.writerID }
395 1 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
396 :
397 0 : func (o *mergeOp) keys() []*[]byte {
398 0 : return []*[]byte{&o.key}
399 0 : }
400 :
401 0 : func (o *mergeOp) diagramKeyRanges() []pebble.KeyRange {
402 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
403 0 : }
404 :
405 : // setOp models a Write.Set operation.
406 : type setOp struct {
407 : writerID objID
408 : key []byte
409 : value []byte
410 : }
411 :
412 1 : func (o *setOp) run(t *Test, h historyRecorder) {
413 1 : w := t.getWriter(o.writerID)
414 1 : err := w.Set(o.key, o.value, t.writeOpts)
415 1 : h.Recordf("%s // %v", o, err)
416 1 : }
417 :
418 1 : func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
419 1 : func (o *setOp) receiver() objID { return o.writerID }
420 1 : func (o *setOp) syncObjs() objIDSlice { return nil }
421 :
422 0 : func (o *setOp) keys() []*[]byte {
423 0 : return []*[]byte{&o.key}
424 0 : }
425 :
426 0 : func (o *setOp) diagramKeyRanges() []pebble.KeyRange {
427 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
428 0 : }
429 :
430 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
431 : type rangeKeyDeleteOp struct {
432 : writerID objID
433 : start []byte
434 : end []byte
435 : }
436 :
437 1 : func (o *rangeKeyDeleteOp) run(t *Test, h historyRecorder) {
438 1 : w := t.getWriter(o.writerID)
439 1 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
440 1 : h.Recordf("%s // %v", o, err)
441 1 : }
442 :
443 1 : func (o *rangeKeyDeleteOp) String() string {
444 1 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
445 1 : }
446 :
447 1 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
448 1 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
449 :
450 0 : func (o *rangeKeyDeleteOp) keys() []*[]byte {
451 0 : return []*[]byte{&o.start, &o.end}
452 0 : }
453 :
454 1 : func (o *rangeKeyDeleteOp) diagramKeyRanges() []pebble.KeyRange {
455 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
456 1 : }
457 :
458 : // rangeKeySetOp models a Write.RangeKeySet operation.
459 : type rangeKeySetOp struct {
460 : writerID objID
461 : start []byte
462 : end []byte
463 : suffix []byte
464 : value []byte
465 : }
466 :
467 1 : func (o *rangeKeySetOp) run(t *Test, h historyRecorder) {
468 1 : w := t.getWriter(o.writerID)
469 1 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
470 1 : h.Recordf("%s // %v", o, err)
471 1 : }
472 :
473 1 : func (o *rangeKeySetOp) String() string {
474 1 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
475 1 : o.writerID, o.start, o.end, o.suffix, o.value)
476 1 : }
477 :
478 1 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
479 1 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
480 :
481 1 : func (o *rangeKeySetOp) keys() []*[]byte {
482 1 : return []*[]byte{&o.start, &o.end}
483 1 : }
484 :
485 1 : func (o *rangeKeySetOp) diagramKeyRanges() []pebble.KeyRange {
486 1 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
487 1 : }
488 :
489 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
490 : type rangeKeyUnsetOp struct {
491 : writerID objID
492 : start []byte
493 : end []byte
494 : suffix []byte
495 : }
496 :
497 1 : func (o *rangeKeyUnsetOp) run(t *Test, h historyRecorder) {
498 1 : w := t.getWriter(o.writerID)
499 1 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
500 1 : h.Recordf("%s // %v", o, err)
501 1 : }
502 :
503 1 : func (o *rangeKeyUnsetOp) String() string {
504 1 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
505 1 : o.writerID, o.start, o.end, o.suffix)
506 1 : }
507 :
508 1 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
509 1 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
510 :
511 0 : func (o *rangeKeyUnsetOp) keys() []*[]byte {
512 0 : return []*[]byte{&o.start, &o.end}
513 0 : }
514 :
515 0 : func (o *rangeKeyUnsetOp) diagramKeyRanges() []pebble.KeyRange {
516 0 : return []pebble.KeyRange{{Start: o.start, End: o.end}}
517 0 : }
518 :
519 : // newBatchOp models a Write.NewBatch operation.
520 : type newBatchOp struct {
521 : dbID objID
522 : batchID objID
523 : }
524 :
525 1 : func (o *newBatchOp) run(t *Test, h historyRecorder) {
526 1 : b := t.getDB(o.dbID).NewBatch()
527 1 : t.setBatch(o.batchID, b)
528 1 : h.Recordf("%s", o)
529 1 : }
530 :
531 1 : func (o *newBatchOp) String() string { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
532 1 : func (o *newBatchOp) receiver() objID { return o.dbID }
533 1 : func (o *newBatchOp) syncObjs() objIDSlice {
534 1 : // NewBatch should not be concurrent with operations that interact with that
535 1 : // same batch.
536 1 : return []objID{o.batchID}
537 1 : }
538 :
539 0 : func (o *newBatchOp) keys() []*[]byte { return nil }
540 0 : func (o *newBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
541 :
542 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
543 : type newIndexedBatchOp struct {
544 : dbID objID
545 : batchID objID
546 : }
547 :
548 1 : func (o *newIndexedBatchOp) run(t *Test, h historyRecorder) {
549 1 : b := t.getDB(o.dbID).NewIndexedBatch()
550 1 : t.setBatch(o.batchID, b)
551 1 : h.Recordf("%s", o)
552 1 : }
553 :
554 1 : func (o *newIndexedBatchOp) String() string {
555 1 : return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
556 1 : }
557 1 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
558 1 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
559 1 : // NewIndexedBatch should not be concurrent with operations that interact
560 1 : // with that same batch.
561 1 : return []objID{o.batchID}
562 1 : }
563 :
564 0 : func (o *newIndexedBatchOp) keys() []*[]byte { return nil }
565 0 : func (o *newIndexedBatchOp) diagramKeyRanges() []pebble.KeyRange { return nil }
566 :
567 : // batchCommitOp models a Batch.Commit operation.
568 : type batchCommitOp struct {
569 : dbID objID
570 : batchID objID
571 : }
572 :
573 1 : func (o *batchCommitOp) run(t *Test, h historyRecorder) {
574 1 : b := t.getBatch(o.batchID)
575 1 : err := b.Commit(t.writeOpts)
576 1 : h.Recordf("%s // %v", o, err)
577 1 : }
578 :
579 1 : func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
580 1 : func (o *batchCommitOp) receiver() objID { return o.batchID }
581 1 : func (o *batchCommitOp) syncObjs() objIDSlice {
582 1 : // Synchronize on the database so that NewIters wait for the commit.
583 1 : return []objID{o.dbID}
584 1 : }
585 :
586 0 : func (o *batchCommitOp) keys() []*[]byte { return nil }
587 0 : func (o *batchCommitOp) diagramKeyRanges() []pebble.KeyRange { return nil }
588 :
589 : // ingestOp models a DB.Ingest operation.
590 : type ingestOp struct {
591 : dbID objID
592 : batchIDs []objID
593 :
594 : derivedDBIDs []objID
595 : }
596 :
597 1 : func (o *ingestOp) run(t *Test, h historyRecorder) {
598 1 : // We can only use apply as an alternative for ingestion if we are ingesting
599 1 : // a single batch. If we are ingesting multiple batches, the batches may
600 1 : // overlap which would cause ingestion to fail but apply would succeed.
601 1 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
602 0 : id := o.batchIDs[0]
603 0 : b := t.getBatch(id)
604 0 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
605 0 : db := t.getDB(o.dbID)
606 0 : c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
607 0 : if err == nil {
608 0 : err = db.Apply(c, t.writeOpts)
609 0 : }
610 0 : _ = b.Close()
611 0 : _ = c.Close()
612 0 : t.clearObj(id)
613 0 : h.Recordf("%s // %v", o, err)
614 0 : return
615 : }
616 :
617 1 : var paths []string
618 1 : var err error
619 1 : for i, id := range o.batchIDs {
620 1 : b := t.getBatch(id)
621 1 : t.clearObj(id)
622 1 : path, err2 := o.build(t, h, b, i)
623 1 : if err2 != nil {
624 0 : h.Recordf("Build(%s) // %v", id, err2)
625 0 : }
626 1 : err = firstError(err, err2)
627 1 : if err2 == nil {
628 1 : paths = append(paths, path)
629 1 : }
630 1 : err = firstError(err, b.Close())
631 : }
632 :
633 1 : err = firstError(err, t.withRetries(func() error {
634 1 : return t.getDB(o.dbID).Ingest(paths)
635 1 : }))
636 :
637 1 : h.Recordf("%s // %v", o, err)
638 : }
639 :
640 : func buildForIngest(
641 : t *Test, dbID objID, h historyRecorder, b *pebble.Batch, i int,
642 1 : ) (string, *sstable.WriterMetadata, error) {
643 1 : path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
644 1 : f, err := t.opts.FS.Create(path)
645 1 : if err != nil {
646 0 : return "", nil, err
647 0 : }
648 1 : db := t.getDB(dbID)
649 1 :
650 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
651 1 : defer closeIters(iter, rangeDelIter, rangeKeyIter)
652 1 :
653 1 : equal := t.opts.Comparer.Equal
654 1 : tableFormat := db.FormatMajorVersion().MaxTableFormat()
655 1 : wOpts := t.opts.MakeWriterOptions(0, tableFormat)
656 1 : if t.testOpts.disableValueBlocksForIngestSSTables {
657 1 : wOpts.DisableValueBlocks = true
658 1 : }
659 1 : w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), wOpts)
660 1 :
661 1 : var lastUserKey []byte
662 1 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
663 1 : // Ignore duplicate keys.
664 1 : if equal(lastUserKey, key.UserKey) {
665 1 : continue
666 : }
667 : // NB: We don't have to copy the key or value since we're reading from a
668 : // batch which doesn't do prefix compression.
669 1 : lastUserKey = key.UserKey
670 1 :
671 1 : key.SetSeqNum(base.SeqNumZero)
672 1 : // It's possible that we wrote the key on a batch from a db that supported
673 1 : // DeleteSized, but are now ingesting into a db that does not. Detect
674 1 : // this case and translate the key to an InternalKeyKindDelete.
675 1 : if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(dbID, pebble.FormatDeleteSizedAndObsolete) {
676 0 : value = pebble.LazyValue{}
677 0 : key.SetKind(pebble.InternalKeyKindDelete)
678 0 : }
679 1 : if err := w.Add(*key, value.InPlaceValue()); err != nil {
680 0 : return "", nil, err
681 0 : }
682 : }
683 1 : if err := iter.Close(); err != nil {
684 0 : return "", nil, err
685 0 : }
686 1 : iter = nil
687 1 :
688 1 : if rangeDelIter != nil {
689 1 : // NB: The range tombstones have already been fragmented by the Batch.
690 1 : t, err := rangeDelIter.First()
691 1 : for ; t != nil; t, err = rangeDelIter.Next() {
692 1 : // NB: We don't have to copy the key or value since we're reading from a
693 1 : // batch which doesn't do prefix compression.
694 1 : if err := w.DeleteRange(t.Start, t.End); err != nil {
695 0 : return "", nil, err
696 0 : }
697 : }
698 1 : if err != nil {
699 0 : return "", nil, err
700 0 : }
701 1 : if err := rangeDelIter.Close(); err != nil {
702 0 : return "", nil, err
703 0 : }
704 1 : rangeDelIter = nil
705 : }
706 :
707 1 : if rangeKeyIter != nil {
708 1 : span, err := rangeKeyIter.First()
709 1 : for ; span != nil; span, err = rangeKeyIter.Next() {
710 1 : // Coalesce the keys of this span and then zero the sequence
711 1 : // numbers. This is necessary in order to make the range keys within
712 1 : // the ingested sstable internally consistent at the sequence number
713 1 : // it's ingested at. The individual keys within a batch are
714 1 : // committed at unique sequence numbers, whereas all the keys of an
715 1 : // ingested sstable are given the same sequence number. A span
716 1 : // contaning keys that both set and unset the same suffix at the
717 1 : // same sequence number is nonsensical, so we "coalesce" or collapse
718 1 : // the keys.
719 1 : collapsed := keyspan.Span{
720 1 : Start: span.Start,
721 1 : End: span.End,
722 1 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
723 1 : }
724 1 : err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys)
725 1 : if err != nil {
726 0 : return "", nil, err
727 0 : }
728 1 : for i := range collapsed.Keys {
729 1 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
730 1 : }
731 1 : keyspan.SortKeysByTrailer(&collapsed.Keys)
732 1 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
733 0 : return "", nil, err
734 0 : }
735 : }
736 1 : if err != nil {
737 0 : return "", nil, err
738 0 : }
739 1 : if err := rangeKeyIter.Close(); err != nil {
740 0 : return "", nil, err
741 0 : }
742 1 : rangeKeyIter = nil
743 : }
744 :
745 1 : if err := w.Close(); err != nil {
746 0 : return "", nil, err
747 0 : }
748 1 : meta, err := w.Metadata()
749 1 : return path, meta, err
750 : }
751 :
752 1 : func (o *ingestOp) build(t *Test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
753 1 : path, _, err := buildForIngest(t, o.dbID, h, b, i)
754 1 : return path, err
755 1 : }
756 :
757 1 : func (o *ingestOp) receiver() objID { return o.dbID }
758 1 : func (o *ingestOp) syncObjs() objIDSlice {
759 1 : // Ingest should not be concurrent with mutating the batches that will be
760 1 : // ingested as sstables.
761 1 : objs := make([]objID, 0, len(o.batchIDs)+1)
762 1 : objs = append(objs, o.batchIDs...)
763 1 : addedDBs := make(map[objID]struct{})
764 1 : for i := range o.derivedDBIDs {
765 1 : _, ok := addedDBs[o.derivedDBIDs[i]]
766 1 : if !ok && o.derivedDBIDs[i] != o.dbID {
767 0 : objs = append(objs, o.derivedDBIDs[i])
768 0 : addedDBs[o.derivedDBIDs[i]] = struct{}{}
769 0 : }
770 : }
771 1 : return objs
772 : }
773 :
774 : func closeIters(
775 : pointIter base.InternalIterator,
776 : rangeDelIter keyspan.FragmentIterator,
777 : rangeKeyIter keyspan.FragmentIterator,
778 1 : ) {
779 1 : if pointIter != nil {
780 1 : pointIter.Close()
781 1 : }
782 1 : if rangeDelIter != nil {
783 1 : rangeDelIter.Close()
784 1 : }
785 1 : if rangeKeyIter != nil {
786 1 : rangeKeyIter.Close()
787 1 : }
788 : }
789 :
790 : // collapseBatch collapses the mutations in a batch to be equivalent to an
791 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
792 : // so that only the latest update is performed. All range deletions are
793 : // performed first in the batch to match the semantics of ingestion where a
794 : // range deletion does not delete a point record contained in the sstable.
795 : func (o *ingestOp) collapseBatch(
796 : t *Test,
797 : db *pebble.DB,
798 : pointIter base.InternalIterator,
799 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
800 : b *pebble.Batch,
801 0 : ) (*pebble.Batch, error) {
802 0 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
803 0 : equal := t.opts.Comparer.Equal
804 0 : collapsed := db.NewBatch()
805 0 :
806 0 : if rangeDelIter != nil {
807 0 : // NB: The range tombstones have already been fragmented by the Batch.
808 0 : t, err := rangeDelIter.First()
809 0 : for ; t != nil; t, err = rangeDelIter.Next() {
810 0 : // NB: We don't have to copy the key or value since we're reading from a
811 0 : // batch which doesn't do prefix compression.
812 0 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
813 0 : return nil, err
814 0 : }
815 : }
816 0 : if err != nil {
817 0 : return nil, err
818 0 : }
819 0 : if err := rangeDelIter.Close(); err != nil {
820 0 : return nil, err
821 0 : }
822 0 : rangeDelIter = nil
823 : }
824 :
825 0 : if pointIter != nil {
826 0 : var lastUserKey []byte
827 0 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
828 0 : // Ignore duplicate keys.
829 0 : //
830 0 : // Note: this is necessary due to MERGE keys, otherwise it would be
831 0 : // fine to include all the keys in the batch and let the normal
832 0 : // sequence number precedence determine which of the keys "wins".
833 0 : // But the code to build the ingested sstable will only keep the
834 0 : // most recent internal key and will not merge across internal keys.
835 0 : if equal(lastUserKey, key.UserKey) {
836 0 : continue
837 : }
838 : // NB: We don't have to copy the key or value since we're reading from a
839 : // batch which doesn't do prefix compression.
840 0 : lastUserKey = key.UserKey
841 0 :
842 0 : var err error
843 0 : switch key.Kind() {
844 0 : case pebble.InternalKeyKindDelete:
845 0 : err = collapsed.Delete(key.UserKey, nil)
846 0 : case pebble.InternalKeyKindDeleteSized:
847 0 : v, _ := binary.Uvarint(value.InPlaceValue())
848 0 : // Batch.DeleteSized takes just the length of the value being
849 0 : // deleted and adds the key's length to derive the overall entry
850 0 : // size of the value being deleted. This has already been done
851 0 : // to the key we're reading from the batch, so we must subtract
852 0 : // the key length from the encoded value before calling
853 0 : // collapsed.DeleteSized, which will again add the key length
854 0 : // before encoding.
855 0 : err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
856 0 : case pebble.InternalKeyKindSingleDelete:
857 0 : err = collapsed.SingleDelete(key.UserKey, nil)
858 0 : case pebble.InternalKeyKindSet:
859 0 : err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
860 0 : case pebble.InternalKeyKindMerge:
861 0 : err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
862 0 : case pebble.InternalKeyKindLogData:
863 0 : err = collapsed.LogData(key.UserKey, nil)
864 0 : default:
865 0 : err = errors.Errorf("unknown batch record kind: %d", key.Kind())
866 : }
867 0 : if err != nil {
868 0 : return nil, err
869 0 : }
870 : }
871 0 : if err := pointIter.Close(); err != nil {
872 0 : return nil, err
873 0 : }
874 0 : pointIter = nil
875 : }
876 :
877 : // There's no equivalent of a MERGE operator for range keys, so there's no
878 : // need to collapse the range keys here. Rather than reading the range keys
879 : // from `rangeKeyIter`, which will already be fragmented, read the range
880 : // keys from the batch and copy them verbatim. This marginally improves our
881 : // test coverage over the alternative approach of pre-fragmenting and
882 : // pre-coalescing before writing to the batch.
883 : //
884 : // The `rangeKeyIter` is used only to determine if there are any range keys
885 : // in the batch at all, and only because we already have it handy from
886 : // private.BatchSort.
887 0 : if rangeKeyIter != nil {
888 0 : for r := b.Reader(); ; {
889 0 : kind, key, value, ok, err := r.Next()
890 0 : if !ok {
891 0 : if err != nil {
892 0 : return nil, err
893 0 : }
894 0 : break
895 0 : } else if !rangekey.IsRangeKey(kind) {
896 0 : continue
897 : }
898 0 : ik := base.MakeInternalKey(key, 0, kind)
899 0 : if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
900 0 : return nil, err
901 0 : }
902 : }
903 0 : if err := rangeKeyIter.Close(); err != nil {
904 0 : return nil, err
905 0 : }
906 0 : rangeKeyIter = nil
907 : }
908 :
909 0 : return collapsed, nil
910 : }
911 :
912 1 : func (o *ingestOp) String() string {
913 1 : var buf strings.Builder
914 1 : buf.WriteString(o.dbID.String())
915 1 : buf.WriteString(".Ingest(")
916 1 : for i, id := range o.batchIDs {
917 1 : if i > 0 {
918 1 : buf.WriteString(", ")
919 1 : }
920 1 : buf.WriteString(id.String())
921 : }
922 1 : buf.WriteString(")")
923 1 : return buf.String()
924 : }
925 :
926 0 : func (o *ingestOp) keys() []*[]byte { return nil }
927 0 : func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil }
928 :
929 : type ingestAndExciseOp struct {
930 : dbID objID
931 : batchID objID
932 : derivedDBID objID
933 : exciseStart, exciseEnd []byte
934 : }
935 :
936 0 : func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) {
937 0 : var err error
938 0 : b := t.getBatch(o.batchID)
939 0 : t.clearObj(o.batchID)
940 0 : if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
941 0 : panic("non-well-formed excise span")
942 : }
943 0 : if b.Empty() {
944 0 : // No-op.
945 0 : h.Recordf("%s // %v", o, err)
946 0 : return
947 0 : }
948 0 : path, writerMeta, err2 := o.build(t, h, b, 0 /* i */)
949 0 : if err2 != nil {
950 0 : h.Recordf("Build(%s) // %v", o.batchID, err2)
951 0 : return
952 0 : }
953 0 : err = firstError(err, err2)
954 0 : err = firstError(err, b.Close())
955 0 :
956 0 : if writerMeta.Properties.NumEntries == 0 && writerMeta.Properties.NumRangeKeys() == 0 {
957 0 : // No-op.
958 0 : h.Recordf("%s // %v", o, err)
959 0 : return
960 0 : }
961 0 : db := t.getDB(o.dbID)
962 0 : if !t.testOpts.useExcise {
963 0 : // Do a rangedel and rangekeydel before the ingestion. This mimics the
964 0 : // behaviour of an excise.
965 0 : err = firstError(err, db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
966 0 : err = firstError(err, db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts))
967 0 : }
968 :
969 0 : if t.testOpts.useExcise {
970 0 : err = firstError(err, t.withRetries(func() error {
971 0 : _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, pebble.KeyRange{
972 0 : Start: o.exciseStart,
973 0 : End: o.exciseEnd,
974 0 : })
975 0 : return err
976 0 : }))
977 0 : } else {
978 0 : err = firstError(err, t.withRetries(func() error {
979 0 : return t.getDB(o.dbID).Ingest([]string{path})
980 0 : }))
981 : }
982 :
983 0 : h.Recordf("%s // %v", o, err)
984 : }
985 :
986 : func (o *ingestAndExciseOp) build(
987 : t *Test, h historyRecorder, b *pebble.Batch, i int,
988 0 : ) (string, *sstable.WriterMetadata, error) {
989 0 : return buildForIngest(t, o.dbID, h, b, i)
990 0 : }
991 :
992 0 : func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
993 0 : func (o *ingestAndExciseOp) syncObjs() objIDSlice {
994 0 : // Ingest should not be concurrent with mutating the batches that will be
995 0 : // ingested as sstables.
996 0 : objs := []objID{o.batchID}
997 0 : if o.derivedDBID != o.dbID {
998 0 : objs = append(objs, o.derivedDBID)
999 0 : }
1000 0 : return objs
1001 : }
1002 :
1003 1 : func (o *ingestAndExciseOp) String() string {
1004 1 : return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd)
1005 1 : }
1006 :
1007 0 : func (o *ingestAndExciseOp) keys() []*[]byte {
1008 0 : return []*[]byte{&o.exciseStart, &o.exciseEnd}
1009 0 : }
1010 :
1011 0 : func (o *ingestAndExciseOp) diagramKeyRanges() []pebble.KeyRange {
1012 0 : return []pebble.KeyRange{{Start: o.exciseStart, End: o.exciseEnd}}
1013 0 : }
1014 :
1015 : // getOp models a Reader.Get operation.
1016 : type getOp struct {
1017 : readerID objID
1018 : key []byte
1019 : derivedDBID objID
1020 : }
1021 :
1022 1 : func (o *getOp) run(t *Test, h historyRecorder) {
1023 1 : r := t.getReader(o.readerID)
1024 1 : var val []byte
1025 1 : var closer io.Closer
1026 1 : err := t.withRetries(func() (err error) {
1027 1 : val, closer, err = r.Get(o.key)
1028 1 : return err
1029 1 : })
1030 1 : h.Recordf("%s // [%q] %v", o, val, err)
1031 1 : if closer != nil {
1032 1 : closer.Close()
1033 1 : }
1034 : }
1035 :
1036 1 : func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
1037 1 : func (o *getOp) receiver() objID { return o.readerID }
1038 1 : func (o *getOp) syncObjs() objIDSlice {
1039 1 : if o.readerID.tag() == dbTag {
1040 1 : return nil
1041 1 : }
1042 : // batch.Get reads through to the current database state.
1043 1 : if o.derivedDBID != 0 {
1044 1 : return []objID{o.derivedDBID}
1045 1 : }
1046 0 : return nil
1047 : }
1048 :
1049 0 : func (o *getOp) keys() []*[]byte {
1050 0 : return []*[]byte{&o.key}
1051 0 : }
1052 :
1053 0 : func (o *getOp) diagramKeyRanges() []pebble.KeyRange {
1054 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1055 0 : }
1056 :
1057 : // newIterOp models a Reader.NewIter operation.
1058 : type newIterOp struct {
1059 : readerID objID
1060 : iterID objID
1061 : iterOpts
1062 : derivedDBID objID
1063 : }
1064 :
1065 : // Enable this to enable debug logging of range key iterator operations.
1066 : const debugIterators = false
1067 :
1068 1 : func (o *newIterOp) run(t *Test, h historyRecorder) {
1069 1 : r := t.getReader(o.readerID)
1070 1 : opts := iterOptions(o.iterOpts)
1071 1 : if debugIterators {
1072 0 : opts.DebugRangeKeyStack = true
1073 0 : }
1074 :
1075 1 : var i *pebble.Iterator
1076 1 : for {
1077 1 : i, _ = r.NewIter(opts)
1078 1 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
1079 1 : break
1080 : }
1081 : // close this iter and retry NewIter
1082 0 : _ = i.Close()
1083 : }
1084 1 : t.setIter(o.iterID, i)
1085 1 :
1086 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1087 1 : // the user-provided bounds.
1088 1 : if opts != nil {
1089 1 : rand.Read(opts.LowerBound[:])
1090 1 : rand.Read(opts.UpperBound[:])
1091 1 : }
1092 1 : h.Recordf("%s // %v", o, i.Error())
1093 : }
1094 :
1095 1 : func (o *newIterOp) String() string {
1096 1 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1097 1 : o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1098 1 : }
1099 :
1100 1 : func (o *newIterOp) receiver() objID { return o.readerID }
1101 1 : func (o *newIterOp) syncObjs() objIDSlice {
1102 1 : // Prevent o.iterID ops from running before it exists.
1103 1 : objs := []objID{o.iterID}
1104 1 : // If reading through a batch or snapshot, the new iterator will also observe database
1105 1 : // state, and we must synchronize on the database state for a consistent
1106 1 : // view.
1107 1 : if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
1108 1 : objs = append(objs, o.derivedDBID)
1109 1 : }
1110 1 : return objs
1111 : }
1112 :
1113 0 : func (o *newIterOp) keys() []*[]byte { return nil }
1114 1 : func (o *newIterOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1115 :
1116 : // newIterUsingCloneOp models a Iterator.Clone operation.
1117 : type newIterUsingCloneOp struct {
1118 : existingIterID objID
1119 : iterID objID
1120 : refreshBatch bool
1121 : iterOpts
1122 :
1123 : // derivedReaderID is the ID of the underlying reader that backs both the
1124 : // existing iterator and the new iterator. The derivedReaderID is NOT
1125 : // serialized by String and is derived from other operations during parse.
1126 : derivedReaderID objID
1127 : }
1128 :
1129 1 : func (o *newIterUsingCloneOp) run(t *Test, h historyRecorder) {
1130 1 : iter := t.getIter(o.existingIterID)
1131 1 : cloneOpts := pebble.CloneOptions{
1132 1 : IterOptions: iterOptions(o.iterOpts),
1133 1 : RefreshBatchView: o.refreshBatch,
1134 1 : }
1135 1 : i, err := iter.iter.Clone(cloneOpts)
1136 1 : if err != nil {
1137 0 : panic(err)
1138 : }
1139 1 : t.setIter(o.iterID, i)
1140 1 : h.Recordf("%s // %v", o, i.Error())
1141 : }
1142 :
1143 1 : func (o *newIterUsingCloneOp) String() string {
1144 1 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1145 1 : o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
1146 1 : o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1147 1 : }
1148 :
1149 1 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
1150 :
1151 1 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
1152 1 : objIDs := []objID{o.iterID}
1153 1 : // If the underlying reader is a batch, we must synchronize with the batch.
1154 1 : // If refreshBatch=true, synchronizing is necessary to observe all the
1155 1 : // mutations up to until this op and no more. Even when refreshBatch=false,
1156 1 : // we must synchronize because iterator construction may access state cached
1157 1 : // on the indexed batch to avoid refragmenting range tombstones or range
1158 1 : // keys.
1159 1 : if o.derivedReaderID.tag() == batchTag {
1160 0 : objIDs = append(objIDs, o.derivedReaderID)
1161 0 : }
1162 1 : return objIDs
1163 : }
1164 :
1165 0 : func (o *newIterUsingCloneOp) keys() []*[]byte { return nil }
1166 0 : func (o *newIterUsingCloneOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1167 :
1168 : // iterSetBoundsOp models an Iterator.SetBounds operation.
1169 : type iterSetBoundsOp struct {
1170 : iterID objID
1171 : lower []byte
1172 : upper []byte
1173 : }
1174 :
1175 1 : func (o *iterSetBoundsOp) run(t *Test, h historyRecorder) {
1176 1 : i := t.getIter(o.iterID)
1177 1 : var lower, upper []byte
1178 1 : if o.lower != nil {
1179 1 : lower = append(lower, o.lower...)
1180 1 : }
1181 1 : if o.upper != nil {
1182 1 : upper = append(upper, o.upper...)
1183 1 : }
1184 1 : i.SetBounds(lower, upper)
1185 1 :
1186 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1187 1 : // the user-provided bounds.
1188 1 : rand.Read(lower[:])
1189 1 : rand.Read(upper[:])
1190 1 :
1191 1 : h.Recordf("%s // %v", o, i.Error())
1192 : }
1193 :
1194 1 : func (o *iterSetBoundsOp) String() string {
1195 1 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
1196 1 : }
1197 :
1198 1 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
1199 1 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
1200 :
1201 0 : func (o *iterSetBoundsOp) keys() []*[]byte {
1202 0 : return []*[]byte{&o.lower, &o.upper}
1203 0 : }
1204 :
1205 0 : func (o *iterSetBoundsOp) diagramKeyRanges() []pebble.KeyRange {
1206 0 : return []pebble.KeyRange{{Start: o.lower, End: o.upper}}
1207 0 : }
1208 :
1209 : // iterSetOptionsOp models an Iterator.SetOptions operation.
1210 : type iterSetOptionsOp struct {
1211 : iterID objID
1212 : iterOpts
1213 :
1214 : // derivedReaderID is the ID of the underlying reader that backs the
1215 : // iterator. The derivedReaderID is NOT serialized by String and is derived
1216 : // from other operations during parse.
1217 : derivedReaderID objID
1218 : }
1219 :
1220 1 : func (o *iterSetOptionsOp) run(t *Test, h historyRecorder) {
1221 1 : i := t.getIter(o.iterID)
1222 1 :
1223 1 : opts := iterOptions(o.iterOpts)
1224 1 : if opts == nil {
1225 1 : opts = &pebble.IterOptions{}
1226 1 : }
1227 1 : i.SetOptions(opts)
1228 1 :
1229 1 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
1230 1 : // the user-provided bounds.
1231 1 : rand.Read(opts.LowerBound[:])
1232 1 : rand.Read(opts.UpperBound[:])
1233 1 :
1234 1 : h.Recordf("%s // %v", o, i.Error())
1235 : }
1236 :
1237 1 : func (o *iterSetOptionsOp) String() string {
1238 1 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
1239 1 : o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
1240 1 : }
1241 :
1242 1 : func iterOptions(o iterOpts) *pebble.IterOptions {
1243 1 : if o.IsZero() {
1244 1 : return nil
1245 1 : }
1246 1 : var lower, upper []byte
1247 1 : if o.lower != nil {
1248 1 : lower = append(lower, o.lower...)
1249 1 : }
1250 1 : if o.upper != nil {
1251 1 : upper = append(upper, o.upper...)
1252 1 : }
1253 1 : opts := &pebble.IterOptions{
1254 1 : LowerBound: lower,
1255 1 : UpperBound: upper,
1256 1 : KeyTypes: pebble.IterKeyType(o.keyTypes),
1257 1 : RangeKeyMasking: pebble.RangeKeyMasking{
1258 1 : Suffix: o.maskSuffix,
1259 1 : },
1260 1 : UseL6Filters: o.useL6Filters,
1261 1 : }
1262 1 : if opts.RangeKeyMasking.Suffix != nil {
1263 1 : opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
1264 1 : return sstable.NewTestKeysMaskingFilter()
1265 1 : }
1266 : }
1267 1 : if o.filterMax > 0 {
1268 1 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
1269 1 : sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
1270 1 : }
1271 1 : // Enforce the timestamp bounds in SkipPoint, so that the iterator never
1272 1 : // returns a key outside the filterMin, filterMax bounds. This provides
1273 1 : // deterministic iteration.
1274 1 : opts.SkipPoint = func(k []byte) (skip bool) {
1275 1 : n := testkeys.Comparer.Split(k)
1276 1 : if n == len(k) {
1277 1 : // No suffix, don't skip it.
1278 1 : return false
1279 1 : }
1280 1 : v, err := testkeys.ParseSuffix(k[n:])
1281 1 : if err != nil {
1282 0 : panic(err)
1283 : }
1284 1 : ts := uint64(v)
1285 1 : return ts < o.filterMin || ts >= o.filterMax
1286 : }
1287 : }
1288 1 : return opts
1289 : }
1290 :
1291 1 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
1292 :
1293 1 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
1294 1 : if o.derivedReaderID.tag() == batchTag {
1295 0 : // If the underlying reader is a batch, we must synchronize with the
1296 0 : // batch so that we observe all the mutations up until this operation
1297 0 : // and no more.
1298 0 : return []objID{o.derivedReaderID}
1299 0 : }
1300 1 : return nil
1301 : }
1302 :
1303 0 : func (o *iterSetOptionsOp) keys() []*[]byte { return nil }
1304 0 : func (o *iterSetOptionsOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1305 :
1306 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
1307 : type iterSeekGEOp struct {
1308 : iterID objID
1309 : key []byte
1310 : limit []byte
1311 :
1312 : derivedReaderID objID
1313 : }
1314 :
1315 1 : func iteratorPos(i *retryableIter) string {
1316 1 : var buf bytes.Buffer
1317 1 : fmt.Fprintf(&buf, "%q", i.Key())
1318 1 : hasPoint, hasRange := i.HasPointAndRange()
1319 1 : if hasPoint {
1320 1 : fmt.Fprintf(&buf, ",%q", i.Value())
1321 1 : } else {
1322 1 : fmt.Fprint(&buf, ",<no point>")
1323 1 : }
1324 1 : if hasRange {
1325 1 : start, end := i.RangeBounds()
1326 1 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
1327 1 : for i, rk := range i.RangeKeys() {
1328 1 : if i > 0 {
1329 1 : fmt.Fprint(&buf, ",")
1330 1 : }
1331 1 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
1332 : }
1333 1 : fmt.Fprint(&buf, "}")
1334 1 : } else {
1335 1 : fmt.Fprint(&buf, ",<no range>")
1336 1 : }
1337 1 : if i.RangeKeyChanged() {
1338 1 : fmt.Fprint(&buf, "*")
1339 1 : }
1340 1 : return buf.String()
1341 : }
1342 :
1343 1 : func validBoolToStr(valid bool) string {
1344 1 : return fmt.Sprintf("%t", valid)
1345 1 : }
1346 :
1347 1 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
1348 1 : // We can't distinguish between IterExhausted and IterAtLimit in a
1349 1 : // deterministic manner.
1350 1 : switch validity {
1351 1 : case pebble.IterExhausted, pebble.IterAtLimit:
1352 1 : return false, "invalid"
1353 1 : case pebble.IterValid:
1354 1 : return true, "valid"
1355 0 : default:
1356 0 : panic("unknown validity")
1357 : }
1358 : }
1359 :
1360 1 : func (o *iterSeekGEOp) run(t *Test, h historyRecorder) {
1361 1 : i := t.getIter(o.iterID)
1362 1 : var valid bool
1363 1 : var validStr string
1364 1 : if o.limit == nil {
1365 1 : valid = i.SeekGE(o.key)
1366 1 : validStr = validBoolToStr(valid)
1367 1 : } else {
1368 1 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
1369 1 : }
1370 1 : if valid {
1371 1 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1372 1 : } else {
1373 1 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1374 1 : }
1375 : }
1376 :
1377 1 : func (o *iterSeekGEOp) String() string {
1378 1 : return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
1379 1 : }
1380 1 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
1381 1 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1382 :
1383 0 : func (o *iterSeekGEOp) keys() []*[]byte {
1384 0 : return []*[]byte{&o.key}
1385 0 : }
1386 :
1387 1 : func (o *iterSeekGEOp) diagramKeyRanges() []pebble.KeyRange {
1388 1 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1389 1 : }
1390 :
1391 1 : func onlyBatchIDs(ids ...objID) objIDSlice {
1392 1 : var ret objIDSlice
1393 1 : for _, id := range ids {
1394 1 : if id.tag() == batchTag {
1395 0 : ret = append(ret, id)
1396 0 : }
1397 : }
1398 1 : return ret
1399 : }
1400 :
1401 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1402 : type iterSeekPrefixGEOp struct {
1403 : iterID objID
1404 : key []byte
1405 :
1406 : derivedReaderID objID
1407 : }
1408 :
1409 1 : func (o *iterSeekPrefixGEOp) run(t *Test, h historyRecorder) {
1410 1 : i := t.getIter(o.iterID)
1411 1 : valid := i.SeekPrefixGE(o.key)
1412 1 : if valid {
1413 1 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1414 1 : } else {
1415 1 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1416 1 : }
1417 : }
1418 :
1419 1 : func (o *iterSeekPrefixGEOp) String() string {
1420 1 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
1421 1 : }
1422 1 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1423 1 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1424 :
1425 0 : func (o *iterSeekPrefixGEOp) keys() []*[]byte {
1426 0 : return []*[]byte{&o.key}
1427 0 : }
1428 :
1429 0 : func (o *iterSeekPrefixGEOp) diagramKeyRanges() []pebble.KeyRange {
1430 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1431 0 : }
1432 :
1433 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1434 : type iterSeekLTOp struct {
1435 : iterID objID
1436 : key []byte
1437 : limit []byte
1438 :
1439 : derivedReaderID objID
1440 : }
1441 :
1442 1 : func (o *iterSeekLTOp) run(t *Test, h historyRecorder) {
1443 1 : i := t.getIter(o.iterID)
1444 1 : var valid bool
1445 1 : var validStr string
1446 1 : if o.limit == nil {
1447 1 : valid = i.SeekLT(o.key)
1448 1 : validStr = validBoolToStr(valid)
1449 1 : } else {
1450 1 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1451 1 : }
1452 1 : if valid {
1453 1 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1454 1 : } else {
1455 1 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1456 1 : }
1457 : }
1458 :
1459 1 : func (o *iterSeekLTOp) String() string {
1460 1 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
1461 1 : }
1462 :
1463 1 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1464 1 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1465 :
1466 0 : func (o *iterSeekLTOp) keys() []*[]byte {
1467 0 : return []*[]byte{&o.key}
1468 0 : }
1469 :
1470 0 : func (o *iterSeekLTOp) diagramKeyRanges() []pebble.KeyRange {
1471 0 : return []pebble.KeyRange{{Start: o.key, End: o.key}}
1472 0 : }
1473 :
1474 : // iterFirstOp models an Iterator.First operation.
1475 : type iterFirstOp struct {
1476 : iterID objID
1477 :
1478 : derivedReaderID objID
1479 : }
1480 :
1481 1 : func (o *iterFirstOp) run(t *Test, h historyRecorder) {
1482 1 : i := t.getIter(o.iterID)
1483 1 : valid := i.First()
1484 1 : if valid {
1485 1 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1486 1 : } else {
1487 1 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1488 1 : }
1489 : }
1490 :
1491 1 : func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
1492 1 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1493 1 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1494 :
1495 0 : func (o *iterFirstOp) keys() []*[]byte { return nil }
1496 0 : func (o *iterFirstOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1497 :
1498 : // iterLastOp models an Iterator.Last operation.
1499 : type iterLastOp struct {
1500 : iterID objID
1501 :
1502 : derivedReaderID objID
1503 : }
1504 :
1505 1 : func (o *iterLastOp) run(t *Test, h historyRecorder) {
1506 1 : i := t.getIter(o.iterID)
1507 1 : valid := i.Last()
1508 1 : if valid {
1509 1 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1510 1 : } else {
1511 1 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1512 1 : }
1513 : }
1514 :
1515 1 : func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
1516 1 : func (o *iterLastOp) receiver() objID { return o.iterID }
1517 1 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1518 :
1519 0 : func (o *iterLastOp) keys() []*[]byte { return nil }
1520 0 : func (o *iterLastOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1521 :
1522 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1523 : type iterNextOp struct {
1524 : iterID objID
1525 : limit []byte
1526 :
1527 : derivedReaderID objID
1528 : }
1529 :
1530 1 : func (o *iterNextOp) run(t *Test, h historyRecorder) {
1531 1 : i := t.getIter(o.iterID)
1532 1 : var valid bool
1533 1 : var validStr string
1534 1 : if o.limit == nil {
1535 1 : valid = i.Next()
1536 1 : validStr = validBoolToStr(valid)
1537 1 : } else {
1538 1 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1539 1 : }
1540 1 : if valid {
1541 1 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1542 1 : } else {
1543 1 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1544 1 : }
1545 : }
1546 :
1547 1 : func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
1548 1 : func (o *iterNextOp) receiver() objID { return o.iterID }
1549 1 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1550 :
1551 0 : func (o *iterNextOp) keys() []*[]byte { return nil }
1552 0 : func (o *iterNextOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1553 :
1554 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1555 : type iterNextPrefixOp struct {
1556 : iterID objID
1557 :
1558 : derivedReaderID objID
1559 : }
1560 :
1561 1 : func (o *iterNextPrefixOp) run(t *Test, h historyRecorder) {
1562 1 : i := t.getIter(o.iterID)
1563 1 : valid := i.NextPrefix()
1564 1 : validStr := validBoolToStr(valid)
1565 1 : if valid {
1566 1 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1567 1 : } else {
1568 1 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1569 1 : }
1570 : }
1571 :
1572 1 : func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
1573 1 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1574 1 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1575 :
1576 0 : func (o *iterNextPrefixOp) keys() []*[]byte { return nil }
1577 0 : func (o *iterNextPrefixOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1578 :
1579 : // iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
1580 : // Iterator.
1581 : type iterCanSingleDelOp struct {
1582 : iterID objID
1583 :
1584 : derivedReaderID objID
1585 : }
1586 :
1587 1 : func (o *iterCanSingleDelOp) run(t *Test, h historyRecorder) {
1588 1 : // TODO(jackson): When we perform error injection, we'll need to rethink
1589 1 : // this.
1590 1 : _, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
1591 1 : // The return value of CanDeterministicallySingleDelete is dependent on
1592 1 : // internal LSM state and non-deterministic, so we don't record it.
1593 1 : // Including the operation within the metamorphic test at all helps ensure
1594 1 : // that it does not change the result of any other Iterator operation that
1595 1 : // should be deterministic, regardless of its own outcome.
1596 1 : //
1597 1 : // We still record the value of the error because it's deterministic, at
1598 1 : // least for now. The possible error cases are:
1599 1 : // - The iterator was already in an error state when the operation ran.
1600 1 : // - The operation is deterministically invalid (like using an InternalNext
1601 1 : // to change directions.)
1602 1 : h.Recordf("%s // %v", o, err)
1603 1 : }
1604 :
1605 1 : func (o *iterCanSingleDelOp) String() string { return fmt.Sprintf("%s.InternalNext()", o.iterID) }
1606 1 : func (o *iterCanSingleDelOp) receiver() objID { return o.iterID }
1607 1 : func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1608 :
1609 0 : func (o *iterCanSingleDelOp) keys() []*[]byte { return nil }
1610 0 : func (o *iterCanSingleDelOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1611 :
1612 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1613 : type iterPrevOp struct {
1614 : iterID objID
1615 : limit []byte
1616 :
1617 : derivedReaderID objID
1618 : }
1619 :
1620 1 : func (o *iterPrevOp) run(t *Test, h historyRecorder) {
1621 1 : i := t.getIter(o.iterID)
1622 1 : var valid bool
1623 1 : var validStr string
1624 1 : if o.limit == nil {
1625 1 : valid = i.Prev()
1626 1 : validStr = validBoolToStr(valid)
1627 1 : } else {
1628 1 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1629 1 : }
1630 1 : if valid {
1631 1 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1632 1 : } else {
1633 1 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1634 1 : }
1635 : }
1636 :
1637 1 : func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
1638 1 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1639 1 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1640 :
1641 0 : func (o *iterPrevOp) keys() []*[]byte { return nil }
1642 0 : func (o *iterPrevOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1643 :
1644 : // newSnapshotOp models a DB.NewSnapshot operation.
1645 : type newSnapshotOp struct {
1646 : dbID objID
1647 : snapID objID
1648 : // If nonempty, this snapshot must not be used to read any keys outside of
1649 : // the provided bounds. This allows some implementations to use 'Eventually
1650 : // file-only snapshots,' which require bounds.
1651 : bounds []pebble.KeyRange
1652 : }
1653 :
1654 1 : func (o *newSnapshotOp) run(t *Test, h historyRecorder) {
1655 1 : bounds := o.bounds
1656 1 : if len(bounds) == 0 {
1657 0 : panic("bounds unexpectedly unset for newSnapshotOp")
1658 : }
1659 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1660 1 : createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
1661 1 : // If either of these options is true, an EFOS _must_ be created, regardless
1662 1 : // of what the fibonacci hash returned.
1663 1 : excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExcise
1664 1 : if createEfos || excisePossible {
1665 1 : s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
1666 1 : t.setSnapshot(o.snapID, s)
1667 1 : // If the EFOS isn't guaranteed to always create iterators, we must force
1668 1 : // a flush on this DB so it transitions this EFOS into a file-only snapshot.
1669 1 : if excisePossible && !t.testOpts.efosAlwaysCreatesIters {
1670 0 : err := t.getDB(o.dbID).Flush()
1671 0 : if err != nil {
1672 0 : h.Recordf("%s // %v", o, err)
1673 0 : }
1674 : }
1675 1 : } else {
1676 1 : s := t.getDB(o.dbID).NewSnapshot()
1677 1 : t.setSnapshot(o.snapID, s)
1678 1 : }
1679 1 : h.Recordf("%s", o)
1680 : }
1681 :
1682 1 : func (o *newSnapshotOp) String() string {
1683 1 : var buf bytes.Buffer
1684 1 : fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
1685 1 : for i := range o.bounds {
1686 1 : if i > 0 {
1687 1 : fmt.Fprint(&buf, ", ")
1688 1 : }
1689 1 : fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
1690 : }
1691 1 : fmt.Fprint(&buf, ")")
1692 1 : return buf.String()
1693 : }
1694 1 : func (o *newSnapshotOp) receiver() objID { return o.dbID }
1695 1 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1696 :
1697 1 : func (o *newSnapshotOp) keys() []*[]byte {
1698 1 : var res []*[]byte
1699 1 : for i := range o.bounds {
1700 1 : res = append(res, &o.bounds[i].Start, &o.bounds[i].End)
1701 1 : }
1702 1 : return res
1703 : }
1704 :
1705 1 : func (o *newSnapshotOp) diagramKeyRanges() []pebble.KeyRange {
1706 1 : return o.bounds
1707 1 : }
1708 :
1709 : type dbRatchetFormatMajorVersionOp struct {
1710 : dbID objID
1711 : vers pebble.FormatMajorVersion
1712 : }
1713 :
1714 1 : func (o *dbRatchetFormatMajorVersionOp) run(t *Test, h historyRecorder) {
1715 1 : var err error
1716 1 : // NB: We no-op the operation if we're already at or above the provided
1717 1 : // format major version. Different runs start at different format major
1718 1 : // versions, making the presence of an error and the error message itself
1719 1 : // non-deterministic if we attempt to upgrade to an older version.
1720 1 : //
1721 1 : //Regardless, subsequent operations should behave identically, which is what
1722 1 : //we're really aiming to test by including this format major version ratchet
1723 1 : //operation.
1724 1 : if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
1725 1 : err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
1726 1 : }
1727 1 : h.Recordf("%s // %v", o, err)
1728 : }
1729 :
1730 1 : func (o *dbRatchetFormatMajorVersionOp) String() string {
1731 1 : return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
1732 1 : }
1733 1 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID }
1734 1 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1735 :
1736 0 : func (o *dbRatchetFormatMajorVersionOp) keys() []*[]byte { return nil }
1737 0 : func (o *dbRatchetFormatMajorVersionOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1738 :
1739 : type dbRestartOp struct {
1740 : dbID objID
1741 :
1742 : // affectedObjects is the list of additional objects that are affected by this
1743 : // operation, and which syncObjs() must return so that we don't perform the
1744 : // restart in parallel with other operations to affected objects.
1745 : affectedObjects []objID
1746 : }
1747 :
1748 1 : func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1749 1 : if err := t.restartDB(o.dbID); err != nil {
1750 0 : h.Recordf("%s // %v", o, err)
1751 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1752 1 : } else {
1753 1 : h.Recordf("%s", o)
1754 1 : }
1755 : }
1756 :
1757 1 : func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) }
1758 1 : func (o *dbRestartOp) receiver() objID { return o.dbID }
1759 1 : func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
1760 :
1761 0 : func (o *dbRestartOp) keys() []*[]byte { return nil }
1762 0 : func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
1763 :
1764 1 : func formatOps(ops []op) string {
1765 1 : var buf strings.Builder
1766 1 : for _, op := range ops {
1767 1 : fmt.Fprintf(&buf, "%s\n", op)
1768 1 : }
1769 1 : return buf.String()
1770 : }
1771 :
1772 : // replicateOp models an operation that could copy keys from one db to
1773 : // another through either an IngestAndExcise, or an Ingest.
1774 : type replicateOp struct {
1775 : source, dest objID
1776 : start, end []byte
1777 : }
1778 :
1779 : func (r *replicateOp) runSharedReplicate(
1780 : t *Test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
1781 0 : ) {
1782 0 : var sharedSSTs []pebble.SharedSSTMeta
1783 0 : var err error
1784 0 : err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
1785 0 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
1786 0 : val, _, err := value.Value(nil)
1787 0 : if err != nil {
1788 0 : panic(err)
1789 : }
1790 0 : return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
1791 : },
1792 0 : func(start, end []byte, seqNum uint64) error {
1793 0 : return w.DeleteRange(start, end)
1794 0 : },
1795 0 : func(start, end []byte, keys []keyspan.Key) error {
1796 0 : s := keyspan.Span{
1797 0 : Start: start,
1798 0 : End: end,
1799 0 : Keys: keys,
1800 0 : }
1801 0 : return rangekey.Encode(&s, w.AddRangeKey)
1802 0 : },
1803 0 : func(sst *pebble.SharedSSTMeta) error {
1804 0 : sharedSSTs = append(sharedSSTs, *sst)
1805 0 : return nil
1806 0 : },
1807 : )
1808 0 : if err != nil {
1809 0 : h.Recordf("%s // %v", r, err)
1810 0 : return
1811 0 : }
1812 :
1813 0 : err = w.Close()
1814 0 : if err != nil {
1815 0 : h.Recordf("%s // %v", r, err)
1816 0 : return
1817 0 : }
1818 0 : meta, err := w.Metadata()
1819 0 : if err != nil {
1820 0 : h.Recordf("%s // %v", r, err)
1821 0 : return
1822 0 : }
1823 0 : if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 && meta.Properties.NumRangeKeys() == 0 {
1824 0 : // IngestAndExcise below will be a no-op. We should do a
1825 0 : // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate
1826 0 : // case.
1827 0 : //
1828 0 : // TODO(bilal): Remove this when we support excises with no matching ingests.
1829 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
1830 0 : h.Recordf("%s // %v", r, err)
1831 0 : return
1832 0 : }
1833 0 : err := dest.DeleteRange(r.start, r.end, t.writeOpts)
1834 0 : h.Recordf("%s // %v", r, err)
1835 0 : return
1836 : }
1837 :
1838 0 : _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
1839 0 : h.Recordf("%s // %v", r, err)
1840 : }
1841 :
1842 0 : func (r *replicateOp) run(t *Test, h historyRecorder) {
1843 0 : // Shared replication only works if shared storage is enabled.
1844 0 : useSharedIngest := t.testOpts.useSharedReplicate
1845 0 : if !t.testOpts.sharedStorageEnabled {
1846 0 : useSharedIngest = false
1847 0 : }
1848 :
1849 0 : source := t.getDB(r.source)
1850 0 : dest := t.getDB(r.dest)
1851 0 : sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
1852 0 : f, err := t.opts.FS.Create(sstPath)
1853 0 : if err != nil {
1854 0 : h.Recordf("%s // %v", r, err)
1855 0 : return
1856 0 : }
1857 0 : w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
1858 0 :
1859 0 : if useSharedIngest {
1860 0 : r.runSharedReplicate(t, h, source, dest, w, sstPath)
1861 0 : return
1862 0 : }
1863 :
1864 : // First, do a RangeKeyDelete and DeleteRange on the whole span.
1865 0 : if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
1866 0 : h.Recordf("%s // %v", r, err)
1867 0 : return
1868 0 : }
1869 0 : if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil {
1870 0 : h.Recordf("%s // %v", r, err)
1871 0 : return
1872 0 : }
1873 0 : iter, err := source.NewIter(&pebble.IterOptions{
1874 0 : LowerBound: r.start,
1875 0 : UpperBound: r.end,
1876 0 : KeyTypes: pebble.IterKeyTypePointsAndRanges,
1877 0 : })
1878 0 : if err != nil {
1879 0 : panic(err)
1880 : }
1881 0 : defer iter.Close()
1882 0 :
1883 0 : for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
1884 0 : hasPoint, hasRange := iter.HasPointAndRange()
1885 0 : if hasPoint {
1886 0 : val, err := iter.ValueAndErr()
1887 0 : if err != nil {
1888 0 : panic(err)
1889 : }
1890 0 : if err := w.Set(iter.Key(), val); err != nil {
1891 0 : panic(err)
1892 : }
1893 : }
1894 0 : if hasRange && iter.RangeKeyChanged() {
1895 0 : rangeKeys := iter.RangeKeys()
1896 0 : rkStart, rkEnd := iter.RangeBounds()
1897 0 :
1898 0 : span := &keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
1899 0 : for i := range rangeKeys {
1900 0 : span.Keys[i] = keyspan.Key{
1901 0 : Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
1902 0 : Suffix: rangeKeys[i].Suffix,
1903 0 : Value: rangeKeys[i].Value,
1904 0 : }
1905 0 : }
1906 0 : keyspan.SortKeysByTrailer(&span.Keys)
1907 0 : if err := rangekey.Encode(span, w.AddRangeKey); err != nil {
1908 0 : panic(err)
1909 : }
1910 : }
1911 : }
1912 0 : if err := iter.Error(); err != nil {
1913 0 : h.Recordf("%s // %v", r, err)
1914 0 : return
1915 0 : }
1916 0 : if err := w.Close(); err != nil {
1917 0 : panic(err)
1918 : }
1919 :
1920 0 : err = dest.Ingest([]string{sstPath})
1921 0 : h.Recordf("%s // %v", r, err)
1922 : }
1923 :
1924 1 : func (r *replicateOp) String() string {
1925 1 : return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
1926 1 : }
1927 :
1928 0 : func (r *replicateOp) receiver() objID { return r.source }
1929 0 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
1930 :
1931 0 : func (r *replicateOp) keys() []*[]byte {
1932 0 : return []*[]byte{&r.start, &r.end}
1933 0 : }
1934 :
1935 1 : func (r *replicateOp) diagramKeyRanges() []pebble.KeyRange {
1936 1 : return []pebble.KeyRange{{Start: r.start, End: r.end}}
1937 1 : }
|