Line data Source code
1 : // Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package metamorphic
6 :
7 : import (
8 : "fmt"
9 : "go/scanner"
10 : "go/token"
11 : "reflect"
12 : "strconv"
13 : "strings"
14 :
15 : "github.com/cockroachdb/errors"
16 : "github.com/cockroachdb/pebble"
17 : "golang.org/x/exp/slices"
18 : )
19 :
20 : type methodInfo struct {
21 : constructor func() op
22 : validTags uint32
23 : }
24 :
25 2 : func makeMethod(i interface{}, tags ...objTag) *methodInfo {
26 2 : var validTags uint32
27 2 : for _, tag := range tags {
28 2 : validTags |= 1 << tag
29 2 : }
30 :
31 2 : t := reflect.TypeOf(i)
32 2 : return &methodInfo{
33 2 : constructor: func() op {
34 2 : return reflect.New(t).Interface().(op)
35 2 : },
36 : validTags: validTags,
37 : }
38 : }
39 :
40 : // args returns the receiverID, targetID and arguments for the op. The
41 : // receiverID is the ID of the object the op will be applied to. The targetID
42 : // is the ID of the object for assignment. If the method does not return a new
43 : // object, then targetID will be nil. The argument list is just what it sounds
44 : // like: the list of arguments for the operation.
45 2 : func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) {
46 2 : switch t := op.(type) {
47 2 : case *applyOp:
48 2 : return &t.writerID, nil, []interface{}{&t.batchID}
49 2 : case *checkpointOp:
50 2 : return &t.dbID, nil, []interface{}{&t.spans}
51 2 : case *closeOp:
52 2 : return &t.objID, nil, nil
53 2 : case *compactOp:
54 2 : return &t.dbID, nil, []interface{}{&t.start, &t.end, &t.parallelize}
55 2 : case *batchCommitOp:
56 2 : return &t.batchID, nil, nil
57 2 : case *dbRatchetFormatMajorVersionOp:
58 2 : return &t.dbID, nil, []interface{}{&t.vers}
59 2 : case *dbRestartOp:
60 2 : return &t.dbID, nil, nil
61 2 : case *deleteOp:
62 2 : return &t.writerID, nil, []interface{}{&t.key}
63 2 : case *deleteRangeOp:
64 2 : return &t.writerID, nil, []interface{}{&t.start, &t.end}
65 2 : case *iterFirstOp:
66 2 : return &t.iterID, nil, nil
67 2 : case *flushOp:
68 2 : return &t.db, nil, nil
69 2 : case *getOp:
70 2 : return &t.readerID, nil, []interface{}{&t.key}
71 2 : case *ingestOp:
72 2 : return &t.dbID, nil, []interface{}{&t.batchIDs}
73 2 : case *ingestAndExciseOp:
74 2 : return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd}
75 2 : case *initOp:
76 2 : return nil, nil, []interface{}{&t.dbSlots, &t.batchSlots, &t.iterSlots, &t.snapshotSlots}
77 2 : case *iterLastOp:
78 2 : return &t.iterID, nil, nil
79 2 : case *mergeOp:
80 2 : return &t.writerID, nil, []interface{}{&t.key, &t.value}
81 2 : case *newBatchOp:
82 2 : return &t.dbID, &t.batchID, nil
83 2 : case *newIndexedBatchOp:
84 2 : return &t.dbID, &t.batchID, nil
85 2 : case *newIterOp:
86 2 : return &t.readerID, &t.iterID, []interface{}{&t.lower, &t.upper, &t.keyTypes, &t.filterMin, &t.filterMax, &t.useL6Filters, &t.maskSuffix}
87 2 : case *newIterUsingCloneOp:
88 2 : return &t.existingIterID, &t.iterID, []interface{}{&t.refreshBatch, &t.lower, &t.upper, &t.keyTypes, &t.filterMin, &t.filterMax, &t.useL6Filters, &t.maskSuffix}
89 2 : case *newSnapshotOp:
90 2 : return &t.dbID, &t.snapID, []interface{}{&t.bounds}
91 2 : case *iterNextOp:
92 2 : return &t.iterID, nil, []interface{}{&t.limit}
93 2 : case *iterNextPrefixOp:
94 2 : return &t.iterID, nil, nil
95 2 : case *iterCanSingleDelOp:
96 2 : return &t.iterID, nil, []interface{}{}
97 2 : case *iterPrevOp:
98 2 : return &t.iterID, nil, []interface{}{&t.limit}
99 2 : case *iterSeekLTOp:
100 2 : return &t.iterID, nil, []interface{}{&t.key, &t.limit}
101 2 : case *iterSeekGEOp:
102 2 : return &t.iterID, nil, []interface{}{&t.key, &t.limit}
103 2 : case *iterSeekPrefixGEOp:
104 2 : return &t.iterID, nil, []interface{}{&t.key}
105 2 : case *setOp:
106 2 : return &t.writerID, nil, []interface{}{&t.key, &t.value}
107 2 : case *iterSetBoundsOp:
108 2 : return &t.iterID, nil, []interface{}{&t.lower, &t.upper}
109 2 : case *iterSetOptionsOp:
110 2 : return &t.iterID, nil, []interface{}{&t.lower, &t.upper, &t.keyTypes, &t.filterMin, &t.filterMax, &t.useL6Filters, &t.maskSuffix}
111 2 : case *singleDeleteOp:
112 2 : return &t.writerID, nil, []interface{}{&t.key, &t.maybeReplaceDelete}
113 2 : case *rangeKeyDeleteOp:
114 2 : return &t.writerID, nil, []interface{}{&t.start, &t.end}
115 2 : case *rangeKeySetOp:
116 2 : return &t.writerID, nil, []interface{}{&t.start, &t.end, &t.suffix, &t.value}
117 2 : case *rangeKeyUnsetOp:
118 2 : return &t.writerID, nil, []interface{}{&t.start, &t.end, &t.suffix}
119 2 : case *replicateOp:
120 2 : return &t.source, nil, []interface{}{&t.dest, &t.start, &t.end}
121 : }
122 0 : panic(fmt.Sprintf("unsupported op type: %T", op))
123 : }
124 :
125 : var methods = map[string]*methodInfo{
126 : "Apply": makeMethod(applyOp{}, dbTag, batchTag),
127 : "Checkpoint": makeMethod(checkpointOp{}, dbTag),
128 : "Clone": makeMethod(newIterUsingCloneOp{}, iterTag),
129 : "Close": makeMethod(closeOp{}, dbTag, batchTag, iterTag, snapTag),
130 : "Commit": makeMethod(batchCommitOp{}, batchTag),
131 : "Compact": makeMethod(compactOp{}, dbTag),
132 : "Delete": makeMethod(deleteOp{}, dbTag, batchTag),
133 : "DeleteRange": makeMethod(deleteRangeOp{}, dbTag, batchTag),
134 : "First": makeMethod(iterFirstOp{}, iterTag),
135 : "Flush": makeMethod(flushOp{}, dbTag),
136 : "Get": makeMethod(getOp{}, dbTag, batchTag, snapTag),
137 : "Ingest": makeMethod(ingestOp{}, dbTag),
138 : "IngestAndExcise": makeMethod(ingestAndExciseOp{}, dbTag),
139 : "Init": makeMethod(initOp{}, dbTag),
140 : "Last": makeMethod(iterLastOp{}, iterTag),
141 : "Merge": makeMethod(mergeOp{}, dbTag, batchTag),
142 : "NewBatch": makeMethod(newBatchOp{}, dbTag),
143 : "NewIndexedBatch": makeMethod(newIndexedBatchOp{}, dbTag),
144 : "NewIter": makeMethod(newIterOp{}, dbTag, batchTag, snapTag),
145 : "NewSnapshot": makeMethod(newSnapshotOp{}, dbTag),
146 : "Next": makeMethod(iterNextOp{}, iterTag),
147 : "NextPrefix": makeMethod(iterNextPrefixOp{}, iterTag),
148 : "InternalNext": makeMethod(iterCanSingleDelOp{}, iterTag),
149 : "Prev": makeMethod(iterPrevOp{}, iterTag),
150 : "RangeKeyDelete": makeMethod(rangeKeyDeleteOp{}, dbTag, batchTag),
151 : "RangeKeySet": makeMethod(rangeKeySetOp{}, dbTag, batchTag),
152 : "RangeKeyUnset": makeMethod(rangeKeyUnsetOp{}, dbTag, batchTag),
153 : "RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag),
154 : "Replicate": makeMethod(replicateOp{}, dbTag),
155 : "Restart": makeMethod(dbRestartOp{}, dbTag),
156 : "SeekGE": makeMethod(iterSeekGEOp{}, iterTag),
157 : "SeekLT": makeMethod(iterSeekLTOp{}, iterTag),
158 : "SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag),
159 : "Set": makeMethod(setOp{}, dbTag, batchTag),
160 : "SetBounds": makeMethod(iterSetBoundsOp{}, iterTag),
161 : "SetOptions": makeMethod(iterSetOptionsOp{}, iterTag),
162 : "SingleDelete": makeMethod(singleDeleteOp{}, dbTag, batchTag),
163 : }
164 :
165 : type parser struct {
166 : opts parserOpts
167 : fset *token.FileSet
168 : s scanner.Scanner
169 : objs map[objID]bool
170 : }
171 :
172 : type parserOpts struct {
173 : allowUndefinedObjs bool
174 : }
175 :
176 2 : func parse(src []byte, opts parserOpts) (_ []op, err error) {
177 2 : // Various bits of magic incantation to set up a scanner for Go compatible
178 2 : // syntax. We arranged for the textual format of ops (e.g. op.String()) to
179 2 : // look like Go which allows us to use the Go scanner for parsing.
180 2 : p := &parser{
181 2 : opts: opts,
182 2 : fset: token.NewFileSet(),
183 2 : objs: map[objID]bool{makeObjID(dbTag, 1): true, makeObjID(dbTag, 2): true},
184 2 : }
185 2 : file := p.fset.AddFile("", -1, len(src))
186 2 : p.s.Init(file, src, nil /* no error handler */, 0)
187 2 : return p.parse()
188 2 : }
189 :
190 2 : func (p *parser) parse() (_ []op, err error) {
191 2 : defer func() {
192 2 : if r := recover(); r != nil {
193 1 : var ok bool
194 1 : if err, ok = r.(error); ok {
195 1 : return
196 1 : }
197 0 : err = errors.Errorf("%v", r)
198 : }
199 : }()
200 :
201 2 : var ops []op
202 2 : for {
203 2 : op := p.parseOp()
204 2 : if op == nil {
205 2 : computeDerivedFields(ops)
206 2 : return ops, nil
207 2 : }
208 2 : ops = append(ops, op)
209 : }
210 : }
211 :
212 2 : func (p *parser) parseOp() op {
213 2 : destPos, destTok, destLit := p.s.Scan()
214 2 : if destTok == token.EOF {
215 2 : return nil
216 2 : }
217 2 : if destTok != token.IDENT {
218 1 : panic(p.errorf(destPos, "unexpected token: %s %q", destTok, destLit))
219 : }
220 2 : if destLit == "Init" {
221 2 : // <op>(<args>)
222 2 : return p.makeOp(destLit, makeObjID(dbTag, 1), 0, destPos)
223 2 : }
224 :
225 2 : destID := p.parseObjID(destPos, destLit)
226 2 :
227 2 : pos, tok, lit := p.s.Scan()
228 2 : switch tok {
229 2 : case token.PERIOD:
230 2 : // <obj>.<op>(<args>)
231 2 : if !p.objs[destID] {
232 1 : if p.opts.allowUndefinedObjs {
233 1 : p.objs[destID] = true
234 1 : } else {
235 0 : panic(p.errorf(destPos, "unknown object: %s", destID))
236 : }
237 : }
238 2 : _, methodLit := p.scanToken(token.IDENT)
239 2 : return p.makeOp(methodLit, destID, 0, destPos)
240 :
241 2 : case token.ASSIGN:
242 2 : // <obj> = <obj>.<op>(<args>)
243 2 : srcPos, srcLit := p.scanToken(token.IDENT)
244 2 : srcID := p.parseObjID(srcPos, srcLit)
245 2 : if !p.objs[srcID] {
246 0 : if p.opts.allowUndefinedObjs {
247 0 : p.objs[srcID] = true
248 0 : } else {
249 0 : panic(p.errorf(srcPos, "unknown object %q", srcLit))
250 : }
251 : }
252 2 : p.scanToken(token.PERIOD)
253 2 : _, methodLit := p.scanToken(token.IDENT)
254 2 : p.objs[destID] = true
255 2 : return p.makeOp(methodLit, srcID, destID, srcPos)
256 : }
257 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
258 : }
259 :
260 2 : func parseObjID(str string) (objID, error) {
261 2 : var tag objTag
262 2 : switch {
263 2 : case strings.HasPrefix(str, "db"):
264 2 : tag, str = dbTag, str[2:]
265 2 : if str == "" {
266 1 : str = "1"
267 1 : }
268 2 : case strings.HasPrefix(str, "batch"):
269 2 : tag, str = batchTag, str[5:]
270 2 : case strings.HasPrefix(str, "iter"):
271 2 : tag, str = iterTag, str[4:]
272 2 : case strings.HasPrefix(str, "snap"):
273 2 : tag, str = snapTag, str[4:]
274 1 : default:
275 1 : return 0, errors.Newf("unable to parse objectID: %q", str)
276 : }
277 2 : id, err := strconv.ParseInt(str, 10, 32)
278 2 : if err != nil {
279 0 : return 0, err
280 0 : }
281 2 : return makeObjID(tag, uint32(id)), nil
282 : }
283 :
284 2 : func (p *parser) parseObjID(pos token.Pos, str string) objID {
285 2 : id, err := parseObjID(str)
286 2 : if err != nil {
287 1 : panic(p.errorf(pos, "%s", err))
288 : }
289 2 : return id
290 : }
291 :
292 2 : func unquoteBytes(lit string) []byte {
293 2 : s, err := strconv.Unquote(lit)
294 2 : if err != nil {
295 0 : panic(err)
296 : }
297 2 : if len(s) == 0 {
298 2 : return nil
299 2 : }
300 2 : return []byte(s)
301 : }
302 :
303 2 : func (p *parser) parseArgs(op op, methodName string, args []interface{}) {
304 2 : pos, _ := p.scanToken(token.LPAREN)
305 2 : for i := range args {
306 2 : if i > 0 {
307 2 : pos, _ = p.scanToken(token.COMMA)
308 2 : }
309 :
310 2 : switch t := args[i].(type) {
311 2 : case *uint32:
312 2 : _, lit := p.scanToken(token.INT)
313 2 : val, err := strconv.ParseUint(lit, 10, 32)
314 2 : if err != nil {
315 0 : panic(err)
316 : }
317 2 : *t = uint32(val)
318 :
319 2 : case *uint64:
320 2 : _, lit := p.scanToken(token.INT)
321 2 : val, err := strconv.ParseUint(lit, 10, 64)
322 2 : if err != nil {
323 0 : panic(err)
324 : }
325 2 : *t = uint64(val)
326 :
327 2 : case *[]byte:
328 2 : _, lit := p.scanToken(token.STRING)
329 2 : *t = unquoteBytes(lit)
330 :
331 2 : case *bool:
332 2 : _, lit := p.scanToken(token.IDENT)
333 2 : b, err := strconv.ParseBool(lit)
334 2 : if err != nil {
335 0 : panic(err)
336 : }
337 2 : *t = b
338 :
339 2 : case *objID:
340 2 : pos, lit := p.scanToken(token.IDENT)
341 2 : *t = p.parseObjID(pos, lit)
342 :
343 2 : case *[]pebble.KeyRange:
344 2 : var pending pebble.KeyRange
345 2 : for {
346 2 : pos, tok, lit := p.s.Scan()
347 2 : switch tok {
348 2 : case token.STRING:
349 2 : x := unquoteBytes(lit)
350 2 : if pending.Start == nil {
351 2 : pending.Start = x
352 2 : } else {
353 2 : pending.End = x
354 2 : *t = append(*t, pending)
355 2 : pending = pebble.KeyRange{}
356 2 : }
357 2 : pos, tok, lit := p.s.Scan()
358 2 : switch tok {
359 2 : case token.COMMA:
360 2 : continue
361 2 : case token.RPAREN:
362 2 : p.scanToken(token.SEMICOLON)
363 2 : return
364 0 : default:
365 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
366 : }
367 0 : case token.RPAREN:
368 0 : p.scanToken(token.SEMICOLON)
369 0 : return
370 0 : default:
371 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
372 : }
373 : }
374 :
375 2 : case *[]objID:
376 2 : for {
377 2 : pos, tok, lit := p.s.Scan()
378 2 : switch tok {
379 2 : case token.IDENT:
380 2 : *t = append(*t, p.parseObjID(pos, lit))
381 2 : pos, tok, lit := p.s.Scan()
382 2 : switch tok {
383 2 : case token.COMMA:
384 2 : continue
385 2 : case token.RPAREN:
386 2 : p.scanToken(token.SEMICOLON)
387 2 : return
388 0 : default:
389 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
390 : }
391 0 : case token.RPAREN:
392 0 : p.scanToken(token.SEMICOLON)
393 0 : return
394 0 : default:
395 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
396 : }
397 : }
398 :
399 2 : case *[]pebble.CheckpointSpan:
400 2 : pos, tok, lit := p.s.Scan()
401 2 : switch tok {
402 2 : case token.RPAREN:
403 2 : // No spans.
404 2 : *t = nil
405 2 : p.scanToken(token.SEMICOLON)
406 2 : return
407 :
408 2 : case token.STRING:
409 2 : var keys [][]byte
410 2 : for {
411 2 : s, err := strconv.Unquote(lit)
412 2 : if err != nil {
413 0 : panic(p.errorf(pos, "unquoting %q: %v", lit, err))
414 : }
415 2 : keys = append(keys, []byte(s))
416 2 :
417 2 : pos, tok, lit = p.s.Scan()
418 2 : switch tok {
419 2 : case token.COMMA:
420 2 : pos, tok, lit = p.s.Scan()
421 2 : if tok != token.STRING {
422 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
423 : }
424 2 : continue
425 :
426 2 : case token.RPAREN:
427 2 : p.scanToken(token.SEMICOLON)
428 2 : if len(keys)%2 == 1 {
429 0 : panic(p.errorf(pos, "expected even number of keys"))
430 : }
431 2 : *t = make([]pebble.CheckpointSpan, len(keys)/2)
432 2 : for i := range *t {
433 2 : (*t)[i] = pebble.CheckpointSpan{
434 2 : Start: keys[i*2],
435 2 : End: keys[i*2+1],
436 2 : }
437 2 : }
438 2 : return
439 :
440 0 : default:
441 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
442 : }
443 : }
444 :
445 0 : default:
446 0 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
447 : }
448 :
449 2 : case *pebble.FormatMajorVersion:
450 2 : _, lit := p.scanToken(token.INT)
451 2 : val, err := strconv.ParseUint(lit, 10, 64)
452 2 : if err != nil {
453 0 : panic(err)
454 : }
455 2 : *t = pebble.FormatMajorVersion(val)
456 :
457 0 : default:
458 0 : panic(p.errorf(pos, "%s: unsupported arg[%d] type: %T", methodName, i, args[i]))
459 : }
460 : }
461 2 : p.scanToken(token.RPAREN)
462 2 : p.scanToken(token.SEMICOLON)
463 : }
464 :
465 2 : func (p *parser) scanToken(expected token.Token) (pos token.Pos, lit string) {
466 2 : pos, tok, lit := p.s.Scan()
467 2 : if tok != expected {
468 1 : panic(p.errorf(pos, "unexpected token: %q", p.tokenf(tok, lit)))
469 : }
470 2 : return pos, lit
471 : }
472 :
473 2 : func (p *parser) makeOp(methodName string, receiverID, targetID objID, pos token.Pos) op {
474 2 : info := methods[methodName]
475 2 : if info == nil {
476 1 : panic(p.errorf(pos, "unknown op %s.%s", receiverID, methodName))
477 : }
478 2 : if info.validTags&(1<<receiverID.tag()) == 0 {
479 1 : panic(p.errorf(pos, "%s.%s: %s is not a method on %s",
480 1 : receiverID, methodName, methodName, receiverID))
481 : }
482 :
483 2 : op := info.constructor()
484 2 : receiver, target, args := opArgs(op)
485 2 :
486 2 : // The form of an operation is:
487 2 : // [target =] receiver.method(args)
488 2 : //
489 2 : // The receiver is the object the operation will be called on, which can be
490 2 : // any valid ID. Certain operations such as Ingest are only valid on the DB
491 2 : // object. That is indicated by opArgs returning a nil receiver.
492 2 : if receiver != nil {
493 2 : *receiver = receiverID
494 2 : } else if receiverID.tag() != dbTag {
495 0 : panic(p.errorf(pos, "unknown op %s.%s", receiverID, methodName))
496 : }
497 :
498 : // The target is the object that will be assigned the result of an object
499 : // creation operation such as newBatchOp or newIterOp.
500 2 : if target != nil {
501 2 : // It is invalid to not have a targetID for a method which generates a new
502 2 : // object.
503 2 : if targetID == 0 {
504 1 : panic(p.errorf(pos, "assignment expected for %s.%s", receiverID, methodName))
505 : }
506 : // It is invalid to try to assign to the DB object.
507 2 : if targetID.tag() == dbTag {
508 0 : panic(p.errorf(pos, "cannot use %s as target of assignment", targetID))
509 : }
510 2 : *target = targetID
511 2 : } else if targetID != 0 {
512 1 : panic(p.errorf(pos, "cannot use %s.%s in assignment", receiverID, methodName))
513 : }
514 :
515 2 : p.parseArgs(op, methodName, args)
516 2 : return op
517 : }
518 :
519 1 : func (p *parser) tokenf(tok token.Token, lit string) string {
520 1 : if tok.IsLiteral() {
521 0 : return lit
522 0 : }
523 1 : return tok.String()
524 : }
525 :
526 1 : func (p *parser) errorf(pos token.Pos, format string, args ...interface{}) error {
527 1 : return errors.New("metamorphic test internal error: " + p.fset.Position(pos).String() + ": " + fmt.Sprintf(format, args...))
528 1 : }
529 :
530 : // computeDerivedFields makes one pass through the provided operations, filling
531 : // any derived fields. This pass must happen before execution because concurrent
532 : // execution depends on these fields.
533 2 : func computeDerivedFields(ops []op) {
534 2 : iterToReader := make(map[objID]objID)
535 2 : objToDB := make(map[objID]objID)
536 2 : for i := range ops {
537 2 : switch v := ops[i].(type) {
538 2 : case *newSnapshotOp:
539 2 : objToDB[v.snapID] = v.dbID
540 2 : case *newIterOp:
541 2 : iterToReader[v.iterID] = v.readerID
542 2 : dbReaderID := v.readerID
543 2 : if dbReaderID.tag() != dbTag {
544 2 : dbReaderID = objToDB[dbReaderID]
545 2 : }
546 2 : objToDB[v.iterID] = dbReaderID
547 2 : v.derivedDBID = dbReaderID
548 2 : case *newIterUsingCloneOp:
549 2 : v.derivedReaderID = iterToReader[v.existingIterID]
550 2 : iterToReader[v.iterID] = v.derivedReaderID
551 2 : objToDB[v.iterID] = objToDB[v.existingIterID]
552 2 : case *iterSetOptionsOp:
553 2 : v.derivedReaderID = iterToReader[v.iterID]
554 2 : case *iterFirstOp:
555 2 : v.derivedReaderID = iterToReader[v.iterID]
556 2 : case *iterLastOp:
557 2 : v.derivedReaderID = iterToReader[v.iterID]
558 2 : case *iterSeekGEOp:
559 2 : v.derivedReaderID = iterToReader[v.iterID]
560 2 : case *iterSeekPrefixGEOp:
561 2 : v.derivedReaderID = iterToReader[v.iterID]
562 2 : case *iterSeekLTOp:
563 2 : v.derivedReaderID = iterToReader[v.iterID]
564 2 : case *iterNextOp:
565 2 : v.derivedReaderID = iterToReader[v.iterID]
566 2 : case *iterNextPrefixOp:
567 2 : v.derivedReaderID = iterToReader[v.iterID]
568 2 : case *iterCanSingleDelOp:
569 2 : v.derivedReaderID = iterToReader[v.iterID]
570 2 : case *iterPrevOp:
571 2 : v.derivedReaderID = iterToReader[v.iterID]
572 2 : case *newBatchOp:
573 2 : objToDB[v.batchID] = v.dbID
574 2 : case *newIndexedBatchOp:
575 2 : objToDB[v.batchID] = v.dbID
576 2 : case *applyOp:
577 2 : if derivedDBID, ok := objToDB[v.batchID]; ok && v.writerID.tag() != dbTag {
578 2 : objToDB[v.writerID] = derivedDBID
579 2 : }
580 2 : case *getOp:
581 2 : if derivedDBID, ok := objToDB[v.readerID]; ok {
582 2 : v.derivedDBID = derivedDBID
583 2 : }
584 2 : case *batchCommitOp:
585 2 : v.dbID = objToDB[v.batchID]
586 2 : case *closeOp:
587 2 : if v.objID.tag() == dbTag {
588 2 : // Find all objects that use this db.
589 2 : v.affectedObjects = nil
590 2 : for obj, db := range objToDB {
591 2 : if db == v.objID {
592 2 : v.affectedObjects = append(v.affectedObjects, obj)
593 2 : }
594 : }
595 : // Sort so the output is deterministic.
596 2 : slices.Sort(v.affectedObjects)
597 2 : } else if dbID, ok := objToDB[v.objID]; ok {
598 2 : v.affectedObjects = objIDSlice{dbID}
599 2 : }
600 2 : case *dbRestartOp:
601 2 : // Find all objects that use this db.
602 2 : v.affectedObjects = nil
603 2 : for obj, db := range objToDB {
604 2 : if db == v.dbID {
605 2 : v.affectedObjects = append(v.affectedObjects, obj)
606 2 : }
607 : }
608 : // Sort so the output is deterministic.
609 2 : slices.Sort(v.affectedObjects)
610 2 : case *ingestOp:
611 2 : v.derivedDBIDs = make([]objID, len(v.batchIDs))
612 2 : for i := range v.batchIDs {
613 2 : v.derivedDBIDs[i] = objToDB[v.batchIDs[i]]
614 2 : }
615 2 : case *ingestAndExciseOp:
616 2 : v.derivedDBID = objToDB[v.batchID]
617 2 : case *deleteOp:
618 2 : derivedDBID := v.writerID
619 2 : if v.writerID.tag() != dbTag {
620 2 : derivedDBID = objToDB[v.writerID]
621 2 : }
622 2 : v.derivedDBID = derivedDBID
623 : }
624 : }
625 : }
|