Line data Source code
1 : // Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2 : // of this source code is governed by a BSD-style license that can be found in
3 : // the LICENSE file.
4 :
5 : package keyspan
6 :
7 : import (
8 : "bytes"
9 : "fmt"
10 : "go/token"
11 : "io"
12 : "reflect"
13 : "strconv"
14 : "strings"
15 :
16 : "github.com/cockroachdb/errors"
17 : "github.com/cockroachdb/pebble/internal/dsl"
18 : )
19 :
20 : // This file contains testing facilities for Spans and FragmentIterators. It's
21 : // defined here so that it may be used by the keyspan package to test its
22 : // various FragmentIterator implementations.
23 : //
24 : // TODO(jackson): Move keyspan.{Span,Key,FragmentIterator} into internal/base,
25 : // and then move the testing facilities to an independent package, eg
26 : // internal/itertest. Alternatively, make all tests that use it use the
27 : // keyspan_test package, which can then import a separate itertest package.
28 :
29 : // probe defines an interface for probes that may inspect or mutate internal
30 : // span iterator behavior.
31 : type probe interface {
32 : // probe inspects, and possibly manipulates, iterator operations' results.
33 : probe(*probeContext)
34 : }
35 :
36 0 : func parseProbes(probeDSLs ...string) []probe {
37 0 : probes := make([]probe, len(probeDSLs))
38 0 : var err error
39 0 : for i := range probeDSLs {
40 0 : probes[i], err = probeParser.Parse(probeDSLs[i])
41 0 : if err != nil {
42 0 : panic(err)
43 : }
44 : }
45 0 : return probes
46 : }
47 :
48 0 : func attachProbes(iter FragmentIterator, pctx probeContext, probes ...probe) FragmentIterator {
49 0 : if pctx.log == nil {
50 0 : pctx.log = io.Discard
51 0 : }
52 0 : for i := range probes {
53 0 : iter = &probeIterator{
54 0 : iter: iter,
55 0 : probe: probes[i],
56 0 : probeCtx: pctx,
57 0 : }
58 0 : }
59 0 : return iter
60 : }
61 :
62 : // ParseAndAttachProbes parses DSL probes and attaches them to an iterator.
63 : func ParseAndAttachProbes(
64 : iter FragmentIterator, log io.Writer, probeDSLs ...string,
65 0 : ) FragmentIterator {
66 0 : pctx := probeContext{log: log}
67 0 : return attachProbes(iter, pctx, parseProbes(probeDSLs...)...)
68 0 : }
69 :
70 : // probeContext provides the context within which a probe is run. It includes
71 : // information about the iterator operation in progress.
72 : type probeContext struct {
73 : op
74 : log io.Writer
75 : }
76 :
77 : type op struct {
78 : Kind OpKind
79 : SeekKey []byte
80 : Span *Span
81 : Err error
82 : }
83 :
84 : // ErrInjected is an error artificially injected for testing.
85 : var ErrInjected = &errorProbe{name: "ErrInjected", err: errors.New("injected error")}
86 :
87 1 : var probeParser = func() *dsl.Parser[probe] {
88 1 : valuerParser := dsl.NewParser[valuer]()
89 1 : valuerParser.DefineConstant("StartKey", func() valuer { return startKey{} })
90 1 : valuerParser.DefineFunc("Bytes",
91 1 : func(p *dsl.Parser[valuer], s *dsl.Scanner) valuer {
92 0 : v := bytesConstant{bytes: []byte(s.ConsumeString())}
93 0 : s.Consume(token.RPAREN)
94 0 : return v
95 0 : })
96 :
97 1 : predicateParser := dsl.NewPredicateParser[*probeContext]()
98 1 : predicateParser.DefineFunc("Equal",
99 1 : func(p *dsl.Parser[dsl.Predicate[*probeContext]], s *dsl.Scanner) dsl.Predicate[*probeContext] {
100 0 : eq := equal{
101 0 : valuerParser.ParseFromPos(s, s.Scan()),
102 0 : valuerParser.ParseFromPos(s, s.Scan()),
103 0 : }
104 0 : s.Consume(token.RPAREN)
105 0 : return eq
106 0 : })
107 1 : for i, name := range opNames {
108 1 : opKind := OpKind(i)
109 1 : predicateParser.DefineConstant(name, func() dsl.Predicate[*probeContext] {
110 0 : // An OpKind implements dsl.Predicate[*probeContext].
111 0 : return opKind
112 0 : })
113 : }
114 1 : probeParser := dsl.NewParser[probe]()
115 1 : probeParser.DefineConstant("ErrInjected", func() probe { return ErrInjected })
116 1 : probeParser.DefineConstant("noop", func() probe { return noop{} })
117 1 : probeParser.DefineFunc("If",
118 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) probe {
119 0 : probe := ifProbe{
120 0 : predicateParser.ParseFromPos(s, s.Scan()),
121 0 : probeParser.ParseFromPos(s, s.Scan()),
122 0 : probeParser.ParseFromPos(s, s.Scan()),
123 0 : }
124 0 : s.Consume(token.RPAREN)
125 0 : return probe
126 0 : })
127 1 : probeParser.DefineFunc("Return",
128 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) (ret probe) {
129 0 : switch tok := s.Scan(); tok.Kind {
130 0 : case token.STRING:
131 0 : str, err := strconv.Unquote(tok.Lit)
132 0 : if err != nil {
133 0 : panic(err)
134 : }
135 0 : span := ParseSpan(str)
136 0 : ret = returnSpan{s: &span}
137 0 : case token.IDENT:
138 0 : switch tok.Lit {
139 0 : case "nil":
140 0 : ret = returnSpan{s: nil}
141 0 : default:
142 0 : panic(errors.Newf("unrecognized return value %q", tok.Lit))
143 : }
144 : }
145 0 : s.Consume(token.RPAREN)
146 0 : return ret
147 : })
148 1 : probeParser.DefineFunc("Log",
149 1 : func(p *dsl.Parser[probe], s *dsl.Scanner) (ret probe) {
150 0 : ret = loggingProbe{prefix: s.ConsumeString()}
151 0 : s.Consume(token.RPAREN)
152 0 : return ret
153 0 : })
154 1 : return probeParser
155 : }()
156 :
157 : // probe implementations
158 :
159 : type errorProbe struct {
160 : name string
161 : err error
162 : }
163 :
164 0 : func (p *errorProbe) String() string { return p.name }
165 0 : func (p *errorProbe) Error() error { return p.err }
166 0 : func (p *errorProbe) probe(pctx *probeContext) {
167 0 : pctx.op.Err = p.err
168 0 : pctx.op.Span = nil
169 0 : }
170 :
171 : // ifProbe is a conditional probe. If its predicate evaluates to true, it probes
172 : // using its Then probe. If its predicate evalutes to false, it probes using its
173 : // Else probe.
174 : type ifProbe struct {
175 : Predicate dsl.Predicate[*probeContext]
176 : Then probe
177 : Else probe
178 : }
179 :
180 0 : func (p ifProbe) String() string { return fmt.Sprintf("(If %s %s %s)", p.Predicate, p.Then, p.Else) }
181 0 : func (p ifProbe) probe(pctx *probeContext) {
182 0 : if p.Predicate.Evaluate(pctx) {
183 0 : p.Then.probe(pctx)
184 0 : } else {
185 0 : p.Else.probe(pctx)
186 0 : }
187 : }
188 :
189 : type returnSpan struct {
190 : s *Span
191 : }
192 :
193 0 : func (p returnSpan) String() string {
194 0 : if p.s == nil {
195 0 : return "(Return nil)"
196 0 : }
197 0 : return fmt.Sprintf("(Return %q)", p.s.String())
198 : }
199 :
200 0 : func (p returnSpan) probe(pctx *probeContext) {
201 0 : pctx.op.Span = p.s
202 0 : pctx.op.Err = nil
203 0 : }
204 :
205 : type noop struct{}
206 :
207 0 : func (noop) String() string { return "Noop" }
208 0 : func (noop) probe(pctx *probeContext) {}
209 :
210 : type loggingProbe struct {
211 : prefix string
212 : }
213 :
214 0 : func (lp loggingProbe) String() string { return fmt.Sprintf("(Log %q)", lp.prefix) }
215 0 : func (lp loggingProbe) probe(pctx *probeContext) {
216 0 : opStr := strings.TrimPrefix(pctx.op.Kind.String(), "Op")
217 0 : fmt.Fprintf(pctx.log, "%s%s(", lp.prefix, opStr)
218 0 : if pctx.op.SeekKey != nil {
219 0 : fmt.Fprintf(pctx.log, "%q", pctx.op.SeekKey)
220 0 : }
221 0 : fmt.Fprint(pctx.log, ") = ")
222 0 : if pctx.op.Span == nil {
223 0 : fmt.Fprint(pctx.log, "nil")
224 0 : if pctx.op.Err != nil {
225 0 : fmt.Fprintf(pctx.log, " <err=%q>", pctx.op.Err)
226 0 : }
227 0 : } else {
228 0 : fmt.Fprint(pctx.log, pctx.op.Span.String())
229 0 : }
230 0 : fmt.Fprintln(pctx.log)
231 : }
232 :
233 : // dsl.Predicate[*probeContext] implementations.
234 :
235 : type equal struct {
236 : a, b valuer
237 : }
238 :
239 0 : func (e equal) String() string { return fmt.Sprintf("(Equal %s %s)", e.a, e.b) }
240 0 : func (e equal) Evaluate(pctx *probeContext) bool {
241 0 : return reflect.DeepEqual(e.a.value(pctx), e.b.value(pctx))
242 0 : }
243 :
244 : // OpKind indicates the type of iterator operation being performed.
245 : type OpKind int8
246 :
247 : // OpKind values.
248 : const (
249 : OpSeekGE OpKind = iota
250 : OpSeekLT
251 : OpFirst
252 : OpLast
253 : OpNext
254 : OpPrev
255 : OpClose
256 : numOpKinds
257 : )
258 :
259 0 : func (o OpKind) String() string { return opNames[o] }
260 :
261 : // Evaluate implements dsl.Predicate.
262 0 : func (o OpKind) Evaluate(pctx *probeContext) bool { return pctx.op.Kind == o }
263 :
264 : var opNames = [numOpKinds]string{
265 : OpSeekGE: "OpSeekGE",
266 : OpSeekLT: "OpSeekLT",
267 : OpFirst: "OpFirst",
268 : OpLast: "OpLast",
269 : OpNext: "OpNext",
270 : OpPrev: "OpPrev",
271 : OpClose: "OpClose",
272 : }
273 :
274 : // valuer implementations
275 :
276 : type valuer interface {
277 : fmt.Stringer
278 : value(pctx *probeContext) any
279 : }
280 :
281 : type bytesConstant struct {
282 : bytes []byte
283 : }
284 :
285 0 : func (b bytesConstant) String() string { return fmt.Sprintf("%q", string(b.bytes)) }
286 0 : func (b bytesConstant) value(pctx *probeContext) any { return b.bytes }
287 :
288 : type startKey struct{}
289 :
290 0 : func (s startKey) String() string { return "StartKey" }
291 0 : func (s startKey) value(pctx *probeContext) any {
292 0 : if pctx.op.Span == nil {
293 0 : return nil
294 0 : }
295 0 : return pctx.op.Span.Start
296 : }
297 :
298 : type probeIterator struct {
299 : iter FragmentIterator
300 : probe probe
301 : probeCtx probeContext
302 : }
303 :
304 : // Assert that probeIterator implements the fragment iterator interface.
305 : var _ FragmentIterator = (*probeIterator)(nil)
306 :
307 0 : func (p *probeIterator) handleOp(preProbeOp op) (*Span, error) {
308 0 : p.probeCtx.op = preProbeOp
309 0 : p.probe.probe(&p.probeCtx)
310 0 : return p.probeCtx.op.Span, p.probeCtx.op.Err
311 0 : }
312 :
313 0 : func (p *probeIterator) SeekGE(key []byte) (*Span, error) {
314 0 : op := op{
315 0 : Kind: OpSeekGE,
316 0 : SeekKey: key,
317 0 : }
318 0 : if p.iter != nil {
319 0 : op.Span, op.Err = p.iter.SeekGE(key)
320 0 : }
321 0 : return p.handleOp(op)
322 : }
323 :
324 0 : func (p *probeIterator) SeekLT(key []byte) (*Span, error) {
325 0 : op := op{
326 0 : Kind: OpSeekLT,
327 0 : SeekKey: key,
328 0 : }
329 0 : if p.iter != nil {
330 0 : op.Span, op.Err = p.iter.SeekLT(key)
331 0 : }
332 0 : return p.handleOp(op)
333 : }
334 :
335 0 : func (p *probeIterator) First() (*Span, error) {
336 0 : op := op{Kind: OpFirst}
337 0 : if p.iter != nil {
338 0 : op.Span, op.Err = p.iter.First()
339 0 : }
340 0 : return p.handleOp(op)
341 : }
342 :
343 0 : func (p *probeIterator) Last() (*Span, error) {
344 0 : op := op{Kind: OpLast}
345 0 : if p.iter != nil {
346 0 : op.Span, op.Err = p.iter.Last()
347 0 : }
348 0 : return p.handleOp(op)
349 : }
350 :
351 0 : func (p *probeIterator) Next() (*Span, error) {
352 0 : op := op{Kind: OpNext}
353 0 : if p.iter != nil {
354 0 : op.Span, op.Err = p.iter.Next()
355 0 : }
356 0 : return p.handleOp(op)
357 : }
358 :
359 0 : func (p *probeIterator) Prev() (*Span, error) {
360 0 : op := op{Kind: OpPrev}
361 0 : if p.iter != nil {
362 0 : op.Span, op.Err = p.iter.Prev()
363 0 : }
364 0 : return p.handleOp(op)
365 : }
366 :
367 0 : func (p *probeIterator) Close() error {
368 0 : op := op{Kind: OpClose}
369 0 : if p.iter != nil {
370 0 : op.Err = p.iter.Close()
371 0 : }
372 0 : _, err := p.handleOp(op)
373 0 : return err
374 : }
375 :
376 0 : func (p *probeIterator) WrapChildren(wrap WrapFn) {
377 0 : p.iter = wrap(p.iter)
378 0 : }
379 :
380 : // RunIterCmd evaluates a datadriven command controlling an internal
381 : // keyspan.FragmentIterator, writing the results of the iterator operations to
382 : // the provided writer.
383 0 : func RunIterCmd(tdInput string, iter FragmentIterator, w io.Writer) {
384 0 : lines := strings.Split(strings.TrimSpace(tdInput), "\n")
385 0 : for i, line := range lines {
386 0 : if i > 0 {
387 0 : fmt.Fprintln(w)
388 0 : }
389 0 : line = strings.TrimSpace(line)
390 0 : i := strings.IndexByte(line, '#')
391 0 : iterCmd := line
392 0 : if i > 0 {
393 0 : iterCmd = string(line[:i])
394 0 : }
395 0 : runIterOp(w, iter, iterCmd)
396 : }
397 : }
398 :
399 : var iterDelim = map[rune]bool{',': true, ' ': true, '(': true, ')': true, '"': true}
400 :
401 0 : func runIterOp(w io.Writer, it FragmentIterator, op string) {
402 0 : fields := strings.FieldsFunc(op, func(r rune) bool { return iterDelim[r] })
403 0 : var s *Span
404 0 : var err error
405 0 : switch strings.ToLower(fields[0]) {
406 0 : case "first":
407 0 : s, err = it.First()
408 0 : case "last":
409 0 : s, err = it.Last()
410 0 : case "seekge", "seek-ge":
411 0 : if len(fields) == 1 {
412 0 : panic(fmt.Sprintf("unable to parse iter op %q", op))
413 : }
414 0 : s, err = it.SeekGE([]byte(fields[1]))
415 0 : case "seeklt", "seek-lt":
416 0 : if len(fields) == 1 {
417 0 : panic(fmt.Sprintf("unable to parse iter op %q", op))
418 : }
419 0 : s, err = it.SeekLT([]byte(fields[1]))
420 0 : case "next":
421 0 : s, err = it.Next()
422 0 : case "prev":
423 0 : s, err = it.Prev()
424 0 : default:
425 0 : panic(fmt.Sprintf("unrecognized iter op %q", fields[0]))
426 : }
427 0 : switch {
428 0 : case err != nil:
429 0 : fmt.Fprintf(w, "<nil> err=<%s>", err)
430 0 : case s == nil:
431 0 : fmt.Fprint(w, "<nil>")
432 0 : default:
433 0 : fmt.Fprint(w, s)
434 : }
435 : }
436 :
437 : // RunFragmentIteratorCmd runs a command on an iterator; intended for testing.
438 0 : func RunFragmentIteratorCmd(iter FragmentIterator, input string, extraInfo func() string) string {
439 0 : var b bytes.Buffer
440 0 : for _, line := range strings.Split(input, "\n") {
441 0 : parts := strings.Fields(line)
442 0 : if len(parts) == 0 {
443 0 : continue
444 : }
445 0 : var span *Span
446 0 : var err error
447 0 : switch parts[0] {
448 0 : case "seek-ge":
449 0 : if len(parts) != 2 {
450 0 : return "seek-ge <key>\n"
451 0 : }
452 0 : span, err = iter.SeekGE([]byte(strings.TrimSpace(parts[1])))
453 0 : case "seek-lt":
454 0 : if len(parts) != 2 {
455 0 : return "seek-lt <key>\n"
456 0 : }
457 0 : span, err = iter.SeekLT([]byte(strings.TrimSpace(parts[1])))
458 0 : case "first":
459 0 : span, err = iter.First()
460 0 : case "last":
461 0 : span, err = iter.Last()
462 0 : case "next":
463 0 : span, err = iter.Next()
464 0 : case "prev":
465 0 : span, err = iter.Prev()
466 0 : default:
467 0 : return fmt.Sprintf("unknown op: %s", parts[0])
468 : }
469 0 : switch {
470 0 : case err != nil:
471 0 : fmt.Fprintf(&b, "err=%v\n", err)
472 0 : case span == nil:
473 0 : fmt.Fprintf(&b, ".\n")
474 0 : default:
475 0 : fmt.Fprintf(&b, "%s", span)
476 0 : if extraInfo != nil {
477 0 : fmt.Fprintf(&b, " (%s)", extraInfo())
478 0 : }
479 0 : b.WriteByte('\n')
480 : }
481 : }
482 0 : return b.String()
483 : }
484 :
485 : // NewInvalidatingIter wraps a FragmentIterator; spans surfaced by the inner
486 : // iterator are copied to buffers that are zeroed by subsequent iterator
487 : // positioning calls. This is intended to help surface bugs in improper lifetime
488 : // expectations of Spans.
489 0 : func NewInvalidatingIter(iter FragmentIterator) FragmentIterator {
490 0 : return &invalidatingIter{
491 0 : iter: iter,
492 0 : }
493 0 : }
494 :
495 : type invalidatingIter struct {
496 : iter FragmentIterator
497 : bufs [][]byte
498 : keys []Key
499 : span Span
500 : }
501 :
502 : // invalidatingIter implements FragmentIterator.
503 : var _ FragmentIterator = (*invalidatingIter)(nil)
504 :
505 0 : func (i *invalidatingIter) invalidate(s *Span, err error) (*Span, error) {
506 0 : // Zero the entirety of the byte bufs and the keys slice.
507 0 : for j := range i.bufs {
508 0 : for k := range i.bufs[j] {
509 0 : i.bufs[j][k] = 0x00
510 0 : }
511 0 : i.bufs[j] = nil
512 : }
513 0 : for j := range i.keys {
514 0 : i.keys[j] = Key{}
515 0 : }
516 0 : if s == nil {
517 0 : return nil, err
518 0 : }
519 :
520 : // Copy all of the span's slices into slices owned by the invalidating iter
521 : // that we can invalidate on a subsequent positioning method.
522 0 : i.bufs = i.bufs[:0]
523 0 : i.keys = i.keys[:0]
524 0 : i.span = Span{
525 0 : Start: i.saveBytes(s.Start),
526 0 : End: i.saveBytes(s.End),
527 0 : }
528 0 : for j := range s.Keys {
529 0 : i.keys = append(i.keys, Key{
530 0 : Trailer: s.Keys[j].Trailer,
531 0 : Suffix: i.saveBytes(s.Keys[j].Suffix),
532 0 : Value: i.saveBytes(s.Keys[j].Value),
533 0 : })
534 0 : }
535 0 : i.span.Keys = i.keys
536 0 : return &i.span, err
537 : }
538 :
539 0 : func (i *invalidatingIter) saveBytes(b []byte) []byte {
540 0 : if b == nil {
541 0 : return nil
542 0 : }
543 0 : saved := append([]byte(nil), b...)
544 0 : i.bufs = append(i.bufs, saved)
545 0 : return saved
546 : }
547 :
548 0 : func (i *invalidatingIter) SeekGE(key []byte) (*Span, error) { return i.invalidate(i.iter.SeekGE(key)) }
549 0 : func (i *invalidatingIter) SeekLT(key []byte) (*Span, error) { return i.invalidate(i.iter.SeekLT(key)) }
550 0 : func (i *invalidatingIter) First() (*Span, error) { return i.invalidate(i.iter.First()) }
551 0 : func (i *invalidatingIter) Last() (*Span, error) { return i.invalidate(i.iter.Last()) }
552 0 : func (i *invalidatingIter) Next() (*Span, error) { return i.invalidate(i.iter.Next()) }
553 0 : func (i *invalidatingIter) Prev() (*Span, error) { return i.invalidate(i.iter.Prev()) }
554 0 : func (i *invalidatingIter) Close() error { return i.iter.Close() }
555 0 : func (i *invalidatingIter) WrapChildren(wrap WrapFn) {
556 0 : i.iter = wrap(i.iter)
557 0 : }
|