Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tomlkit/source.py: 98%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from typing import Any
5from tomlkit.exceptions import ParseError
6from tomlkit.exceptions import UnexpectedCharError
9class _State:
10 def __init__(
11 self,
12 source: Source,
13 save_marker: bool | None = False,
14 restore: bool | None = False,
15 ) -> None:
16 self._source = source
17 self._save_marker = save_marker
18 self.restore = restore
20 def __enter__(self) -> _State:
21 # Entering this context manager - save the state
22 # PERF: snapshot only the integer index + current char + marker.
23 # We no longer carry an iterator (`_chars`) so there's no `copy(...)`
24 # to do here — saving 3 attribute reads vs the original iter copy.
25 self._idx = self._source._idx
26 self._current = self._source._current
27 self._marker = self._source._marker
29 return self
31 def __exit__(
32 self,
33 exception_type: type[BaseException] | None,
34 exception_val: BaseException | None,
35 trace: Any,
36 ) -> None:
37 # Exiting this context manager - restore the prior state
38 if self.restore or exception_type:
39 self._source._idx = self._idx
40 self._source._current = self._current
41 if self._save_marker:
42 self._source._marker = self._marker
45class _StateHandler:
46 """
47 State preserver for the Parser.
48 """
50 def __init__(self, source: Source) -> None:
51 self._source = source
52 self._states: list[_State] = []
54 def __call__(
55 self,
56 save_marker: bool | None = False,
57 restore: bool | None = False,
58 ) -> _State:
59 return _State(self._source, save_marker, restore)
61 def __enter__(self) -> _State:
62 state = self()
63 self._states.append(state)
64 return state.__enter__()
66 def __exit__(
67 self,
68 exception_type: type[BaseException] | None,
69 exception_val: BaseException | None,
70 trace: Any,
71 ) -> None:
72 state = self._states.pop()
73 state.__exit__(exception_type, exception_val, trace)
76class Source(str):
77 # EOF is a placeholder value for `current` past the end of input. End-of-input
78 # is detected positionally (`end()` / `_idx >= len`), never by comparing to this
79 # value, so a real NUL byte in the input is not mistaken for EOF.
80 EOF = "\0"
82 def __init__(self, _: str) -> None:
83 super().__init__()
85 # Track an integer index over the underlying str (Source subclasses str):
86 # init is O(1) and `inc()` just bumps the index and reads the next char,
87 # instead of materializing a list of (index, char) pairs up front.
88 self._idx = -1 # pre-start sentinel; first inc() will land on 0
89 self._marker = 0
90 self._current: str = ""
92 self._state = _StateHandler(self)
94 self.inc()
96 def reset(self) -> None:
97 # initialize both idx and current
98 self.inc()
100 # reset marker
101 self.mark()
103 @property
104 def state(self) -> _StateHandler:
105 return self._state
107 @property
108 def idx(self) -> int:
109 return self._idx
111 @property
112 def current(self) -> str:
113 return self._current
115 @property
116 def marker(self) -> int:
117 return self._marker
119 def extract(self) -> str:
120 """
121 Extracts the value between marker and index
122 """
123 return self[self._marker : self._idx]
125 def inc(self, exception: type[ParseError] | None = None) -> bool:
126 """
127 Increments the parser if the end of the input has not been reached.
128 Returns whether or not it was able to advance.
129 """
130 # Integer increment + a single str index, no iterator / StopIteration triage.
131 next_idx = self._idx + 1
132 if next_idx < len(self):
133 self._idx = next_idx
134 self._current = self[next_idx]
135 return True
137 # Past end : pin to len, switch current to EOF, raise if asked.
138 self._idx = len(self)
139 self._current = self.EOF
140 if exception:
141 raise self.parse_error(exception) from None
142 return False
144 def advance_while(self, charset: frozenset) -> bool:
145 """Advance while the current character is in ``charset``.
147 Equivalent to ``while self.current in charset and self.inc(): pass`` but
148 it scans the underlying string in a single pass and updates the index
149 and current character only once, instead of paying a per-character
150 ``inc()`` call. On return ``current`` is the first character NOT in
151 ``charset`` (or EOF). Returns ``True`` if it stopped on a real
152 character, ``False`` at EOF — the same value contract as the loop.
153 """
154 i = self._idx
155 n = len(self)
156 while i < n and self[i] in charset:
157 i += 1
158 if i < n:
159 self._idx = i
160 self._current = self[i]
161 return True
162 self._idx = n
163 self._current = self.EOF
164 return False
166 def advance_until(self, stopset: frozenset) -> bool:
167 """Advance while the current character is NOT in ``stopset``.
169 The mirror of :meth:`advance_while`: equivalent to
170 ``while self.current not in stopset and self.inc(): pass`` in a single
171 scan. On return ``current`` is the first character IN ``stopset`` (or
172 EOF), with the same return-value contract.
173 """
174 i = self._idx
175 n = len(self)
176 while i < n and self[i] not in stopset:
177 i += 1
178 if i < n:
179 self._idx = i
180 self._current = self[i]
181 return True
182 self._idx = n
183 self._current = self.EOF
184 return False
186 def inc_n(self, n: int, exception: type[ParseError] | None = None) -> bool:
187 """
188 Increments the parser by n characters
189 if the end of the input has not been reached.
190 """
191 return all(self.inc(exception=exception) for _ in range(n))
193 def consume(self, chars: str, min: int = 0, max: int = -1) -> None:
194 """
195 Consume chars until min/max is satisfied is valid.
196 """
197 while self.current in chars and max != 0:
198 min -= 1
199 max -= 1
200 if not self.inc():
201 break
203 # failed to consume minimum number of characters
204 if min > 0:
205 raise self.parse_error(UnexpectedCharError, self.current)
207 def end(self) -> bool:
208 """
209 Returns True if the parser has reached the end of the input.
210 """
211 return self._idx >= len(self)
213 def mark(self) -> None:
214 """
215 Sets the marker to the index's current position
216 """
217 self._marker = self._idx
219 def parse_error(
220 self,
221 exception: type[ParseError] = ParseError,
222 *args: Any,
223 **kwargs: Any,
224 ) -> ParseError:
225 """
226 Creates a generic "parse error" at the current position.
227 """
228 line, col = self._to_linecol()
230 return exception(line, col, *args, **kwargs)
232 def _to_linecol(self) -> tuple[int, int]:
233 cur = 0
234 for i, line in enumerate(self.splitlines()):
235 if cur + len(line) + 1 > self.idx:
236 return (i + 1, self.idx - cur)
238 cur += len(line) + 1
240 return len(self.splitlines()), 0