Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "crypto/rand"
11 : "encoding/binary"
12 : "fmt"
13 : "io"
14 : "path"
15 : "path/filepath"
16 : "strings"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble"
20 : "github.com/cockroachdb/pebble/internal/base"
21 : "github.com/cockroachdb/pebble/internal/keyspan"
22 : "github.com/cockroachdb/pebble/internal/private"
23 : "github.com/cockroachdb/pebble/internal/rangekey"
24 : "github.com/cockroachdb/pebble/internal/testkeys"
25 : "github.com/cockroachdb/pebble/objstorage/objstorageprovider"
26 : "github.com/cockroachdb/pebble/sstable"
27 : "github.com/cockroachdb/pebble/vfs/errorfs"
28 : )
29 :
30 : // op defines the interface for a single operation, such as creating a batch,
31 : // or advancing an iterator.
32 : type op interface {
33 : String() string
34 : run(t *test, h historyRecorder)
35 :
36 : // receiver returns the object ID of the object the operation is performed
37 : // on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
38 : // its receiver). Receivers are used for synchronization when running with
39 : // concurrency.
40 : receiver() objID
41 :
42 : // syncObjs returns an additional set of object IDs—excluding the
43 : // receiver—that the operation must synchronize with. At execution time,
44 : // the operation will run serially with respect to all other operations
45 : // that return these objects from their own syncObjs or receiver methods.
46 : syncObjs() objIDSlice
47 : }
48 :
49 : // initOp performs test initialization
50 : type initOp struct {
51 : dbSlots uint32
52 : batchSlots uint32
53 : iterSlots uint32
54 : snapshotSlots uint32
55 : }
56 :
57 2 : func (o *initOp) run(t *test, h historyRecorder) {
58 2 : t.batches = make([]*pebble.Batch, o.batchSlots)
59 2 : t.iters = make([]*retryableIter, o.iterSlots)
60 2 : t.snapshots = make([]readerCloser, o.snapshotSlots)
61 2 : h.Recordf("%s", o)
62 2 : }
63 :
64 2 : func (o *initOp) String() string {
65 2 : return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)",
66 2 : o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots)
67 2 : }
68 :
69 2 : func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
70 2 : func (o *initOp) syncObjs() objIDSlice {
71 2 : syncObjs := make([]objID, 0)
72 2 : // Add any additional DBs to syncObjs.
73 2 : for i := uint32(2); i < o.dbSlots+1; i++ {
74 1 : syncObjs = append(syncObjs, makeObjID(dbTag, i))
75 1 : }
76 2 : return syncObjs
77 : }
78 :
79 : // applyOp models a Writer.Apply operation.
80 : type applyOp struct {
81 : writerID objID
82 : batchID objID
83 : }
84 :
85 2 : func (o *applyOp) run(t *test, h historyRecorder) {
86 2 : b := t.getBatch(o.batchID)
87 2 : w := t.getWriter(o.writerID)
88 2 : var err error
89 2 : if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
90 1 : err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
91 1 : if err == nil {
92 1 : err = b.SyncWait()
93 1 : }
94 2 : } else {
95 2 : err = w.Apply(b, t.writeOpts)
96 2 : }
97 2 : h.Recordf("%s // %v", o, err)
98 : // batch will be closed by a closeOp which is guaranteed to be generated
99 : }
100 :
101 2 : func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
102 2 : func (o *applyOp) receiver() objID { return o.writerID }
103 2 : func (o *applyOp) syncObjs() objIDSlice {
104 2 : // Apply should not be concurrent with operations that are mutating the
105 2 : // batch.
106 2 : return []objID{o.batchID}
107 2 : }
108 :
109 : // checkpointOp models a DB.Checkpoint operation.
110 : type checkpointOp struct {
111 : dbID objID
112 : // If non-empty, the checkpoint is restricted to these spans.
113 : spans []pebble.CheckpointSpan
114 : }
115 :
116 2 : func (o *checkpointOp) run(t *test, h historyRecorder) {
117 2 : // TODO(josh): db.Checkpoint does not work with shared storage yet.
118 2 : // It would be better to filter out ahead of calling run on the op,
119 2 : // by setting the weight that generator.go uses to zero, or similar.
120 2 : // But IIUC the ops are shared for ALL the metamorphic test runs, so
121 2 : // not sure how to do that easily:
122 2 : // https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
123 2 : if t.testOpts.sharedStorageEnabled {
124 1 : h.Recordf("%s // %v", o, nil)
125 1 : return
126 1 : }
127 2 : var opts []pebble.CheckpointOption
128 2 : if len(o.spans) > 0 {
129 2 : opts = append(opts, pebble.WithRestrictToSpans(o.spans))
130 2 : }
131 2 : db := t.getDB(o.dbID)
132 2 : err := withRetries(func() error {
133 2 : return db.Checkpoint(o.dir(t.dir, h.op), opts...)
134 2 : })
135 2 : h.Recordf("%s // %v", o, err)
136 : }
137 :
138 2 : func (o *checkpointOp) dir(dataDir string, idx int) string {
139 2 : return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
140 2 : }
141 :
142 2 : func (o *checkpointOp) String() string {
143 2 : var spanStr bytes.Buffer
144 2 : for i, span := range o.spans {
145 2 : if i > 0 {
146 2 : spanStr.WriteString(",")
147 2 : }
148 2 : fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
149 : }
150 2 : return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
151 : }
152 :
153 2 : func (o *checkpointOp) receiver() objID { return o.dbID }
154 2 : func (o *checkpointOp) syncObjs() objIDSlice { return nil }
155 :
156 : // closeOp models a {Batch,Iterator,Snapshot}.Close operation.
157 : type closeOp struct {
158 : objID objID
159 : derivedDBID objID
160 : }
161 :
162 2 : func (o *closeOp) run(t *test, h historyRecorder) {
163 2 : c := t.getCloser(o.objID)
164 2 : if o.objID.tag() == dbTag && t.opts.DisableWAL {
165 1 : // Special case: If WAL is disabled, do a flush right before DB Close. This
166 1 : // allows us to reuse this run's data directory as initial state for
167 1 : // future runs without losing any mutations.
168 1 : _ = t.getDB(o.objID).Flush()
169 1 : }
170 2 : t.clearObj(o.objID)
171 2 : err := c.Close()
172 2 : h.Recordf("%s // %v", o, err)
173 : }
174 :
175 2 : func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
176 2 : func (o *closeOp) receiver() objID { return o.objID }
177 2 : func (o *closeOp) syncObjs() objIDSlice {
178 2 : // Synchronize on the database so that we don't close the database before
179 2 : // all its iterators, snapshots and batches are closed.
180 2 : // TODO(jackson): It would be nice to relax this so that Close calls can
181 2 : // execute in parallel.
182 2 : if o.objID.tag() == dbTag {
183 2 : return nil
184 2 : }
185 2 : if o.derivedDBID != 0 {
186 2 : return []objID{o.derivedDBID}
187 2 : }
188 0 : return nil
189 : }
190 :
191 : // compactOp models a DB.Compact operation.
192 : type compactOp struct {
193 : dbID objID
194 : start []byte
195 : end []byte
196 : parallelize bool
197 : }
198 :
199 2 : func (o *compactOp) run(t *test, h historyRecorder) {
200 2 : err := withRetries(func() error {
201 2 : return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
202 2 : })
203 2 : h.Recordf("%s // %v", o, err)
204 : }
205 :
206 2 : func (o *compactOp) String() string {
207 2 : return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
208 2 : }
209 :
210 2 : func (o *compactOp) receiver() objID { return o.dbID }
211 2 : func (o *compactOp) syncObjs() objIDSlice { return nil }
212 :
213 : // deleteOp models a Write.Delete operation.
214 : type deleteOp struct {
215 : writerID objID
216 : key []byte
217 :
218 : derivedDBID objID
219 : }
220 :
221 2 : func (o *deleteOp) run(t *test, h historyRecorder) {
222 2 : w := t.getWriter(o.writerID)
223 2 : var err error
224 2 : if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
225 1 : // Call DeleteSized with a deterministic size derived from the index.
226 1 : // The size does not need to be accurate for correctness.
227 1 : err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
228 2 : } else {
229 2 : err = w.Delete(o.key, t.writeOpts)
230 2 : }
231 2 : h.Recordf("%s // %v", o, err)
232 : }
233 :
234 1 : func hashSize(index int) uint32 {
235 1 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
236 1 : return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
237 1 : }
238 :
239 2 : func (o *deleteOp) String() string {
240 2 : return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
241 2 : }
242 2 : func (o *deleteOp) receiver() objID { return o.writerID }
243 2 : func (o *deleteOp) syncObjs() objIDSlice { return nil }
244 :
245 : // singleDeleteOp models a Write.SingleDelete operation.
246 : type singleDeleteOp struct {
247 : writerID objID
248 : key []byte
249 : maybeReplaceDelete bool
250 : }
251 :
252 2 : func (o *singleDeleteOp) run(t *test, h historyRecorder) {
253 2 : w := t.getWriter(o.writerID)
254 2 : var err error
255 2 : if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
256 1 : err = w.Delete(o.key, t.writeOpts)
257 2 : } else {
258 2 : err = w.SingleDelete(o.key, t.writeOpts)
259 2 : }
260 : // NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
261 : // write the former to the history log. The log line will indicate whether
262 : // or not the delete *could* have been replaced. The OPTIONS file should
263 : // also be consulted to determine what happened at runtime (i.e. by taking
264 : // the logical AND).
265 2 : h.Recordf("%s // %v", o, err)
266 : }
267 :
268 2 : func (o *singleDeleteOp) String() string {
269 2 : return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
270 2 : }
271 :
272 2 : func (o *singleDeleteOp) receiver() objID { return o.writerID }
273 2 : func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
274 :
275 : // deleteRangeOp models a Write.DeleteRange operation.
276 : type deleteRangeOp struct {
277 : writerID objID
278 : start []byte
279 : end []byte
280 : }
281 :
282 2 : func (o *deleteRangeOp) run(t *test, h historyRecorder) {
283 2 : w := t.getWriter(o.writerID)
284 2 : err := w.DeleteRange(o.start, o.end, t.writeOpts)
285 2 : h.Recordf("%s // %v", o, err)
286 2 : }
287 :
288 2 : func (o *deleteRangeOp) String() string {
289 2 : return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
290 2 : }
291 :
292 2 : func (o *deleteRangeOp) receiver() objID { return o.writerID }
293 2 : func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
294 :
295 : // flushOp models a DB.Flush operation.
296 : type flushOp struct {
297 : db objID
298 : }
299 :
300 2 : func (o *flushOp) run(t *test, h historyRecorder) {
301 2 : db := t.getDB(o.db)
302 2 : err := db.Flush()
303 2 : h.Recordf("%s // %v", o, err)
304 2 : }
305 :
306 2 : func (o *flushOp) String() string { return fmt.Sprintf("%s.Flush()", o.db) }
307 2 : func (o *flushOp) receiver() objID { return o.db }
308 2 : func (o *flushOp) syncObjs() objIDSlice { return nil }
309 :
310 : // mergeOp models a Write.Merge operation.
311 : type mergeOp struct {
312 : writerID objID
313 : key []byte
314 : value []byte
315 : }
316 :
317 2 : func (o *mergeOp) run(t *test, h historyRecorder) {
318 2 : w := t.getWriter(o.writerID)
319 2 : err := w.Merge(o.key, o.value, t.writeOpts)
320 2 : h.Recordf("%s // %v", o, err)
321 2 : }
322 :
323 2 : func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
324 2 : func (o *mergeOp) receiver() objID { return o.writerID }
325 2 : func (o *mergeOp) syncObjs() objIDSlice { return nil }
326 :
327 : // setOp models a Write.Set operation.
328 : type setOp struct {
329 : writerID objID
330 : key []byte
331 : value []byte
332 : }
333 :
334 2 : func (o *setOp) run(t *test, h historyRecorder) {
335 2 : w := t.getWriter(o.writerID)
336 2 : err := w.Set(o.key, o.value, t.writeOpts)
337 2 : h.Recordf("%s // %v", o, err)
338 2 : }
339 :
340 2 : func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
341 2 : func (o *setOp) receiver() objID { return o.writerID }
342 2 : func (o *setOp) syncObjs() objIDSlice { return nil }
343 :
344 : // rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
345 : type rangeKeyDeleteOp struct {
346 : writerID objID
347 : start []byte
348 : end []byte
349 : }
350 :
351 2 : func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
352 2 : w := t.getWriter(o.writerID)
353 2 : err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
354 2 : h.Recordf("%s // %v", o, err)
355 2 : }
356 :
357 2 : func (o *rangeKeyDeleteOp) String() string {
358 2 : return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
359 2 : }
360 :
361 2 : func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
362 2 : func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
363 :
364 : // rangeKeySetOp models a Write.RangeKeySet operation.
365 : type rangeKeySetOp struct {
366 : writerID objID
367 : start []byte
368 : end []byte
369 : suffix []byte
370 : value []byte
371 : }
372 :
373 2 : func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
374 2 : w := t.getWriter(o.writerID)
375 2 : err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
376 2 : h.Recordf("%s // %v", o, err)
377 2 : }
378 :
379 2 : func (o *rangeKeySetOp) String() string {
380 2 : return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
381 2 : o.writerID, o.start, o.end, o.suffix, o.value)
382 2 : }
383 :
384 2 : func (o *rangeKeySetOp) receiver() objID { return o.writerID }
385 2 : func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
386 :
387 : // rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
388 : type rangeKeyUnsetOp struct {
389 : writerID objID
390 : start []byte
391 : end []byte
392 : suffix []byte
393 : }
394 :
395 2 : func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
396 2 : w := t.getWriter(o.writerID)
397 2 : err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
398 2 : h.Recordf("%s // %v", o, err)
399 2 : }
400 :
401 2 : func (o *rangeKeyUnsetOp) String() string {
402 2 : return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
403 2 : o.writerID, o.start, o.end, o.suffix)
404 2 : }
405 :
406 2 : func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
407 2 : func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
408 :
409 : // newBatchOp models a Write.NewBatch operation.
410 : type newBatchOp struct {
411 : dbID objID
412 : batchID objID
413 : }
414 :
415 2 : func (o *newBatchOp) run(t *test, h historyRecorder) {
416 2 : b := t.getDB(o.dbID).NewBatch()
417 2 : t.setBatch(o.batchID, b)
418 2 : h.Recordf("%s", o)
419 2 : }
420 :
421 2 : func (o *newBatchOp) String() string { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
422 2 : func (o *newBatchOp) receiver() objID { return o.dbID }
423 2 : func (o *newBatchOp) syncObjs() objIDSlice {
424 2 : // NewBatch should not be concurrent with operations that interact with that
425 2 : // same batch.
426 2 : return []objID{o.batchID}
427 2 : }
428 :
429 : // newIndexedBatchOp models a Write.NewIndexedBatch operation.
430 : type newIndexedBatchOp struct {
431 : dbID objID
432 : batchID objID
433 : }
434 :
435 2 : func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
436 2 : b := t.getDB(o.dbID).NewIndexedBatch()
437 2 : t.setBatch(o.batchID, b)
438 2 : h.Recordf("%s", o)
439 2 : }
440 :
441 2 : func (o *newIndexedBatchOp) String() string {
442 2 : return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
443 2 : }
444 2 : func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
445 2 : func (o *newIndexedBatchOp) syncObjs() objIDSlice {
446 2 : // NewIndexedBatch should not be concurrent with operations that interact
447 2 : // with that same batch.
448 2 : return []objID{o.batchID}
449 2 : }
450 :
451 : // batchCommitOp models a Batch.Commit operation.
452 : type batchCommitOp struct {
453 : dbID objID
454 : batchID objID
455 : }
456 :
457 2 : func (o *batchCommitOp) run(t *test, h historyRecorder) {
458 2 : b := t.getBatch(o.batchID)
459 2 : err := b.Commit(t.writeOpts)
460 2 : h.Recordf("%s // %v", o, err)
461 2 : }
462 :
463 2 : func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
464 2 : func (o *batchCommitOp) receiver() objID { return o.batchID }
465 2 : func (o *batchCommitOp) syncObjs() objIDSlice {
466 2 : // Synchronize on the database so that NewIters wait for the commit.
467 2 : return []objID{o.dbID}
468 2 : }
469 :
470 : // ingestOp models a DB.Ingest operation.
471 : type ingestOp struct {
472 : dbID objID
473 : batchIDs []objID
474 :
475 : derivedDBIDs []objID
476 : }
477 :
478 2 : func (o *ingestOp) run(t *test, h historyRecorder) {
479 2 : // We can only use apply as an alternative for ingestion if we are ingesting
480 2 : // a single batch. If we are ingesting multiple batches, the batches may
481 2 : // overlap which would cause ingestion to fail but apply would succeed.
482 2 : if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
483 1 : id := o.batchIDs[0]
484 1 : b := t.getBatch(id)
485 1 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
486 1 : db := t.getDB(o.dbID)
487 1 : c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
488 1 : if err == nil {
489 1 : err = db.Apply(c, t.writeOpts)
490 1 : }
491 1 : _ = b.Close()
492 1 : _ = c.Close()
493 1 : t.clearObj(id)
494 1 : h.Recordf("%s // %v", o, err)
495 1 : return
496 : }
497 :
498 2 : var paths []string
499 2 : var err error
500 2 : for i, id := range o.batchIDs {
501 2 : b := t.getBatch(id)
502 2 : t.clearObj(id)
503 2 : path, err2 := o.build(t, h, b, i)
504 2 : if err2 != nil {
505 0 : h.Recordf("Build(%s) // %v", id, err2)
506 0 : }
507 2 : err = firstError(err, err2)
508 2 : if err2 == nil {
509 2 : paths = append(paths, path)
510 2 : }
511 2 : err = firstError(err, b.Close())
512 : }
513 :
514 2 : err = firstError(err, withRetries(func() error {
515 2 : return t.getDB(o.dbID).Ingest(paths)
516 2 : }))
517 :
518 2 : h.Recordf("%s // %v", o, err)
519 : }
520 :
521 2 : func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
522 2 : path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i))
523 2 : f, err := t.opts.FS.Create(path)
524 2 : if err != nil {
525 0 : return "", err
526 0 : }
527 2 : db := t.getDB(o.dbID)
528 2 :
529 2 : iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
530 2 : defer closeIters(iter, rangeDelIter, rangeKeyIter)
531 2 :
532 2 : equal := t.opts.Comparer.Equal
533 2 : tableFormat := db.FormatMajorVersion().MaxTableFormat()
534 2 : w := sstable.NewWriter(
535 2 : objstorageprovider.NewFileWritable(f),
536 2 : t.opts.MakeWriterOptions(0, tableFormat),
537 2 : )
538 2 :
539 2 : var lastUserKey []byte
540 2 : for key, value := iter.First(); key != nil; key, value = iter.Next() {
541 2 : // Ignore duplicate keys.
542 2 : if equal(lastUserKey, key.UserKey) {
543 2 : continue
544 : }
545 : // NB: We don't have to copy the key or value since we're reading from a
546 : // batch which doesn't do prefix compression.
547 2 : lastUserKey = key.UserKey
548 2 :
549 2 : key.SetSeqNum(base.SeqNumZero)
550 2 : // It's possible that we wrote the key on a batch from a db that supported
551 2 : // DeleteSized, but are now ingesting into a db that does not. Detect
552 2 : // this case and translate the key to an InternalKeyKindDelete.
553 2 : if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) {
554 1 : value = pebble.LazyValue{}
555 1 : key.SetKind(pebble.InternalKeyKindDelete)
556 1 : }
557 2 : if err := w.Add(*key, value.InPlaceValue()); err != nil {
558 0 : return "", err
559 0 : }
560 : }
561 2 : if err := iter.Close(); err != nil {
562 0 : return "", err
563 0 : }
564 2 : iter = nil
565 2 :
566 2 : if rangeDelIter != nil {
567 2 : // NB: The range tombstones have already been fragmented by the Batch.
568 2 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
569 2 : // NB: We don't have to copy the key or value since we're reading from a
570 2 : // batch which doesn't do prefix compression.
571 2 : if err := w.DeleteRange(t.Start, t.End); err != nil {
572 0 : return "", err
573 0 : }
574 : }
575 2 : if err := rangeDelIter.Close(); err != nil {
576 0 : return "", err
577 0 : }
578 2 : rangeDelIter = nil
579 : }
580 :
581 2 : if rangeKeyIter != nil {
582 2 : for span := rangeKeyIter.First(); span != nil; span = rangeKeyIter.Next() {
583 2 : // Coalesce the keys of this span and then zero the sequence
584 2 : // numbers. This is necessary in order to make the range keys within
585 2 : // the ingested sstable internally consistent at the sequence number
586 2 : // it's ingested at. The individual keys within a batch are
587 2 : // committed at unique sequence numbers, whereas all the keys of an
588 2 : // ingested sstable are given the same sequence number. A span
589 2 : // contaning keys that both set and unset the same suffix at the
590 2 : // same sequence number is nonsensical, so we "coalesce" or collapse
591 2 : // the keys.
592 2 : collapsed := keyspan.Span{
593 2 : Start: span.Start,
594 2 : End: span.End,
595 2 : Keys: make([]keyspan.Key, 0, len(span.Keys)),
596 2 : }
597 2 : err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys)
598 2 : if err != nil {
599 0 : return "", err
600 0 : }
601 2 : for i := range collapsed.Keys {
602 2 : collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
603 2 : }
604 2 : keyspan.SortKeysByTrailer(&collapsed.Keys)
605 2 : if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
606 0 : return "", err
607 0 : }
608 : }
609 2 : if err := rangeKeyIter.Error(); err != nil {
610 0 : return "", err
611 0 : }
612 2 : if err := rangeKeyIter.Close(); err != nil {
613 0 : return "", err
614 0 : }
615 2 : rangeKeyIter = nil
616 : }
617 :
618 2 : if err := w.Close(); err != nil {
619 0 : return "", err
620 0 : }
621 2 : return path, nil
622 : }
623 :
624 2 : func (o *ingestOp) receiver() objID { return o.dbID }
625 2 : func (o *ingestOp) syncObjs() objIDSlice {
626 2 : // Ingest should not be concurrent with mutating the batches that will be
627 2 : // ingested as sstables.
628 2 : objs := make([]objID, 0, len(o.batchIDs)+1)
629 2 : objs = append(objs, o.batchIDs...)
630 2 : addedDBs := make(map[objID]struct{})
631 2 : for i := range o.derivedDBIDs {
632 2 : _, ok := addedDBs[o.derivedDBIDs[i]]
633 2 : if !ok && o.derivedDBIDs[i] != o.dbID {
634 1 : objs = append(objs, o.derivedDBIDs[i])
635 1 : addedDBs[o.derivedDBIDs[i]] = struct{}{}
636 1 : }
637 : }
638 2 : return objs
639 : }
640 :
641 : func closeIters(
642 : pointIter base.InternalIterator,
643 : rangeDelIter keyspan.FragmentIterator,
644 : rangeKeyIter keyspan.FragmentIterator,
645 2 : ) {
646 2 : if pointIter != nil {
647 2 : pointIter.Close()
648 2 : }
649 2 : if rangeDelIter != nil {
650 2 : rangeDelIter.Close()
651 2 : }
652 2 : if rangeKeyIter != nil {
653 2 : rangeKeyIter.Close()
654 2 : }
655 : }
656 :
657 : // collapseBatch collapses the mutations in a batch to be equivalent to an
658 : // sstable ingesting those mutations. Duplicate updates to a key are collapsed
659 : // so that only the latest update is performed. All range deletions are
660 : // performed first in the batch to match the semantics of ingestion where a
661 : // range deletion does not delete a point record contained in the sstable.
662 : func (o *ingestOp) collapseBatch(
663 : t *test,
664 : db *pebble.DB,
665 : pointIter base.InternalIterator,
666 : rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
667 : b *pebble.Batch,
668 1 : ) (*pebble.Batch, error) {
669 1 : defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
670 1 : equal := t.opts.Comparer.Equal
671 1 : collapsed := db.NewBatch()
672 1 :
673 1 : if rangeDelIter != nil {
674 1 : // NB: The range tombstones have already been fragmented by the Batch.
675 1 : for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
676 1 : // NB: We don't have to copy the key or value since we're reading from a
677 1 : // batch which doesn't do prefix compression.
678 1 : if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
679 0 : return nil, err
680 0 : }
681 : }
682 1 : if err := rangeDelIter.Close(); err != nil {
683 0 : return nil, err
684 0 : }
685 1 : rangeDelIter = nil
686 : }
687 :
688 1 : if pointIter != nil {
689 1 : var lastUserKey []byte
690 1 : for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
691 1 : // Ignore duplicate keys.
692 1 : //
693 1 : // Note: this is necessary due to MERGE keys, otherwise it would be
694 1 : // fine to include all the keys in the batch and let the normal
695 1 : // sequence number precedence determine which of the keys "wins".
696 1 : // But the code to build the ingested sstable will only keep the
697 1 : // most recent internal key and will not merge across internal keys.
698 1 : if equal(lastUserKey, key.UserKey) {
699 1 : continue
700 : }
701 : // NB: We don't have to copy the key or value since we're reading from a
702 : // batch which doesn't do prefix compression.
703 1 : lastUserKey = key.UserKey
704 1 :
705 1 : var err error
706 1 : switch key.Kind() {
707 1 : case pebble.InternalKeyKindDelete:
708 1 : err = collapsed.Delete(key.UserKey, nil)
709 1 : case pebble.InternalKeyKindDeleteSized:
710 1 : v, _ := binary.Uvarint(value.InPlaceValue())
711 1 : // Batch.DeleteSized takes just the length of the value being
712 1 : // deleted and adds the key's length to derive the overall entry
713 1 : // size of the value being deleted. This has already been done
714 1 : // to the key we're reading from the batch, so we must subtract
715 1 : // the key length from the encoded value before calling
716 1 : // collapsed.DeleteSized, which will again add the key length
717 1 : // before encoding.
718 1 : err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
719 1 : case pebble.InternalKeyKindSingleDelete:
720 1 : err = collapsed.SingleDelete(key.UserKey, nil)
721 1 : case pebble.InternalKeyKindSet:
722 1 : err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
723 1 : case pebble.InternalKeyKindMerge:
724 1 : err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
725 0 : case pebble.InternalKeyKindLogData:
726 0 : err = collapsed.LogData(key.UserKey, nil)
727 0 : default:
728 0 : err = errors.Errorf("unknown batch record kind: %d", key.Kind())
729 : }
730 1 : if err != nil {
731 0 : return nil, err
732 0 : }
733 : }
734 1 : if err := pointIter.Close(); err != nil {
735 0 : return nil, err
736 0 : }
737 1 : pointIter = nil
738 : }
739 :
740 : // There's no equivalent of a MERGE operator for range keys, so there's no
741 : // need to collapse the range keys here. Rather than reading the range keys
742 : // from `rangeKeyIter`, which will already be fragmented, read the range
743 : // keys from the batch and copy them verbatim. This marginally improves our
744 : // test coverage over the alternative approach of pre-fragmenting and
745 : // pre-coalescing before writing to the batch.
746 : //
747 : // The `rangeKeyIter` is used only to determine if there are any range keys
748 : // in the batch at all, and only because we already have it handy from
749 : // private.BatchSort.
750 1 : if rangeKeyIter != nil {
751 1 : for r := b.Reader(); ; {
752 1 : kind, key, value, ok := r.Next()
753 1 : if !ok {
754 1 : break
755 1 : } else if !rangekey.IsRangeKey(kind) {
756 1 : continue
757 : }
758 1 : ik := base.MakeInternalKey(key, 0, kind)
759 1 : if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
760 0 : return nil, err
761 0 : }
762 : }
763 1 : if err := rangeKeyIter.Close(); err != nil {
764 0 : return nil, err
765 0 : }
766 1 : rangeKeyIter = nil
767 : }
768 :
769 1 : return collapsed, nil
770 : }
771 :
772 2 : func (o *ingestOp) String() string {
773 2 : var buf strings.Builder
774 2 : buf.WriteString(o.dbID.String())
775 2 : buf.WriteString(".Ingest(")
776 2 : for i, id := range o.batchIDs {
777 2 : if i > 0 {
778 2 : buf.WriteString(", ")
779 2 : }
780 2 : buf.WriteString(id.String())
781 : }
782 2 : buf.WriteString(")")
783 2 : return buf.String()
784 : }
785 :
786 : // getOp models a Reader.Get operation.
787 : type getOp struct {
788 : readerID objID
789 : key []byte
790 : derivedDBID objID
791 : }
792 :
793 2 : func (o *getOp) run(t *test, h historyRecorder) {
794 2 : r := t.getReader(o.readerID)
795 2 : var val []byte
796 2 : var closer io.Closer
797 2 : err := withRetries(func() (err error) {
798 2 : val, closer, err = r.Get(o.key)
799 2 : return err
800 2 : })
801 2 : h.Recordf("%s // [%q] %v", o, val, err)
802 2 : if closer != nil {
803 2 : closer.Close()
804 2 : }
805 : }
806 :
807 2 : func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
808 2 : func (o *getOp) receiver() objID { return o.readerID }
809 2 : func (o *getOp) syncObjs() objIDSlice {
810 2 : if o.readerID.tag() == dbTag {
811 2 : return nil
812 2 : }
813 : // batch.Get reads through to the current database state.
814 2 : if o.derivedDBID != 0 {
815 2 : return []objID{o.derivedDBID}
816 2 : }
817 0 : return nil
818 : }
819 :
820 : // newIterOp models a Reader.NewIter operation.
821 : type newIterOp struct {
822 : readerID objID
823 : iterID objID
824 : iterOpts
825 : derivedDBID objID
826 : }
827 :
828 2 : func (o *newIterOp) run(t *test, h historyRecorder) {
829 2 : r := t.getReader(o.readerID)
830 2 : opts := iterOptions(o.iterOpts)
831 2 :
832 2 : var i *pebble.Iterator
833 2 : for {
834 2 : i, _ = r.NewIter(opts)
835 2 : if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
836 2 : break
837 : }
838 : // close this iter and retry NewIter
839 0 : _ = i.Close()
840 : }
841 2 : t.setIter(o.iterID, i)
842 2 :
843 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
844 2 : // the user-provided bounds.
845 2 : if opts != nil {
846 2 : rand.Read(opts.LowerBound[:])
847 2 : rand.Read(opts.UpperBound[:])
848 2 : }
849 2 : h.Recordf("%s // %v", o, i.Error())
850 : }
851 :
852 2 : func (o *newIterOp) String() string {
853 2 : return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
854 2 : o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
855 2 : }
856 :
857 2 : func (o *newIterOp) receiver() objID { return o.readerID }
858 2 : func (o *newIterOp) syncObjs() objIDSlice {
859 2 : // Prevent o.iterID ops from running before it exists.
860 2 : objs := []objID{o.iterID}
861 2 : // If reading through a batch or snapshot, the new iterator will also observe database
862 2 : // state, and we must synchronize on the database state for a consistent
863 2 : // view.
864 2 : if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
865 2 : objs = append(objs, o.derivedDBID)
866 2 : }
867 2 : return objs
868 : }
869 :
870 : // newIterUsingCloneOp models a Iterator.Clone operation.
871 : type newIterUsingCloneOp struct {
872 : existingIterID objID
873 : iterID objID
874 : refreshBatch bool
875 : iterOpts
876 :
877 : // derivedReaderID is the ID of the underlying reader that backs both the
878 : // existing iterator and the new iterator. The derivedReaderID is NOT
879 : // serialized by String and is derived from other operations during parse.
880 : derivedReaderID objID
881 : }
882 :
883 2 : func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
884 2 : iter := t.getIter(o.existingIterID)
885 2 : cloneOpts := pebble.CloneOptions{
886 2 : IterOptions: iterOptions(o.iterOpts),
887 2 : RefreshBatchView: o.refreshBatch,
888 2 : }
889 2 : i, err := iter.iter.Clone(cloneOpts)
890 2 : if err != nil {
891 0 : panic(err)
892 : }
893 2 : t.setIter(o.iterID, i)
894 2 : h.Recordf("%s // %v", o, i.Error())
895 : }
896 :
897 2 : func (o *newIterUsingCloneOp) String() string {
898 2 : return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
899 2 : o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
900 2 : o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
901 2 : }
902 :
903 2 : func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
904 :
905 2 : func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
906 2 : objIDs := []objID{o.iterID}
907 2 : // If the underlying reader is a batch, we must synchronize with the batch.
908 2 : // If refreshBatch=true, synchronizing is necessary to observe all the
909 2 : // mutations up to until this op and no more. Even when refreshBatch=false,
910 2 : // we must synchronize because iterator construction may access state cached
911 2 : // on the indexed batch to avoid refragmenting range tombstones or range
912 2 : // keys.
913 2 : if o.derivedReaderID.tag() == batchTag {
914 1 : objIDs = append(objIDs, o.derivedReaderID)
915 1 : }
916 2 : return objIDs
917 : }
918 :
919 : // iterSetBoundsOp models an Iterator.SetBounds operation.
920 : type iterSetBoundsOp struct {
921 : iterID objID
922 : lower []byte
923 : upper []byte
924 : }
925 :
926 2 : func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
927 2 : i := t.getIter(o.iterID)
928 2 : var lower, upper []byte
929 2 : if o.lower != nil {
930 2 : lower = append(lower, o.lower...)
931 2 : }
932 2 : if o.upper != nil {
933 2 : upper = append(upper, o.upper...)
934 2 : }
935 2 : i.SetBounds(lower, upper)
936 2 :
937 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
938 2 : // the user-provided bounds.
939 2 : rand.Read(lower[:])
940 2 : rand.Read(upper[:])
941 2 :
942 2 : h.Recordf("%s // %v", o, i.Error())
943 : }
944 :
945 2 : func (o *iterSetBoundsOp) String() string {
946 2 : return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
947 2 : }
948 :
949 2 : func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
950 2 : func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
951 :
952 : // iterSetOptionsOp models an Iterator.SetOptions operation.
953 : type iterSetOptionsOp struct {
954 : iterID objID
955 : iterOpts
956 :
957 : // derivedReaderID is the ID of the underlying reader that backs the
958 : // iterator. The derivedReaderID is NOT serialized by String and is derived
959 : // from other operations during parse.
960 : derivedReaderID objID
961 : }
962 :
963 2 : func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
964 2 : i := t.getIter(o.iterID)
965 2 :
966 2 : opts := iterOptions(o.iterOpts)
967 2 : if opts == nil {
968 1 : opts = &pebble.IterOptions{}
969 1 : }
970 2 : i.SetOptions(opts)
971 2 :
972 2 : // Trash the bounds to ensure that Pebble doesn't rely on the stability of
973 2 : // the user-provided bounds.
974 2 : rand.Read(opts.LowerBound[:])
975 2 : rand.Read(opts.UpperBound[:])
976 2 :
977 2 : h.Recordf("%s // %v", o, i.Error())
978 : }
979 :
980 2 : func (o *iterSetOptionsOp) String() string {
981 2 : return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
982 2 : o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
983 2 : }
984 :
985 2 : func iterOptions(o iterOpts) *pebble.IterOptions {
986 2 : if o.IsZero() {
987 2 : return nil
988 2 : }
989 2 : var lower, upper []byte
990 2 : if o.lower != nil {
991 2 : lower = append(lower, o.lower...)
992 2 : }
993 2 : if o.upper != nil {
994 2 : upper = append(upper, o.upper...)
995 2 : }
996 2 : opts := &pebble.IterOptions{
997 2 : LowerBound: lower,
998 2 : UpperBound: upper,
999 2 : KeyTypes: pebble.IterKeyType(o.keyTypes),
1000 2 : RangeKeyMasking: pebble.RangeKeyMasking{
1001 2 : Suffix: o.maskSuffix,
1002 2 : },
1003 2 : UseL6Filters: o.useL6Filters,
1004 2 : }
1005 2 : if opts.RangeKeyMasking.Suffix != nil {
1006 2 : opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
1007 2 : return sstable.NewTestKeysMaskingFilter()
1008 2 : }
1009 : }
1010 2 : if o.filterMax > 0 {
1011 2 : opts.PointKeyFilters = []pebble.BlockPropertyFilter{
1012 2 : sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
1013 2 : }
1014 2 : // Enforce the timestamp bounds in SkipPoint, so that the iterator never
1015 2 : // returns a key outside the filterMin, filterMax bounds. This provides
1016 2 : // deterministic iteration.
1017 2 : opts.SkipPoint = func(k []byte) (skip bool) {
1018 2 : n := testkeys.Comparer.Split(k)
1019 2 : if n == len(k) {
1020 2 : // No suffix, don't skip it.
1021 2 : return false
1022 2 : }
1023 2 : v, err := testkeys.ParseSuffix(k[n:])
1024 2 : if err != nil {
1025 0 : panic(err)
1026 : }
1027 2 : ts := uint64(v)
1028 2 : return ts < o.filterMin || ts >= o.filterMax
1029 : }
1030 : }
1031 2 : return opts
1032 : }
1033 :
1034 2 : func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
1035 :
1036 2 : func (o *iterSetOptionsOp) syncObjs() objIDSlice {
1037 2 : if o.derivedReaderID.tag() == batchTag {
1038 1 : // If the underlying reader is a batch, we must synchronize with the
1039 1 : // batch so that we observe all the mutations up until this operation
1040 1 : // and no more.
1041 1 : return []objID{o.derivedReaderID}
1042 1 : }
1043 2 : return nil
1044 : }
1045 :
1046 : // iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
1047 : type iterSeekGEOp struct {
1048 : iterID objID
1049 : key []byte
1050 : limit []byte
1051 :
1052 : derivedReaderID objID
1053 : }
1054 :
1055 2 : func iteratorPos(i *retryableIter) string {
1056 2 : var buf bytes.Buffer
1057 2 : fmt.Fprintf(&buf, "%q", i.Key())
1058 2 : hasPoint, hasRange := i.HasPointAndRange()
1059 2 : if hasPoint {
1060 2 : fmt.Fprintf(&buf, ",%q", i.Value())
1061 2 : } else {
1062 2 : fmt.Fprint(&buf, ",<no point>")
1063 2 : }
1064 2 : if hasRange {
1065 2 : start, end := i.RangeBounds()
1066 2 : fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
1067 2 : for i, rk := range i.RangeKeys() {
1068 2 : if i > 0 {
1069 2 : fmt.Fprint(&buf, ",")
1070 2 : }
1071 2 : fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
1072 : }
1073 2 : fmt.Fprint(&buf, "}")
1074 2 : } else {
1075 2 : fmt.Fprint(&buf, ",<no range>")
1076 2 : }
1077 2 : if i.RangeKeyChanged() {
1078 2 : fmt.Fprint(&buf, "*")
1079 2 : }
1080 2 : return buf.String()
1081 : }
1082 :
1083 2 : func validBoolToStr(valid bool) string {
1084 2 : return fmt.Sprintf("%t", valid)
1085 2 : }
1086 :
1087 2 : func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
1088 2 : // We can't distinguish between IterExhausted and IterAtLimit in a
1089 2 : // deterministic manner.
1090 2 : switch validity {
1091 2 : case pebble.IterExhausted, pebble.IterAtLimit:
1092 2 : return false, "invalid"
1093 2 : case pebble.IterValid:
1094 2 : return true, "valid"
1095 0 : default:
1096 0 : panic("unknown validity")
1097 : }
1098 : }
1099 :
1100 2 : func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
1101 2 : i := t.getIter(o.iterID)
1102 2 : var valid bool
1103 2 : var validStr string
1104 2 : if o.limit == nil {
1105 2 : valid = i.SeekGE(o.key)
1106 2 : validStr = validBoolToStr(valid)
1107 2 : } else {
1108 2 : valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
1109 2 : }
1110 2 : if valid {
1111 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1112 2 : } else {
1113 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1114 2 : }
1115 : }
1116 :
1117 2 : func (o *iterSeekGEOp) String() string {
1118 2 : return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
1119 2 : }
1120 2 : func (o *iterSeekGEOp) receiver() objID { return o.iterID }
1121 2 : func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1122 :
1123 2 : func onlyBatchIDs(ids ...objID) objIDSlice {
1124 2 : var ret objIDSlice
1125 2 : for _, id := range ids {
1126 2 : if id.tag() == batchTag {
1127 2 : ret = append(ret, id)
1128 2 : }
1129 : }
1130 2 : return ret
1131 : }
1132 :
1133 : // iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
1134 : type iterSeekPrefixGEOp struct {
1135 : iterID objID
1136 : key []byte
1137 :
1138 : derivedReaderID objID
1139 : }
1140 :
1141 2 : func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
1142 2 : i := t.getIter(o.iterID)
1143 2 : valid := i.SeekPrefixGE(o.key)
1144 2 : if valid {
1145 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1146 2 : } else {
1147 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1148 2 : }
1149 : }
1150 :
1151 2 : func (o *iterSeekPrefixGEOp) String() string {
1152 2 : return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
1153 2 : }
1154 2 : func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
1155 2 : func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1156 :
1157 : // iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
1158 : type iterSeekLTOp struct {
1159 : iterID objID
1160 : key []byte
1161 : limit []byte
1162 :
1163 : derivedReaderID objID
1164 : }
1165 :
1166 2 : func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
1167 2 : i := t.getIter(o.iterID)
1168 2 : var valid bool
1169 2 : var validStr string
1170 2 : if o.limit == nil {
1171 2 : valid = i.SeekLT(o.key)
1172 2 : validStr = validBoolToStr(valid)
1173 2 : } else {
1174 2 : valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
1175 2 : }
1176 2 : if valid {
1177 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1178 2 : } else {
1179 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1180 2 : }
1181 : }
1182 :
1183 2 : func (o *iterSeekLTOp) String() string {
1184 2 : return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
1185 2 : }
1186 :
1187 2 : func (o *iterSeekLTOp) receiver() objID { return o.iterID }
1188 2 : func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1189 :
1190 : // iterFirstOp models an Iterator.First operation.
1191 : type iterFirstOp struct {
1192 : iterID objID
1193 :
1194 : derivedReaderID objID
1195 : }
1196 :
1197 2 : func (o *iterFirstOp) run(t *test, h historyRecorder) {
1198 2 : i := t.getIter(o.iterID)
1199 2 : valid := i.First()
1200 2 : if valid {
1201 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1202 2 : } else {
1203 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1204 2 : }
1205 : }
1206 :
1207 2 : func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
1208 2 : func (o *iterFirstOp) receiver() objID { return o.iterID }
1209 2 : func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1210 :
1211 : // iterLastOp models an Iterator.Last operation.
1212 : type iterLastOp struct {
1213 : iterID objID
1214 :
1215 : derivedReaderID objID
1216 : }
1217 :
1218 2 : func (o *iterLastOp) run(t *test, h historyRecorder) {
1219 2 : i := t.getIter(o.iterID)
1220 2 : valid := i.Last()
1221 2 : if valid {
1222 2 : h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
1223 2 : } else {
1224 2 : h.Recordf("%s // [%t] %v", o, valid, i.Error())
1225 2 : }
1226 : }
1227 :
1228 2 : func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
1229 2 : func (o *iterLastOp) receiver() objID { return o.iterID }
1230 2 : func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1231 :
1232 : // iterNextOp models an Iterator.Next[WithLimit] operation.
1233 : type iterNextOp struct {
1234 : iterID objID
1235 : limit []byte
1236 :
1237 : derivedReaderID objID
1238 : }
1239 :
1240 2 : func (o *iterNextOp) run(t *test, h historyRecorder) {
1241 2 : i := t.getIter(o.iterID)
1242 2 : var valid bool
1243 2 : var validStr string
1244 2 : if o.limit == nil {
1245 2 : valid = i.Next()
1246 2 : validStr = validBoolToStr(valid)
1247 2 : } else {
1248 2 : valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
1249 2 : }
1250 2 : if valid {
1251 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1252 2 : } else {
1253 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1254 2 : }
1255 : }
1256 :
1257 2 : func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
1258 2 : func (o *iterNextOp) receiver() objID { return o.iterID }
1259 2 : func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1260 :
1261 : // iterNextPrefixOp models an Iterator.NextPrefix operation.
1262 : type iterNextPrefixOp struct {
1263 : iterID objID
1264 :
1265 : derivedReaderID objID
1266 : }
1267 :
1268 2 : func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
1269 2 : i := t.getIter(o.iterID)
1270 2 : valid := i.NextPrefix()
1271 2 : validStr := validBoolToStr(valid)
1272 2 : if valid {
1273 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1274 2 : } else {
1275 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1276 2 : }
1277 : }
1278 :
1279 2 : func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
1280 2 : func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
1281 2 : func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1282 :
1283 : // iterPrevOp models an Iterator.Prev[WithLimit] operation.
1284 : type iterPrevOp struct {
1285 : iterID objID
1286 : limit []byte
1287 :
1288 : derivedReaderID objID
1289 : }
1290 :
1291 2 : func (o *iterPrevOp) run(t *test, h historyRecorder) {
1292 2 : i := t.getIter(o.iterID)
1293 2 : var valid bool
1294 2 : var validStr string
1295 2 : if o.limit == nil {
1296 2 : valid = i.Prev()
1297 2 : validStr = validBoolToStr(valid)
1298 2 : } else {
1299 2 : valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
1300 2 : }
1301 2 : if valid {
1302 2 : h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
1303 2 : } else {
1304 2 : h.Recordf("%s // [%s] %v", o, validStr, i.Error())
1305 2 : }
1306 : }
1307 :
1308 2 : func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
1309 2 : func (o *iterPrevOp) receiver() objID { return o.iterID }
1310 2 : func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
1311 :
1312 : // newSnapshotOp models a DB.NewSnapshot operation.
1313 : type newSnapshotOp struct {
1314 : dbID objID
1315 : snapID objID
1316 : // If nonempty, this snapshot must not be used to read any keys outside of
1317 : // the provided bounds. This allows some implementations to use 'Eventually
1318 : // file-only snapshots,' which require bounds.
1319 : bounds []pebble.KeyRange
1320 : }
1321 :
1322 2 : func (o *newSnapshotOp) run(t *test, h historyRecorder) {
1323 2 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
1324 2 : if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) {
1325 1 : s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds)
1326 1 : t.setSnapshot(o.snapID, s)
1327 2 : } else {
1328 2 : s := t.getDB(o.dbID).NewSnapshot()
1329 2 : t.setSnapshot(o.snapID, s)
1330 2 : }
1331 2 : h.Recordf("%s", o)
1332 : }
1333 :
1334 2 : func (o *newSnapshotOp) String() string {
1335 2 : var buf bytes.Buffer
1336 2 : fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
1337 2 : for i := range o.bounds {
1338 2 : if i > 0 {
1339 2 : fmt.Fprint(&buf, ", ")
1340 2 : }
1341 2 : fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
1342 : }
1343 2 : fmt.Fprint(&buf, ")")
1344 2 : return buf.String()
1345 : }
1346 2 : func (o *newSnapshotOp) receiver() objID { return o.dbID }
1347 2 : func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
1348 :
1349 : type dbRatchetFormatMajorVersionOp struct {
1350 : dbID objID
1351 : vers pebble.FormatMajorVersion
1352 : }
1353 :
1354 2 : func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
1355 2 : var err error
1356 2 : // NB: We no-op the operation if we're already at or above the provided
1357 2 : // format major version. Different runs start at different format major
1358 2 : // versions, making the presence of an error and the error message itself
1359 2 : // non-deterministic if we attempt to upgrade to an older version.
1360 2 : //
1361 2 : //Regardless, subsequent operations should behave identically, which is what
1362 2 : //we're really aiming to test by including this format major version ratchet
1363 2 : //operation.
1364 2 : if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
1365 2 : err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
1366 2 : }
1367 2 : h.Recordf("%s // %v", o, err)
1368 : }
1369 :
1370 2 : func (o *dbRatchetFormatMajorVersionOp) String() string {
1371 2 : return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
1372 2 : }
1373 2 : func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID }
1374 2 : func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
1375 :
1376 : type dbRestartOp struct {
1377 : dbID objID
1378 : }
1379 :
1380 2 : func (o *dbRestartOp) run(t *test, h historyRecorder) {
1381 2 : if err := t.restartDB(o.dbID); err != nil {
1382 0 : h.Recordf("%s // %v", o, err)
1383 0 : h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
1384 2 : } else {
1385 2 : h.Recordf("%s", o)
1386 2 : }
1387 : }
1388 :
1389 2 : func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) }
1390 2 : func (o *dbRestartOp) receiver() objID { return o.dbID }
1391 2 : func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
1392 :
1393 1 : func formatOps(ops []op) string {
1394 1 : var buf strings.Builder
1395 1 : for _, op := range ops {
1396 1 : fmt.Fprintf(&buf, "%s\n", op)
1397 1 : }
1398 1 : return buf.String()
1399 : }
1400 :
1401 : // replicateOp models an operation that could copy keys from one db to
1402 : // another through either an IngestAndExcise, or an Ingest.
1403 : type replicateOp struct {
1404 : source, dest objID
1405 : start, end []byte
1406 : }
1407 :
1408 : func (r *replicateOp) runSharedReplicate(
1409 : t *test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
1410 0 : ) {
1411 0 : var sharedSSTs []pebble.SharedSSTMeta
1412 0 : var err error
1413 0 : err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
1414 0 : func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
1415 0 : val, _, err := value.Value(nil)
1416 0 : if err != nil {
1417 0 : panic(err)
1418 : }
1419 0 : return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
1420 : },
1421 0 : func(start, end []byte, seqNum uint64) error {
1422 0 : return w.DeleteRange(start, end)
1423 0 : },
1424 0 : func(start, end []byte, keys []keyspan.Key) error {
1425 0 : s := keyspan.Span{
1426 0 : Start: start,
1427 0 : End: end,
1428 0 : Keys: keys,
1429 0 : KeysOrder: 0,
1430 0 : }
1431 0 : return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
1432 0 : return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
1433 0 : })
1434 : },
1435 0 : func(sst *pebble.SharedSSTMeta) error {
1436 0 : sharedSSTs = append(sharedSSTs, *sst)
1437 0 : return nil
1438 0 : },
1439 : )
1440 0 : if err != nil {
1441 0 : h.Recordf("%s // %v", r, err)
1442 0 : return
1443 0 : }
1444 :
1445 0 : _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
1446 0 : h.Recordf("%s // %v", r, err)
1447 : }
1448 :
1449 1 : func (r *replicateOp) run(t *test, h historyRecorder) {
1450 1 : // Shared replication only works if shared storage is enabled.
1451 1 : useSharedIngest := t.testOpts.useSharedReplicate
1452 1 : if !t.testOpts.sharedStorageEnabled {
1453 1 : useSharedIngest = false
1454 1 : }
1455 :
1456 1 : source := t.getDB(r.source)
1457 1 : dest := t.getDB(r.dest)
1458 1 : sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
1459 1 : f, err := t.opts.FS.Create(sstPath)
1460 1 : if err != nil {
1461 0 : h.Recordf("%s // %v", r, err)
1462 0 : return
1463 0 : }
1464 1 : w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
1465 1 :
1466 1 : if useSharedIngest {
1467 0 : r.runSharedReplicate(t, h, source, dest, w, sstPath)
1468 0 : return
1469 0 : }
1470 :
1471 1 : iter, err := source.NewIter(&pebble.IterOptions{
1472 1 : LowerBound: r.start,
1473 1 : UpperBound: r.end,
1474 1 : KeyTypes: pebble.IterKeyTypePointsAndRanges,
1475 1 : })
1476 1 : if err != nil {
1477 0 : panic(err)
1478 : }
1479 1 : defer iter.Close()
1480 1 :
1481 1 : // Write rangedels and rangekeydels for the range. This mimics the Excise
1482 1 : // that runSharedReplicate would do.
1483 1 : if err := w.DeleteRange(r.start, r.end); err != nil {
1484 0 : panic(err)
1485 : }
1486 1 : if err := w.RangeKeyDelete(r.start, r.end); err != nil {
1487 0 : panic(err)
1488 : }
1489 :
1490 1 : for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() {
1491 0 : hasPoint, hasRange := iter.HasPointAndRange()
1492 0 : if hasPoint {
1493 0 : val, err := iter.ValueAndErr()
1494 0 : if err != nil {
1495 0 : panic(err)
1496 : }
1497 0 : if err := w.Set(iter.Key(), val); err != nil {
1498 0 : panic(err)
1499 : }
1500 : }
1501 0 : if hasRange && iter.RangeKeyChanged() {
1502 0 : rangeKeys := iter.RangeKeys()
1503 0 : rkStart, rkEnd := iter.RangeBounds()
1504 0 : for i := range rangeKeys {
1505 0 : if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil {
1506 0 : panic(err)
1507 : }
1508 : }
1509 : }
1510 : }
1511 1 : if err := w.Close(); err != nil {
1512 0 : panic(err)
1513 : }
1514 :
1515 1 : err = dest.Ingest([]string{sstPath})
1516 1 : h.Recordf("%s // %v", r, err)
1517 : }
1518 :
1519 2 : func (r *replicateOp) String() string {
1520 2 : return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
1521 2 : }
1522 :
1523 1 : func (r *replicateOp) receiver() objID { return r.source }
1524 1 : func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
|