Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "os"
12 : "path"
13 : "sort"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble"
18 : "github.com/cockroachdb/pebble/vfs"
19 : "github.com/cockroachdb/pebble/vfs/errorfs"
20 : )
21 :
22 : // New constructs a new metamorphic test that runs the provided operations
23 : // against a database using the provided TestOptions and outputs the history of
24 : // events to an io.Writer.
25 : //
26 : // dir specifies the path within opts.Opts.FS to open the database.
27 1 : func New(ops Ops, opts *TestOptions, dir string, w io.Writer) (*Test, error) {
28 1 : t := newTest(ops)
29 1 : h := newHistory(nil /* failRegexp */, w)
30 1 : if err := t.init(h, dir, opts, 1 /* num instances */); err != nil {
31 0 : return nil, err
32 0 : }
33 1 : return t, nil
34 : }
35 :
36 : // A Test configures an individual test run consisting of a set of operations,
37 : // TestOptions configuring the target database to which the operations should be
38 : // applied, and a sink for outputting test history.
39 : type Test struct {
40 : // The list of ops to execute. The ops refer to slots in the batches, iters,
41 : // and snapshots slices.
42 : ops []op
43 : opsWaitOn [][]int // op index -> op indexes
44 : opsDone []chan struct{} // op index -> done channel
45 : idx int
46 : dir string
47 : h *history
48 : opts *pebble.Options
49 : testOpts *TestOptions
50 : writeOpts *pebble.WriteOptions
51 : tmpDir string
52 : // The DBs the test is run on.
53 : dbs []*pebble.DB
54 : // The slots for the batches, iterators, and snapshots. These are read and
55 : // written by the ops to pass state from one op to another.
56 : batches []*pebble.Batch
57 : iters []*retryableIter
58 : snapshots []readerCloser
59 : }
60 :
61 1 : func newTest(ops []op) *Test {
62 1 : return &Test{
63 1 : ops: ops,
64 1 : }
65 1 : }
66 :
67 1 : func (t *Test) init(h *history, dir string, testOpts *TestOptions, numInstances int) error {
68 1 : t.dir = dir
69 1 : t.h = h
70 1 : t.testOpts = testOpts
71 1 : t.writeOpts = pebble.NoSync
72 1 : if testOpts.strictFS {
73 0 : t.writeOpts = pebble.Sync
74 1 : } else {
75 1 : t.writeOpts = pebble.NoSync
76 1 : }
77 1 : testOpts.Opts.WithFSDefaults()
78 1 : t.opts = testOpts.Opts.EnsureDefaults()
79 1 : t.opts.Logger = h
80 1 : lel := pebble.MakeLoggingEventListener(t.opts.Logger)
81 1 : t.opts.EventListener = &lel
82 1 : // If the test options set a DebugCheck func, wrap it with retrying of
83 1 : // retriable errors (according to the test's retry policy).
84 1 : if debugCheck := t.opts.DebugCheck; debugCheck != nil {
85 1 : t.opts.DebugCheck = func(db *pebble.DB) error {
86 1 : return t.withRetries(func() error { return debugCheck(db) })
87 : }
88 : }
89 1 : if numInstances < 1 {
90 1 : numInstances = 1
91 1 : }
92 :
93 1 : t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
94 1 :
95 1 : defer t.opts.Cache.Unref()
96 1 :
97 1 : // If an error occurs and we were using an in-memory FS, attempt to clone to
98 1 : // on-disk in order to allow post-mortem debugging. Note that always using
99 1 : // the on-disk FS isn't desirable because there is a large performance
100 1 : // difference between in-memory and on-disk which causes different code paths
101 1 : // and timings to be exercised.
102 1 : maybeExit := func(err error) {
103 1 : if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
104 1 : return
105 1 : }
106 0 : t.maybeSaveData()
107 0 : fmt.Fprintln(os.Stderr, err)
108 0 : os.Exit(1)
109 : }
110 :
111 : // Exit early on any error from a background operation.
112 1 : t.opts.EventListener.BackgroundError = func(err error) {
113 0 : t.opts.Logger.Infof("background error: %s", err)
114 0 : maybeExit(err)
115 0 : }
116 1 : t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
117 1 : t.opts.Logger.Infof("%s", info)
118 1 : maybeExit(info.Err)
119 1 : }
120 1 : t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
121 1 : t.opts.Logger.Infof("%s", info)
122 1 : if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
123 0 : maybeExit(info.Err)
124 0 : }
125 : }
126 1 : t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
127 1 : t.opts.Logger.Infof("%s", info)
128 1 : maybeExit(info.Err)
129 1 : }
130 1 : t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
131 1 : t.opts.Logger.Infof("%s", info)
132 1 : maybeExit(info.Err)
133 1 : }
134 1 : t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
135 1 : t.opts.Logger.Infof("%s", info)
136 1 : maybeExit(info.Err)
137 1 : }
138 1 : t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
139 1 : t.opts.Logger.Infof("%s", info)
140 1 : maybeExit(info.Err)
141 1 : }
142 1 : t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
143 1 : t.opts.Logger.Infof("%s", info)
144 1 : maybeExit(info.Err)
145 1 : }
146 1 : t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
147 1 : t.opts.Logger.Infof("%s", info)
148 1 : maybeExit(info.Err)
149 1 : }
150 :
151 1 : for i := range t.testOpts.CustomOpts {
152 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
153 0 : return err
154 0 : }
155 : }
156 :
157 1 : t.dbs = make([]*pebble.DB, numInstances)
158 1 : for i := range t.dbs {
159 1 : var db *pebble.DB
160 1 : var err error
161 1 : if len(t.dbs) > 1 {
162 0 : dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
163 0 : }
164 1 : err = t.withRetries(func() error {
165 1 : db, err = pebble.Open(dir, t.opts)
166 1 : return err
167 1 : })
168 1 : if err != nil {
169 0 : return err
170 0 : }
171 1 : t.dbs[i] = db
172 1 : h.log.Printf("// db%d.Open() %v", i+1, err)
173 1 :
174 1 : if t.testOpts.sharedStorageEnabled {
175 0 : err = t.withRetries(func() error {
176 0 : return db.SetCreatorID(uint64(i + 1))
177 0 : })
178 0 : if err != nil {
179 0 : return err
180 0 : }
181 0 : h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
182 : }
183 : }
184 :
185 1 : var err error
186 1 : t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
187 1 : if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
188 0 : return err
189 0 : }
190 1 : if t.testOpts.strictFS {
191 0 : // Sync the whole directory path for the tmpDir, since restartDB() is executed during
192 0 : // the test. That would reset MemFS to the synced state, which would make an unsynced
193 0 : // directory disappear in the middle of the test. It is the responsibility of the test
194 0 : // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
195 0 : // into Pebble.
196 0 : for {
197 0 : f, err := t.opts.FS.OpenDir(dir)
198 0 : if err != nil {
199 0 : return err
200 0 : }
201 0 : if err = f.Sync(); err != nil {
202 0 : return err
203 0 : }
204 0 : if err = f.Close(); err != nil {
205 0 : return err
206 0 : }
207 0 : if len(dir) == 1 {
208 0 : break
209 : }
210 0 : dir = t.opts.FS.PathDir(dir)
211 0 : // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
212 0 : if len(dir) == 1 {
213 0 : dir = "/"
214 0 : }
215 : }
216 : }
217 :
218 1 : return nil
219 : }
220 :
221 1 : func (t *Test) withRetries(fn func() error) error {
222 1 : return withRetries(fn, t.testOpts.RetryPolicy)
223 1 : }
224 :
225 0 : func (t *Test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
226 0 : db := t.getDB(dbID)
227 0 : return db.FormatMajorVersion() >= fmv
228 0 : }
229 :
230 1 : func (t *Test) restartDB(dbID objID) error {
231 1 : db := t.getDB(dbID)
232 1 : if !t.testOpts.strictFS {
233 1 : return nil
234 1 : }
235 0 : t.opts.Cache.Ref()
236 0 : // The fs isn't necessarily a MemFS.
237 0 : fs, ok := vfs.Root(t.opts.FS).(*vfs.MemFS)
238 0 : if ok {
239 0 : fs.SetIgnoreSyncs(true)
240 0 : }
241 0 : if err := db.Close(); err != nil {
242 0 : return err
243 0 : }
244 : // Release any resources held by custom options. This may be used, for
245 : // example, by the encryption-at-rest custom option (within the Cockroach
246 : // repository) to close the file registry.
247 0 : for i := range t.testOpts.CustomOpts {
248 0 : if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
249 0 : return err
250 0 : }
251 : }
252 0 : if ok {
253 0 : fs.ResetToSyncedState()
254 0 : fs.SetIgnoreSyncs(false)
255 0 : }
256 :
257 : // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
258 : // are well defined within the context of retries.
259 0 : err := t.withRetries(func() (err error) {
260 0 : // Reacquire any resources required by custom options. This may be used, for
261 0 : // example, by the encryption-at-rest custom option (within the Cockroach
262 0 : // repository) to reopen the file registry.
263 0 : for i := range t.testOpts.CustomOpts {
264 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
265 0 : return err
266 0 : }
267 : }
268 0 : dir := t.dir
269 0 : if len(t.dbs) > 1 {
270 0 : dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
271 0 : }
272 0 : t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
273 0 : if err != nil {
274 0 : return err
275 0 : }
276 0 : return err
277 : })
278 0 : t.opts.Cache.Unref()
279 0 : return err
280 : }
281 :
282 1 : func (t *Test) maybeSaveDataInternal() error {
283 1 : rootFS := vfs.Root(t.opts.FS)
284 1 : if rootFS == vfs.Default {
285 0 : return nil
286 0 : }
287 1 : if err := os.RemoveAll(t.dir); err != nil {
288 0 : return err
289 0 : }
290 1 : if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
291 0 : return err
292 0 : }
293 1 : if t.testOpts.sharedStorageEnabled {
294 0 : fs := t.testOpts.sharedStorageFS
295 0 : outputDir := vfs.Default.PathJoin(t.dir, "shared", string(t.testOpts.Opts.Experimental.CreateOnSharedLocator))
296 0 : vfs.Default.MkdirAll(outputDir, 0755)
297 0 : objs, err := fs.List("", "")
298 0 : if err != nil {
299 0 : return err
300 0 : }
301 0 : for i := range objs {
302 0 : reader, readSize, err := fs.ReadObject(context.TODO(), objs[i])
303 0 : if err != nil {
304 0 : return err
305 0 : }
306 0 : buf := make([]byte, readSize)
307 0 : if err := reader.ReadAt(context.TODO(), buf, 0); err != nil {
308 0 : return err
309 0 : }
310 0 : outputPath := vfs.Default.PathJoin(outputDir, objs[i])
311 0 : outputFile, err := vfs.Default.Create(outputPath)
312 0 : if err != nil {
313 0 : return err
314 0 : }
315 0 : if _, err := outputFile.Write(buf); err != nil {
316 0 : outputFile.Close()
317 0 : return err
318 0 : }
319 0 : if err := outputFile.Close(); err != nil {
320 0 : return err
321 0 : }
322 : }
323 : }
324 1 : return nil
325 : }
326 :
327 : // If an in-memory FS is being used, save the contents to disk.
328 1 : func (t *Test) maybeSaveData() {
329 1 : if err := t.maybeSaveDataInternal(); err != nil {
330 0 : t.opts.Logger.Infof("unable to save data: %s: %v", t.dir, err)
331 0 : }
332 : }
333 :
334 : // Step runs one single operation, returning whether or not there are additional
335 : // operations, the operation's output and an error if any occurred while running
336 : // the operation.
337 : //
338 : // Step may be used in stead of Execute to advance a test one operation at a
339 : // time.
340 1 : func (t *Test) Step() (more bool, operationOutput string, err error) {
341 1 : more = t.step(t.h, func(format string, args ...interface{}) {
342 1 : operationOutput = fmt.Sprintf(format, args...)
343 1 : })
344 1 : err = t.h.Error()
345 1 : return more, operationOutput, err
346 : }
347 :
348 1 : func (t *Test) step(h *history, optionalRecordf func(string, ...interface{})) bool {
349 1 : if t.idx >= len(t.ops) {
350 1 : return false
351 1 : }
352 1 : t.ops[t.idx].run(t, h.recorder(-1 /* thread */, t.idx, optionalRecordf))
353 1 : t.idx++
354 1 : return true
355 : }
356 :
357 1 : func (t *Test) setBatch(id objID, b *pebble.Batch) {
358 1 : if id.tag() != batchTag {
359 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
360 : }
361 1 : t.batches[id.slot()] = b
362 : }
363 :
364 1 : func (t *Test) setIter(id objID, i *pebble.Iterator) {
365 1 : if id.tag() != iterTag {
366 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
367 : }
368 1 : t.iters[id.slot()] = &retryableIter{
369 1 : iter: i,
370 1 : lastKey: nil,
371 1 : needRetry: t.testOpts.RetryPolicy,
372 1 : }
373 : }
374 :
375 : type readerCloser interface {
376 : pebble.Reader
377 : io.Closer
378 : }
379 :
380 1 : func (t *Test) setSnapshot(id objID, s readerCloser) {
381 1 : if id.tag() != snapTag {
382 0 : panic(fmt.Sprintf("invalid snapshot ID: %s", id))
383 : }
384 1 : t.snapshots[id.slot()] = s
385 : }
386 :
387 1 : func (t *Test) clearObj(id objID) {
388 1 : switch id.tag() {
389 1 : case dbTag:
390 1 : t.dbs[id.slot()-1] = nil
391 1 : case batchTag:
392 1 : t.batches[id.slot()] = nil
393 1 : case iterTag:
394 1 : t.iters[id.slot()] = nil
395 1 : case snapTag:
396 1 : t.snapshots[id.slot()] = nil
397 : }
398 : }
399 :
400 1 : func (t *Test) getBatch(id objID) *pebble.Batch {
401 1 : if id.tag() != batchTag || t.batches[id.slot()] == nil {
402 0 : panic(fmt.Sprintf("metamorphic test internal error: invalid batch ID: %s", id))
403 : }
404 1 : return t.batches[id.slot()]
405 : }
406 :
407 1 : func (t *Test) getCloser(id objID) io.Closer {
408 1 : switch id.tag() {
409 1 : case dbTag:
410 1 : return t.dbs[id.slot()-1]
411 1 : case batchTag:
412 1 : return t.batches[id.slot()]
413 1 : case iterTag:
414 1 : return t.iters[id.slot()]
415 1 : case snapTag:
416 1 : return t.snapshots[id.slot()]
417 : }
418 0 : panic(fmt.Sprintf("cannot close ID: %s", id))
419 : }
420 :
421 1 : func (t *Test) getIter(id objID) *retryableIter {
422 1 : if id.tag() != iterTag {
423 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
424 : }
425 1 : return t.iters[id.slot()]
426 : }
427 :
428 1 : func (t *Test) getReader(id objID) pebble.Reader {
429 1 : switch id.tag() {
430 1 : case dbTag:
431 1 : return t.dbs[id.slot()-1]
432 1 : case batchTag:
433 1 : return t.batches[id.slot()]
434 1 : case snapTag:
435 1 : return t.snapshots[id.slot()]
436 : }
437 0 : panic(fmt.Sprintf("invalid reader ID: %s", id))
438 : }
439 :
440 1 : func (t *Test) getWriter(id objID) pebble.Writer {
441 1 : switch id.tag() {
442 1 : case dbTag:
443 1 : return t.dbs[id.slot()-1]
444 1 : case batchTag:
445 1 : return t.batches[id.slot()]
446 : }
447 0 : panic(fmt.Sprintf("invalid writer ID: %s", id))
448 : }
449 :
450 1 : func (t *Test) getDB(id objID) *pebble.DB {
451 1 : switch id.tag() {
452 1 : case dbTag:
453 1 : return t.dbs[id.slot()-1]
454 0 : default:
455 0 : panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
456 : }
457 : }
458 :
459 : // Compute the synchronization points between operations. When operating
460 : // with more than 1 thread, operations must synchronize access to shared
461 : // objects. Compute two slices the same length as ops.
462 : //
463 : // opsWaitOn: the value v at index i indicates that operation i must wait
464 : // for the operation at index v to finish before it may run. NB: v < i
465 : //
466 : // opsDone: the channel at index i must be closed when the operation at index i
467 : // completes. This slice is sparse. Operations that are never used as
468 : // synchronization points may have a nil channel.
469 1 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
470 1 : opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
471 1 : opsWaitOn = make([][]int, len(ops)) // operation index -> operation index
472 1 : lastOpReference := make(map[objID]int) // objID -> operation index
473 1 : for i, o := range ops {
474 1 : // Find the last operation that involved the same receiver object. We at
475 1 : // least need to wait on that operation.
476 1 : receiver := o.receiver()
477 1 : waitIndex, ok := lastOpReference[receiver]
478 1 : lastOpReference[receiver] = i
479 1 : if !ok {
480 1 : // Only valid for i=0. For all other operations, the receiver should
481 1 : // have been referenced by some other operation before it's used as
482 1 : // a receiver.
483 1 : if i != 0 && receiver.tag() != dbTag {
484 0 : panic(fmt.Sprintf("op %s on receiver %s; first reference of %s", ops[i].String(), receiver, receiver))
485 : }
486 : // The initOp is a little special. We do want to store the objects it's
487 : // syncing on, in `lastOpReference`.
488 1 : if i != 0 {
489 0 : continue
490 : }
491 : }
492 :
493 : // The last operation that referenced `receiver` is the one at index
494 : // `waitIndex`. All operations with the same receiver are performed on
495 : // the same thread. We only need to synchronize on the operation at
496 : // `waitIndex` if `receiver` isn't also the receiver on that operation
497 : // too.
498 1 : if ops[waitIndex].receiver() != receiver {
499 1 : opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
500 1 : }
501 :
502 : // In additional to synchronizing on the operation's receiver operation,
503 : // we may need to synchronize on additional objects. For example,
504 : // batch0.Commit() must synchronize its receiver, batch0, but also on
505 : // the DB since it mutates database state.
506 1 : for _, syncObjID := range o.syncObjs() {
507 1 : if vi, vok := lastOpReference[syncObjID]; vok {
508 1 : opsWaitOn[i] = append(opsWaitOn[i], vi)
509 1 : }
510 1 : lastOpReference[syncObjID] = i
511 : }
512 :
513 1 : waitIndexes := opsWaitOn[i]
514 1 : sort.Ints(waitIndexes)
515 1 : for _, waitIndex := range waitIndexes {
516 1 : // If this is the first operation that must wait on the operation at
517 1 : // `waitIndex`, then there will be no channel for the operation yet.
518 1 : // Create one.
519 1 : if opsDone[waitIndex] == nil {
520 1 : opsDone[waitIndex] = make(chan struct{})
521 1 : }
522 : }
523 : }
524 1 : return opsWaitOn, opsDone
525 : }
|