Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "bytes"
9 : "context"
10 : "fmt"
11 : "go/token"
12 : "io"
13 : "reflect"
14 : "strconv"
15 : "strings"
16 :
17 : "github.com/cockroachdb/errors"
18 : "github.com/cockroachdb/pebble/internal/dsl"
19 : "github.com/cockroachdb/pebble/internal/treeprinter"
20 : )
21 :
22 : // This file contains testing facilities for Spans and FragmentIterators. It's
23 : // defined here so that it may be used by the keyspan package to test its
24 : // various FragmentIterator implementations.
25 : //
26 : // TODO(jackson): Move keyspan.{Span,Key,FragmentIterator} into internal/base,
27 : // and then move the testing facilities to an independent package, eg
28 : // internal/itertest. Alternatively, make all tests that use it use the
29 : // keyspan_test package, which can then import a separate itertest package.
30 :
31 : // probe defines an interface for probes that may inspect or mutate internal
32 : // span iterator behavior.
33 : type probe interface {
34 : // probe inspects, and possibly manipulates, iterator operations' results.
35 : probe(*probeContext)
36 : }
37 :
38 1 : func parseProbes(probeDSLs ...string) []probe {
39 1 : probes := make([]probe, len(probeDSLs))
40 1 : var err error
41 1 : for i := range probeDSLs {
42 1 : probes[i], err = probeParser.Parse(probeDSLs[i])
43 1 : if err != nil {
44 0 : panic(err)
45 : }
46 : }
47 1 : return probes
48 : }
49 :
50 1 : func attachProbes(iter FragmentIterator, pctx probeContext, probes ...probe) FragmentIterator {
51 1 : if pctx.log == nil {
52 0 : pctx.log = io.Discard
53 0 : }
54 1 : for i := range probes {
55 1 : iter = &probeIterator{
56 1 : iter: iter,
57 1 : probe: probes[i],
58 1 : probeCtx: pctx,
59 1 : }
60 1 : }
61 1 : return iter
62 : }
63 :
64 : // ParseAndAttachProbes parses DSL probes and attaches them to an iterator.
65 : func ParseAndAttachProbes(
66 : iter FragmentIterator, log io.Writer, probeDSLs ...string,
67 1 : ) FragmentIterator {
68 1 : pctx := probeContext{log: log}
69 1 : return attachProbes(iter, pctx, parseProbes(probeDSLs...)...)
70 1 : }
71 :
72 : // probeContext provides the context within which a probe is run. It includes
73 : // information about the iterator operation in progress.
74 : type probeContext struct {
75 : op
76 : log io.Writer
77 : }
78 :
79 : type op struct {
80 : Kind OpKind
81 : SeekKey []byte
82 : Span *Span
83 : Err error
84 : }
85 :
86 : // ErrInjected is an error artificially injected for testing.
87 : var ErrInjected = &errorProbe{name: "ErrInjected", err: errors.New("injected error")}
88 :
89 1 : var probeParser = func() *dsl.Parser[probe] {
90 1 : valuerParser := dsl.NewParser[valuer]()
91 1 : valuerParser.DefineConstant("StartKey", func() valuer { return startKey{} })
92 1 : valuerParser.DefineFunc("Bytes",
93 1 : func(p *dsl.Parser[valuer], s *dsl.Scanner) valuer {
94 1 : v := bytesConstant{bytes: []byte(s.ConsumeString())}
95 1 : s.Consume(token.RPAREN)
96 1 : return v
97 1 : })
98 :
99 1 : predicateParser := dsl.NewPredicateParser[*probeContext]()
100 1 : predicateParser.DefineFunc("Equal",
101 1 : func(p *dsl.Parser[dsl.Predicate[*probeContext]], s *dsl.Scanner) dsl.Predicate[*probeContext] {
102 1 : eq := equal{
103 1 : valuerParser.ParseFromPos(s, s.Scan()),
104 1 : valuerParser.ParseFromPos(s, s.Scan()),
105 1 : }
106 1 : s.Consume(token.RPAREN)
107 1 : return eq
108 1 : })
109 1 : for i, name := range opNames {
110 1 : opKind := OpKind(i)
111 1 : predicateParser.DefineConstant(name, func() dsl.Predicate[*probeContext] {
112 1 : // An OpKind implements dsl.Predicate[*probeContext].
113 1 : return opKind
114 1 : })
115 : }
116 1 : probeParser := dsl.NewParser[probe]()
117 1 : probeParser.DefineConstant("ErrInjected", func() probe { return ErrInjected })
118 1 : probeParser.DefineConstant("noop", func() probe { return noop{} })
119 1 : probeParser.DefineFunc("If",
120 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) probe {
121 1 : probe := ifProbe{
122 1 : predicateParser.ParseFromPos(s, s.Scan()),
123 1 : probeParser.ParseFromPos(s, s.Scan()),
124 1 : probeParser.ParseFromPos(s, s.Scan()),
125 1 : }
126 1 : s.Consume(token.RPAREN)
127 1 : return probe
128 1 : })
129 1 : probeParser.DefineFunc("Return",
130 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) (ret probe) {
131 0 : switch tok := s.Scan(); tok.Kind {
132 0 : case token.STRING:
133 0 : str, err := strconv.Unquote(tok.Lit)
134 0 : if err != nil {
135 0 : panic(err)
136 : }
137 0 : span := ParseSpan(str)
138 0 : ret = returnSpan{s: &span}
139 0 : case token.IDENT:
140 0 : switch tok.Lit {
141 0 : case "nil":
142 0 : ret = returnSpan{s: nil}
143 0 : default:
144 0 : panic(errors.Newf("unrecognized return value %q", tok.Lit))
145 : }
146 : }
147 0 : s.Consume(token.RPAREN)
148 0 : return ret
149 : })
150 1 : probeParser.DefineFunc("Log",
151 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) (ret probe) {
152 1 : ret = loggingProbe{prefix: s.ConsumeString()}
153 1 : s.Consume(token.RPAREN)
154 1 : return ret
155 1 : })
156 1 : return probeParser
157 : }()
158 :
159 : // probe implementations
160 :
161 : type errorProbe struct {
162 : name string
163 : err error
164 : }
165 :
166 0 : func (p *errorProbe) String() string { return p.name }
167 0 : func (p *errorProbe) Error() error { return p.err }
168 1 : func (p *errorProbe) probe(pctx *probeContext) {
169 1 : pctx.op.Err = p.err
170 1 : pctx.op.Span = nil
171 1 : }
172 :
173 : // ifProbe is a conditional probe. If its predicate evaluates to true, it probes
174 : // using its Then probe. If its predicate evalutes to false, it probes using its
175 : // Else probe.
176 : type ifProbe struct {
177 : Predicate dsl.Predicate[*probeContext]
178 : Then probe
179 : Else probe
180 : }
181 :
182 0 : func (p ifProbe) String() string { return fmt.Sprintf("(If %s %s %s)", p.Predicate, p.Then, p.Else) }
183 1 : func (p ifProbe) probe(pctx *probeContext) {
184 1 : if p.Predicate.Evaluate(pctx) {
185 1 : p.Then.probe(pctx)
186 1 : } else {
187 1 : p.Else.probe(pctx)
188 1 : }
189 : }
190 :
191 : type returnSpan struct {
192 : s *Span
193 : }
194 :
195 0 : func (p returnSpan) String() string {
196 0 : if p.s == nil {
197 0 : return "(Return nil)"
198 0 : }
199 0 : return fmt.Sprintf("(Return %q)", p.s.String())
200 : }
201 :
202 0 : func (p returnSpan) probe(pctx *probeContext) {
203 0 : pctx.op.Span = p.s
204 0 : pctx.op.Err = nil
205 0 : }
206 :
207 : type noop struct{}
208 :
209 0 : func (noop) String() string { return "Noop" }
210 1 : func (noop) probe(pctx *probeContext) {}
211 :
212 : type loggingProbe struct {
213 : prefix string
214 : }
215 :
216 0 : func (lp loggingProbe) String() string { return fmt.Sprintf("(Log %q)", lp.prefix) }
217 1 : func (lp loggingProbe) probe(pctx *probeContext) {
218 1 : opStr := strings.TrimPrefix(pctx.op.Kind.String(), "Op")
219 1 : fmt.Fprintf(pctx.log, "%s%s(", lp.prefix, opStr)
220 1 : if pctx.op.SeekKey != nil {
221 1 : fmt.Fprintf(pctx.log, "%q", pctx.op.SeekKey)
222 1 : }
223 1 : fmt.Fprint(pctx.log, ") = ")
224 1 : if pctx.op.Span == nil {
225 1 : fmt.Fprint(pctx.log, "nil")
226 1 : if pctx.op.Err != nil {
227 1 : fmt.Fprintf(pctx.log, " <err=%q>", pctx.op.Err)
228 1 : }
229 1 : } else {
230 1 : fmt.Fprint(pctx.log, pctx.op.Span.String())
231 1 : }
232 1 : fmt.Fprintln(pctx.log)
233 : }
234 :
235 : // dsl.Predicate[*probeContext] implementations.
236 :
237 : type equal struct {
238 : a, b valuer
239 : }
240 :
241 0 : func (e equal) String() string { return fmt.Sprintf("(Equal %s %s)", e.a, e.b) }
242 1 : func (e equal) Evaluate(pctx *probeContext) bool {
243 1 : return reflect.DeepEqual(e.a.value(pctx), e.b.value(pctx))
244 1 : }
245 :
246 : // OpKind indicates the type of iterator operation being performed.
247 : type OpKind int8
248 :
249 : // OpKind values.
250 : const (
251 : OpSeekGE OpKind = iota
252 : OpSeekLT
253 : OpFirst
254 : OpLast
255 : OpNext
256 : OpPrev
257 : OpClose
258 : numOpKinds
259 : )
260 :
261 1 : func (o OpKind) String() string { return opNames[o] }
262 :
263 : // Evaluate implements dsl.Predicate.
264 1 : func (o OpKind) Evaluate(pctx *probeContext) bool { return pctx.op.Kind == o }
265 :
266 : var opNames = [numOpKinds]string{
267 : OpSeekGE: "OpSeekGE",
268 : OpSeekLT: "OpSeekLT",
269 : OpFirst: "OpFirst",
270 : OpLast: "OpLast",
271 : OpNext: "OpNext",
272 : OpPrev: "OpPrev",
273 : OpClose: "OpClose",
274 : }
275 :
276 : // valuer implementations
277 :
278 : type valuer interface {
279 : fmt.Stringer
280 : value(pctx *probeContext) any
281 : }
282 :
283 : type bytesConstant struct {
284 : bytes []byte
285 : }
286 :
287 0 : func (b bytesConstant) String() string { return fmt.Sprintf("%q", string(b.bytes)) }
288 1 : func (b bytesConstant) value(pctx *probeContext) any { return b.bytes }
289 :
290 : type startKey struct{}
291 :
292 0 : func (s startKey) String() string { return "StartKey" }
293 1 : func (s startKey) value(pctx *probeContext) any {
294 1 : if pctx.op.Span == nil {
295 0 : return nil
296 0 : }
297 1 : return pctx.op.Span.Start
298 : }
299 :
300 : type probeIterator struct {
301 : iter FragmentIterator
302 : probe probe
303 : probeCtx probeContext
304 : }
305 :
306 : // Assert that probeIterator implements the fragment iterator interface.
307 : var _ FragmentIterator = (*probeIterator)(nil)
308 :
309 1 : func (p *probeIterator) handleOp(preProbeOp op) (*Span, error) {
310 1 : p.probeCtx.op = preProbeOp
311 1 : p.probe.probe(&p.probeCtx)
312 1 : return p.probeCtx.op.Span, p.probeCtx.op.Err
313 1 : }
314 :
315 1 : func (p *probeIterator) SeekGE(key []byte) (*Span, error) {
316 1 : op := op{
317 1 : Kind: OpSeekGE,
318 1 : SeekKey: key,
319 1 : }
320 1 : if p.iter != nil {
321 1 : op.Span, op.Err = p.iter.SeekGE(key)
322 1 : }
323 1 : return p.handleOp(op)
324 : }
325 :
326 1 : func (p *probeIterator) SeekLT(key []byte) (*Span, error) {
327 1 : op := op{
328 1 : Kind: OpSeekLT,
329 1 : SeekKey: key,
330 1 : }
331 1 : if p.iter != nil {
332 1 : op.Span, op.Err = p.iter.SeekLT(key)
333 1 : }
334 1 : return p.handleOp(op)
335 : }
336 :
337 1 : func (p *probeIterator) First() (*Span, error) {
338 1 : op := op{Kind: OpFirst}
339 1 : if p.iter != nil {
340 1 : op.Span, op.Err = p.iter.First()
341 1 : }
342 1 : return p.handleOp(op)
343 : }
344 :
345 1 : func (p *probeIterator) Last() (*Span, error) {
346 1 : op := op{Kind: OpLast}
347 1 : if p.iter != nil {
348 1 : op.Span, op.Err = p.iter.Last()
349 1 : }
350 1 : return p.handleOp(op)
351 : }
352 :
353 1 : func (p *probeIterator) Next() (*Span, error) {
354 1 : op := op{Kind: OpNext}
355 1 : if p.iter != nil {
356 1 : op.Span, op.Err = p.iter.Next()
357 1 : }
358 1 : return p.handleOp(op)
359 : }
360 :
361 1 : func (p *probeIterator) Prev() (*Span, error) {
362 1 : op := op{Kind: OpPrev}
363 1 : if p.iter != nil {
364 1 : op.Span, op.Err = p.iter.Prev()
365 1 : }
366 1 : return p.handleOp(op)
367 : }
368 :
369 : // SetContext is part of the FragmentIterator interface.
370 0 : func (p *probeIterator) SetContext(ctx context.Context) {
371 0 : p.iter.SetContext(ctx)
372 0 : }
373 :
374 0 : func (p *probeIterator) Close() {
375 0 : op := op{Kind: OpClose}
376 0 : if p.iter != nil {
377 0 : p.iter.Close()
378 0 : }
379 0 : _, _ = p.handleOp(op)
380 : }
381 :
382 0 : func (p *probeIterator) WrapChildren(wrap WrapFn) {
383 0 : p.iter = wrap(p.iter)
384 0 : }
385 :
386 : // DebugTree is part of the FragmentIterator interface.
387 0 : func (p *probeIterator) DebugTree(tp treeprinter.Node) {
388 0 : n := tp.Childf("%T(%p)", p, p)
389 0 : if p.iter != nil {
390 0 : p.iter.DebugTree(n)
391 0 : }
392 : }
393 :
394 : // RunIterCmd evaluates a datadriven command controlling an internal
395 : // keyspan.FragmentIterator, writing the results of the iterator operations to
396 : // the provided writer.
397 1 : func RunIterCmd(tdInput string, iter FragmentIterator, w io.Writer) {
398 1 : lines := strings.Split(strings.TrimSpace(tdInput), "\n")
399 1 : for i, line := range lines {
400 1 : if i > 0 {
401 1 : fmt.Fprintln(w)
402 1 : }
403 1 : line = strings.TrimSpace(line)
404 1 : i := strings.IndexByte(line, '#')
405 1 : iterCmd := line
406 1 : if i > 0 {
407 0 : iterCmd = string(line[:i])
408 0 : }
409 1 : runIterOp(w, iter, iterCmd)
410 : }
411 : }
412 :
413 : var iterDelim = map[rune]bool{',': true, ' ': true, '(': true, ')': true, '"': true}
414 :
415 1 : func runIterOp(w io.Writer, it FragmentIterator, op string) {
416 1 : fields := strings.FieldsFunc(op, func(r rune) bool { return iterDelim[r] })
417 1 : var s *Span
418 1 : var err error
419 1 : switch strings.ToLower(fields[0]) {
420 1 : case "first":
421 1 : s, err = it.First()
422 1 : case "last":
423 1 : s, err = it.Last()
424 1 : case "seekge", "seek-ge":
425 1 : if len(fields) == 1 {
426 0 : panic(fmt.Sprintf("unable to parse iter op %q", op))
427 : }
428 1 : s, err = it.SeekGE([]byte(fields[1]))
429 1 : case "seeklt", "seek-lt":
430 1 : if len(fields) == 1 {
431 0 : panic(fmt.Sprintf("unable to parse iter op %q", op))
432 : }
433 1 : s, err = it.SeekLT([]byte(fields[1]))
434 1 : case "next":
435 1 : s, err = it.Next()
436 1 : case "prev":
437 1 : s, err = it.Prev()
438 0 : default:
439 0 : panic(fmt.Sprintf("unrecognized iter op %q", fields[0]))
440 : }
441 1 : switch {
442 1 : case err != nil:
443 1 : fmt.Fprintf(w, "<nil> err=<%s>", err)
444 1 : case s == nil:
445 1 : fmt.Fprint(w, "<nil>")
446 1 : default:
447 1 : fmt.Fprint(w, s)
448 : }
449 : }
450 :
451 : // RunFragmentIteratorCmd runs a command on an iterator; intended for testing.
452 1 : func RunFragmentIteratorCmd(iter FragmentIterator, input string, extraInfo func() string) string {
453 1 : var b bytes.Buffer
454 1 : for _, line := range strings.Split(input, "\n") {
455 1 : parts := strings.Fields(line)
456 1 : if len(parts) == 0 {
457 0 : continue
458 : }
459 1 : var span *Span
460 1 : var err error
461 1 : switch parts[0] {
462 1 : case "seek-ge":
463 1 : if len(parts) != 2 {
464 0 : return "seek-ge <key>\n"
465 0 : }
466 1 : span, err = iter.SeekGE([]byte(strings.TrimSpace(parts[1])))
467 1 : case "seek-lt":
468 1 : if len(parts) != 2 {
469 0 : return "seek-lt <key>\n"
470 0 : }
471 1 : span, err = iter.SeekLT([]byte(strings.TrimSpace(parts[1])))
472 1 : case "first":
473 1 : span, err = iter.First()
474 1 : case "last":
475 1 : span, err = iter.Last()
476 1 : case "next":
477 1 : span, err = iter.Next()
478 1 : case "prev":
479 1 : span, err = iter.Prev()
480 0 : default:
481 0 : return fmt.Sprintf("unknown op: %s", parts[0])
482 : }
483 1 : switch {
484 0 : case err != nil:
485 0 : fmt.Fprintf(&b, "err=%v\n", err)
486 1 : case span == nil:
487 1 : fmt.Fprintf(&b, ".\n")
488 1 : default:
489 1 : fmt.Fprintf(&b, "%s", span)
490 1 : if extraInfo != nil {
491 1 : fmt.Fprintf(&b, " (%s)", extraInfo())
492 1 : }
493 1 : b.WriteByte('\n')
494 : }
495 : }
496 1 : return b.String()
497 : }
498 :
499 : // NewInvalidatingIter wraps a FragmentIterator; spans surfaced by the inner
500 : // iterator are copied to buffers that are zeroed by subsequent iterator
501 : // positioning calls. This is intended to help surface bugs in improper lifetime
502 : // expectations of Spans.
503 1 : func NewInvalidatingIter(iter FragmentIterator) FragmentIterator {
504 1 : return &invalidatingIter{
505 1 : iter: iter,
506 1 : }
507 1 : }
508 :
509 : type invalidatingIter struct {
510 : iter FragmentIterator
511 : bufs [][]byte
512 : keys []Key
513 : span Span
514 : }
515 :
516 : // invalidatingIter implements FragmentIterator.
517 : var _ FragmentIterator = (*invalidatingIter)(nil)
518 :
519 1 : func (i *invalidatingIter) invalidate(s *Span, err error) (*Span, error) {
520 1 : // Mangle the entirety of the byte bufs and the keys slice.
521 1 : for j := range i.bufs {
522 1 : for k := range i.bufs[j] {
523 1 : i.bufs[j][k] = 0xff
524 1 : }
525 1 : i.bufs[j] = nil
526 : }
527 1 : for j := range i.keys {
528 1 : i.keys[j] = Key{}
529 1 : }
530 1 : if s == nil {
531 1 : return nil, err
532 1 : }
533 :
534 : // Copy all of the span's slices into slices owned by the invalidating iter
535 : // that we can invalidate on a subsequent positioning method.
536 1 : i.bufs = i.bufs[:0]
537 1 : i.keys = i.keys[:0]
538 1 : i.span = Span{
539 1 : Start: i.saveBytes(s.Start),
540 1 : End: i.saveBytes(s.End),
541 1 : KeysOrder: s.KeysOrder,
542 1 : }
543 1 : for j := range s.Keys {
544 1 : i.keys = append(i.keys, Key{
545 1 : Trailer: s.Keys[j].Trailer,
546 1 : Suffix: i.saveBytes(s.Keys[j].Suffix),
547 1 : Value: i.saveBytes(s.Keys[j].Value),
548 1 : })
549 1 : }
550 1 : i.span.Keys = i.keys
551 1 : return &i.span, err
552 : }
553 :
554 1 : func (i *invalidatingIter) saveBytes(b []byte) []byte {
555 1 : if b == nil {
556 1 : return nil
557 1 : }
558 1 : saved := append([]byte(nil), b...)
559 1 : i.bufs = append(i.bufs, saved)
560 1 : return saved
561 : }
562 :
563 1 : func (i *invalidatingIter) SeekGE(key []byte) (*Span, error) { return i.invalidate(i.iter.SeekGE(key)) }
564 1 : func (i *invalidatingIter) SeekLT(key []byte) (*Span, error) { return i.invalidate(i.iter.SeekLT(key)) }
565 1 : func (i *invalidatingIter) First() (*Span, error) { return i.invalidate(i.iter.First()) }
566 1 : func (i *invalidatingIter) Last() (*Span, error) { return i.invalidate(i.iter.Last()) }
567 1 : func (i *invalidatingIter) Next() (*Span, error) { return i.invalidate(i.iter.Next()) }
568 1 : func (i *invalidatingIter) Prev() (*Span, error) { return i.invalidate(i.iter.Prev()) }
569 :
570 : // SetContext is part of the FragmentIterator interface.
571 0 : func (i *invalidatingIter) SetContext(ctx context.Context) {
572 0 : i.iter.SetContext(ctx)
573 0 : }
574 :
575 1 : func (i *invalidatingIter) Close() {
576 1 : _, _ = i.invalidate(nil, nil)
577 1 : i.iter.Close()
578 1 : }
579 :
580 0 : func (i *invalidatingIter) WrapChildren(wrap WrapFn) {
581 0 : i.iter = wrap(i.iter)
582 0 : }
583 :
584 : // DebugTree is part of the FragmentIterator interface.
585 0 : func (i *invalidatingIter) DebugTree(tp treeprinter.Node) {
586 0 : n := tp.Childf("%T(%p)", i, i)
587 0 : if i.iter != nil {
588 0 : i.iter.DebugTree(n)
589 0 : }
590 : }
|