Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/lark/lexer.py: 70%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Lexer Implementation
3from abc import abstractmethod, ABC
4import re
5from contextlib import suppress
6from typing import (
7 TypeVar, Type, Dict, Iterator, Collection, Callable, Optional, FrozenSet, Any,
8 ClassVar, TYPE_CHECKING, overload
9)
10from types import ModuleType
11import warnings
12try:
13 import interegular
14except ImportError:
15 pass
16if TYPE_CHECKING:
17 from .common import LexerConf
18 from .parsers.lalr_parser_state import ParserState
20from .utils import classify, get_regexp_width, Serialize, logger, TextSlice, TextOrSlice
21from .exceptions import UnexpectedCharacters, LexError, UnexpectedToken
22from .grammar import TOKEN_DEFAULT_PRIORITY
25###{standalone
26from copy import copy
28try: # For the standalone parser, we need to make sure that has_interegular is False to avoid NameErrors later on
29 has_interegular = bool(interegular)
30except NameError:
31 has_interegular = False
33class Pattern(Serialize, ABC):
34 "An abstraction over regular expressions."
36 value: str
37 flags: Collection[str]
38 raw: Optional[str]
39 type: ClassVar[str]
41 def __init__(self, value: str, flags: Collection[str] = (), raw: Optional[str] = None) -> None:
42 self.value = value
43 self.flags = frozenset(flags)
44 self.raw = raw
46 def __repr__(self):
47 return repr(self.to_regexp())
49 # Pattern Hashing assumes all subclasses have a different priority!
50 def __hash__(self):
51 return hash((type(self), self.value, self.flags))
53 def __eq__(self, other):
54 return type(self) == type(other) and self.value == other.value and self.flags == other.flags
56 @abstractmethod
57 def to_regexp(self) -> str:
58 raise NotImplementedError()
60 @property
61 @abstractmethod
62 def min_width(self) -> int:
63 raise NotImplementedError()
65 @property
66 @abstractmethod
67 def max_width(self) -> int:
68 raise NotImplementedError()
70 def _get_flags(self, value):
71 for f in self.flags:
72 value = ('(?%s:%s)' % (f, value))
73 return value
76class PatternStr(Pattern):
77 __serialize_fields__ = 'value', 'flags', 'raw'
79 type: ClassVar[str] = "str"
81 def to_regexp(self) -> str:
82 return self._get_flags(re.escape(self.value))
84 @property
85 def min_width(self) -> int:
86 return len(self.value)
88 @property
89 def max_width(self) -> int:
90 return len(self.value)
93class PatternRE(Pattern):
94 __serialize_fields__ = 'value', 'flags', 'raw', '_width'
96 type: ClassVar[str] = "re"
98 def to_regexp(self) -> str:
99 return self._get_flags(self.value)
101 _width = None
102 def _get_width(self):
103 if self._width is None:
104 self._width = get_regexp_width(self.to_regexp())
105 return self._width
107 @property
108 def min_width(self) -> int:
109 return self._get_width()[0]
111 @property
112 def max_width(self) -> int:
113 return self._get_width()[1]
116class TerminalDef(Serialize):
117 "A definition of a terminal"
118 __serialize_fields__ = 'name', 'pattern', 'priority'
119 __serialize_namespace__ = PatternStr, PatternRE
121 name: str
122 pattern: Pattern
123 priority: int
125 def __init__(self, name: str, pattern: Pattern, priority: int = TOKEN_DEFAULT_PRIORITY) -> None:
126 assert isinstance(pattern, Pattern), pattern
127 self.name = name
128 self.pattern = pattern
129 self.priority = priority
131 def __repr__(self):
132 return '%s(%r, %r)' % (type(self).__name__, self.name, self.pattern)
134 def user_repr(self) -> str:
135 if self.name.startswith('__'): # We represent a generated terminal
136 return self.pattern.raw or self.name
137 else:
138 return self.name
140_T = TypeVar('_T', bound="Token")
142class Token(str):
143 """A string with meta-information, that is produced by the lexer.
145 When parsing text, the resulting chunks of the input that haven't been discarded,
146 will end up in the tree as Token instances. The Token class inherits from Python's ``str``,
147 so normal string comparisons and operations will work as expected.
149 Attributes:
150 type: Name of the token (as specified in grammar)
151 value: Value of the token (redundant, as ``token.value == token`` will always be true)
152 start_pos: The index of the token in the text
153 line: The line of the token in the text (starting with 1)
154 column: The column of the token in the text (starting with 1)
155 end_line: The line where the token ends
156 end_column: The next column after the end of the token. For example,
157 if the token is a single character with a column value of 4,
158 end_column will be 5.
159 end_pos: the index where the token ends (basically ``start_pos + len(token)``)
160 """
161 __slots__ = ('type', 'start_pos', 'value', 'line', 'column', 'end_line', 'end_column', 'end_pos')
163 __match_args__ = ('type', 'value')
165 type: str
166 start_pos: Optional[int]
167 value: Any
168 line: Optional[int]
169 column: Optional[int]
170 end_line: Optional[int]
171 end_column: Optional[int]
172 end_pos: Optional[int]
175 @overload
176 def __new__(
177 cls,
178 type: str,
179 value: Any,
180 start_pos: Optional[int] = None,
181 line: Optional[int] = None,
182 column: Optional[int] = None,
183 end_line: Optional[int] = None,
184 end_column: Optional[int] = None,
185 end_pos: Optional[int] = None
186 ) -> 'Token':
187 ...
189 @overload
190 def __new__(
191 cls,
192 type_: str,
193 value: Any,
194 start_pos: Optional[int] = None,
195 line: Optional[int] = None,
196 column: Optional[int] = None,
197 end_line: Optional[int] = None,
198 end_column: Optional[int] = None,
199 end_pos: Optional[int] = None
200 ) -> 'Token': ...
202 def __new__(cls, *args, **kwargs):
203 if "type_" in kwargs:
204 warnings.warn("`type_` is deprecated use `type` instead", DeprecationWarning)
206 if "type" in kwargs:
207 raise TypeError("Error: using both 'type' and the deprecated 'type_' as arguments.")
208 kwargs["type"] = kwargs.pop("type_")
210 return cls._future_new(*args, **kwargs)
213 @classmethod
214 def _future_new(cls, type, value, start_pos=None, line=None, column=None, end_line=None, end_column=None, end_pos=None):
215 inst = super(Token, cls).__new__(cls, value)
217 inst.type = type
218 inst.start_pos = start_pos
219 inst.value = value
220 inst.line = line
221 inst.column = column
222 inst.end_line = end_line
223 inst.end_column = end_column
224 inst.end_pos = end_pos
225 return inst
227 @overload
228 def update(self, type: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
229 ...
231 @overload
232 def update(self, type_: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
233 ...
235 def update(self, *args, **kwargs):
236 if "type_" in kwargs:
237 warnings.warn("`type_` is deprecated use `type` instead", DeprecationWarning)
239 if "type" in kwargs:
240 raise TypeError("Error: using both 'type' and the deprecated 'type_' as arguments.")
241 kwargs["type"] = kwargs.pop("type_")
243 return self._future_update(*args, **kwargs)
245 def _future_update(self, type: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
246 return Token.new_borrow_pos(
247 type if type is not None else self.type,
248 value if value is not None else self.value,
249 self
250 )
252 @classmethod
253 def new_borrow_pos(cls: Type[_T], type_: str, value: Any, borrow_t: 'Token') -> _T:
254 return cls(type_, value, borrow_t.start_pos, borrow_t.line, borrow_t.column, borrow_t.end_line, borrow_t.end_column, borrow_t.end_pos)
256 def __reduce__(self):
257 return (self.__class__, (self.type, self.value, self.start_pos, self.line, self.column))
259 def __repr__(self):
260 return 'Token(%r, %r)' % (self.type, self.value)
262 def __deepcopy__(self, memo):
263 return Token(self.type, self.value, self.start_pos, self.line, self.column)
265 def __eq__(self, other):
266 if isinstance(other, Token) and self.type != other.type:
267 return False
269 return str.__eq__(self, other)
271 __hash__ = str.__hash__
274class LineCounter:
275 "A utility class for keeping track of line & column information"
277 __slots__ = 'char_pos', 'line', 'column', 'line_start_pos', 'newline_char'
279 def __init__(self, newline_char):
280 self.newline_char = newline_char
281 self.char_pos = 0
282 self.line = 1
283 self.column = 1
284 self.line_start_pos = 0
286 def __eq__(self, other):
287 if not isinstance(other, LineCounter):
288 return NotImplemented
290 return self.char_pos == other.char_pos and self.newline_char == other.newline_char
292 def feed(self, token: TextOrSlice, test_newline=True):
293 """Consume a token and calculate the new line & column.
295 As an optional optimization, set test_newline=False if token doesn't contain a newline.
296 """
297 if test_newline:
298 newlines = token.count(self.newline_char)
299 if newlines:
300 self.line += newlines
301 self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1
303 self.char_pos += len(token)
304 self.column = self.char_pos - self.line_start_pos + 1
307class UnlessCallback:
308 def __init__(self, scanner: 'Scanner'):
309 self.scanner = scanner
311 def __call__(self, t: Token):
312 res = self.scanner.fullmatch(t.value)
313 if res is not None:
314 t.type = res
315 return t
318class CallChain:
319 def __init__(self, callback1, callback2, cond):
320 self.callback1 = callback1
321 self.callback2 = callback2
322 self.cond = cond
324 def __call__(self, t):
325 t2 = self.callback1(t)
326 return self.callback2(t) if self.cond(t2) else t2
329def _get_match(re_, regexp, s, flags):
330 m = re_.match(regexp, s, flags)
331 if m:
332 return m.group(0)
334def _create_unless(terminals, g_regex_flags, re_, use_bytes):
335 tokens_by_type = classify(terminals, lambda t: type(t.pattern))
336 assert len(tokens_by_type) <= 2, tokens_by_type.keys()
337 embedded_strs = set()
338 callback = {}
339 for retok in tokens_by_type.get(PatternRE, []):
340 unless = []
341 for strtok in tokens_by_type.get(PatternStr, []):
342 if strtok.priority != retok.priority:
343 continue
344 s = strtok.pattern.value
345 if s == _get_match(re_, retok.pattern.to_regexp(), s, g_regex_flags):
346 unless.append(strtok)
347 if strtok.pattern.flags <= retok.pattern.flags:
348 embedded_strs.add(strtok)
349 if unless:
350 callback[retok.name] = UnlessCallback(Scanner(unless, g_regex_flags, re_, use_bytes=use_bytes))
352 new_terminals = [t for t in terminals if t not in embedded_strs]
353 return new_terminals, callback
356class Scanner:
357 def __init__(self, terminals, g_regex_flags, re_, use_bytes):
358 self.terminals = terminals
359 self.g_regex_flags = g_regex_flags
360 self.re_ = re_
361 self.use_bytes = use_bytes
363 self.allowed_types = {t.name for t in self.terminals}
365 self._mres = self._build_mres(terminals, len(terminals))
367 def _build_mres(self, terminals, max_size):
368 # Python sets an unreasonable group limit (currently 100) in its re module
369 # Worse, the only way to know we reached it is by catching an AssertionError!
370 # This function recursively tries less and less groups until it's successful.
371 mres = []
372 while terminals:
373 pattern = u'|'.join(u'(?P<%s>%s)' % (t.name, t.pattern.to_regexp()) for t in terminals[:max_size])
374 if self.use_bytes:
375 pattern = pattern.encode('latin-1')
376 try:
377 mre = self.re_.compile(pattern, self.g_regex_flags)
378 except AssertionError: # Yes, this is what Python provides us.. :/
379 return self._build_mres(terminals, max_size // 2)
381 mres.append(mre)
382 terminals = terminals[max_size:]
383 return mres
385 def match(self, text: TextSlice, pos):
386 for mre in self._mres:
387 m = mre.match(text.text, pos, text.end)
388 if m:
389 return m.group(0), m.lastgroup
392 def fullmatch(self, text: str) -> Optional[str]:
393 for mre in self._mres:
394 m = mre.fullmatch(text)
395 if m:
396 return m.lastgroup
397 return None
399def _regexp_has_newline(r: str):
400 r"""Expressions that may indicate newlines in a regexp:
401 - newlines (\n)
402 - escaped newline (\\n)
403 - anything but ([^...])
404 - any-char (.) when the flag (?s) exists
405 - spaces (\s)
406 """
407 return '\n' in r or '\\n' in r or '\\s' in r or '[^' in r or ('(?s' in r and '.' in r)
410class LexerState:
411 """Represents the current state of the lexer as it scans the text
412 (Lexer objects are only instantiated per grammar, not per text)
413 """
415 __slots__ = 'text', 'line_ctr', 'last_token'
417 text: TextSlice
418 line_ctr: LineCounter
419 last_token: Optional[Token]
421 def __init__(self, text: TextSlice, line_ctr: Optional[LineCounter] = None, last_token: Optional[Token]=None):
422 if line_ctr is None:
423 line_ctr = LineCounter(b'\n' if isinstance(text.text, bytes) else '\n')
425 if text.start > 0:
426 # Advance the line-count until line_ctr.char_pos == text.start
427 line_ctr.feed(TextSlice(text.text, 0, text.start))
429 if not (text.start <= line_ctr.char_pos <= text.end):
430 raise ValueError("LineCounter.char_pos is out of bounds")
432 self.text = text
433 self.line_ctr = line_ctr
434 self.last_token = last_token
437 def __eq__(self, other):
438 if not isinstance(other, LexerState):
439 return NotImplemented
441 return self.text == other.text and self.line_ctr == other.line_ctr and self.last_token == other.last_token
443 def __copy__(self):
444 return type(self)(self.text, copy(self.line_ctr), self.last_token)
447class LexerThread:
448 """A thread that ties a lexer instance and a lexer state, to be used by the parser
449 """
451 def __init__(self, lexer: 'Lexer', lexer_state: Optional[LexerState]):
452 self.lexer = lexer
453 self.state = lexer_state
455 @classmethod
456 def from_text(cls, lexer: 'Lexer', text_or_slice: TextOrSlice) -> 'LexerThread':
457 text = TextSlice.cast_from(text_or_slice)
458 return cls(lexer, LexerState(text))
460 def lex(self, parser_state):
461 if self.state is None:
462 raise TypeError("Cannot lex: No text assigned to lexer state")
463 return self.lexer.lex(self.state, parser_state)
465 def __copy__(self):
466 return type(self)(self.lexer, copy(self.state))
468 _Token = Token
471_Callback = Callable[[Token], Token]
473class Lexer(ABC):
474 """Lexer interface
476 Method Signatures:
477 lex(self, lexer_state, parser_state) -> Iterator[Token]
478 """
479 @abstractmethod
480 def lex(self, lexer_state: LexerState, parser_state: Any) -> Iterator[Token]:
481 return NotImplemented
483 def make_lexer_state(self, text: str):
484 "Deprecated"
485 return LexerState(TextSlice.cast_from(text))
488def _check_regex_collisions(terminal_to_regexp: Dict[TerminalDef, str], comparator, strict_mode, max_collisions_to_show=8):
489 if not comparator:
490 comparator = interegular.Comparator.from_regexes(terminal_to_regexp)
492 # When in strict mode, we only ever try to provide one example, so taking
493 # a long time for that should be fine
494 max_time = 2 if strict_mode else 0.2
496 # We don't want to show too many collisions.
497 if comparator.count_marked_pairs() >= max_collisions_to_show:
498 return
499 for group in classify(terminal_to_regexp, lambda t: t.priority).values():
500 for a, b in comparator.check(group, skip_marked=True):
501 assert a.priority == b.priority
502 # Mark this pair to not repeat warnings when multiple different BasicLexers see the same collision
503 comparator.mark(a, b)
505 # Notify the user
506 message = f"Collision between Terminals {a.name} and {b.name}. "
507 try:
508 example = comparator.get_example_overlap(a, b, max_time).format_multiline()
509 except ValueError:
510 # Couldn't find an example within max_time steps.
511 example = "No example could be found fast enough. However, the collision does still exists"
512 if strict_mode:
513 raise LexError(f"{message}\n{example}")
514 logger.warning("%s The lexer will choose between them arbitrarily.\n%s", message, example)
515 if comparator.count_marked_pairs() >= max_collisions_to_show:
516 logger.warning("Found 8 regex collisions, will not check for more.")
517 return
520class AbstractBasicLexer(Lexer):
521 terminals_by_name: Dict[str, TerminalDef]
523 @abstractmethod
524 def __init__(self, conf: 'LexerConf', comparator=None) -> None:
525 ...
527 @abstractmethod
528 def next_token(self, lex_state: LexerState, parser_state: Any = None) -> Token:
529 ...
531 def lex(self, state: LexerState, parser_state: Any) -> Iterator[Token]:
532 with suppress(EOFError):
533 while True:
534 yield self.next_token(state, parser_state)
537class BasicLexer(AbstractBasicLexer):
538 terminals: Collection[TerminalDef]
539 ignore_types: FrozenSet[str]
540 newline_types: FrozenSet[str]
541 user_callbacks: Dict[str, _Callback]
542 callback: Dict[str, _Callback]
543 re: ModuleType
545 def __init__(self, conf: 'LexerConf', comparator=None) -> None:
546 terminals = list(conf.terminals)
547 assert all(isinstance(t, TerminalDef) for t in terminals), terminals
549 self.re = conf.re_module
551 if not conf.skip_validation:
552 # Sanitization
553 terminal_to_regexp = {}
554 for t in terminals:
555 regexp = t.pattern.to_regexp()
556 try:
557 self.re.compile(regexp, conf.g_regex_flags)
558 except self.re.error:
559 raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern))
561 if t.pattern.min_width == 0:
562 raise LexError("Lexer does not allow zero-width terminals. (%s: %s)" % (t.name, t.pattern))
563 if t.pattern.type == "re":
564 terminal_to_regexp[t] = regexp
566 if not (set(conf.ignore) <= {t.name for t in terminals}):
567 raise LexError("Ignore terminals are not defined: %s" % (set(conf.ignore) - {t.name for t in terminals}))
569 if has_interegular:
570 _check_regex_collisions(terminal_to_regexp, comparator, conf.strict)
571 elif conf.strict:
572 raise LexError("interegular must be installed for strict mode. Use `pip install 'lark[interegular]'`.")
574 # Init
575 self.newline_types = frozenset(t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp()))
576 self.ignore_types = frozenset(conf.ignore)
578 terminals.sort(key=lambda x: (-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name))
579 self.terminals = terminals
580 self.user_callbacks = conf.callbacks
581 self.g_regex_flags = conf.g_regex_flags
582 self.use_bytes = conf.use_bytes
583 self.terminals_by_name = conf.terminals_by_name
585 self._scanner: Optional[Scanner] = None
587 def _build_scanner(self) -> Scanner:
588 terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, self.re, self.use_bytes)
589 assert all(self.callback.values())
591 for type_, f in self.user_callbacks.items():
592 if type_ in self.callback:
593 # Already a callback there, probably UnlessCallback
594 self.callback[type_] = CallChain(self.callback[type_], f, lambda t: t.type == type_)
595 else:
596 self.callback[type_] = f
598 return Scanner(terminals, self.g_regex_flags, self.re, self.use_bytes)
600 @property
601 def scanner(self) -> Scanner:
602 if self._scanner is None:
603 self._scanner = self._build_scanner()
604 return self._scanner
606 def match(self, text, pos):
607 return self.scanner.match(text, pos)
609 def next_token(self, lex_state: LexerState, parser_state: Any = None) -> Token:
610 line_ctr = lex_state.line_ctr
611 while line_ctr.char_pos < lex_state.text.end:
612 res = self.match(lex_state.text, line_ctr.char_pos)
613 if not res:
614 allowed = self.scanner.allowed_types - self.ignore_types
615 if not allowed:
616 allowed = {"<END-OF-FILE>"}
617 raise UnexpectedCharacters(lex_state.text.text, line_ctr.char_pos, line_ctr.line, line_ctr.column,
618 allowed=allowed, token_history=lex_state.last_token and [lex_state.last_token],
619 state=parser_state, terminals_by_name=self.terminals_by_name)
621 value, type_ = res
623 ignored = type_ in self.ignore_types
624 t = None
625 if not ignored or type_ in self.callback:
626 t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
627 line_ctr.feed(value, type_ in self.newline_types)
628 if t is not None:
629 t.end_line = line_ctr.line
630 t.end_column = line_ctr.column
631 t.end_pos = line_ctr.char_pos
632 if t.type in self.callback:
633 t = self.callback[t.type](t)
634 if not ignored:
635 if not isinstance(t, Token):
636 raise LexError("Callbacks must return a token (returned %r)" % t)
637 lex_state.last_token = t
638 return t
640 # EOF
641 raise EOFError(self)
644class ContextualLexer(Lexer):
645 lexers: Dict[int, AbstractBasicLexer]
646 root_lexer: AbstractBasicLexer
648 BasicLexer: Type[AbstractBasicLexer] = BasicLexer
650 def __init__(self, conf: 'LexerConf', states: Dict[int, Collection[str]], always_accept: Collection[str]=()) -> None:
651 terminals = list(conf.terminals)
652 terminals_by_name = conf.terminals_by_name
654 trad_conf = copy(conf)
655 trad_conf.terminals = terminals
657 if has_interegular and not conf.skip_validation:
658 comparator = interegular.Comparator.from_regexes({t: t.pattern.to_regexp() for t in terminals})
659 else:
660 comparator = None
661 lexer_by_tokens: Dict[FrozenSet[str], AbstractBasicLexer] = {}
662 self.lexers = {}
663 for state, accepts in states.items():
664 key = frozenset(accepts)
665 try:
666 lexer = lexer_by_tokens[key]
667 except KeyError:
668 accepts = set(accepts) | set(conf.ignore) | set(always_accept)
669 lexer_conf = copy(trad_conf)
670 lexer_conf.terminals = [terminals_by_name[n] for n in accepts if n in terminals_by_name]
671 lexer = self.BasicLexer(lexer_conf, comparator)
672 lexer_by_tokens[key] = lexer
674 self.lexers[state] = lexer
676 assert trad_conf.terminals is terminals
677 trad_conf.skip_validation = True # We don't need to verify all terminals again
678 self.root_lexer = self.BasicLexer(trad_conf, comparator)
680 def lex(self, lexer_state: LexerState, parser_state: 'ParserState') -> Iterator[Token]:
681 try:
682 while True:
683 lexer = self.lexers[parser_state.position]
684 yield lexer.next_token(lexer_state, parser_state)
685 except EOFError:
686 pass
687 except UnexpectedCharacters as e:
688 # In the contextual lexer, UnexpectedCharacters can mean that the terminal is defined, but not in the current context.
689 # This tests the input against the global context, to provide a nicer error.
690 try:
691 last_token = lexer_state.last_token # Save last_token. Calling root_lexer.next_token will change this to the wrong token
692 token = self.root_lexer.next_token(lexer_state, parser_state)
693 raise UnexpectedToken(token, e.allowed, state=parser_state, token_history=[last_token], terminals_by_name=self.root_lexer.terminals_by_name)
694 except UnexpectedCharacters:
695 raise e # Raise the original UnexpectedCharacters. The root lexer raises it with the wrong expected set.
697###}