Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "path"
12 : "sort"
13 : "strings"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/pebble/vfs/errorfs"
19 : )
20 :
21 : type test struct {
22 : // The list of ops to execute. The ops refer to slots in the batches, iters,
23 : // and snapshots slices.
24 : ops []op
25 : opsWaitOn [][]int // op index -> op indexes
26 : opsDone []chan struct{} // op index -> done channel
27 : idx int
28 : dir string
29 : opts *pebble.Options
30 : testOpts *TestOptions
31 : writeOpts *pebble.WriteOptions
32 : tmpDir string
33 : // The DBs the test is run on.
34 : dbs []*pebble.DB
35 : // The slots for the batches, iterators, and snapshots. These are read and
36 : // written by the ops to pass state from one op to another.
37 : batches []*pebble.Batch
38 : iters []*retryableIter
39 : snapshots []readerCloser
40 : }
41 :
42 1 : func newTest(ops []op) *test {
43 1 : return &test{
44 1 : ops: ops,
45 1 : }
46 1 : }
47 :
48 1 : func (t *test) init(h *history, dir string, testOpts *TestOptions, numInstances int) error {
49 1 : t.dir = dir
50 1 : t.testOpts = testOpts
51 1 : t.writeOpts = pebble.NoSync
52 1 : if testOpts.strictFS {
53 1 : t.writeOpts = pebble.Sync
54 1 : }
55 1 : t.opts = testOpts.Opts.EnsureDefaults()
56 1 : t.opts.Logger = h
57 1 : lel := pebble.MakeLoggingEventListener(t.opts.Logger)
58 1 : t.opts.EventListener = &lel
59 1 : t.opts.DebugCheck = func(db *pebble.DB) error {
60 1 : // Wrap the ordinary DebugCheckLevels with retrying
61 1 : // of injected errors.
62 1 : return withRetries(func() error {
63 1 : return pebble.DebugCheckLevels(db)
64 1 : })
65 : }
66 1 : if numInstances < 1 {
67 1 : numInstances = 1
68 1 : }
69 :
70 1 : t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
71 1 :
72 1 : defer t.opts.Cache.Unref()
73 1 :
74 1 : // If an error occurs and we were using an in-memory FS, attempt to clone to
75 1 : // on-disk in order to allow post-mortem debugging. Note that always using
76 1 : // the on-disk FS isn't desirable because there is a large performance
77 1 : // difference between in-memory and on-disk which causes different code paths
78 1 : // and timings to be exercised.
79 1 : maybeExit := func(err error) {
80 1 : if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
81 1 : return
82 1 : }
83 0 : t.maybeSaveData()
84 0 : fmt.Fprintln(os.Stderr, err)
85 0 : os.Exit(1)
86 : }
87 :
88 : // Exit early on any error from a background operation.
89 1 : t.opts.EventListener.BackgroundError = func(err error) {
90 1 : t.opts.Logger.Infof("background error: %s", err)
91 1 : maybeExit(err)
92 1 : }
93 1 : t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
94 1 : t.opts.Logger.Infof("%s", info)
95 1 : maybeExit(info.Err)
96 1 : }
97 1 : t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
98 1 : t.opts.Logger.Infof("%s", info)
99 1 : if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
100 0 : maybeExit(info.Err)
101 0 : }
102 : }
103 1 : t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
104 1 : t.opts.Logger.Infof("%s", info)
105 1 : maybeExit(info.Err)
106 1 : }
107 1 : t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
108 1 : t.opts.Logger.Infof("%s", info)
109 1 : maybeExit(info.Err)
110 1 : }
111 1 : t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
112 1 : t.opts.Logger.Infof("%s", info)
113 1 : maybeExit(info.Err)
114 1 : }
115 1 : t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
116 1 : t.opts.Logger.Infof("%s", info)
117 1 : maybeExit(info.Err)
118 1 : }
119 1 : t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
120 1 : t.opts.Logger.Infof("%s", info)
121 1 : maybeExit(info.Err)
122 1 : }
123 1 : t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
124 1 : t.opts.Logger.Infof("%s", info)
125 1 : maybeExit(info.Err)
126 1 : }
127 :
128 1 : for i := range t.testOpts.CustomOpts {
129 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
130 0 : return err
131 0 : }
132 : }
133 :
134 1 : t.dbs = make([]*pebble.DB, numInstances)
135 1 : for i := range t.dbs {
136 1 : var db *pebble.DB
137 1 : var err error
138 1 : if len(t.dbs) > 1 {
139 1 : dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
140 1 : }
141 1 : err = withRetries(func() error {
142 1 : db, err = pebble.Open(dir, t.opts)
143 1 : return err
144 1 : })
145 1 : if err != nil {
146 0 : return err
147 0 : }
148 1 : t.dbs[i] = db
149 1 : h.log.Printf("// db%d.Open() %v", i+1, err)
150 1 :
151 1 : if t.testOpts.sharedStorageEnabled {
152 1 : err = withRetries(func() error {
153 1 : return db.SetCreatorID(uint64(i + 1))
154 1 : })
155 1 : if err != nil {
156 0 : return err
157 0 : }
158 1 : h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
159 : }
160 : }
161 :
162 1 : var err error
163 1 : t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
164 1 : if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
165 0 : return err
166 0 : }
167 1 : if t.testOpts.strictFS {
168 1 : // Sync the whole directory path for the tmpDir, since restartDB() is executed during
169 1 : // the test. That would reset MemFS to the synced state, which would make an unsynced
170 1 : // directory disappear in the middle of the test. It is the responsibility of the test
171 1 : // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
172 1 : // into Pebble.
173 1 : for {
174 1 : f, err := t.opts.FS.OpenDir(dir)
175 1 : if err != nil {
176 0 : return err
177 0 : }
178 1 : if err = f.Sync(); err != nil {
179 0 : return err
180 0 : }
181 1 : if err = f.Close(); err != nil {
182 0 : return err
183 0 : }
184 1 : if len(dir) == 1 {
185 1 : break
186 : }
187 1 : dir = t.opts.FS.PathDir(dir)
188 1 : // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
189 1 : if len(dir) == 1 {
190 1 : dir = "/"
191 1 : }
192 : }
193 : }
194 :
195 1 : return nil
196 : }
197 :
198 1 : func (t *test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
199 1 : db := t.getDB(dbID)
200 1 : return db.FormatMajorVersion() >= fmv
201 1 : }
202 :
203 1 : func (t *test) restartDB(dbID objID) error {
204 1 : db := t.getDB(dbID)
205 1 : if !t.testOpts.strictFS {
206 1 : return nil
207 1 : }
208 1 : t.opts.Cache.Ref()
209 1 : // The fs isn't necessarily a MemFS.
210 1 : fs, ok := vfs.Root(t.opts.FS).(*vfs.MemFS)
211 1 : if ok {
212 0 : fs.SetIgnoreSyncs(true)
213 0 : }
214 1 : if err := db.Close(); err != nil {
215 0 : return err
216 0 : }
217 : // Release any resources held by custom options. This may be used, for
218 : // example, by the encryption-at-rest custom option (within the Cockroach
219 : // repository) to close the file registry.
220 1 : for i := range t.testOpts.CustomOpts {
221 0 : if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
222 0 : return err
223 0 : }
224 : }
225 1 : if ok {
226 0 : fs.ResetToSyncedState()
227 0 : fs.SetIgnoreSyncs(false)
228 0 : }
229 :
230 : // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
231 : // are well defined within the context of retries.
232 1 : err := withRetries(func() (err error) {
233 1 : // Reacquire any resources required by custom options. This may be used, for
234 1 : // example, by the encryption-at-rest custom option (within the Cockroach
235 1 : // repository) to reopen the file registry.
236 1 : for i := range t.testOpts.CustomOpts {
237 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
238 0 : return err
239 0 : }
240 : }
241 1 : dir := t.dir
242 1 : if len(t.dbs) > 1 {
243 1 : dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
244 1 : }
245 1 : t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
246 1 : if err != nil {
247 0 : return err
248 0 : }
249 1 : return err
250 : })
251 1 : t.opts.Cache.Unref()
252 1 : return err
253 : }
254 :
255 : // If an in-memory FS is being used, save the contents to disk.
256 0 : func (t *test) maybeSaveData() {
257 0 : rootFS := vfs.Root(t.opts.FS)
258 0 : if rootFS == vfs.Default {
259 0 : return
260 0 : }
261 0 : _ = os.RemoveAll(t.dir)
262 0 : if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
263 0 : t.opts.Logger.Infof("unable to clone: %s: %v", t.dir, err)
264 0 : }
265 : }
266 :
267 1 : func (t *test) step(h *history) bool {
268 1 : if t.idx >= len(t.ops) {
269 1 : return false
270 1 : }
271 1 : t.ops[t.idx].run(t, h.recorder(-1 /* thread */, t.idx))
272 1 : t.idx++
273 1 : return true
274 : }
275 :
276 1 : func (t *test) setBatch(id objID, b *pebble.Batch) {
277 1 : if id.tag() != batchTag {
278 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
279 : }
280 1 : t.batches[id.slot()] = b
281 : }
282 :
283 1 : func (t *test) setIter(id objID, i *pebble.Iterator) {
284 1 : if id.tag() != iterTag {
285 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
286 : }
287 1 : t.iters[id.slot()] = &retryableIter{
288 1 : iter: i,
289 1 : lastKey: nil,
290 1 : }
291 : }
292 :
293 : type readerCloser interface {
294 : pebble.Reader
295 : io.Closer
296 : }
297 :
298 1 : func (t *test) setSnapshot(id objID, s readerCloser) {
299 1 : if id.tag() != snapTag {
300 0 : panic(fmt.Sprintf("invalid snapshot ID: %s", id))
301 : }
302 1 : t.snapshots[id.slot()] = s
303 : }
304 :
305 1 : func (t *test) clearObj(id objID) {
306 1 : switch id.tag() {
307 1 : case dbTag:
308 1 : t.dbs[id.slot()-1] = nil
309 1 : case batchTag:
310 1 : t.batches[id.slot()] = nil
311 1 : case iterTag:
312 1 : t.iters[id.slot()] = nil
313 1 : case snapTag:
314 1 : t.snapshots[id.slot()] = nil
315 : }
316 : }
317 :
318 1 : func (t *test) getBatch(id objID) *pebble.Batch {
319 1 : if id.tag() != batchTag {
320 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
321 : }
322 1 : return t.batches[id.slot()]
323 : }
324 :
325 1 : func (t *test) getCloser(id objID) io.Closer {
326 1 : switch id.tag() {
327 1 : case dbTag:
328 1 : return t.dbs[id.slot()-1]
329 1 : case batchTag:
330 1 : return t.batches[id.slot()]
331 1 : case iterTag:
332 1 : return t.iters[id.slot()]
333 1 : case snapTag:
334 1 : return t.snapshots[id.slot()]
335 : }
336 0 : panic(fmt.Sprintf("cannot close ID: %s", id))
337 : }
338 :
339 1 : func (t *test) getIter(id objID) *retryableIter {
340 1 : if id.tag() != iterTag {
341 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
342 : }
343 1 : return t.iters[id.slot()]
344 : }
345 :
346 1 : func (t *test) getReader(id objID) pebble.Reader {
347 1 : switch id.tag() {
348 1 : case dbTag:
349 1 : return t.dbs[id.slot()-1]
350 1 : case batchTag:
351 1 : return t.batches[id.slot()]
352 1 : case snapTag:
353 1 : return t.snapshots[id.slot()]
354 : }
355 0 : panic(fmt.Sprintf("invalid reader ID: %s", id))
356 : }
357 :
358 1 : func (t *test) getWriter(id objID) pebble.Writer {
359 1 : switch id.tag() {
360 1 : case dbTag:
361 1 : return t.dbs[id.slot()-1]
362 1 : case batchTag:
363 1 : return t.batches[id.slot()]
364 : }
365 0 : panic(fmt.Sprintf("invalid writer ID: %s", id))
366 : }
367 :
368 1 : func (t *test) getDB(id objID) *pebble.DB {
369 1 : switch id.tag() {
370 1 : case dbTag:
371 1 : return t.dbs[id.slot()-1]
372 0 : default:
373 0 : panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
374 : }
375 : }
376 :
377 : // Compute the synchronization points between operations. When operating
378 : // with more than 1 thread, operations must synchronize access to shared
379 : // objects. Compute two slices the same length as ops.
380 : //
381 : // opsWaitOn: the value v at index i indicates that operation i must wait
382 : // for the operation at index v to finish before it may run. NB: v < i
383 : //
384 : // opsDone: the channel at index i must be closed when the operation at index i
385 : // completes. This slice is sparse. Operations that are never used as
386 : // synchronization points may have a nil channel.
387 1 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
388 1 : opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
389 1 : opsWaitOn = make([][]int, len(ops)) // operation index -> operation index
390 1 : lastOpReference := make(map[objID]int) // objID -> operation index
391 1 : for i, o := range ops {
392 1 : // Find the last operation that involved the same receiver object. We at
393 1 : // least need to wait on that operation.
394 1 : receiver := o.receiver()
395 1 : waitIndex, ok := lastOpReference[receiver]
396 1 : lastOpReference[receiver] = i
397 1 : if !ok {
398 1 : // Only valid for i=0. For all other operations, the receiver should
399 1 : // have been referenced by some other operation before it's used as
400 1 : // a receiver.
401 1 : if i != 0 && receiver.tag() != dbTag {
402 0 : panic(fmt.Sprintf("op %s on receiver %s; first reference of %s", ops[i].String(), receiver, receiver))
403 : }
404 : // The initOp is a little special. We do want to store the objects it's
405 : // syncing on, in `lastOpReference`.
406 1 : if i != 0 {
407 0 : continue
408 : }
409 : }
410 :
411 : // The last operation that referenced `receiver` is the one at index
412 : // `waitIndex`. All operations with the same receiver are performed on
413 : // the same thread. We only need to synchronize on the operation at
414 : // `waitIndex` if `receiver` isn't also the receiver on that operation
415 : // too.
416 1 : if ops[waitIndex].receiver() != receiver {
417 1 : opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
418 1 : }
419 :
420 : // In additional to synchronizing on the operation's receiver operation,
421 : // we may need to synchronize on additional objects. For example,
422 : // batch0.Commit() must synchronize its receiver, batch0, but also on
423 : // the DB since it mutates database state.
424 1 : for _, syncObjID := range o.syncObjs() {
425 1 : if vi, vok := lastOpReference[syncObjID]; vok {
426 1 : opsWaitOn[i] = append(opsWaitOn[i], vi)
427 1 : }
428 1 : lastOpReference[syncObjID] = i
429 : }
430 :
431 1 : waitIndexes := opsWaitOn[i]
432 1 : sort.Ints(waitIndexes)
433 1 : for _, waitIndex := range waitIndexes {
434 1 : // If this is the first operation that must wait on the operation at
435 1 : // `waitIndex`, then there will be no channel for the operation yet.
436 1 : // Create one.
437 1 : if opsDone[waitIndex] == nil {
438 1 : opsDone[waitIndex] = make(chan struct{})
439 1 : }
440 : }
441 : }
442 1 : return opsWaitOn, opsDone
443 : }
|