Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "math"
11 : "os"
12 : "path/filepath"
13 : "runtime"
14 : "strconv"
15 : "strings"
16 : "sync/atomic"
17 : "time"
18 :
19 : "github.com/cockroachdb/errors"
20 : "github.com/cockroachdb/pebble"
21 : "github.com/cockroachdb/pebble/bloom"
22 : "github.com/cockroachdb/pebble/internal/base"
23 : "github.com/cockroachdb/pebble/internal/cache"
24 : "github.com/cockroachdb/pebble/internal/testkeys"
25 : "github.com/cockroachdb/pebble/objstorage/remote"
26 : "github.com/cockroachdb/pebble/sstable"
27 : "github.com/cockroachdb/pebble/vfs"
28 : "github.com/cockroachdb/pebble/wal"
29 : "golang.org/x/exp/rand"
30 : )
31 :
32 : const (
33 : minimumFormatMajorVersion = pebble.FormatMinSupported
34 : // The format major version to use in the default options configurations. We
35 : // default to the minimum supported format so we exercise the runtime version
36 : // ratcheting that a cluster upgrading would experience. The randomized
37 : // options may still use format major versions that are less than
38 : // defaultFormatMajorVersion but are at least minimumFormatMajorVersion.
39 : defaultFormatMajorVersion = pebble.FormatMinSupported
40 : // newestFormatMajorVersionToTest is the most recent format major version
41 : // the metamorphic tests should use. This may be greater than
42 : // pebble.FormatNewest when some format major versions are marked as
43 : // experimental.
44 : newestFormatMajorVersionToTest = pebble.FormatNewest
45 : )
46 :
47 : func parseOptions(
48 : opts *TestOptions, data string, customOptionParsers map[string]func(string) (CustomOption, bool),
49 2 : ) error {
50 2 : hooks := &pebble.ParseHooks{
51 2 : NewCache: pebble.NewCache,
52 2 : NewFilterPolicy: filterPolicyFromName,
53 2 : SkipUnknown: func(name, value string) bool {
54 2 : switch name {
55 0 : case "TestOptions":
56 0 : return true
57 2 : case "TestOptions.strictfs":
58 2 : opts.strictFS = true
59 2 : opts.Opts.FS = vfs.NewStrictMem()
60 2 : return true
61 2 : case "TestOptions.ingest_using_apply":
62 2 : opts.ingestUsingApply = true
63 2 : return true
64 2 : case "TestOptions.delete_sized":
65 2 : opts.deleteSized = true
66 2 : return true
67 2 : case "TestOptions.replace_single_delete":
68 2 : opts.replaceSingleDelete = true
69 2 : return true
70 2 : case "TestOptions.use_disk":
71 2 : opts.useDisk = true
72 2 : opts.Opts.FS = vfs.Default
73 2 : return true
74 0 : case "TestOptions.initial_state_desc":
75 0 : opts.initialStateDesc = value
76 0 : return true
77 0 : case "TestOptions.initial_state_path":
78 0 : opts.initialStatePath = value
79 0 : return true
80 2 : case "TestOptions.threads":
81 2 : v, err := strconv.Atoi(value)
82 2 : if err != nil {
83 0 : panic(err)
84 : }
85 2 : opts.Threads = v
86 2 : return true
87 2 : case "TestOptions.disable_block_property_collector":
88 2 : v, err := strconv.ParseBool(value)
89 2 : if err != nil {
90 0 : panic(err)
91 : }
92 2 : opts.disableBlockPropertyCollector = v
93 2 : if v {
94 2 : opts.Opts.BlockPropertyCollectors = nil
95 2 : }
96 2 : return true
97 2 : case "TestOptions.enable_value_blocks":
98 2 : opts.enableValueBlocks = true
99 2 : opts.Opts.Experimental.EnableValueBlocks = func() bool { return true }
100 2 : return true
101 2 : case "TestOptions.disable_value_blocks_for_ingest_sstables":
102 2 : opts.disableValueBlocksForIngestSSTables = true
103 2 : return true
104 2 : case "TestOptions.async_apply_to_db":
105 2 : opts.asyncApplyToDB = true
106 2 : return true
107 2 : case "TestOptions.shared_storage_enabled":
108 2 : opts.sharedStorageEnabled = true
109 2 : opts.sharedStorageFS = remote.NewInMem()
110 2 : if opts.Opts.Experimental.CreateOnShared == remote.CreateOnSharedNone {
111 1 : opts.Opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
112 1 : }
113 2 : return true
114 2 : case "TestOptions.external_storage_enabled":
115 2 : opts.externalStorageEnabled = true
116 2 : opts.externalStorageFS = remote.NewInMem()
117 2 : return true
118 2 : case "TestOptions.secondary_cache_enabled":
119 2 : opts.secondaryCacheEnabled = true
120 2 : opts.Opts.Experimental.SecondaryCacheSizeBytes = 1024 * 1024 * 32 // 32 MBs
121 2 : return true
122 2 : case "TestOptions.seed_efos":
123 2 : v, err := strconv.ParseUint(value, 10, 64)
124 2 : if err != nil {
125 0 : panic(err)
126 : }
127 2 : opts.seedEFOS = v
128 2 : return true
129 2 : case "TestOptions.io_latency_mean":
130 2 : v, err := time.ParseDuration(value)
131 2 : if err != nil {
132 0 : panic(err)
133 : }
134 2 : opts.ioLatencyMean = v
135 2 : return true
136 2 : case "TestOptions.io_latency_probability":
137 2 : v, err := strconv.ParseFloat(value, 64)
138 2 : if err != nil {
139 0 : panic(err)
140 : }
141 2 : opts.ioLatencyProbability = v
142 2 : return true
143 2 : case "TestOptions.io_latency_seed":
144 2 : v, err := strconv.ParseInt(value, 10, 64)
145 2 : if err != nil {
146 0 : panic(err)
147 : }
148 2 : opts.ioLatencySeed = v
149 2 : return true
150 2 : case "TestOptions.ingest_split":
151 2 : opts.ingestSplit = true
152 2 : opts.Opts.Experimental.IngestSplit = func() bool {
153 2 : return true
154 2 : }
155 2 : return true
156 2 : case "TestOptions.use_shared_replicate":
157 2 : opts.useSharedReplicate = true
158 2 : return true
159 0 : case "TestOptions.use_external_replicate":
160 0 : opts.useExternalReplicate = true
161 0 : return true
162 2 : case "TestOptions.use_excise":
163 2 : opts.useExcise = true
164 2 : return true
165 1 : default:
166 1 : if customOptionParsers == nil {
167 0 : return false
168 0 : }
169 1 : name = strings.TrimPrefix(name, "TestOptions.")
170 1 : if p, ok := customOptionParsers[name]; ok {
171 1 : if customOpt, ok := p(value); ok {
172 1 : opts.CustomOpts = append(opts.CustomOpts, customOpt)
173 1 : return true
174 1 : }
175 : }
176 0 : return false
177 : }
178 : },
179 : }
180 2 : err := opts.Opts.Parse(data, hooks)
181 2 : // Ensure that the WAL failover FS agrees with the primary FS. They're
182 2 : // separate options, but in the metamorphic tests we keep them in sync.
183 2 : if opts.Opts.WALFailover != nil {
184 2 : opts.Opts.WALFailover.Secondary.FS = opts.Opts.FS
185 2 : }
186 2 : opts.InitRemoteStorageFactory()
187 2 : opts.Opts.EnsureDefaults()
188 2 : return err
189 : }
190 :
191 1 : func optionsToString(opts *TestOptions) string {
192 1 : var buf bytes.Buffer
193 1 : if opts.strictFS {
194 1 : fmt.Fprint(&buf, " strictfs=true\n")
195 1 : }
196 1 : if opts.ingestUsingApply {
197 1 : fmt.Fprint(&buf, " ingest_using_apply=true\n")
198 1 : }
199 1 : if opts.deleteSized {
200 1 : fmt.Fprint(&buf, " delete_sized=true\n")
201 1 : }
202 1 : if opts.replaceSingleDelete {
203 1 : fmt.Fprint(&buf, " replace_single_delete=true\n")
204 1 : }
205 1 : if opts.useDisk {
206 1 : fmt.Fprint(&buf, " use_disk=true\n")
207 1 : }
208 1 : if opts.initialStatePath != "" {
209 0 : fmt.Fprintf(&buf, " initial_state_path=%s\n", opts.initialStatePath)
210 0 : }
211 1 : if opts.initialStateDesc != "" {
212 0 : fmt.Fprintf(&buf, " initial_state_desc=%s\n", opts.initialStateDesc)
213 0 : }
214 1 : if opts.Threads != 0 {
215 1 : fmt.Fprintf(&buf, " threads=%d\n", opts.Threads)
216 1 : }
217 1 : if opts.disableBlockPropertyCollector {
218 1 : fmt.Fprintf(&buf, " disable_block_property_collector=%t\n", opts.disableBlockPropertyCollector)
219 1 : }
220 1 : if opts.enableValueBlocks {
221 1 : fmt.Fprintf(&buf, " enable_value_blocks=%t\n", opts.enableValueBlocks)
222 1 : }
223 1 : if opts.disableValueBlocksForIngestSSTables {
224 1 : fmt.Fprintf(&buf, " disable_value_blocks_for_ingest_sstables=%t\n", opts.disableValueBlocksForIngestSSTables)
225 1 : }
226 1 : if opts.asyncApplyToDB {
227 1 : fmt.Fprint(&buf, " async_apply_to_db=true\n")
228 1 : }
229 1 : if opts.sharedStorageEnabled {
230 1 : fmt.Fprint(&buf, " shared_storage_enabled=true\n")
231 1 : }
232 1 : if opts.externalStorageEnabled {
233 1 : fmt.Fprint(&buf, " external_storage_enabled=true\n")
234 1 : }
235 1 : if opts.secondaryCacheEnabled {
236 1 : fmt.Fprint(&buf, " secondary_cache_enabled=true\n")
237 1 : }
238 1 : if opts.seedEFOS != 0 {
239 1 : fmt.Fprintf(&buf, " seed_efos=%d\n", opts.seedEFOS)
240 1 : }
241 1 : if opts.ingestSplit {
242 1 : fmt.Fprintf(&buf, " ingest_split=%v\n", opts.ingestSplit)
243 1 : }
244 1 : if opts.ioLatencyProbability > 0 {
245 1 : fmt.Fprintf(&buf, " io_latency_mean=%s\n", opts.ioLatencyMean)
246 1 : fmt.Fprintf(&buf, " io_latency_probability=%.10f\n", opts.ioLatencyProbability)
247 1 : fmt.Fprintf(&buf, " io_latency_seed=%d\n", opts.ioLatencySeed)
248 1 : }
249 1 : if opts.useSharedReplicate {
250 1 : fmt.Fprintf(&buf, " use_shared_replicate=%v\n", opts.useSharedReplicate)
251 1 : }
252 1 : if opts.useExternalReplicate {
253 0 : fmt.Fprintf(&buf, " use_external_replicate=%v\n", opts.useExternalReplicate)
254 0 : }
255 1 : if opts.useExcise {
256 1 : fmt.Fprintf(&buf, " use_excise=%v\n", opts.useExcise)
257 1 : }
258 1 : for _, customOpt := range opts.CustomOpts {
259 1 : fmt.Fprintf(&buf, " %s=%s\n", customOpt.Name(), customOpt.Value())
260 1 : }
261 :
262 1 : s := opts.Opts.String()
263 1 : if buf.Len() == 0 {
264 0 : return s
265 0 : }
266 1 : return s + "\n[TestOptions]\n" + buf.String()
267 : }
268 :
269 2 : func defaultTestOptions() *TestOptions {
270 2 : return &TestOptions{
271 2 : Opts: defaultOptions(),
272 2 : Threads: 16,
273 2 : RetryPolicy: NeverRetry,
274 2 : }
275 2 : }
276 :
277 2 : func defaultOptions() *pebble.Options {
278 2 : opts := &pebble.Options{
279 2 : // Use an archive cleaner to ease post-mortem debugging.
280 2 : Cleaner: base.ArchiveCleaner{},
281 2 : // Always use our custom comparer which provides a Split method,
282 2 : // splitting keys at the trailing '@'.
283 2 : Comparer: testkeys.Comparer,
284 2 : FS: vfs.NewMem(),
285 2 : FormatMajorVersion: defaultFormatMajorVersion,
286 2 : Levels: []pebble.LevelOptions{{
287 2 : FilterPolicy: bloom.FilterPolicy(10),
288 2 : }},
289 2 : BlockPropertyCollectors: blockPropertyCollectorConstructors,
290 2 : }
291 2 :
292 2 : // We don't want to run the level checker every time because it can slow down
293 2 : // downloads and background compactions too much.
294 2 : //
295 2 : // We aim to run it once every 500ms (on average). To do this with some
296 2 : // randomization, each time we get a callback we see how much time passed
297 2 : // since the last call and run the check with a proportional probability.
298 2 : const meanTimeBetweenChecks = 500 * time.Millisecond
299 2 : startTime := time.Now()
300 2 : // lastCallTime stores the time of the last DebugCheck call, as the duration
301 2 : // since startTime.
302 2 : var lastCallTime atomic.Uint64
303 2 : opts.DebugCheck = func(db *pebble.DB) error {
304 2 : now := time.Since(startTime)
305 2 : last := time.Duration(lastCallTime.Swap(uint64(now)))
306 2 : // Run the check with probability equal to the time (as a fraction of
307 2 : // meanTimeBetweenChecks) passed since the last time we had a chance, as a
308 2 : // fraction of meanTimeBetweenChecks.
309 2 : if rand.Float64() < float64(now-last)/float64(meanTimeBetweenChecks) {
310 2 : return pebble.DebugCheckLevels(db)
311 2 : }
312 2 : return nil
313 : }
314 :
315 2 : return opts
316 : }
317 :
318 : // TestOptions describes the options configuring an individual run of the
319 : // metamorphic tests.
320 : type TestOptions struct {
321 : // Opts holds the *pebble.Options for the test.
322 : Opts *pebble.Options
323 : // Threads configures the parallelism of the test. Each thread will run in
324 : // an independent goroutine and be responsible for executing operations
325 : // against an independent set of objects. The outcome of any individual
326 : // operation will still be deterministic, with the metamorphic test
327 : // inserting synchronization where necessary.
328 : Threads int
329 : // RetryPolicy configures which errors should be retried.
330 : RetryPolicy RetryPolicy
331 : // CustomOptions holds custom test options that are defined outside of this
332 : // package.
333 : CustomOpts []CustomOption
334 :
335 : // internal
336 :
337 : useDisk bool
338 : strictFS bool
339 : // Use Batch.Apply rather than DB.Ingest.
340 : ingestUsingApply bool
341 : // Use Batch.DeleteSized rather than Batch.Delete.
342 : deleteSized bool
343 : // Replace a SINGLEDEL with a DELETE.
344 : replaceSingleDelete bool
345 : // The path on the local filesystem where the initial state of the database
346 : // exists. Empty if the test run begins from an empty database state.
347 : initialStatePath string
348 : // A human-readable string describing the initial state of the database.
349 : // Empty if the test run begins from an empty database state.
350 : initialStateDesc string
351 : // Disable the block property collector, which may be used by block property
352 : // filters.
353 : disableBlockPropertyCollector bool
354 : // Enable the use of value blocks.
355 : enableValueBlocks bool
356 : // Disables value blocks in the sstables written for ingest.
357 : disableValueBlocksForIngestSSTables bool
358 : // Use DB.ApplyNoSyncWait for applies that want to sync the WAL.
359 : asyncApplyToDB bool
360 : // Enable the use of shared storage.
361 : sharedStorageEnabled bool
362 : sharedStorageFS remote.Storage
363 : // Enable the use of shared storage for external file ingestion.
364 : externalStorageEnabled bool
365 : externalStorageFS remote.Storage
366 : // Enables the use of shared replication in TestOptions.
367 : useSharedReplicate bool
368 : // Enables the use of external replication in TestOptions.
369 : useExternalReplicate bool
370 : // Enable the secondary cache. Only effective if sharedStorageEnabled is
371 : // also true.
372 : secondaryCacheEnabled bool
373 : // If nonzero, enables the use of EventuallyFileOnlySnapshots for
374 : // newSnapshotOps that are keyspan-bounded. The set of which newSnapshotOps
375 : // are actually created as EventuallyFileOnlySnapshots is deterministically
376 : // derived from the seed and the operation index.
377 : seedEFOS uint64
378 : // If nonzero, enables the injection of random IO latency. The mechanics of
379 : // a Pebble operation can be very timing dependent, so artificial latency
380 : // can ensure a wide variety of mechanics are exercised. Additionally,
381 : // exercising some mechanics such as WAL failover require IO latency.
382 : ioLatencyProbability float64
383 : ioLatencySeed int64
384 : ioLatencyMean time.Duration
385 : // Enables ingest splits. Saved here for serialization as Options does not
386 : // serialize this.
387 : ingestSplit bool
388 : // Enables operations that do excises. Note that a false value for this does
389 : // not guarantee the lack of excises, as useSharedReplicate can also cause
390 : // excises. However !useExcise && !useSharedReplicate can be used to guarantee
391 : // lack of excises.
392 : useExcise bool
393 : }
394 :
395 : // InitRemoteStorageFactory initializes Opts.Experimental.RemoteStorage.
396 2 : func (testOpts *TestOptions) InitRemoteStorageFactory() {
397 2 : if testOpts.sharedStorageEnabled || testOpts.externalStorageEnabled {
398 2 : m := make(map[remote.Locator]remote.Storage)
399 2 : if testOpts.sharedStorageEnabled {
400 2 : m[""] = testOpts.sharedStorageFS
401 2 : }
402 2 : if testOpts.externalStorageEnabled {
403 2 : m["external"] = testOpts.externalStorageFS
404 2 : }
405 2 : testOpts.Opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(m)
406 : }
407 : }
408 :
409 : // CustomOption defines a custom option that configures the behavior of an
410 : // individual test run. Like all test options, custom options are serialized to
411 : // the OPTIONS file even if they're not options ordinarily understood by Pebble.
412 : type CustomOption interface {
413 : // Name returns the name of the custom option. This is the key under which
414 : // the option appears in the OPTIONS file, within the [TestOptions] stanza.
415 : Name() string
416 : // Value returns the value of the custom option, serialized as it should
417 : // appear within the OPTIONS file.
418 : Value() string
419 : // Close is run after the test database has been closed at the end of the
420 : // test as well as during restart operations within the test sequence. It's
421 : // passed a copy of the *pebble.Options. If the custom options hold on to
422 : // any resources outside, Close should release them.
423 : Close(*pebble.Options) error
424 : // Open is run before the test runs and during a restart operation after the
425 : // test database has been closed and Close has been called. It's passed a
426 : // copy of the *pebble.Options. If the custom options must acquire any
427 : // resources before the test continues, it should reacquire them.
428 : Open(*pebble.Options) error
429 :
430 : // TODO(jackson): provide additional hooks for custom options changing the
431 : // behavior of a run.
432 : }
433 :
434 1 : func standardOptions() []*TestOptions {
435 1 : // The index labels are not strictly necessary, but they make it easier to
436 1 : // find which options correspond to a failure.
437 1 : stdOpts := []string{
438 1 : 0: "", // default options
439 1 : 1: `
440 1 : [Options]
441 1 : cache_size=1
442 1 : `,
443 1 : 2: `
444 1 : [Options]
445 1 : disable_wal=true
446 1 : `,
447 1 : 3: `
448 1 : [Options]
449 1 : l0_compaction_threshold=1
450 1 : `,
451 1 : 4: `
452 1 : [Options]
453 1 : l0_compaction_threshold=1
454 1 : l0_stop_writes_threshold=1
455 1 : `,
456 1 : 5: `
457 1 : [Options]
458 1 : lbase_max_bytes=1
459 1 : `,
460 1 : 6: `
461 1 : [Options]
462 1 : max_manifest_file_size=1
463 1 : `,
464 1 : 7: `
465 1 : [Options]
466 1 : max_open_files=1
467 1 : `,
468 1 : 8: `
469 1 : [Options]
470 1 : mem_table_size=2000
471 1 : `,
472 1 : 9: `
473 1 : [Options]
474 1 : mem_table_stop_writes_threshold=2
475 1 : `,
476 1 : 10: `
477 1 : [Options]
478 1 : wal_dir=data/wal
479 1 : `,
480 1 : 11: `
481 1 : [Level "0"]
482 1 : block_restart_interval=1
483 1 : `,
484 1 : 12: `
485 1 : [Level "0"]
486 1 : block_size=1
487 1 : `,
488 1 : 13: `
489 1 : [Level "0"]
490 1 : compression=NoCompression
491 1 : `,
492 1 : 14: `
493 1 : [Level "0"]
494 1 : index_block_size=1
495 1 : `,
496 1 : 15: `
497 1 : [Level "0"]
498 1 : target_file_size=1
499 1 : `,
500 1 : 16: `
501 1 : [Level "0"]
502 1 : filter_policy=none
503 1 : `,
504 1 : // 1GB
505 1 : 17: `
506 1 : [Options]
507 1 : bytes_per_sync=1073741824
508 1 : [TestOptions]
509 1 : strictfs=true
510 1 : `,
511 1 : 18: `
512 1 : [Options]
513 1 : max_concurrent_compactions=2
514 1 : `,
515 1 : 19: `
516 1 : [TestOptions]
517 1 : ingest_using_apply=true
518 1 : `,
519 1 : 20: `
520 1 : [TestOptions]
521 1 : replace_single_delete=true
522 1 : `,
523 1 : 21: `
524 1 : [TestOptions]
525 1 : use_disk=true
526 1 : `,
527 1 : 22: `
528 1 : [Options]
529 1 : max_writer_concurrency=2
530 1 : force_writer_parallelism=true
531 1 : `,
532 1 : 23: `
533 1 : [TestOptions]
534 1 : disable_block_property_collector=true
535 1 : `,
536 1 : 24: `
537 1 : [TestOptions]
538 1 : threads=1
539 1 : `,
540 1 : 25: `
541 1 : [TestOptions]
542 1 : enable_value_blocks=true
543 1 : `,
544 1 : 26: fmt.Sprintf(`
545 1 : [Options]
546 1 : format_major_version=%s
547 1 : `, newestFormatMajorVersionToTest),
548 1 : 27: fmt.Sprintf(`
549 1 : [Options]
550 1 : format_major_version=%s
551 1 : [TestOptions]
552 1 : shared_storage_enabled=true
553 1 : secondary_cache_enabled=true
554 1 : `, pebble.FormatMinForSharedObjects),
555 1 : 28: fmt.Sprintf(`
556 1 : [Options]
557 1 : format_major_version=%s
558 1 : [TestOptions]
559 1 : external_storage_enabled=true
560 1 : `, pebble.FormatSyntheticPrefixSuffix),
561 1 : 29: fmt.Sprintf(`
562 1 : [Options]
563 1 : format_major_version=%s
564 1 : max_concurrent_downloads=2
565 1 : [TestOptions]
566 1 : shared_storage_enabled=true
567 1 : external_storage_enabled=true
568 1 : secondary_cache_enabled=false
569 1 : `, pebble.FormatSyntheticPrefixSuffix),
570 1 : }
571 1 :
572 1 : opts := make([]*TestOptions, len(stdOpts))
573 1 : for i := range opts {
574 1 : opts[i] = defaultTestOptions()
575 1 : // NB: The standard options by definition can never include custom
576 1 : // options, so no need to propagate custom option parsers.
577 1 : if err := parseOptions(opts[i], stdOpts[i], nil /* custom option parsers */); err != nil {
578 0 : panic(err)
579 : }
580 : }
581 1 : return opts
582 : }
583 :
584 : // RandomOptions generates a random set of operations, drawing randomness from
585 : // rng.
586 : func RandomOptions(
587 : rng *rand.Rand, customOptionParsers map[string]func(string) (CustomOption, bool),
588 1 : ) *TestOptions {
589 1 : testOpts := defaultTestOptions()
590 1 : opts := testOpts.Opts
591 1 :
592 1 : // There are some private options, which we don't want users to fiddle with.
593 1 : // There's no way to set it through the public interface. The only method is
594 1 : // through Parse.
595 1 : {
596 1 : var privateOpts bytes.Buffer
597 1 : fmt.Fprintln(&privateOpts, `[Options]`)
598 1 : if rng.Intn(3) == 0 /* 33% */ {
599 1 : fmt.Fprintln(&privateOpts, ` disable_delete_only_compactions=true`)
600 1 : }
601 1 : if rng.Intn(3) == 0 /* 33% */ {
602 1 : fmt.Fprintln(&privateOpts, ` disable_elision_only_compactions=true`)
603 1 : }
604 1 : if rng.Intn(5) == 0 /* 20% */ {
605 1 : fmt.Fprintln(&privateOpts, ` disable_lazy_combined_iteration=true`)
606 1 : }
607 1 : if privateOptsStr := privateOpts.String(); privateOptsStr != `[Options]\n` {
608 1 : parseOptions(testOpts, privateOptsStr, customOptionParsers)
609 1 : }
610 : }
611 :
612 1 : opts.BytesPerSync = 1 << uint(rng.Intn(28)) // 1B - 256MB
613 1 : opts.Cache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
614 1 : opts.DisableWAL = rng.Intn(2) == 0
615 1 : opts.FlushDelayDeleteRange = time.Millisecond * time.Duration(5*rng.Intn(245)) // 5-250ms
616 1 : opts.FlushDelayRangeKey = time.Millisecond * time.Duration(5*rng.Intn(245)) // 5-250ms
617 1 : opts.FlushSplitBytes = 1 << rng.Intn(20) // 1B - 1MB
618 1 : opts.FormatMajorVersion = minimumFormatMajorVersion
619 1 : n := int(newestFormatMajorVersionToTest - opts.FormatMajorVersion)
620 1 : opts.FormatMajorVersion += pebble.FormatMajorVersion(rng.Intn(n + 1))
621 1 : opts.Experimental.L0CompactionConcurrency = 1 + rng.Intn(4) // 1-4
622 1 : opts.Experimental.LevelMultiplier = 5 << rng.Intn(7) // 5 - 320
623 1 : opts.TargetByteDeletionRate = 1 << uint(20+rng.Intn(10)) // 1MB - 1GB
624 1 : opts.Experimental.ValidateOnIngest = rng.Intn(2) != 0
625 1 : opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100
626 1 : opts.L0CompactionFileThreshold = 1 << rng.Intn(11) // 1 - 1024
627 1 : opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100
628 1 : if opts.L0StopWritesThreshold < opts.L0CompactionThreshold {
629 1 : opts.L0StopWritesThreshold = opts.L0CompactionThreshold
630 1 : }
631 1 : opts.LBaseMaxBytes = 1 << uint(rng.Intn(30)) // 1B - 1GB
632 1 : maxConcurrentCompactions := rng.Intn(3) + 1 // 1-3
633 1 : opts.MaxConcurrentCompactions = func() int {
634 1 : return maxConcurrentCompactions
635 1 : }
636 1 : maxConcurrentDownloads := rng.Intn(3) + 1 // 1-3
637 1 : opts.MaxConcurrentDownloads = func() int {
638 1 : return maxConcurrentDownloads
639 1 : }
640 1 : opts.MaxManifestFileSize = 1 << uint(rng.Intn(30)) // 1B - 1GB
641 1 : opts.MemTableSize = 2 << (10 + uint(rng.Intn(16))) // 2KB - 256MB
642 1 : opts.MemTableStopWritesThreshold = 2 + rng.Intn(5) // 2 - 5
643 1 : if rng.Intn(2) == 0 {
644 1 : opts.WALDir = "data/wal"
645 1 : }
646 :
647 : // Half the time enable WAL failover.
648 1 : if rng.Intn(2) == 0 {
649 1 : // Use 10x longer durations when writing directly to FS; we don't want
650 1 : // WAL failover to trigger excessively frequently.
651 1 : referenceDur := time.Millisecond
652 1 : if testOpts.useDisk {
653 0 : referenceDur *= 10
654 0 : }
655 :
656 1 : scaleDuration := func(d time.Duration, minFactor, maxFactor float64) time.Duration {
657 1 : return time.Duration(float64(d) * (minFactor + rng.Float64()*(maxFactor-minFactor)))
658 1 : }
659 1 : unhealthyThreshold := expRandDuration(rng, 3*referenceDur, time.Second)
660 1 : healthyThreshold := expRandDuration(rng, 3*referenceDur, time.Second)
661 1 : healthyInterval := scaleDuration(healthyThreshold, 1.0, 10.0) // Between 1-10x the healthy threshold
662 1 : opts.WALFailover = &pebble.WALFailoverOptions{
663 1 : Secondary: wal.Dir{FS: vfs.Default, Dirname: "data/wal_secondary"},
664 1 : FailoverOptions: wal.FailoverOptions{
665 1 : PrimaryDirProbeInterval: scaleDuration(healthyThreshold, 0.10, 0.50), // Between 10-50% of the healthy threshold
666 1 : HealthyProbeLatencyThreshold: healthyThreshold,
667 1 : HealthyInterval: healthyInterval,
668 1 : UnhealthySamplingInterval: scaleDuration(unhealthyThreshold, 0.10, 0.50), // Between 10-50% of the unhealthy threshold
669 1 : UnhealthyOperationLatencyThreshold: func() (time.Duration, bool) {
670 1 : return unhealthyThreshold, true
671 1 : },
672 : ElevatedWriteStallThresholdLag: expRandDuration(rng, 5*referenceDur, 2*time.Second),
673 : },
674 : }
675 : }
676 1 : if rng.Intn(4) == 0 {
677 1 : // Enable Writer parallelism for 25% of the random options. Setting
678 1 : // MaxWriterConcurrency to any value greater than or equal to 1 has the
679 1 : // same effect currently.
680 1 : opts.Experimental.MaxWriterConcurrency = 2
681 1 : opts.Experimental.ForceWriterParallelism = true
682 1 : }
683 1 : if rng.Intn(2) == 0 {
684 1 : opts.Experimental.DisableIngestAsFlushable = func() bool { return true }
685 : }
686 :
687 : // We either use no multilevel compactions, multilevel compactions with the
688 : // default (zero) additional propensity, or multilevel compactions with an
689 : // additional propensity to encourage more multilevel compactions than we
690 : // ohterwise would.
691 1 : switch rng.Intn(3) {
692 1 : case 0:
693 1 : opts.Experimental.MultiLevelCompactionHeuristic = pebble.NoMultiLevel{}
694 1 : case 1:
695 1 : opts.Experimental.MultiLevelCompactionHeuristic = pebble.WriteAmpHeuristic{}
696 1 : default:
697 1 : opts.Experimental.MultiLevelCompactionHeuristic = pebble.WriteAmpHeuristic{
698 1 : AddPropensity: rng.Float64() * float64(rng.Intn(3)), // [0,3.0)
699 1 : AllowL0: rng.Intn(4) == 1, // 25% of the time
700 1 : }
701 : }
702 :
703 1 : var lopts pebble.LevelOptions
704 1 : lopts.BlockRestartInterval = 1 + rng.Intn(64) // 1 - 64
705 1 : lopts.BlockSize = 1 << uint(rng.Intn(24)) // 1 - 16MB
706 1 : lopts.BlockSizeThreshold = 50 + rng.Intn(50) // 50 - 100
707 1 : lopts.IndexBlockSize = 1 << uint(rng.Intn(24)) // 1 - 16MB
708 1 : lopts.TargetFileSize = 1 << uint(rng.Intn(28)) // 1 - 256MB
709 1 :
710 1 : // We either use no bloom filter, the default filter, or a filter with
711 1 : // randomized bits-per-key setting. We zero out the Filters map. It'll get
712 1 : // repopulated on EnsureDefaults accordingly.
713 1 : opts.Filters = nil
714 1 : switch rng.Intn(3) {
715 1 : case 0:
716 1 : lopts.FilterPolicy = nil
717 1 : case 1:
718 1 : lopts.FilterPolicy = bloom.FilterPolicy(10)
719 1 : default:
720 1 : lopts.FilterPolicy = newTestingFilterPolicy(1 << rng.Intn(5))
721 : }
722 :
723 : // We use either no compression, snappy compression or zstd compression.
724 1 : switch rng.Intn(3) {
725 1 : case 0:
726 1 : lopts.Compression = func() sstable.Compression { return pebble.NoCompression }
727 1 : case 1:
728 1 : lopts.Compression = func() sstable.Compression { return pebble.ZstdCompression }
729 1 : default:
730 1 : lopts.Compression = func() sstable.Compression { return pebble.SnappyCompression }
731 : }
732 1 : opts.Levels = []pebble.LevelOptions{lopts}
733 1 :
734 1 : // Explicitly disable disk-backed FS's for the random configurations. The
735 1 : // single standard test configuration that uses a disk-backed FS is
736 1 : // sufficient.
737 1 : testOpts.useDisk = false
738 1 : testOpts.strictFS = rng.Intn(2) != 0 // Only relevant for MemFS.
739 1 : // 50% of the time, enable IO latency injection.
740 1 : if rng.Intn(2) == 0 {
741 1 : // Note: we want ioLatencyProbability to be at least 1e-10, otherwise it
742 1 : // might print as 0 when we stringify options.
743 1 : testOpts.ioLatencyProbability = 1e-10 + 0.01*rng.Float64() // 0-1%
744 1 : testOpts.ioLatencyMean = expRandDuration(rng, 3*time.Millisecond, time.Second)
745 1 : testOpts.ioLatencySeed = rng.Int63()
746 1 : }
747 1 : testOpts.Threads = rng.Intn(runtime.GOMAXPROCS(0)) + 1
748 1 : if testOpts.strictFS {
749 1 : opts.DisableWAL = false
750 1 : opts.FS = vfs.NewStrictMem()
751 1 : } else if !testOpts.useDisk {
752 1 : opts.FS = vfs.NewMem()
753 1 : }
754 : // Update the WALFailover's secondary to use the same FS. This isn't
755 : // strictly necessary (the WALFailover could use a separate FS), but it
756 : // ensures when we save a copy of the test state to disk, we include the
757 : // secondary's WALs.
758 1 : if opts.WALFailover != nil {
759 1 : opts.WALFailover.Secondary.FS = opts.FS
760 1 : }
761 1 : testOpts.ingestUsingApply = rng.Intn(2) != 0
762 1 : testOpts.deleteSized = rng.Intn(2) != 0
763 1 : testOpts.replaceSingleDelete = rng.Intn(2) != 0
764 1 : testOpts.disableBlockPropertyCollector = rng.Intn(2) == 1
765 1 : if testOpts.disableBlockPropertyCollector {
766 1 : testOpts.Opts.BlockPropertyCollectors = nil
767 1 : }
768 1 : testOpts.enableValueBlocks = rng.Intn(2) != 0
769 1 : if testOpts.enableValueBlocks {
770 1 : testOpts.Opts.Experimental.EnableValueBlocks = func() bool { return true }
771 : }
772 1 : testOpts.disableValueBlocksForIngestSSTables = rng.Intn(2) == 0
773 1 : testOpts.asyncApplyToDB = rng.Intn(2) != 0
774 1 : // 20% of time, enable shared storage.
775 1 : if rng.Intn(5) == 0 {
776 1 : testOpts.sharedStorageEnabled = true
777 1 : if testOpts.Opts.FormatMajorVersion < pebble.FormatMinForSharedObjects {
778 1 : testOpts.Opts.FormatMajorVersion = pebble.FormatMinForSharedObjects
779 1 : }
780 1 : testOpts.sharedStorageFS = remote.NewInMem()
781 1 : // If shared storage is enabled, pick between writing all files on shared
782 1 : // vs. lower levels only, 50% of the time.
783 1 : testOpts.Opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
784 1 : if rng.Intn(2) == 0 {
785 1 : testOpts.Opts.Experimental.CreateOnShared = remote.CreateOnSharedLower
786 1 : }
787 : // If shared storage is enabled, enable secondary cache 50% of time.
788 1 : if rng.Intn(2) == 0 {
789 1 : testOpts.secondaryCacheEnabled = true
790 1 : // TODO(josh): Randomize various secondary cache settings.
791 1 : testOpts.Opts.Experimental.SecondaryCacheSizeBytes = 1024 * 1024 * 32 // 32 MBs
792 1 : }
793 : // 50% of the time, enable shared replication.
794 1 : testOpts.useSharedReplicate = rng.Intn(2) == 0
795 : }
796 :
797 : // 50% of time, enable external storage.
798 1 : if rng.Intn(2) == 0 {
799 1 : testOpts.externalStorageEnabled = true
800 1 : if testOpts.Opts.FormatMajorVersion < pebble.FormatSyntheticPrefixSuffix {
801 1 : testOpts.Opts.FormatMajorVersion = pebble.FormatSyntheticPrefixSuffix
802 1 : }
803 1 : testOpts.externalStorageFS = remote.NewInMem()
804 : }
805 :
806 1 : testOpts.seedEFOS = rng.Uint64()
807 1 : testOpts.ingestSplit = rng.Intn(2) == 0
808 1 : opts.Experimental.IngestSplit = func() bool { return testOpts.ingestSplit }
809 1 : testOpts.useExcise = rng.Intn(2) == 0
810 1 : if testOpts.useExcise {
811 1 : if testOpts.Opts.FormatMajorVersion < pebble.FormatVirtualSSTables {
812 1 : testOpts.Opts.FormatMajorVersion = pebble.FormatVirtualSSTables
813 1 : }
814 : }
815 1 : testOpts.InitRemoteStorageFactory()
816 1 : testOpts.Opts.EnsureDefaults()
817 1 : return testOpts
818 : }
819 :
820 1 : func expRandDuration(rng *rand.Rand, meanDur, maxDur time.Duration) time.Duration {
821 1 : return min(maxDur, time.Duration(math.Round(rng.ExpFloat64()*float64(meanDur))))
822 1 : }
823 :
824 1 : func setupInitialState(dataDir string, testOpts *TestOptions) error {
825 1 : // Copy (vfs.Default,<initialStatePath>/data) to (testOpts.opts.FS,<dataDir>).
826 1 : ok, err := vfs.Clone(
827 1 : vfs.Default,
828 1 : testOpts.Opts.FS,
829 1 : vfs.Default.PathJoin(testOpts.initialStatePath, "data"),
830 1 : dataDir,
831 1 : vfs.CloneSync,
832 1 : vfs.CloneSkip(func(filename string) bool {
833 1 : // Skip the archive of historical files, any checkpoints created by
834 1 : // operations and files staged for ingest in tmp.
835 1 : b := filepath.Base(filename)
836 1 : return b == "archive" || b == "checkpoints" || b == "tmp"
837 1 : }))
838 1 : if err != nil {
839 0 : return err
840 1 : } else if !ok {
841 0 : return os.ErrNotExist
842 0 : }
843 :
844 : // Tests with wal_dir set store their WALs in a `wal` directory. The source
845 : // database (initialStatePath) could've had wal_dir set, or the current test
846 : // options (testOpts) could have wal_dir set, or both.
847 : //
848 : // If the test opts are not configured to use a WAL dir, we add the WAL dir
849 : // as a 'WAL recovery dir' so that we'll read any WALs in the directory in
850 : // Open.
851 1 : walRecoveryPath := testOpts.Opts.FS.PathJoin(dataDir, "wal")
852 1 : if testOpts.Opts.WALDir != "" {
853 0 : // If the test opts are configured to use a WAL dir, we add the data
854 0 : // directory itself as a 'WAL recovery dir' so that we'll read any WALs if
855 0 : // the previous test was writing them to the data directory.
856 0 : walRecoveryPath = dataDir
857 0 : }
858 1 : testOpts.Opts.WALRecoveryDirs = append(testOpts.Opts.WALRecoveryDirs, wal.Dir{
859 1 : FS: testOpts.Opts.FS,
860 1 : Dirname: walRecoveryPath,
861 1 : })
862 1 :
863 1 : // If the failover dir exists and the test opts are not configured to use
864 1 : // WAL failover, add the failover directory as a 'WAL recovery dir' in case
865 1 : // the previous test was configured to use failover.
866 1 : failoverDir := testOpts.Opts.FS.PathJoin(dataDir, "wal_secondary")
867 1 : if _, err := testOpts.Opts.FS.Stat(failoverDir); err == nil && testOpts.Opts.WALFailover == nil {
868 0 : testOpts.Opts.WALRecoveryDirs = append(testOpts.Opts.WALRecoveryDirs, wal.Dir{
869 0 : FS: testOpts.Opts.FS,
870 0 : Dirname: failoverDir,
871 0 : })
872 0 : }
873 1 : return nil
874 : }
875 :
876 : var blockPropertyCollectorConstructors = []func() pebble.BlockPropertyCollector{
877 : sstable.NewTestKeysBlockPropertyCollector,
878 : }
879 :
880 : // testingFilterPolicy is used to allow bloom filter policies with non-default
881 : // bits-per-key setting. It is necessary because the name of the production
882 : // filter policy is fixed (see bloom.FilterPolicy.Name()); we need to output a
883 : // custom policy name to the OPTIONS file that the test can then parse.
884 : type testingFilterPolicy struct {
885 : bloom.FilterPolicy
886 : }
887 :
888 : var _ pebble.FilterPolicy = (*testingFilterPolicy)(nil)
889 :
890 2 : func newTestingFilterPolicy(bitsPerKey int) *testingFilterPolicy {
891 2 : return &testingFilterPolicy{
892 2 : FilterPolicy: bloom.FilterPolicy(bitsPerKey),
893 2 : }
894 2 : }
895 :
896 : const testingFilterPolicyFmt = "testing_bloom_filter/bits_per_key=%d"
897 :
898 : // Name implements the pebble.FilterPolicy interface.
899 2 : func (t *testingFilterPolicy) Name() string {
900 2 : if t.FilterPolicy == 10 {
901 0 : return "rocksdb.BuiltinBloomFilter"
902 0 : }
903 2 : return fmt.Sprintf(testingFilterPolicyFmt, t.FilterPolicy)
904 : }
905 :
906 2 : func filterPolicyFromName(name string) (pebble.FilterPolicy, error) {
907 2 : switch name {
908 2 : case "none":
909 2 : return nil, nil
910 2 : case "rocksdb.BuiltinBloomFilter":
911 2 : return bloom.FilterPolicy(10), nil
912 : }
913 2 : var bitsPerKey int
914 2 : if _, err := fmt.Sscanf(name, testingFilterPolicyFmt, &bitsPerKey); err != nil {
915 0 : return nil, errors.Errorf("Invalid filter policy name '%s'", name)
916 0 : }
917 2 : return newTestingFilterPolicy(bitsPerKey), nil
918 : }
|