Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : // Package metamorphic provides a testing framework for running randomized tests
6 : // over multiple Pebble databases with varying configurations. Logically
7 : // equivalent operations should result in equivalent output across all
8 : // configurations.
9 : package metamorphic
10 :
11 : import (
12 : "context"
13 : "fmt"
14 : "io"
15 : "math/rand/v2"
16 : "os"
17 : "os/exec"
18 : "path"
19 : "path/filepath"
20 : "regexp"
21 : "sort"
22 : "strconv"
23 : "testing"
24 : "time"
25 :
26 : "github.com/cockroachdb/errors"
27 : "github.com/cockroachdb/pebble/internal/base"
28 : "github.com/cockroachdb/pebble/internal/dsl"
29 : "github.com/cockroachdb/pebble/internal/randvar"
30 : "github.com/cockroachdb/pebble/vfs"
31 : "github.com/cockroachdb/pebble/vfs/errorfs"
32 : "github.com/pmezard/go-difflib/difflib"
33 : "github.com/stretchr/testify/require"
34 : "golang.org/x/sync/errgroup"
35 : )
36 :
37 : type runAndCompareOptions struct {
38 : seed uint64
39 : ops randvar.Static
40 : previousOpsPath string
41 : initialStatePath string
42 : initialStateDesc string
43 : traceFile string
44 : innerBinary string
45 : mutateTestOptions []func(*TestOptions)
46 : customRuns map[string]string
47 : numInstances int
48 : runOnceOptions
49 : }
50 :
51 : // A RunOption configures the behavior of RunAndCompare.
52 : type RunOption interface {
53 : apply(*runAndCompareOptions)
54 : }
55 :
56 : // Seed configures generation to use the provided seed. Seed may be used to
57 : // deterministically reproduce the same run.
58 : type Seed uint64
59 :
60 0 : func (s Seed) apply(ro *runAndCompareOptions) { ro.seed = uint64(s) }
61 :
62 : // ExtendPreviousRun configures RunAndCompare to use the output of a previous
63 : // metamorphic test run to seed the this run. It's used in the crossversion
64 : // metamorphic tests, in which a data directory is upgraded through multiple
65 : // versions of Pebble, exercising upgrade code paths and cross-version
66 : // compatibility.
67 : //
68 : // The opsPath should be the filesystem path to the ops file containing the
69 : // operations run within the previous iteration of the metamorphic test. It's
70 : // used to inform operation generation to prefer using keys used in the previous
71 : // run, which are therefore more likely to be "interesting."
72 : //
73 : // The initialStatePath argument should be the filesystem path to the data
74 : // directory containing the database where the previous run of the metamorphic
75 : // test left off.
76 : //
77 : // The initialStateDesc argument is presentational and should hold a
78 : // human-readable description of the initial state.
79 0 : func ExtendPreviousRun(opsPath, initialStatePath, initialStateDesc string) RunOption {
80 0 : return closureOpt(func(ro *runAndCompareOptions) {
81 0 : ro.previousOpsPath = opsPath
82 0 : ro.initialStatePath = initialStatePath
83 0 : ro.initialStateDesc = initialStateDesc
84 0 : })
85 : }
86 :
87 : var (
88 : // UseDisk configures RunAndCompare to use the physical filesystem for all
89 : // generated runs.
90 0 : UseDisk = closureOpt(func(ro *runAndCompareOptions) {
91 0 : ro.mutateTestOptions = append(ro.mutateTestOptions, func(to *TestOptions) { to.useDisk = true })
92 : })
93 : // UseInMemory configures RunAndCompare to use an in-memory virtual
94 : // filesystem for all generated runs.
95 0 : UseInMemory = closureOpt(func(ro *runAndCompareOptions) {
96 0 : ro.mutateTestOptions = append(ro.mutateTestOptions, func(to *TestOptions) { to.useDisk = false })
97 : })
98 : )
99 :
100 : // OpCount configures the random variable for the number of operations to
101 : // generate.
102 0 : func OpCount(rv randvar.Static) RunOption {
103 0 : return closureOpt(func(ro *runAndCompareOptions) { ro.ops = rv })
104 : }
105 :
106 : // RuntimeTrace configures each test run to collect a runtime trace and output
107 : // it with the provided filename.
108 0 : func RuntimeTrace(name string) RunOption {
109 0 : return closureOpt(func(ro *runAndCompareOptions) { ro.traceFile = name })
110 : }
111 :
112 : // InnerBinary configures the binary that is called for each run. If not
113 : // specified, this binary (os.Args[0]) is called.
114 0 : func InnerBinary(path string) RunOption {
115 0 : return closureOpt(func(ro *runAndCompareOptions) { ro.innerBinary = path })
116 : }
117 :
118 : // ParseCustomTestOption adds support for parsing the provided CustomOption from
119 : // OPTIONS files serialized by the metamorphic tests. This RunOption alone does
120 : // not cause the metamorphic tests to run with any variant of the provided
121 : // CustomOption set.
122 0 : func ParseCustomTestOption(name string, parseFn func(value string) (CustomOption, bool)) RunOption {
123 0 : return closureOpt(func(ro *runAndCompareOptions) { ro.customOptionParsers[name] = parseFn })
124 : }
125 :
126 : // AddCustomRun adds an additional run of the metamorphic tests, using the
127 : // provided OPTIONS file contents. The default options will be used, except
128 : // those options that are overriden by the provided OPTIONS string.
129 0 : func AddCustomRun(name string, serializedOptions string) RunOption {
130 0 : return closureOpt(func(ro *runAndCompareOptions) { ro.customRuns[name] = serializedOptions })
131 : }
132 :
133 : type closureOpt func(*runAndCompareOptions)
134 :
135 0 : func (f closureOpt) apply(ro *runAndCompareOptions) { f(ro) }
136 :
137 0 : func buildRunAndCompareOpts(rOpts []RunOption) runAndCompareOptions {
138 0 : runOpts := runAndCompareOptions{
139 0 : ops: randvar.NewUniform(1000, 10000),
140 0 : customRuns: map[string]string{},
141 0 : runOnceOptions: runOnceOptions{
142 0 : customOptionParsers: map[string]func(string) (CustomOption, bool){},
143 0 : },
144 0 : }
145 0 : for _, o := range rOpts {
146 0 : o.apply(&runOpts)
147 0 : }
148 0 : if runOpts.seed == 0 {
149 0 : runOpts.seed = uint64(time.Now().UnixNano())
150 0 : }
151 0 : return runOpts
152 : }
153 :
154 : // RunAndCompare runs the metamorphic tests, using the provided root directory
155 : // to hold test data.
156 0 : func RunAndCompare(t *testing.T, rootDir string, rOpts ...RunOption) {
157 0 : runOpts := buildRunAndCompareOpts(rOpts)
158 0 : require.NoError(t, os.MkdirAll(rootDir, 0755))
159 0 : metaDir, err := os.MkdirTemp(rootDir, time.Now().Format("060102-150405.000"))
160 0 : require.NoError(t, err)
161 0 : require.NoError(t, os.MkdirAll(metaDir, 0755))
162 0 : defer func() {
163 0 : if !t.Failed() && !runOpts.keep {
164 0 : _ = os.RemoveAll(metaDir)
165 0 : }
166 : }()
167 :
168 0 : topLevelTestName := t.Name()
169 0 : for path.Dir(topLevelTestName) != "." {
170 0 : topLevelTestName = path.Dir(topLevelTestName)
171 0 : }
172 :
173 0 : rng := rand.New(rand.NewPCG(0, runOpts.seed))
174 0 : opCount := runOpts.ops.Uint64(rng)
175 0 :
176 0 : // Generate a new set of random ops, writing them to <dir>/ops. These will be
177 0 : // read by the child processes when performing a test run.
178 0 : km := newKeyManager(runOpts.numInstances)
179 0 : cfg := presetConfigs[rng.IntN(len(presetConfigs))]
180 0 : if runOpts.numInstances > 1 {
181 0 : // The multi-instance variant does not support all operations yet.
182 0 : //
183 0 : // TODO(bilal): Address this and use the default configs.
184 0 : cfg = multiInstancePresetConfig
185 0 : cfg.numInstances = runOpts.numInstances
186 0 : }
187 0 : g := newGenerator(rng, cfg, km)
188 0 : if runOpts.previousOpsPath != "" {
189 0 : // During cross-version testing, we load keys from an `ops` file
190 0 : // produced by a metamorphic test run of an earlier Pebble version.
191 0 : // Seeding the keys ensure we generate interesting operations, including
192 0 : // ones with key shadowing, merging, etc.
193 0 : opsPath := filepath.Join(filepath.Dir(filepath.Clean(runOpts.previousOpsPath)), "ops")
194 0 : opsData, err := os.ReadFile(opsPath)
195 0 : require.NoError(t, err)
196 0 : ops, err := parse(opsData, parserOpts{})
197 0 : require.NoError(t, err)
198 0 : loadPrecedingKeys(ops, g.keyGenerator, km)
199 0 : }
200 :
201 0 : ops := g.generate(opCount)
202 0 : opsPath := filepath.Join(metaDir, "ops")
203 0 : formattedOps := formatOps(km.kf, ops)
204 0 : require.NoError(t, os.WriteFile(opsPath, []byte(formattedOps), 0644))
205 0 :
206 0 : // runOptions performs a particular test run with the specified options. The
207 0 : // options are written to <run-dir>/OPTIONS and a child process is created to
208 0 : // actually execute the test.
209 0 : runOptions := func(t *testing.T, opts *TestOptions) {
210 0 : if opts.Opts.Cache != nil {
211 0 : defer opts.Opts.Cache.Unref()
212 0 : }
213 0 : for _, fn := range runOpts.mutateTestOptions {
214 0 : fn(opts)
215 0 : }
216 0 : runDir := filepath.Join(metaDir, path.Base(t.Name()))
217 0 : require.NoError(t, os.MkdirAll(runDir, 0755))
218 0 :
219 0 : optionsPath := filepath.Join(runDir, "OPTIONS")
220 0 : optionsStr := optionsToString(opts)
221 0 : require.NoError(t, os.WriteFile(optionsPath, []byte(optionsStr), 0644))
222 0 :
223 0 : args := []string{
224 0 : "-keep=" + fmt.Sprint(runOpts.keep),
225 0 : "-run-dir=" + runDir,
226 0 : "-test.run=" + topLevelTestName + "$",
227 0 : "--op-timeout=" + runOpts.opTimeout.String(),
228 0 : }
229 0 : if runOpts.numInstances > 1 {
230 0 : args = append(args, "--num-instances="+strconv.Itoa(runOpts.numInstances))
231 0 : }
232 0 : if runOpts.traceFile != "" {
233 0 : args = append(args, "-test.trace="+filepath.Join(runDir, runOpts.traceFile))
234 0 : }
235 :
236 0 : binary := os.Args[0]
237 0 : if runOpts.innerBinary != "" {
238 0 : binary = runOpts.innerBinary
239 0 : }
240 0 : cmd := exec.Command(binary, args...)
241 0 : out, err := cmd.CombinedOutput()
242 0 : if err != nil {
243 0 : t.Fatalf(`
244 0 : ===== SEED =====
245 0 : %d
246 0 : ===== ERR =====
247 0 : %v
248 0 : ===== OUT =====
249 0 : %s
250 0 : ===== OPTIONS =====
251 0 : %s
252 0 : ===== OPS =====
253 0 : %s
254 0 : ===== HISTORY =====
255 0 : %s
256 0 : To reduce: go test ./internal/metamorphic -tags invariants -run '%s$' --run-dir %s --try-to-reduce -v`,
257 0 : runOpts.seed,
258 0 : err,
259 0 : out,
260 0 : optionsStr,
261 0 : opsPath,
262 0 : readFile(filepath.Join(runDir, "history")),
263 0 : topLevelTestName, runDir,
264 0 : )
265 0 : }
266 : }
267 :
268 : // Construct the various OPTIONS to test with.
269 0 : names, options, err := buildOptions(rng, runOpts)
270 0 : if err != nil {
271 0 : t.Fatal(err)
272 0 : }
273 :
274 : // Run the options.
275 0 : t.Run("execution", func(t *testing.T) {
276 0 : for _, name := range names {
277 0 : name := name
278 0 : t.Run(name, func(t *testing.T) {
279 0 : t.Parallel()
280 0 : runOptions(t, options[name])
281 0 : })
282 : }
283 : })
284 : // NB: The above 'execution' subtest will not complete until all of the
285 : // individual execution/ subtests have completed. The grouping within the
286 : // `execution` subtest ensures all the histories are available when we
287 : // proceed to comparing against the base history.
288 :
289 : // Don't bother comparing output if we've already failed.
290 0 : if t.Failed() {
291 0 : return
292 0 : }
293 :
294 0 : t.Run("compare", func(t *testing.T) {
295 0 : getHistoryPath := func(name string) string {
296 0 : return filepath.Join(metaDir, name, "history")
297 0 : }
298 :
299 0 : base := readHistory(t, getHistoryPath(names[0]))
300 0 : base = reorderHistory(base)
301 0 : for i := 1; i < len(names); i++ {
302 0 : t.Run(names[i], func(t *testing.T) {
303 0 : lines := readHistory(t, getHistoryPath(names[i]))
304 0 : lines = reorderHistory(lines)
305 0 : diff := difflib.UnifiedDiff{
306 0 : A: base,
307 0 : B: lines,
308 0 : Context: 5,
309 0 : }
310 0 : text, err := difflib.GetUnifiedDiffString(diff)
311 0 : require.NoError(t, err)
312 0 : if text != "" {
313 0 : // NB: We force an exit rather than using t.Fatal because the latter
314 0 : // will run another instance of the test if -count is specified, while
315 0 : // we're happy to exit on the first failure.
316 0 : optionsStrA := optionsToString(options[names[0]])
317 0 : optionsStrB := optionsToString(options[names[i]])
318 0 :
319 0 : fmt.Printf(`
320 0 : ===== SEED =====
321 0 : %d
322 0 : ===== DIFF =====
323 0 : %s/{%s,%s}
324 0 : %s
325 0 : ===== OPTIONS %s =====
326 0 : %s
327 0 : ===== OPTIONS %s =====
328 0 : %s
329 0 : ===== OPS =====
330 0 : %s
331 0 : To reduce: go test ./internal/metamorphic -tags invariants -run '%s$' --compare "%s/{%s,%s}" --try-to-reduce -v
332 0 : `,
333 0 : runOpts.seed,
334 0 : metaDir, names[0], names[i], text,
335 0 : names[0], optionsStrA,
336 0 : names[i], optionsStrB,
337 0 : opsPath,
338 0 : topLevelTestName, metaDir, names[0], names[i])
339 0 : os.Exit(1)
340 0 : }
341 : })
342 : }
343 : })
344 : }
345 :
346 : func buildOptions(
347 : rng *rand.Rand, runOpts runAndCompareOptions,
348 0 : ) (names []string, options map[string]*TestOptions, err error) {
349 0 : options = map[string]*TestOptions{}
350 0 :
351 0 : // Create the standard options.
352 0 : for i, opts := range standardOptions() {
353 0 : name := fmt.Sprintf("standard-%03d", i)
354 0 : names = append(names, name)
355 0 : options[name] = opts
356 0 : }
357 :
358 : // Create the custom option runs, if any.
359 0 : for name, customOptsStr := range runOpts.customRuns {
360 0 : options[name] = defaultTestOptions()
361 0 : if err = parseOptions(options[name], customOptsStr, runOpts.customOptionParsers); err != nil {
362 0 : return nil, nil, errors.Wrapf(err, "custom opts %q: %s", name, err)
363 0 : }
364 : }
365 : // Sort the custom options names for determinism (they're currently in
366 : // random order from map iteration).
367 0 : sort.Strings(names[len(names)-len(runOpts.customRuns):])
368 0 :
369 0 : // Create random options. We make an arbitrary choice to run with as many
370 0 : // random options as we have standard options.
371 0 : nOpts := len(options)
372 0 : for i := 0; i < nOpts; i++ {
373 0 : name := fmt.Sprintf("random-%03d", i)
374 0 : names = append(names, name)
375 0 : opts := RandomOptions(rng, runOpts.customOptionParsers)
376 0 : options[name] = opts
377 0 : }
378 :
379 : // If the user provided the path to an initial database state to use, update
380 : // all the options to pull from it.
381 0 : if runOpts.initialStatePath != "" {
382 0 : for _, o := range options {
383 0 : o.initialStatePath, err = filepath.Abs(runOpts.initialStatePath)
384 0 : if err != nil {
385 0 : return nil, nil, err
386 0 : }
387 0 : o.initialStateDesc = runOpts.initialStateDesc
388 : }
389 : }
390 0 : return names, options, nil
391 : }
392 :
393 : type runOnceOptions struct {
394 : // Note: when adding a new option, a new flag might need to be passed to the
395 : // "inner" execution of the test binary in RunAndCompare.
396 :
397 : keep bool
398 : maxThreads int
399 : // opTimeout causes the test to fail if any one op takes longer than this.
400 : opTimeout time.Duration
401 : errorRate float64
402 : failRegexp *regexp.Regexp
403 : numInstances int
404 : customOptionParsers map[string]func(string) (CustomOption, bool)
405 : }
406 :
407 : // A RunOnceOption configures the behavior of a single run of the metamorphic
408 : // tests.
409 : type RunOnceOption interface {
410 : applyOnce(*runOnceOptions)
411 : }
412 :
413 : // KeepData keeps the database directory, even on successful runs. If the test
414 : // used an in-memory filesystem, the in-memory filesystem will be persisted to
415 : // the run directory.
416 : type KeepData struct{}
417 :
418 0 : func (KeepData) apply(ro *runAndCompareOptions) { ro.keep = true }
419 0 : func (KeepData) applyOnce(ro *runOnceOptions) { ro.keep = true }
420 :
421 : // InjectErrorsRate configures the run to inject errors into read-only
422 : // filesystem operations and retry injected errors.
423 : type InjectErrorsRate float64
424 :
425 0 : func (r InjectErrorsRate) apply(ro *runAndCompareOptions) { ro.errorRate = float64(r) }
426 0 : func (r InjectErrorsRate) applyOnce(ro *runOnceOptions) { ro.errorRate = float64(r) }
427 :
428 : // MaxThreads sets an upper bound on the number of parallel execution threads
429 : // during replay.
430 : type MaxThreads int
431 :
432 0 : func (m MaxThreads) apply(ro *runAndCompareOptions) { ro.maxThreads = int(m) }
433 1 : func (m MaxThreads) applyOnce(ro *runOnceOptions) { ro.maxThreads = int(m) }
434 :
435 : // OpTimeout sets a timeout for each executed operation. A value of 0 means no
436 : // timeout.
437 : type OpTimeout time.Duration
438 :
439 0 : func (t OpTimeout) apply(ro *runAndCompareOptions) { ro.opTimeout = time.Duration(t) }
440 1 : func (t OpTimeout) applyOnce(ro *runOnceOptions) { ro.opTimeout = time.Duration(t) }
441 :
442 : // FailOnMatch configures the run to fail immediately if the history matches the
443 : // provided regular expression.
444 : type FailOnMatch struct {
445 : *regexp.Regexp
446 : }
447 :
448 0 : func (f FailOnMatch) apply(ro *runAndCompareOptions) { ro.failRegexp = f.Regexp }
449 0 : func (f FailOnMatch) applyOnce(ro *runOnceOptions) { ro.failRegexp = f.Regexp }
450 :
451 : // MultiInstance configures the number of pebble instances to create.
452 : type MultiInstance int
453 :
454 0 : func (m MultiInstance) apply(ro *runAndCompareOptions) { ro.numInstances = int(m) }
455 1 : func (m MultiInstance) applyOnce(ro *runOnceOptions) { ro.numInstances = int(m) }
456 :
457 : // RunOnce performs one run of the metamorphic tests. RunOnce expects the
458 : // directory named by `runDir` to already exist and contain an `OPTIONS` file
459 : // containing the test run's configuration. The history of the run is persisted
460 : // to a file at the path `historyPath`.
461 : //
462 : // The `seed` parameter is not functional; it's used for context in logging.
463 1 : func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts ...RunOnceOption) {
464 1 : runOpts := runOnceOptions{
465 1 : customOptionParsers: map[string]func(string) (CustomOption, bool){},
466 1 : }
467 1 : for _, o := range rOpts {
468 1 : o.applyOnce(&runOpts)
469 1 : }
470 :
471 1 : opsPath := filepath.Join(filepath.Dir(filepath.Clean(runDir)), "ops")
472 1 : opsData, err := os.ReadFile(opsPath)
473 1 : require.NoError(t, err)
474 1 :
475 1 : optionsPath := filepath.Join(runDir, "OPTIONS")
476 1 : optionsData, err := os.ReadFile(optionsPath)
477 1 : require.NoError(t, err)
478 1 :
479 1 : // NB: It's important to use defaultTestOptions() here as the base into
480 1 : // which we parse the serialized options. It contains the relevant defaults,
481 1 : // like the appropriate block-property collectors.
482 1 : testOpts := defaultTestOptions()
483 1 : opts := testOpts.Opts
484 1 : require.NoError(t, parseOptions(testOpts, string(optionsData), runOpts.customOptionParsers))
485 1 :
486 1 : ops, err := parse(opsData, parserOpts{
487 1 : parseFormattedUserKey: testOpts.KeyFormat.ParseFormattedKey,
488 1 : parseFormattedUserKeySuffix: testOpts.KeyFormat.ParseFormattedKeySuffix,
489 1 : })
490 1 : require.NoError(t, err)
491 1 :
492 1 : // Set up the filesystem to use for the test. Note that by default we use an
493 1 : // in-memory FS.
494 1 : if testOpts.useDisk {
495 1 : opts.FS = vfs.Default
496 1 : require.NoError(t, os.RemoveAll(opts.FS.PathJoin(runDir, "data")))
497 1 : } else {
498 1 : opts.Cleaner = base.ArchiveCleaner{}
499 1 : }
500 : // Wrap the filesystem with a VFS that will inject random latency if the
501 : // test options require it. We cap the overlal injected latency to ten
502 : // seconds to avoid excessive test run times when paired with small target
503 : // file sizes, block sizes, etc.
504 1 : if testOpts.ioLatencyProbability > 0 {
505 1 : opts.FS = errorfs.Wrap(opts.FS, errorfs.RandomLatency(
506 1 : errorfs.Randomly(testOpts.ioLatencyProbability, testOpts.ioLatencySeed),
507 1 : testOpts.ioLatencyMean,
508 1 : testOpts.ioLatencySeed,
509 1 : 10*time.Second,
510 1 : ))
511 1 : }
512 :
513 : // Wrap the filesystem with one that will inject errors into read
514 : // operations with *errorRate probability.
515 1 : opts.FS = errorfs.Wrap(opts.FS, errorfs.ErrInjected.If(
516 1 : dsl.And(errorfs.Reads, errorfs.Randomly(runOpts.errorRate, int64(seed))),
517 1 : ))
518 1 :
519 1 : // Bound testOpts.threads to runOpts.maxThreads.
520 1 : if runOpts.maxThreads < testOpts.Threads {
521 0 : testOpts.Threads = runOpts.maxThreads
522 0 : }
523 :
524 1 : dir := opts.FS.PathJoin(runDir, "data")
525 1 : // Set up the initial database state if configured to start from a non-empty
526 1 : // database. By default tests start from an empty database, but split
527 1 : // version testing may configure a previous metamorphic tests's database
528 1 : // state as the initial state.
529 1 : if testOpts.initialStatePath != "" {
530 0 : require.NoError(t, setupInitialState(dir, testOpts))
531 0 : }
532 :
533 1 : if testOpts.Opts.WALFailover != nil {
534 1 : if runOpts.numInstances > 1 {
535 1 : // TODO(bilal,jackson): Allow opts to diverge on a per-instance
536 1 : // basis, and use that to set unique WAL dirs for all instances in
537 1 : // multi-instance mode.
538 1 : testOpts.Opts.WALFailover = nil
539 1 : } else {
540 1 : testOpts.Opts.WALFailover.Secondary.FS = opts.FS
541 1 : testOpts.Opts.WALFailover.Secondary.Dirname = opts.FS.PathJoin(
542 1 : runDir, testOpts.Opts.WALFailover.Secondary.Dirname)
543 1 : }
544 : }
545 :
546 1 : if opts.WALDir != "" {
547 1 : if runOpts.numInstances > 1 {
548 1 : // TODO(bilal): Allow opts to diverge on a per-instance basis, and use
549 1 : // that to set unique WAL dirs for all instances in multi-instance mode.
550 1 : opts.WALDir = ""
551 1 : } else {
552 1 : opts.WALDir = opts.FS.PathJoin(runDir, opts.WALDir)
553 1 : }
554 : }
555 :
556 1 : historyFile, err := os.Create(historyPath)
557 1 : require.NoError(t, err)
558 1 : defer historyFile.Close()
559 1 : writers := []io.Writer{historyFile}
560 1 :
561 1 : if testing.Verbose() {
562 0 : writers = append(writers, os.Stdout)
563 0 : }
564 1 : h := newHistory(runOpts.failRegexp, writers...)
565 1 : defer h.Close()
566 1 :
567 1 : m := newTest(ops)
568 1 : require.NoError(t, m.init(h, dir, testOpts, runOpts.numInstances, runOpts.opTimeout))
569 1 :
570 1 : if err := Execute(m); err != nil {
571 0 : fmt.Fprintf(os.Stderr, "Seed: %d\n", seed)
572 0 : fmt.Fprintln(os.Stderr, err)
573 0 : }
574 :
575 : // If we have unclosed databases, print LSM details. This will be the case
576 : // when we use --try-to-reduce.
577 1 : for i, db := range m.dbs {
578 1 : if db != nil {
579 0 : fmt.Fprintf(os.Stderr, "\ndb%d:\n%s", i+1, db.DebugString())
580 0 : fmt.Fprintf(os.Stderr, "\n%s\n", db.LSMViewURL())
581 0 : }
582 : }
583 : // Don't let the test pass if it issued a Fatalf.
584 1 : err = errors.CombineErrors(err, h.Error())
585 1 :
586 1 : if err != nil {
587 0 : m.saveInMemoryData()
588 0 : os.Exit(1)
589 0 : }
590 :
591 1 : if runOpts.keep && !testOpts.useDisk {
592 0 : m.saveInMemoryData()
593 0 : }
594 : }
595 :
596 : // Execute runs the provided test, writing the execution history into the Test's
597 : // sink.
598 1 : func Execute(m *Test) error {
599 1 : if m.testOpts.Threads <= 1 {
600 1 : for m.step(m.h, nil /* optionalRecordf */) {
601 1 : if err := m.h.Error(); err != nil {
602 0 : return err
603 0 : }
604 : }
605 1 : return nil
606 : }
607 :
608 : // Run in parallel using an errgroup.
609 1 : eg, ctx := errgroup.WithContext(context.Background())
610 1 : for t := 0; t < m.testOpts.Threads; t++ {
611 1 : t := t // bind loop var to scope
612 1 : eg.Go(func() error {
613 1 : for idx := 0; idx < len(m.ops); idx++ {
614 1 : // Skip any operations whose receiver object hashes to a
615 1 : // different thread. All operations with the same receiver
616 1 : // are performed from the same thread. This goroutine is
617 1 : // only responsible for executing operations that hash to
618 1 : // `t`.
619 1 : if hashThread(m.ops[idx].receiver(), m.testOpts.Threads) != t {
620 1 : continue
621 : }
622 :
623 : // Some operations have additional synchronization
624 : // dependencies. If this operation has any, wait for its
625 : // dependencies to complete before executing.
626 1 : for _, waitOnIdx := range m.opsWaitOn[idx] {
627 1 : select {
628 0 : case <-ctx.Done():
629 0 : // Exit if some other thread already errored out.
630 0 : return ctx.Err()
631 1 : case <-m.opsDone[waitOnIdx]:
632 : }
633 : }
634 :
635 1 : m.runOp(idx, m.h.recorder(t, idx, nil /* optionalRecordf */))
636 1 :
637 1 : // If this operation has a done channel, close it so that
638 1 : // other operations that synchronize on this operation know
639 1 : // that it's been completed.
640 1 : if ch := m.opsDone[idx]; ch != nil {
641 1 : close(ch)
642 1 : }
643 :
644 1 : if err := m.h.Error(); err != nil {
645 0 : return err
646 0 : }
647 : }
648 1 : return nil
649 : })
650 : }
651 1 : return eg.Wait()
652 : }
653 :
654 1 : func hashThread(objID objID, numThreads int) int {
655 1 : // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
656 1 : return int((11400714819323198485 * uint64(objID)) % uint64(numThreads))
657 1 : }
658 :
659 : // Compare runs the metamorphic tests in the provided runDirs and compares their
660 : // histories.
661 0 : func Compare(t TestingT, rootDir string, seed uint64, runDirs []string, rOpts ...RunOnceOption) {
662 0 : historyPaths := make([]string, len(runDirs))
663 0 : for i := 0; i < len(runDirs); i++ {
664 0 : historyPath := filepath.Join(rootDir, runDirs[i]+"-"+time.Now().Format("060102-150405.000"))
665 0 : runDirs[i] = filepath.Join(rootDir, runDirs[i])
666 0 : _ = os.Remove(historyPath)
667 0 : historyPaths[i] = historyPath
668 0 : }
669 0 : defer func() {
670 0 : for _, path := range historyPaths {
671 0 : _ = os.Remove(path)
672 0 : }
673 : }()
674 :
675 0 : for i, runDir := range runDirs {
676 0 : RunOnce(t, runDir, seed, historyPaths[i], rOpts...)
677 0 : }
678 :
679 0 : if t.Failed() {
680 0 : return
681 0 : }
682 :
683 0 : i, diff := CompareHistories(t, historyPaths)
684 0 : if i != 0 {
685 0 : fmt.Printf(`
686 0 : ===== DIFF =====
687 0 : %s/{%s,%s}
688 0 : %s
689 0 : `, rootDir, runDirs[0], runDirs[i], diff)
690 0 : os.Exit(1)
691 0 : }
692 : }
693 :
694 : // TestingT is an interface wrapper around *testing.T
695 : type TestingT interface {
696 : require.TestingT
697 : Failed() bool
698 : }
699 :
700 0 : func readFile(path string) string {
701 0 : history, err := os.ReadFile(path)
702 0 : if err != nil {
703 0 : return fmt.Sprintf("err: %v", err)
704 0 : }
705 :
706 0 : return string(history)
707 : }
|