Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "fmt"
9 : "io"
10 : "os"
11 : "path"
12 : "sort"
13 : "strings"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble"
17 : "github.com/cockroachdb/pebble/vfs"
18 : "github.com/cockroachdb/pebble/vfs/errorfs"
19 : )
20 :
21 : type test struct {
22 : // The list of ops to execute. The ops refer to slots in the batches, iters,
23 : // and snapshots slices.
24 : ops []op
25 : opsWaitOn [][]int // op index -> op indexes
26 : opsDone []chan struct{} // op index -> done channel
27 : idx int
28 : dir string
29 : opts *pebble.Options
30 : testOpts *TestOptions
31 : writeOpts *pebble.WriteOptions
32 : tmpDir string
33 : // The DBs the test is run on.
34 : dbs []*pebble.DB
35 : // The slots for the batches, iterators, and snapshots. These are read and
36 : // written by the ops to pass state from one op to another.
37 : batches []*pebble.Batch
38 : iters []*retryableIter
39 : snapshots []readerCloser
40 : }
41 :
42 2 : func newTest(ops []op) *test {
43 2 : return &test{
44 2 : ops: ops,
45 2 : }
46 2 : }
47 :
48 2 : func (t *test) init(h *history, dir string, testOpts *TestOptions, numInstances int) error {
49 2 : t.dir = dir
50 2 : t.testOpts = testOpts
51 2 : t.writeOpts = pebble.NoSync
52 2 : if testOpts.strictFS {
53 1 : t.writeOpts = pebble.Sync
54 1 : }
55 2 : t.opts = testOpts.Opts.EnsureDefaults()
56 2 : t.opts.Logger = h
57 2 : lel := pebble.MakeLoggingEventListener(t.opts.Logger)
58 2 : t.opts.EventListener = &lel
59 2 : t.opts.DebugCheck = func(db *pebble.DB) error {
60 2 : // Wrap the ordinary DebugCheckLevels with retrying
61 2 : // of injected errors.
62 2 : return withRetries(func() error {
63 2 : return pebble.DebugCheckLevels(db)
64 2 : })
65 : }
66 2 : if numInstances < 1 {
67 2 : numInstances = 1
68 2 : }
69 :
70 2 : t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
71 2 :
72 2 : defer t.opts.Cache.Unref()
73 2 :
74 2 : // If an error occurs and we were using an in-memory FS, attempt to clone to
75 2 : // on-disk in order to allow post-mortem debugging. Note that always using
76 2 : // the on-disk FS isn't desirable because there is a large performance
77 2 : // difference between in-memory and on-disk which causes different code paths
78 2 : // and timings to be exercised.
79 2 : maybeExit := func(err error) {
80 2 : if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
81 2 : return
82 2 : }
83 0 : t.maybeSaveData()
84 0 : fmt.Fprintln(os.Stderr, err)
85 0 : os.Exit(1)
86 : }
87 :
88 : // Exit early on any error from a background operation.
89 2 : t.opts.EventListener.BackgroundError = func(err error) {
90 1 : t.opts.Logger.Infof("background error: %s", err)
91 1 : maybeExit(err)
92 1 : }
93 2 : t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
94 2 : t.opts.Logger.Infof("%s", info)
95 2 : maybeExit(info.Err)
96 2 : }
97 2 : t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
98 2 : t.opts.Logger.Infof("%s", info)
99 2 : if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
100 0 : maybeExit(info.Err)
101 0 : }
102 : }
103 2 : t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
104 2 : t.opts.Logger.Infof("%s", info)
105 2 : maybeExit(info.Err)
106 2 : }
107 2 : t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
108 1 : t.opts.Logger.Infof("%s", info)
109 1 : maybeExit(info.Err)
110 1 : }
111 2 : t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
112 2 : t.opts.Logger.Infof("%s", info)
113 2 : maybeExit(info.Err)
114 2 : }
115 2 : t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
116 2 : t.opts.Logger.Infof("%s", info)
117 2 : maybeExit(info.Err)
118 2 : }
119 2 : t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
120 2 : t.opts.Logger.Infof("%s", info)
121 2 : maybeExit(info.Err)
122 2 : }
123 2 : t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
124 2 : t.opts.Logger.Infof("%s", info)
125 2 : maybeExit(info.Err)
126 2 : }
127 :
128 2 : for i := range t.testOpts.CustomOpts {
129 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
130 0 : return err
131 0 : }
132 : }
133 :
134 2 : t.dbs = make([]*pebble.DB, numInstances)
135 2 : for i := range t.dbs {
136 2 : var db *pebble.DB
137 2 : var err error
138 2 : if len(t.dbs) > 1 {
139 1 : dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
140 1 : }
141 2 : err = withRetries(func() error {
142 2 : db, err = pebble.Open(dir, t.opts)
143 2 : return err
144 2 : })
145 2 : if err != nil {
146 0 : return err
147 0 : }
148 2 : t.dbs[i] = db
149 2 : h.log.Printf("// db%d.Open() %v", i+1, err)
150 2 :
151 2 : if t.testOpts.sharedStorageEnabled {
152 1 : err = withRetries(func() error {
153 1 : return db.SetCreatorID(uint64(i + 1))
154 1 : })
155 1 : if err != nil {
156 0 : return err
157 0 : }
158 1 : h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
159 : }
160 : }
161 :
162 2 : var err error
163 2 : t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
164 2 : if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
165 0 : return err
166 0 : }
167 2 : if t.testOpts.strictFS {
168 1 : // Sync the whole directory path for the tmpDir, since restartDB() is executed during
169 1 : // the test. That would reset MemFS to the synced state, which would make an unsynced
170 1 : // directory disappear in the middle of the test. It is the responsibility of the test
171 1 : // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
172 1 : // into Pebble.
173 1 : for {
174 1 : f, err := t.opts.FS.OpenDir(dir)
175 1 : if err != nil {
176 0 : return err
177 0 : }
178 1 : if err = f.Sync(); err != nil {
179 0 : return err
180 0 : }
181 1 : if err = f.Close(); err != nil {
182 0 : return err
183 0 : }
184 1 : if len(dir) == 1 {
185 1 : break
186 : }
187 1 : dir = t.opts.FS.PathDir(dir)
188 1 : // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
189 1 : if len(dir) == 1 {
190 1 : dir = "/"
191 1 : }
192 : }
193 : }
194 :
195 2 : return nil
196 : }
197 :
198 1 : func (t *test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
199 1 : db := t.getDB(dbID)
200 1 : return db.FormatMajorVersion() >= fmv
201 1 : }
202 :
203 2 : func (t *test) restartDB(dbID objID) error {
204 2 : db := t.getDB(dbID)
205 2 : if !t.testOpts.strictFS {
206 2 : return nil
207 2 : }
208 1 : t.opts.Cache.Ref()
209 1 : // The fs isn't necessarily a MemFS.
210 1 : fs, ok := vfs.Root(t.opts.FS).(*vfs.MemFS)
211 1 : if ok {
212 0 : fs.SetIgnoreSyncs(true)
213 0 : }
214 1 : if err := db.Close(); err != nil {
215 0 : return err
216 0 : }
217 : // Release any resources held by custom options. This may be used, for
218 : // example, by the encryption-at-rest custom option (within the Cockroach
219 : // repository) to close the file registry.
220 1 : for i := range t.testOpts.CustomOpts {
221 0 : if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
222 0 : return err
223 0 : }
224 : }
225 1 : if ok {
226 0 : fs.ResetToSyncedState()
227 0 : fs.SetIgnoreSyncs(false)
228 0 : }
229 :
230 : // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
231 : // are well defined within the context of retries.
232 1 : err := withRetries(func() (err error) {
233 1 : // Reacquire any resources required by custom options. This may be used, for
234 1 : // example, by the encryption-at-rest custom option (within the Cockroach
235 1 : // repository) to reopen the file registry.
236 1 : for i := range t.testOpts.CustomOpts {
237 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
238 0 : return err
239 0 : }
240 : }
241 1 : dir := t.dir
242 1 : if len(t.dbs) > 1 {
243 1 : dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
244 1 : }
245 1 : t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
246 1 : if err != nil {
247 0 : return err
248 0 : }
249 1 : return err
250 : })
251 1 : t.opts.Cache.Unref()
252 1 : return err
253 : }
254 :
255 : // If an in-memory FS is being used, save the contents to disk.
256 1 : func (t *test) maybeSaveData() {
257 1 : rootFS := vfs.Root(t.opts.FS)
258 1 : if rootFS == vfs.Default {
259 0 : return
260 0 : }
261 1 : _ = os.RemoveAll(t.dir)
262 1 : if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
263 0 : t.opts.Logger.Infof("unable to clone: %s: %v", t.dir, err)
264 0 : }
265 : }
266 :
267 2 : func (t *test) step(h *history) bool {
268 2 : if t.idx >= len(t.ops) {
269 2 : return false
270 2 : }
271 2 : t.ops[t.idx].run(t, h.recorder(-1 /* thread */, t.idx))
272 2 : t.idx++
273 2 : return true
274 : }
275 :
276 2 : func (t *test) setBatch(id objID, b *pebble.Batch) {
277 2 : if id.tag() != batchTag {
278 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
279 : }
280 2 : t.batches[id.slot()] = b
281 : }
282 :
283 2 : func (t *test) setIter(id objID, i *pebble.Iterator) {
284 2 : if id.tag() != iterTag {
285 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
286 : }
287 2 : t.iters[id.slot()] = &retryableIter{
288 2 : iter: i,
289 2 : lastKey: nil,
290 2 : }
291 : }
292 :
293 : type readerCloser interface {
294 : pebble.Reader
295 : io.Closer
296 : }
297 :
298 2 : func (t *test) setSnapshot(id objID, s readerCloser) {
299 2 : if id.tag() != snapTag {
300 0 : panic(fmt.Sprintf("invalid snapshot ID: %s", id))
301 : }
302 2 : t.snapshots[id.slot()] = s
303 : }
304 :
305 2 : func (t *test) clearObj(id objID) {
306 2 : switch id.tag() {
307 2 : case dbTag:
308 2 : t.dbs[id.slot()-1] = nil
309 2 : case batchTag:
310 2 : t.batches[id.slot()] = nil
311 2 : case iterTag:
312 2 : t.iters[id.slot()] = nil
313 2 : case snapTag:
314 2 : t.snapshots[id.slot()] = nil
315 : }
316 : }
317 :
318 2 : func (t *test) getBatch(id objID) *pebble.Batch {
319 2 : if id.tag() != batchTag {
320 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
321 : }
322 2 : return t.batches[id.slot()]
323 : }
324 :
325 2 : func (t *test) getCloser(id objID) io.Closer {
326 2 : switch id.tag() {
327 2 : case dbTag:
328 2 : return t.dbs[id.slot()-1]
329 2 : case batchTag:
330 2 : return t.batches[id.slot()]
331 2 : case iterTag:
332 2 : return t.iters[id.slot()]
333 2 : case snapTag:
334 2 : return t.snapshots[id.slot()]
335 : }
336 0 : panic(fmt.Sprintf("cannot close ID: %s", id))
337 : }
338 :
339 2 : func (t *test) getIter(id objID) *retryableIter {
340 2 : if id.tag() != iterTag {
341 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
342 : }
343 2 : return t.iters[id.slot()]
344 : }
345 :
346 2 : func (t *test) getReader(id objID) pebble.Reader {
347 2 : switch id.tag() {
348 2 : case dbTag:
349 2 : return t.dbs[id.slot()-1]
350 2 : case batchTag:
351 2 : return t.batches[id.slot()]
352 2 : case snapTag:
353 2 : return t.snapshots[id.slot()]
354 : }
355 0 : panic(fmt.Sprintf("invalid reader ID: %s", id))
356 : }
357 :
358 2 : func (t *test) getWriter(id objID) pebble.Writer {
359 2 : switch id.tag() {
360 2 : case dbTag:
361 2 : return t.dbs[id.slot()-1]
362 2 : case batchTag:
363 2 : return t.batches[id.slot()]
364 : }
365 0 : panic(fmt.Sprintf("invalid writer ID: %s", id))
366 : }
367 :
368 2 : func (t *test) getDB(id objID) *pebble.DB {
369 2 : switch id.tag() {
370 2 : case dbTag:
371 2 : return t.dbs[id.slot()-1]
372 0 : default:
373 0 : panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
374 : }
375 : }
376 :
377 : // Compute the synchronization points between operations. When operating
378 : // with more than 1 thread, operations must synchronize access to shared
379 : // objects. Compute two slices the same length as ops.
380 : //
381 : // opsWaitOn: the value v at index i indicates that operation i must wait
382 : // for the operation at index v to finish before it may run. NB: v < i
383 : //
384 : // opsDone: the channel at index i must be closed when the operation at index i
385 : // completes. This slice is sparse. Operations that are never used as
386 : // synchronization points may have a nil channel.
387 2 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
388 2 : opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
389 2 : opsWaitOn = make([][]int, len(ops)) // operation index -> operation index
390 2 : lastOpReference := make(map[objID]int) // objID -> operation index
391 2 : for i, o := range ops {
392 2 : // Find the last operation that involved the same receiver object. We at
393 2 : // least need to wait on that operation.
394 2 : receiver := o.receiver()
395 2 : waitIndex, ok := lastOpReference[receiver]
396 2 : lastOpReference[receiver] = i
397 2 : if !ok {
398 2 : // Only valid for i=0. For all other operations, the receiver should
399 2 : // have been referenced by some other operation before it's used as
400 2 : // a receiver.
401 2 : if i != 0 && receiver.tag() != dbTag {
402 0 : panic(fmt.Sprintf("op %s on receiver %s; first reference of %s", ops[i].String(), receiver, receiver))
403 : }
404 : // The initOp is a little special. We do want to store the objects it's
405 : // syncing on, in `lastOpReference`.
406 2 : if i != 0 {
407 0 : continue
408 : }
409 : }
410 :
411 : // The last operation that referenced `receiver` is the one at index
412 : // `waitIndex`. All operations with the same receiver are performed on
413 : // the same thread. We only need to synchronize on the operation at
414 : // `waitIndex` if `receiver` isn't also the receiver on that operation
415 : // too.
416 2 : if ops[waitIndex].receiver() != receiver {
417 2 : opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
418 2 : }
419 :
420 : // In additional to synchronizing on the operation's receiver operation,
421 : // we may need to synchronize on additional objects. For example,
422 : // batch0.Commit() must synchronize its receiver, batch0, but also on
423 : // the DB since it mutates database state.
424 2 : for _, syncObjID := range o.syncObjs() {
425 2 : if vi, vok := lastOpReference[syncObjID]; vok {
426 2 : opsWaitOn[i] = append(opsWaitOn[i], vi)
427 2 : }
428 2 : lastOpReference[syncObjID] = i
429 : }
430 :
431 2 : waitIndexes := opsWaitOn[i]
432 2 : sort.Ints(waitIndexes)
433 2 : for _, waitIndex := range waitIndexes {
434 2 : // If this is the first operation that must wait on the operation at
435 2 : // `waitIndex`, then there will be no channel for the operation yet.
436 2 : // Create one.
437 2 : if opsDone[waitIndex] == nil {
438 2 : opsDone[waitIndex] = make(chan struct{})
439 2 : }
440 : }
441 : }
442 2 : return opsWaitOn, opsDone
443 : }
|