Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/lark/lexer.py: 71%
426 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:30 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:30 +0000
1# Lexer Implementation
3from abc import abstractmethod, ABC
4import re
5from contextlib import suppress
6from typing import (
7 TypeVar, Type, Dict, Iterator, Collection, Callable, Optional, FrozenSet, Any,
8 ClassVar, TYPE_CHECKING, overload
9)
10from types import ModuleType
11import warnings
12try:
13 import interegular
14except ImportError:
15 pass
16if TYPE_CHECKING:
17 from .common import LexerConf
19from .utils import classify, get_regexp_width, Serialize, logger
20from .exceptions import UnexpectedCharacters, LexError, UnexpectedToken
21from .grammar import TOKEN_DEFAULT_PRIORITY
24###{standalone
25from copy import copy
27try: # For the standalone parser, we need to make sure that has_interegular is False to avoid NameErrors later on
28 has_interegular = bool(interegular)
29except NameError:
30 has_interegular = False
32class Pattern(Serialize, ABC):
33 "An abstraction over regular expressions."
35 value: str
36 flags: Collection[str]
37 raw: Optional[str]
38 type: ClassVar[str]
40 def __init__(self, value: str, flags: Collection[str] = (), raw: Optional[str] = None) -> None:
41 self.value = value
42 self.flags = frozenset(flags)
43 self.raw = raw
45 def __repr__(self):
46 return repr(self.to_regexp())
48 # Pattern Hashing assumes all subclasses have a different priority!
49 def __hash__(self):
50 return hash((type(self), self.value, self.flags))
52 def __eq__(self, other):
53 return type(self) == type(other) and self.value == other.value and self.flags == other.flags
55 @abstractmethod
56 def to_regexp(self) -> str:
57 raise NotImplementedError()
59 @property
60 @abstractmethod
61 def min_width(self) -> int:
62 raise NotImplementedError()
64 @property
65 @abstractmethod
66 def max_width(self) -> int:
67 raise NotImplementedError()
69 def _get_flags(self, value):
70 for f in self.flags:
71 value = ('(?%s:%s)' % (f, value))
72 return value
75class PatternStr(Pattern):
76 __serialize_fields__ = 'value', 'flags', 'raw'
78 type: ClassVar[str] = "str"
80 def to_regexp(self) -> str:
81 return self._get_flags(re.escape(self.value))
83 @property
84 def min_width(self) -> int:
85 return len(self.value)
87 @property
88 def max_width(self) -> int:
89 return len(self.value)
92class PatternRE(Pattern):
93 __serialize_fields__ = 'value', 'flags', 'raw', '_width'
95 type: ClassVar[str] = "re"
97 def to_regexp(self) -> str:
98 return self._get_flags(self.value)
100 _width = None
101 def _get_width(self):
102 if self._width is None:
103 self._width = get_regexp_width(self.to_regexp())
104 return self._width
106 @property
107 def min_width(self) -> int:
108 return self._get_width()[0]
110 @property
111 def max_width(self) -> int:
112 return self._get_width()[1]
115class TerminalDef(Serialize):
116 "A definition of a terminal"
117 __serialize_fields__ = 'name', 'pattern', 'priority'
118 __serialize_namespace__ = PatternStr, PatternRE
120 name: str
121 pattern: Pattern
122 priority: int
124 def __init__(self, name: str, pattern: Pattern, priority: int = TOKEN_DEFAULT_PRIORITY) -> None:
125 assert isinstance(pattern, Pattern), pattern
126 self.name = name
127 self.pattern = pattern
128 self.priority = priority
130 def __repr__(self):
131 return '%s(%r, %r)' % (type(self).__name__, self.name, self.pattern)
133 def user_repr(self) -> str:
134 if self.name.startswith('__'): # We represent a generated terminal
135 return self.pattern.raw or self.name
136 else:
137 return self.name
139_T = TypeVar('_T', bound="Token")
141class Token(str):
142 """A string with meta-information, that is produced by the lexer.
144 When parsing text, the resulting chunks of the input that haven't been discarded,
145 will end up in the tree as Token instances. The Token class inherits from Python's ``str``,
146 so normal string comparisons and operations will work as expected.
148 Attributes:
149 type: Name of the token (as specified in grammar)
150 value: Value of the token (redundant, as ``token.value == token`` will always be true)
151 start_pos: The index of the token in the text
152 line: The line of the token in the text (starting with 1)
153 column: The column of the token in the text (starting with 1)
154 end_line: The line where the token ends
155 end_column: The next column after the end of the token. For example,
156 if the token is a single character with a column value of 4,
157 end_column will be 5.
158 end_pos: the index where the token ends (basically ``start_pos + len(token)``)
159 """
160 __slots__ = ('type', 'start_pos', 'value', 'line', 'column', 'end_line', 'end_column', 'end_pos')
162 __match_args__ = ('type', 'value')
164 type: str
165 start_pos: Optional[int]
166 value: Any
167 line: Optional[int]
168 column: Optional[int]
169 end_line: Optional[int]
170 end_column: Optional[int]
171 end_pos: Optional[int]
174 @overload
175 def __new__(
176 cls,
177 type: str,
178 value: Any,
179 start_pos: Optional[int] = None,
180 line: Optional[int] = None,
181 column: Optional[int] = None,
182 end_line: Optional[int] = None,
183 end_column: Optional[int] = None,
184 end_pos: Optional[int] = None
185 ) -> 'Token':
186 ...
188 @overload
189 def __new__(
190 cls,
191 type_: str,
192 value: Any,
193 start_pos: Optional[int] = None,
194 line: Optional[int] = None,
195 column: Optional[int] = None,
196 end_line: Optional[int] = None,
197 end_column: Optional[int] = None,
198 end_pos: Optional[int] = None
199 ) -> 'Token': ...
201 def __new__(cls, *args, **kwargs):
202 if "type_" in kwargs:
203 warnings.warn("`type_` is deprecated use `type` instead", DeprecationWarning)
205 if "type" in kwargs:
206 raise TypeError("Error: using both 'type' and the deprecated 'type_' as arguments.")
207 kwargs["type"] = kwargs.pop("type_")
209 return cls._future_new(*args, **kwargs)
212 @classmethod
213 def _future_new(cls, type, value, start_pos=None, line=None, column=None, end_line=None, end_column=None, end_pos=None):
214 inst = super(Token, cls).__new__(cls, value)
216 inst.type = type
217 inst.start_pos = start_pos
218 inst.value = value
219 inst.line = line
220 inst.column = column
221 inst.end_line = end_line
222 inst.end_column = end_column
223 inst.end_pos = end_pos
224 return inst
226 @overload
227 def update(self, type: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
228 ...
230 @overload
231 def update(self, type_: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
232 ...
234 def update(self, *args, **kwargs):
235 if "type_" in kwargs:
236 warnings.warn("`type_` is deprecated use `type` instead", DeprecationWarning)
238 if "type" in kwargs:
239 raise TypeError("Error: using both 'type' and the deprecated 'type_' as arguments.")
240 kwargs["type"] = kwargs.pop("type_")
242 return self._future_update(*args, **kwargs)
244 def _future_update(self, type: Optional[str] = None, value: Optional[Any] = None) -> 'Token':
245 return Token.new_borrow_pos(
246 type if type is not None else self.type,
247 value if value is not None else self.value,
248 self
249 )
251 @classmethod
252 def new_borrow_pos(cls: Type[_T], type_: str, value: Any, borrow_t: 'Token') -> _T:
253 return cls(type_, value, borrow_t.start_pos, borrow_t.line, borrow_t.column, borrow_t.end_line, borrow_t.end_column, borrow_t.end_pos)
255 def __reduce__(self):
256 return (self.__class__, (self.type, self.value, self.start_pos, self.line, self.column))
258 def __repr__(self):
259 return 'Token(%r, %r)' % (self.type, self.value)
261 def __deepcopy__(self, memo):
262 return Token(self.type, self.value, self.start_pos, self.line, self.column)
264 def __eq__(self, other):
265 if isinstance(other, Token) and self.type != other.type:
266 return False
268 return str.__eq__(self, other)
270 __hash__ = str.__hash__
273class LineCounter:
274 "A utility class for keeping track of line & column information"
276 __slots__ = 'char_pos', 'line', 'column', 'line_start_pos', 'newline_char'
278 def __init__(self, newline_char):
279 self.newline_char = newline_char
280 self.char_pos = 0
281 self.line = 1
282 self.column = 1
283 self.line_start_pos = 0
285 def __eq__(self, other):
286 if not isinstance(other, LineCounter):
287 return NotImplemented
289 return self.char_pos == other.char_pos and self.newline_char == other.newline_char
291 def feed(self, token: Token, test_newline=True):
292 """Consume a token and calculate the new line & column.
294 As an optional optimization, set test_newline=False if token doesn't contain a newline.
295 """
296 if test_newline:
297 newlines = token.count(self.newline_char)
298 if newlines:
299 self.line += newlines
300 self.line_start_pos = self.char_pos + token.rindex(self.newline_char) + 1
302 self.char_pos += len(token)
303 self.column = self.char_pos - self.line_start_pos + 1
306class UnlessCallback:
307 def __init__(self, scanner):
308 self.scanner = scanner
310 def __call__(self, t):
311 res = self.scanner.match(t.value, 0)
312 if res:
313 _value, t.type = res
314 return t
317class CallChain:
318 def __init__(self, callback1, callback2, cond):
319 self.callback1 = callback1
320 self.callback2 = callback2
321 self.cond = cond
323 def __call__(self, t):
324 t2 = self.callback1(t)
325 return self.callback2(t) if self.cond(t2) else t2
328def _get_match(re_, regexp, s, flags):
329 m = re_.match(regexp, s, flags)
330 if m:
331 return m.group(0)
333def _create_unless(terminals, g_regex_flags, re_, use_bytes):
334 tokens_by_type = classify(terminals, lambda t: type(t.pattern))
335 assert len(tokens_by_type) <= 2, tokens_by_type.keys()
336 embedded_strs = set()
337 callback = {}
338 for retok in tokens_by_type.get(PatternRE, []):
339 unless = []
340 for strtok in tokens_by_type.get(PatternStr, []):
341 if strtok.priority != retok.priority:
342 continue
343 s = strtok.pattern.value
344 if s == _get_match(re_, retok.pattern.to_regexp(), s, g_regex_flags):
345 unless.append(strtok)
346 if strtok.pattern.flags <= retok.pattern.flags:
347 embedded_strs.add(strtok)
348 if unless:
349 callback[retok.name] = UnlessCallback(Scanner(unless, g_regex_flags, re_, match_whole=True, use_bytes=use_bytes))
351 new_terminals = [t for t in terminals if t not in embedded_strs]
352 return new_terminals, callback
355class Scanner:
356 def __init__(self, terminals, g_regex_flags, re_, use_bytes, match_whole=False):
357 self.terminals = terminals
358 self.g_regex_flags = g_regex_flags
359 self.re_ = re_
360 self.use_bytes = use_bytes
361 self.match_whole = match_whole
363 self.allowed_types = {t.name for t in self.terminals}
365 self._mres = self._build_mres(terminals, len(terminals))
367 def _build_mres(self, terminals, max_size):
368 # Python sets an unreasonable group limit (currently 100) in its re module
369 # Worse, the only way to know we reached it is by catching an AssertionError!
370 # This function recursively tries less and less groups until it's successful.
371 postfix = '$' if self.match_whole else ''
372 mres = []
373 while terminals:
374 pattern = u'|'.join(u'(?P<%s>%s)' % (t.name, t.pattern.to_regexp() + postfix) for t in terminals[:max_size])
375 if self.use_bytes:
376 pattern = pattern.encode('latin-1')
377 try:
378 mre = self.re_.compile(pattern, self.g_regex_flags)
379 except AssertionError: # Yes, this is what Python provides us.. :/
380 return self._build_mres(terminals, max_size // 2)
382 mres.append(mre)
383 terminals = terminals[max_size:]
384 return mres
386 def match(self, text, pos):
387 for mre in self._mres:
388 m = mre.match(text, pos)
389 if m:
390 return m.group(0), m.lastgroup
393def _regexp_has_newline(r: str):
394 r"""Expressions that may indicate newlines in a regexp:
395 - newlines (\n)
396 - escaped newline (\\n)
397 - anything but ([^...])
398 - any-char (.) when the flag (?s) exists
399 - spaces (\s)
400 """
401 return '\n' in r or '\\n' in r or '\\s' in r or '[^' in r or ('(?s' in r and '.' in r)
404class LexerState:
405 """Represents the current state of the lexer as it scans the text
406 (Lexer objects are only instantiated per grammar, not per text)
407 """
409 __slots__ = 'text', 'line_ctr', 'last_token'
411 text: str
412 line_ctr: LineCounter
413 last_token: Optional[Token]
415 def __init__(self, text: str, line_ctr: Optional[LineCounter]=None, last_token: Optional[Token]=None):
416 self.text = text
417 self.line_ctr = line_ctr or LineCounter(b'\n' if isinstance(text, bytes) else '\n')
418 self.last_token = last_token
420 def __eq__(self, other):
421 if not isinstance(other, LexerState):
422 return NotImplemented
424 return self.text is other.text and self.line_ctr == other.line_ctr and self.last_token == other.last_token
426 def __copy__(self):
427 return type(self)(self.text, copy(self.line_ctr), self.last_token)
430class LexerThread:
431 """A thread that ties a lexer instance and a lexer state, to be used by the parser
432 """
434 def __init__(self, lexer: 'Lexer', lexer_state: LexerState):
435 self.lexer = lexer
436 self.state = lexer_state
438 @classmethod
439 def from_text(cls, lexer: 'Lexer', text: str):
440 return cls(lexer, LexerState(text))
442 def lex(self, parser_state):
443 return self.lexer.lex(self.state, parser_state)
445 def __copy__(self):
446 return type(self)(self.lexer, copy(self.state))
448 _Token = Token
451_Callback = Callable[[Token], Token]
453class Lexer(ABC):
454 """Lexer interface
456 Method Signatures:
457 lex(self, lexer_state, parser_state) -> Iterator[Token]
458 """
459 @abstractmethod
460 def lex(self, lexer_state: LexerState, parser_state: Any) -> Iterator[Token]:
461 return NotImplemented
463 def make_lexer_state(self, text):
464 "Deprecated"
465 return LexerState(text)
468def _check_regex_collisions(terminal_to_regexp: Dict[TerminalDef, str], comparator, strict_mode, max_collisions_to_show=8):
469 if not comparator:
470 comparator = interegular.Comparator.from_regexes(terminal_to_regexp)
472 # When in strict mode, we only ever try to provide one example, so taking
473 # a long time for that should be fine
474 max_time = 2 if strict_mode else 0.2
476 # We don't want to show too many collisions.
477 if comparator.count_marked_pairs() >= max_collisions_to_show:
478 return
479 for group in classify(terminal_to_regexp, lambda t: t.priority).values():
480 for a, b in comparator.check(group, skip_marked=True):
481 assert a.priority == b.priority
482 # Mark this pair to not repeat warnings when multiple different BasicLexers see the same collision
483 comparator.mark(a, b)
485 # Notify the user
486 message = f"Collision between Terminals {a.name} and {b.name}. "
487 try:
488 example = comparator.get_example_overlap(a, b, max_time).format_multiline()
489 except ValueError:
490 # Couldn't find an example within max_time steps.
491 example = "No example could be found fast enough. However, the collision does still exists"
492 if strict_mode:
493 raise LexError(f"{message}\n{example}")
494 logger.warning("%s The lexer will choose between them arbitrarily.\n%s", message, example)
495 if comparator.count_marked_pairs() >= max_collisions_to_show:
496 logger.warning("Found 8 regex collisions, will not check for more.")
497 return
500class BasicLexer(Lexer):
501 terminals: Collection[TerminalDef]
502 ignore_types: FrozenSet[str]
503 newline_types: FrozenSet[str]
504 user_callbacks: Dict[str, _Callback]
505 callback: Dict[str, _Callback]
506 re: ModuleType
508 def __init__(self, conf: 'LexerConf', comparator=None) -> None:
509 terminals = list(conf.terminals)
510 assert all(isinstance(t, TerminalDef) for t in terminals), terminals
512 self.re = conf.re_module
514 if not conf.skip_validation:
515 # Sanitization
516 terminal_to_regexp = {}
517 for t in terminals:
518 regexp = t.pattern.to_regexp()
519 try:
520 self.re.compile(regexp, conf.g_regex_flags)
521 except self.re.error:
522 raise LexError("Cannot compile token %s: %s" % (t.name, t.pattern))
524 if t.pattern.min_width == 0:
525 raise LexError("Lexer does not allow zero-width terminals. (%s: %s)" % (t.name, t.pattern))
526 if t.pattern.type == "re":
527 terminal_to_regexp[t] = regexp
529 if not (set(conf.ignore) <= {t.name for t in terminals}):
530 raise LexError("Ignore terminals are not defined: %s" % (set(conf.ignore) - {t.name for t in terminals}))
532 if has_interegular:
533 _check_regex_collisions(terminal_to_regexp, comparator, conf.strict)
534 elif conf.strict:
535 raise LexError("interegular must be installed for strict mode. Use `pip install 'lark[interegular]'`.")
537 # Init
538 self.newline_types = frozenset(t.name for t in terminals if _regexp_has_newline(t.pattern.to_regexp()))
539 self.ignore_types = frozenset(conf.ignore)
541 terminals.sort(key=lambda x: (-x.priority, -x.pattern.max_width, -len(x.pattern.value), x.name))
542 self.terminals = terminals
543 self.user_callbacks = conf.callbacks
544 self.g_regex_flags = conf.g_regex_flags
545 self.use_bytes = conf.use_bytes
546 self.terminals_by_name = conf.terminals_by_name
548 self._scanner = None
550 def _build_scanner(self):
551 terminals, self.callback = _create_unless(self.terminals, self.g_regex_flags, self.re, self.use_bytes)
552 assert all(self.callback.values())
554 for type_, f in self.user_callbacks.items():
555 if type_ in self.callback:
556 # Already a callback there, probably UnlessCallback
557 self.callback[type_] = CallChain(self.callback[type_], f, lambda t: t.type == type_)
558 else:
559 self.callback[type_] = f
561 self._scanner = Scanner(terminals, self.g_regex_flags, self.re, self.use_bytes)
563 @property
564 def scanner(self):
565 if self._scanner is None:
566 self._build_scanner()
567 return self._scanner
569 def match(self, text, pos):
570 return self.scanner.match(text, pos)
572 def lex(self, state: LexerState, parser_state: Any) -> Iterator[Token]:
573 with suppress(EOFError):
574 while True:
575 yield self.next_token(state, parser_state)
577 def next_token(self, lex_state: LexerState, parser_state: Any = None) -> Token:
578 line_ctr = lex_state.line_ctr
579 while line_ctr.char_pos < len(lex_state.text):
580 res = self.match(lex_state.text, line_ctr.char_pos)
581 if not res:
582 allowed = self.scanner.allowed_types - self.ignore_types
583 if not allowed:
584 allowed = {"<END-OF-FILE>"}
585 raise UnexpectedCharacters(lex_state.text, line_ctr.char_pos, line_ctr.line, line_ctr.column,
586 allowed=allowed, token_history=lex_state.last_token and [lex_state.last_token],
587 state=parser_state, terminals_by_name=self.terminals_by_name)
589 value, type_ = res
591 ignored = type_ in self.ignore_types
592 t = None
593 if not ignored or type_ in self.callback:
594 t = Token(type_, value, line_ctr.char_pos, line_ctr.line, line_ctr.column)
595 line_ctr.feed(value, type_ in self.newline_types)
596 if t is not None:
597 t.end_line = line_ctr.line
598 t.end_column = line_ctr.column
599 t.end_pos = line_ctr.char_pos
600 if t.type in self.callback:
601 t = self.callback[t.type](t)
602 if not ignored:
603 if not isinstance(t, Token):
604 raise LexError("Callbacks must return a token (returned %r)" % t)
605 lex_state.last_token = t
606 return t
608 # EOF
609 raise EOFError(self)
612class ContextualLexer(Lexer):
614 lexers: Dict[str, BasicLexer]
615 root_lexer: BasicLexer
617 def __init__(self, conf: 'LexerConf', states: Dict[str, Collection[str]], always_accept: Collection[str]=()) -> None:
618 terminals = list(conf.terminals)
619 terminals_by_name = conf.terminals_by_name
621 trad_conf = copy(conf)
622 trad_conf.terminals = terminals
624 if has_interegular and not conf.skip_validation:
625 comparator = interegular.Comparator.from_regexes({t: t.pattern.to_regexp() for t in terminals})
626 else:
627 comparator = None
628 lexer_by_tokens: Dict[FrozenSet[str], BasicLexer] = {}
629 self.lexers = {}
630 for state, accepts in states.items():
631 key = frozenset(accepts)
632 try:
633 lexer = lexer_by_tokens[key]
634 except KeyError:
635 accepts = set(accepts) | set(conf.ignore) | set(always_accept)
636 lexer_conf = copy(trad_conf)
637 lexer_conf.terminals = [terminals_by_name[n] for n in accepts if n in terminals_by_name]
638 lexer = BasicLexer(lexer_conf, comparator)
639 lexer_by_tokens[key] = lexer
641 self.lexers[state] = lexer
643 assert trad_conf.terminals is terminals
644 trad_conf.skip_validation = True # We don't need to verify all terminals again
645 self.root_lexer = BasicLexer(trad_conf, comparator)
647 def lex(self, lexer_state: LexerState, parser_state: Any) -> Iterator[Token]:
648 try:
649 while True:
650 lexer = self.lexers[parser_state.position]
651 yield lexer.next_token(lexer_state, parser_state)
652 except EOFError:
653 pass
654 except UnexpectedCharacters as e:
655 # In the contextual lexer, UnexpectedCharacters can mean that the terminal is defined, but not in the current context.
656 # This tests the input against the global context, to provide a nicer error.
657 try:
658 last_token = lexer_state.last_token # Save last_token. Calling root_lexer.next_token will change this to the wrong token
659 token = self.root_lexer.next_token(lexer_state, parser_state)
660 raise UnexpectedToken(token, e.allowed, state=parser_state, token_history=[last_token], terminals_by_name=self.root_lexer.terminals_by_name)
661 except UnexpectedCharacters:
662 raise e # Raise the original UnexpectedCharacters. The root lexer raises it with the wrong expected set.
664###}