Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tomlkit/source.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

118 statements  

1from __future__ import annotations 

2 

3from typing import Any 

4 

5from tomlkit.exceptions import ParseError 

6from tomlkit.exceptions import UnexpectedCharError 

7 

8 

9class _State: 

10 def __init__( 

11 self, 

12 source: Source, 

13 save_marker: bool | None = False, 

14 restore: bool | None = False, 

15 ) -> None: 

16 self._source = source 

17 self._save_marker = save_marker 

18 self.restore = restore 

19 

20 def __enter__(self) -> _State: 

21 # Entering this context manager - save the state 

22 # PERF: snapshot only the integer index + current char + marker. 

23 # We no longer carry an iterator (`_chars`) so there's no `copy(...)` 

24 # to do here — saving 3 attribute reads vs the original iter copy. 

25 self._idx = self._source._idx 

26 self._current = self._source._current 

27 self._marker = self._source._marker 

28 

29 return self 

30 

31 def __exit__( 

32 self, 

33 exception_type: type[BaseException] | None, 

34 exception_val: BaseException | None, 

35 trace: Any, 

36 ) -> None: 

37 # Exiting this context manager - restore the prior state 

38 if self.restore or exception_type: 

39 self._source._idx = self._idx 

40 self._source._current = self._current 

41 if self._save_marker: 

42 self._source._marker = self._marker 

43 

44 

45class _StateHandler: 

46 """ 

47 State preserver for the Parser. 

48 """ 

49 

50 def __init__(self, source: Source) -> None: 

51 self._source = source 

52 self._states: list[_State] = [] 

53 

54 def __call__( 

55 self, 

56 save_marker: bool | None = False, 

57 restore: bool | None = False, 

58 ) -> _State: 

59 return _State(self._source, save_marker, restore) 

60 

61 def __enter__(self) -> _State: 

62 state = self() 

63 self._states.append(state) 

64 return state.__enter__() 

65 

66 def __exit__( 

67 self, 

68 exception_type: type[BaseException] | None, 

69 exception_val: BaseException | None, 

70 trace: Any, 

71 ) -> None: 

72 state = self._states.pop() 

73 state.__exit__(exception_type, exception_val, trace) 

74 

75 

76class Source(str): 

77 # EOF is a placeholder value for `current` past the end of input. End-of-input 

78 # is detected positionally (`end()` / `_idx >= len`), never by comparing to this 

79 # value, so a real NUL byte in the input is not mistaken for EOF. 

80 EOF = "\0" 

81 

82 def __init__(self, _: str) -> None: 

83 super().__init__() 

84 

85 # Track an integer index over the underlying str (Source subclasses str): 

86 # init is O(1) and `inc()` just bumps the index and reads the next char, 

87 # instead of materializing a list of (index, char) pairs up front. 

88 self._idx = -1 # pre-start sentinel; first inc() will land on 0 

89 self._marker = 0 

90 self._current: str = "" 

91 

92 self._state = _StateHandler(self) 

93 

94 self.inc() 

95 

96 def reset(self) -> None: 

97 # initialize both idx and current 

98 self.inc() 

99 

100 # reset marker 

101 self.mark() 

102 

103 @property 

104 def state(self) -> _StateHandler: 

105 return self._state 

106 

107 @property 

108 def idx(self) -> int: 

109 return self._idx 

110 

111 @property 

112 def current(self) -> str: 

113 return self._current 

114 

115 @property 

116 def marker(self) -> int: 

117 return self._marker 

118 

119 def extract(self) -> str: 

120 """ 

121 Extracts the value between marker and index 

122 """ 

123 return self[self._marker : self._idx] 

124 

125 def inc(self, exception: type[ParseError] | None = None) -> bool: 

126 """ 

127 Increments the parser if the end of the input has not been reached. 

128 Returns whether or not it was able to advance. 

129 """ 

130 # Integer increment + a single str index, no iterator / StopIteration triage. 

131 next_idx = self._idx + 1 

132 if next_idx < len(self): 

133 self._idx = next_idx 

134 self._current = self[next_idx] 

135 return True 

136 

137 # Past end : pin to len, switch current to EOF, raise if asked. 

138 self._idx = len(self) 

139 self._current = self.EOF 

140 if exception: 

141 raise self.parse_error(exception) from None 

142 return False 

143 

144 def advance_while(self, charset: frozenset) -> bool: 

145 """Advance while the current character is in ``charset``. 

146 

147 Equivalent to ``while self.current in charset and self.inc(): pass`` but 

148 it scans the underlying string in a single pass and updates the index 

149 and current character only once, instead of paying a per-character 

150 ``inc()`` call. On return ``current`` is the first character NOT in 

151 ``charset`` (or EOF). Returns ``True`` if it stopped on a real 

152 character, ``False`` at EOF — the same value contract as the loop. 

153 """ 

154 i = self._idx 

155 n = len(self) 

156 while i < n and self[i] in charset: 

157 i += 1 

158 if i < n: 

159 self._idx = i 

160 self._current = self[i] 

161 return True 

162 self._idx = n 

163 self._current = self.EOF 

164 return False 

165 

166 def advance_until(self, stopset: frozenset) -> bool: 

167 """Advance while the current character is NOT in ``stopset``. 

168 

169 The mirror of :meth:`advance_while`: equivalent to 

170 ``while self.current not in stopset and self.inc(): pass`` in a single 

171 scan. On return ``current`` is the first character IN ``stopset`` (or 

172 EOF), with the same return-value contract. 

173 """ 

174 i = self._idx 

175 n = len(self) 

176 while i < n and self[i] not in stopset: 

177 i += 1 

178 if i < n: 

179 self._idx = i 

180 self._current = self[i] 

181 return True 

182 self._idx = n 

183 self._current = self.EOF 

184 return False 

185 

186 def inc_n(self, n: int, exception: type[ParseError] | None = None) -> bool: 

187 """ 

188 Increments the parser by n characters 

189 if the end of the input has not been reached. 

190 """ 

191 return all(self.inc(exception=exception) for _ in range(n)) 

192 

193 def consume(self, chars: str, min: int = 0, max: int = -1) -> None: 

194 """ 

195 Consume chars until min/max is satisfied is valid. 

196 """ 

197 while self.current in chars and max != 0: 

198 min -= 1 

199 max -= 1 

200 if not self.inc(): 

201 break 

202 

203 # failed to consume minimum number of characters 

204 if min > 0: 

205 raise self.parse_error(UnexpectedCharError, self.current) 

206 

207 def end(self) -> bool: 

208 """ 

209 Returns True if the parser has reached the end of the input. 

210 """ 

211 return self._idx >= len(self) 

212 

213 def mark(self) -> None: 

214 """ 

215 Sets the marker to the index's current position 

216 """ 

217 self._marker = self._idx 

218 

219 def parse_error( 

220 self, 

221 exception: type[ParseError] = ParseError, 

222 *args: Any, 

223 **kwargs: Any, 

224 ) -> ParseError: 

225 """ 

226 Creates a generic "parse error" at the current position. 

227 """ 

228 line, col = self._to_linecol() 

229 

230 return exception(line, col, *args, **kwargs) 

231 

232 def _to_linecol(self) -> tuple[int, int]: 

233 cur = 0 

234 for i, line in enumerate(self.splitlines()): 

235 if cur + len(line) + 1 > self.idx: 

236 return (i + 1, self.idx - cur) 

237 

238 cur += len(line) + 1 

239 

240 return len(self.splitlines()), 0