Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "context"
9 : "fmt"
10 : "io"
11 : "os"
12 : "path"
13 : "path/filepath"
14 : "sort"
15 : "strings"
16 : "time"
17 :
18 : "github.com/cockroachdb/errors"
19 : "github.com/cockroachdb/pebble"
20 : "github.com/cockroachdb/pebble/objstorage/remote"
21 : "github.com/cockroachdb/pebble/sstable"
22 : "github.com/cockroachdb/pebble/vfs"
23 : "github.com/cockroachdb/pebble/vfs/errorfs"
24 : )
25 :
26 : // New constructs a new metamorphic test that runs the provided operations
27 : // against a database using the provided TestOptions and outputs the history of
28 : // events to an io.Writer.
29 : //
30 : // dir specifies the path within opts.Opts.FS to open the database.
31 1 : func New(ops Ops, opts *TestOptions, dir string, w io.Writer) (*Test, error) {
32 1 : t := newTest(ops)
33 1 : h := newHistory(nil /* failRegexp */, w)
34 1 : if err := t.init(h, dir, opts, 1 /* numInstances */, 0 /* opTimeout */); err != nil {
35 0 : return nil, err
36 0 : }
37 1 : return t, nil
38 : }
39 :
40 : // A Test configures an individual test run consisting of a set of operations,
41 : // TestOptions configuring the target database to which the operations should be
42 : // applied, and a sink for outputting test history.
43 : type Test struct {
44 : // The list of ops to execute. The ops refer to slots in the batches, iters,
45 : // and snapshots slices.
46 : ops []op
47 : opsWaitOn [][]int // op index -> op indexes
48 : opsDone []chan struct{} // op index -> done channel
49 : idx int
50 : dir string
51 : h *history
52 : opTimeout time.Duration
53 : opts *pebble.Options
54 : testOpts *TestOptions
55 : writeOpts *pebble.WriteOptions
56 : tmpDir string
57 : // The DBs the test is run on.
58 : dbs []*pebble.DB
59 : // The slots for the batches, iterators, and snapshots. These are read and
60 : // written by the ops to pass state from one op to another.
61 : batches []*pebble.Batch
62 : iters []*retryableIter
63 : snapshots []readerCloser
64 : externalObjs []externalObjMeta
65 :
66 : // externalStorage is used to write external objects. If external storage is
67 : // enabled, this is the same with testOpts.externalStorageFS; otherwise, this
68 : // is an in-memory implementation used only by the test.
69 : externalStorage remote.Storage
70 : }
71 :
72 : type externalObjMeta struct {
73 : sstMeta *sstable.WriterMetadata
74 : }
75 :
76 1 : func newTest(ops []op) *Test {
77 1 : return &Test{
78 1 : ops: ops,
79 1 : }
80 1 : }
81 :
82 : func (t *Test) init(
83 : h *history, dir string, testOpts *TestOptions, numInstances int, opTimeout time.Duration,
84 1 : ) error {
85 1 : t.dir = dir
86 1 : t.h = h
87 1 : t.opTimeout = opTimeout
88 1 : t.testOpts = testOpts
89 1 : t.writeOpts = pebble.NoSync
90 1 : if testOpts.strictFS {
91 1 : t.writeOpts = pebble.Sync
92 1 : } else {
93 1 : t.writeOpts = pebble.NoSync
94 1 : }
95 1 : testOpts.Opts.WithFSDefaults()
96 1 : t.opts = testOpts.Opts.EnsureDefaults()
97 1 : t.opts.Logger = h
98 1 : lel := pebble.MakeLoggingEventListener(t.opts.Logger)
99 1 : t.opts.EventListener = &lel
100 1 : // If the test options set a DebugCheck func, wrap it with retrying of
101 1 : // retriable errors (according to the test's retry policy).
102 1 : if debugCheck := t.opts.DebugCheck; debugCheck != nil {
103 1 : t.opts.DebugCheck = func(db *pebble.DB) error {
104 1 : return t.withRetries(func() error { return debugCheck(db) })
105 : }
106 : }
107 1 : if numInstances < 1 {
108 1 : numInstances = 1
109 1 : }
110 1 : if t.testOpts.externalStorageEnabled {
111 1 : t.externalStorage = t.testOpts.externalStorageFS
112 1 : } else {
113 1 : t.externalStorage = remote.NewInMem()
114 1 : }
115 :
116 1 : t.opsWaitOn, t.opsDone = computeSynchronizationPoints(t.ops)
117 1 :
118 1 : defer t.opts.Cache.Unref()
119 1 :
120 1 : // If an error occurs and we were using an in-memory FS, attempt to clone to
121 1 : // on-disk in order to allow post-mortem debugging. Note that always using
122 1 : // the on-disk FS isn't desirable because there is a large performance
123 1 : // difference between in-memory and on-disk which causes different code paths
124 1 : // and timings to be exercised.
125 1 : maybeExit := func(err error) {
126 1 : if err == nil || errors.Is(err, errorfs.ErrInjected) || errors.Is(err, pebble.ErrCancelledCompaction) {
127 1 : return
128 1 : }
129 0 : t.saveInMemoryData()
130 0 : fmt.Fprintln(os.Stderr, err)
131 0 : os.Exit(1)
132 : }
133 :
134 : // Exit early on any error from a background operation.
135 1 : t.opts.EventListener.BackgroundError = func(err error) {
136 1 : t.opts.Logger.Infof("background error: %s", err)
137 1 : maybeExit(err)
138 1 : }
139 1 : t.opts.EventListener.CompactionEnd = func(info pebble.CompactionInfo) {
140 1 : t.opts.Logger.Infof("%s", info)
141 1 : maybeExit(info.Err)
142 1 : }
143 1 : t.opts.EventListener.FlushEnd = func(info pebble.FlushInfo) {
144 1 : t.opts.Logger.Infof("%s", info)
145 1 : if info.Err != nil && !strings.Contains(info.Err.Error(), "pebble: empty table") {
146 0 : maybeExit(info.Err)
147 0 : }
148 : }
149 1 : t.opts.EventListener.DownloadEnd = func(info pebble.DownloadInfo) {
150 1 : t.opts.Logger.Infof("%s", info)
151 1 : maybeExit(info.Err)
152 1 : }
153 1 : t.opts.EventListener.ManifestCreated = func(info pebble.ManifestCreateInfo) {
154 1 : t.opts.Logger.Infof("%s", info)
155 1 : maybeExit(info.Err)
156 1 : }
157 1 : t.opts.EventListener.ManifestDeleted = func(info pebble.ManifestDeleteInfo) {
158 1 : t.opts.Logger.Infof("%s", info)
159 1 : maybeExit(info.Err)
160 1 : }
161 1 : t.opts.EventListener.TableDeleted = func(info pebble.TableDeleteInfo) {
162 1 : t.opts.Logger.Infof("%s", info)
163 1 : maybeExit(info.Err)
164 1 : }
165 1 : t.opts.EventListener.TableIngested = func(info pebble.TableIngestInfo) {
166 1 : t.opts.Logger.Infof("%s", info)
167 1 : maybeExit(info.Err)
168 1 : }
169 1 : t.opts.EventListener.WALCreated = func(info pebble.WALCreateInfo) {
170 1 : t.opts.Logger.Infof("%s", info)
171 1 : maybeExit(info.Err)
172 1 : }
173 1 : t.opts.EventListener.WALDeleted = func(info pebble.WALDeleteInfo) {
174 1 : t.opts.Logger.Infof("%s", info)
175 1 : maybeExit(info.Err)
176 1 : }
177 :
178 1 : for i := range t.testOpts.CustomOpts {
179 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
180 0 : return err
181 0 : }
182 : }
183 :
184 1 : t.dbs = make([]*pebble.DB, numInstances)
185 1 : for i := range t.dbs {
186 1 : var db *pebble.DB
187 1 : var err error
188 1 : if len(t.dbs) > 1 {
189 0 : dir = path.Join(t.dir, fmt.Sprintf("db%d", i+1))
190 0 : }
191 1 : err = t.withRetries(func() error {
192 1 : db, err = pebble.Open(dir, t.opts)
193 1 : return err
194 1 : })
195 1 : if err != nil {
196 0 : return err
197 0 : }
198 1 : t.dbs[i] = db
199 1 : h.log.Printf("// db%d.Open() %v", i+1, err)
200 1 :
201 1 : if t.testOpts.sharedStorageEnabled {
202 1 : err = t.withRetries(func() error {
203 1 : return db.SetCreatorID(uint64(i + 1))
204 1 : })
205 1 : if err != nil {
206 0 : return err
207 0 : }
208 1 : h.log.Printf("// db%d.SetCreatorID() %v", i+1, err)
209 : }
210 : }
211 :
212 1 : var err error
213 1 : t.tmpDir = t.opts.FS.PathJoin(t.dir, "tmp")
214 1 : if err = t.opts.FS.MkdirAll(t.tmpDir, 0755); err != nil {
215 0 : return err
216 0 : }
217 1 : if t.testOpts.strictFS {
218 1 : // Sync the whole directory path for the tmpDir, since restartDB() is executed during
219 1 : // the test. That would reset MemFS to the synced state, which would make an unsynced
220 1 : // directory disappear in the middle of the test. It is the responsibility of the test
221 1 : // (not Pebble) to ensure that it can write the ssts that it will subsequently ingest
222 1 : // into Pebble.
223 1 : for {
224 1 : f, err := t.opts.FS.OpenDir(dir)
225 1 : if err != nil {
226 0 : return err
227 0 : }
228 1 : if err = f.Sync(); err != nil {
229 0 : return err
230 0 : }
231 1 : if err = f.Close(); err != nil {
232 0 : return err
233 0 : }
234 1 : if len(dir) == 1 {
235 1 : break
236 : }
237 1 : dir = t.opts.FS.PathDir(dir)
238 1 : // TODO(sbhola): PathDir returns ".", which OpenDir() complains about. Fix.
239 1 : if len(dir) == 1 {
240 1 : dir = "/"
241 1 : }
242 : }
243 : }
244 :
245 1 : return nil
246 : }
247 :
248 1 : func (t *Test) withRetries(fn func() error) error {
249 1 : return withRetries(fn, t.testOpts.RetryPolicy)
250 1 : }
251 :
252 1 : func (t *Test) isFMV(dbID objID, fmv pebble.FormatMajorVersion) bool {
253 1 : db := t.getDB(dbID)
254 1 : return db.FormatMajorVersion() >= fmv
255 1 : }
256 :
257 : // minFMV returns the minimum FormatMajorVersion between all databases.
258 1 : func (t *Test) minFMV() pebble.FormatMajorVersion {
259 1 : minVersion := pebble.FormatNewest
260 1 : for _, db := range t.dbs {
261 1 : if db != nil {
262 1 : minVersion = min(minVersion, db.FormatMajorVersion())
263 1 : }
264 : }
265 1 : return minVersion
266 : }
267 :
268 1 : func (t *Test) restartDB(dbID objID) error {
269 1 : db := t.getDB(dbID)
270 1 : // If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
271 1 : // restart the database (even if we don't revert to synced data).
272 1 : if !t.testOpts.strictFS {
273 1 : return nil
274 1 : }
275 1 : if t.testOpts.sharedStorageEnabled {
276 0 : // We simulate a crash by essentially ignoring writes to disk after a
277 0 : // certain point. However, we cannot prevent the process (which didn't
278 0 : // actually crash) from deleting an external object before we call Close().
279 0 : // TODO(radu): perhaps we want all syncs to fail after the "crash" point?
280 0 : return nil
281 0 : }
282 : // We can't do this if we have more than one database since they share the
283 : // same FS (and we only close/reopen one of them).
284 : // TODO(radu): have each database use its own MemFS.
285 1 : if len(t.dbs) > 1 {
286 0 : return nil
287 0 : }
288 1 : t.opts.Cache.Ref()
289 1 : fs := vfs.Root(t.opts.FS).(*vfs.MemFS)
290 1 : crashFS := fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
291 1 : if err := db.Close(); err != nil {
292 0 : return err
293 0 : }
294 : // Release any resources held by custom options. This may be used, for
295 : // example, by the encryption-at-rest custom option (within the Cockroach
296 : // repository) to close the file registry.
297 1 : for i := range t.testOpts.CustomOpts {
298 0 : if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
299 0 : return err
300 0 : }
301 : }
302 1 : t.opts.FS = crashFS
303 1 : t.opts.WithFSDefaults()
304 1 : if t.opts.WALFailover != nil {
305 1 : t.opts.WALFailover.Secondary.FS = t.opts.FS
306 1 : }
307 :
308 : // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
309 : // are well defined within the context of retries.
310 1 : err := t.withRetries(func() (err error) {
311 1 : // Reacquire any resources required by custom options. This may be used, for
312 1 : // example, by the encryption-at-rest custom option (within the Cockroach
313 1 : // repository) to reopen the file registry.
314 1 : for i := range t.testOpts.CustomOpts {
315 0 : if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
316 0 : return err
317 0 : }
318 : }
319 1 : dir := t.dir
320 1 : if len(t.dbs) > 1 {
321 0 : dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
322 0 : }
323 1 : t.dbs[dbID.slot()-1], err = pebble.Open(dir, t.opts)
324 1 : if err != nil {
325 0 : return err
326 0 : }
327 1 : return err
328 : })
329 1 : t.opts.Cache.Unref()
330 1 : return err
331 : }
332 :
333 1 : func (t *Test) saveInMemoryDataInternal() error {
334 1 : if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
335 1 : // t.opts.FS is an in-memory system; copy it to disk.
336 1 : if err := os.RemoveAll(t.dir); err != nil {
337 0 : return err
338 0 : }
339 1 : if _, err := vfs.Clone(rootFS, vfs.Default, t.dir, t.dir); err != nil {
340 0 : return err
341 0 : }
342 : }
343 1 : if t.testOpts.sharedStorageEnabled {
344 0 : if err := copyRemoteStorage(t.testOpts.sharedStorageFS, filepath.Join(t.dir, "shared")); err != nil {
345 0 : return err
346 0 : }
347 : }
348 1 : if t.testOpts.externalStorageEnabled {
349 0 : if err := copyRemoteStorage(t.testOpts.externalStorageFS, filepath.Join(t.dir, "external")); err != nil {
350 0 : return err
351 0 : }
352 : }
353 1 : return nil
354 : }
355 :
356 0 : func copyRemoteStorage(fs remote.Storage, outputDir string) error {
357 0 : if err := vfs.Default.MkdirAll(outputDir, 0755); err != nil {
358 0 : return err
359 0 : }
360 0 : objs, err := fs.List("", "")
361 0 : if err != nil {
362 0 : return err
363 0 : }
364 0 : for i := range objs {
365 0 : reader, readSize, err := fs.ReadObject(context.TODO(), objs[i])
366 0 : if err != nil {
367 0 : return err
368 0 : }
369 0 : buf := make([]byte, readSize)
370 0 : if err := reader.ReadAt(context.TODO(), buf, 0); err != nil {
371 0 : return err
372 0 : }
373 0 : outputPath := vfs.Default.PathJoin(outputDir, objs[i])
374 0 : outputFile, err := vfs.Default.Create(outputPath, vfs.WriteCategoryUnspecified)
375 0 : if err != nil {
376 0 : return err
377 0 : }
378 0 : if _, err := outputFile.Write(buf); err != nil {
379 0 : outputFile.Close()
380 0 : return err
381 0 : }
382 0 : if err := outputFile.Close(); err != nil {
383 0 : return err
384 0 : }
385 : }
386 0 : return nil
387 : }
388 :
389 : // If an in-memory FS is being used, save the contents to disk.
390 1 : func (t *Test) saveInMemoryData() {
391 1 : if err := t.saveInMemoryDataInternal(); err != nil {
392 0 : t.opts.Logger.Infof("unable to save data: %s: %v", t.dir, err)
393 0 : }
394 : }
395 :
396 : // Step runs one single operation, returning: whether there are additional
397 : // operations remaining; the operation's output; and an error if any occurred
398 : // while running the operation.
399 : //
400 : // Step may be used instead of Execute to advance a test one operation at a
401 : // time.
402 1 : func (t *Test) Step() (more bool, operationOutput string, err error) {
403 1 : more = t.step(t.h, func(format string, args ...interface{}) {
404 1 : operationOutput = fmt.Sprintf(format, args...)
405 1 : })
406 1 : err = t.h.Error()
407 1 : return more, operationOutput, err
408 : }
409 :
410 1 : func (t *Test) step(h *history, optionalRecordf func(string, ...interface{})) bool {
411 1 : if t.idx >= len(t.ops) {
412 1 : return false
413 1 : }
414 1 : t.runOp(t.idx, h.recorder(-1 /* thread */, t.idx, optionalRecordf))
415 1 : t.idx++
416 1 : return true
417 : }
418 :
419 : // runOp runs t.ops[idx] with t.opTimeout.
420 1 : func (t *Test) runOp(idx int, h historyRecorder) {
421 1 : op := t.ops[idx]
422 1 : var timer *time.Timer
423 1 : if t.opTimeout > 0 {
424 0 : opTimeout := t.opTimeout
425 0 : switch op.(type) {
426 0 : case *compactOp, *newSnapshotOp, *ingestOp, *ingestAndExciseOp, *ingestExternalFilesOp:
427 0 : // These ops can be very slow, especially if we end up with many tiny
428 0 : // tables. Bump up the timout by a factor.
429 0 : opTimeout *= 4
430 0 : case *downloadOp:
431 0 : opTimeout *= 8
432 : }
433 0 : timer = time.AfterFunc(opTimeout, func() {
434 0 : panic(fmt.Sprintf("operation took longer than %s: %s",
435 0 : opTimeout, op.formattedString(t.testOpts.KeyFormat)))
436 : })
437 : }
438 1 : op.run(t, h)
439 1 : if timer != nil {
440 0 : timer.Stop()
441 0 : }
442 : }
443 :
444 1 : func (t *Test) setBatch(id objID, b *pebble.Batch) {
445 1 : if id.tag() != batchTag {
446 0 : panic(fmt.Sprintf("invalid batch ID: %s", id))
447 : }
448 1 : t.batches[id.slot()] = b
449 : }
450 :
451 1 : func (t *Test) setIter(id objID, i *pebble.Iterator) {
452 1 : if id.tag() != iterTag {
453 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
454 : }
455 1 : t.iters[id.slot()] = &retryableIter{
456 1 : iter: i,
457 1 : lastKey: nil,
458 1 : needRetry: t.testOpts.RetryPolicy,
459 1 : }
460 : }
461 :
462 : type readerCloser interface {
463 : pebble.Reader
464 : io.Closer
465 : }
466 :
467 1 : func (t *Test) setSnapshot(id objID, s readerCloser) {
468 1 : if id.tag() != snapTag {
469 0 : panic(fmt.Sprintf("invalid snapshot ID: %s", id))
470 : }
471 1 : t.snapshots[id.slot()] = s
472 : }
473 :
474 1 : func (t *Test) setExternalObj(id objID, meta externalObjMeta) {
475 1 : if id.tag() != externalObjTag {
476 0 : panic(fmt.Sprintf("invalid external object ID: %s", id))
477 : }
478 1 : t.externalObjs[id.slot()] = meta
479 : }
480 :
481 1 : func (t *Test) getExternalObj(id objID) externalObjMeta {
482 1 : if id.tag() != externalObjTag || t.externalObjs[id.slot()].sstMeta == nil {
483 0 : panic(fmt.Sprintf("metamorphic test internal error: invalid external object ID: %s", id))
484 : }
485 1 : return t.externalObjs[id.slot()]
486 : }
487 :
488 1 : func (t *Test) clearObj(id objID) {
489 1 : switch id.tag() {
490 1 : case dbTag:
491 1 : t.dbs[id.slot()-1] = nil
492 1 : case batchTag:
493 1 : t.batches[id.slot()] = nil
494 1 : case iterTag:
495 1 : t.iters[id.slot()] = nil
496 1 : case snapTag:
497 1 : t.snapshots[id.slot()] = nil
498 0 : default:
499 0 : panic(fmt.Sprintf("cannot clear ID: %s", id))
500 : }
501 : }
502 :
503 1 : func (t *Test) getBatch(id objID) *pebble.Batch {
504 1 : if id.tag() != batchTag || t.batches[id.slot()] == nil {
505 0 : panic(fmt.Sprintf("metamorphic test internal error: invalid batch ID: %s", id))
506 : }
507 1 : return t.batches[id.slot()]
508 : }
509 :
510 1 : func (t *Test) getCloser(id objID) io.Closer {
511 1 : switch id.tag() {
512 1 : case dbTag:
513 1 : return t.dbs[id.slot()-1]
514 1 : case batchTag:
515 1 : return t.batches[id.slot()]
516 1 : case iterTag:
517 1 : return t.iters[id.slot()]
518 1 : case snapTag:
519 1 : return t.snapshots[id.slot()]
520 0 : default:
521 0 : panic(fmt.Sprintf("cannot close ID: %s", id))
522 : }
523 : }
524 :
525 1 : func (t *Test) getIter(id objID) *retryableIter {
526 1 : if id.tag() != iterTag {
527 0 : panic(fmt.Sprintf("invalid iter ID: %s", id))
528 : }
529 1 : return t.iters[id.slot()]
530 : }
531 :
532 1 : func (t *Test) getReader(id objID) pebble.Reader {
533 1 : switch id.tag() {
534 1 : case dbTag:
535 1 : return t.dbs[id.slot()-1]
536 1 : case batchTag:
537 1 : return t.batches[id.slot()]
538 1 : case snapTag:
539 1 : return t.snapshots[id.slot()]
540 0 : default:
541 0 : panic(fmt.Sprintf("invalid reader ID: %s", id))
542 : }
543 : }
544 :
545 1 : func (t *Test) getWriter(id objID) pebble.Writer {
546 1 : switch id.tag() {
547 1 : case dbTag:
548 1 : return t.dbs[id.slot()-1]
549 1 : case batchTag:
550 1 : return t.batches[id.slot()]
551 0 : default:
552 0 : panic(fmt.Sprintf("invalid writer ID: %s", id))
553 : }
554 : }
555 :
556 1 : func (t *Test) getDB(id objID) *pebble.DB {
557 1 : switch id.tag() {
558 1 : case dbTag:
559 1 : return t.dbs[id.slot()-1]
560 0 : default:
561 0 : panic(fmt.Sprintf("invalid writer tag: %v", id.tag()))
562 : }
563 : }
564 :
565 : // Compute the synchronization points between operations. When operating
566 : // with more than 1 thread, operations must synchronize access to shared
567 : // objects. Compute two slices the same length as ops.
568 : //
569 : // opsWaitOn: the value v at index i indicates that operation i must wait
570 : // for the operation at index v to finish before it may run. NB: v < i
571 : //
572 : // opsDone: the channel at index i must be closed when the operation at index i
573 : // completes. This slice is sparse. Operations that are never used as
574 : // synchronization points may have a nil channel.
575 1 : func computeSynchronizationPoints(ops []op) (opsWaitOn [][]int, opsDone []chan struct{}) {
576 1 : opsDone = make([]chan struct{}, len(ops)) // operation index -> done channel
577 1 : opsWaitOn = make([][]int, len(ops)) // operation index -> operation index
578 1 : lastOpReference := make(map[objID]int) // objID -> operation index
579 1 : for i, o := range ops {
580 1 : // Find the last operation that involved the same receiver object. We at
581 1 : // least need to wait on that operation.
582 1 : receiver := o.receiver()
583 1 : waitIndex, ok := lastOpReference[receiver]
584 1 : lastOpReference[receiver] = i
585 1 : if !ok {
586 1 : // Only valid for i=0. For all other operations, the receiver should
587 1 : // have been referenced by some other operation before it's used as
588 1 : // a receiver.
589 1 : if i != 0 && receiver.tag() != dbTag {
590 0 : panic(fmt.Sprintf("op %T on receiver %s; first reference of %s",
591 0 : ops[i], receiver, receiver))
592 : }
593 : // The initOp is a little special. We do want to store the objects it's
594 : // syncing on, in `lastOpReference`.
595 1 : if i != 0 {
596 0 : continue
597 : }
598 : }
599 :
600 : // The last operation that referenced `receiver` is the one at index
601 : // `waitIndex`. All operations with the same receiver are performed on
602 : // the same thread. We only need to synchronize on the operation at
603 : // `waitIndex` if `receiver` isn't also the receiver on that operation
604 : // too.
605 1 : if ops[waitIndex].receiver() != receiver {
606 1 : opsWaitOn[i] = append(opsWaitOn[i], waitIndex)
607 1 : }
608 :
609 : // In additional to synchronizing on the operation's receiver operation,
610 : // we may need to synchronize on additional objects. For example,
611 : // batch0.Commit() must synchronize its receiver, batch0, but also on
612 : // the DB since it mutates database state.
613 1 : for _, syncObjID := range o.syncObjs() {
614 1 : if vi, vok := lastOpReference[syncObjID]; vok {
615 1 : if vi == i {
616 0 : panic(fmt.Sprintf("%T has %s as syncObj multiple times", ops[i], syncObjID))
617 : }
618 1 : opsWaitOn[i] = append(opsWaitOn[i], vi)
619 : }
620 1 : lastOpReference[syncObjID] = i
621 : }
622 :
623 1 : waitIndexes := opsWaitOn[i]
624 1 : sort.Ints(waitIndexes)
625 1 : for _, waitIndex := range waitIndexes {
626 1 : // If this is the first operation that must wait on the operation at
627 1 : // `waitIndex`, then there will be no channel for the operation yet.
628 1 : // Create one.
629 1 : if opsDone[waitIndex] == nil {
630 1 : opsDone[waitIndex] = make(chan struct{})
631 1 : }
632 : }
633 : }
634 1 : return opsWaitOn, opsDone
635 : }
|