Coverage for /pythoncovmergedfiles/medio/medio/src/black/src/blib2to3/pgen2/driver.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4# Modifications: 

5# Copyright 2006 Google, Inc. All Rights Reserved. 

6# Licensed to PSF under a Contributor Agreement. 

7 

8"""Parser driver. 

9 

10This provides a high-level interface to parse a file into a syntax tree. 

11 

12""" 

13 

14__author__ = "Guido van Rossum <guido@python.org>" 

15 

16__all__ = ["Driver", "load_grammar"] 

17 

18# Python imports 

19import io 

20import logging 

21import os 

22import pkgutil 

23import sys 

24from collections.abc import Iterable, Iterator 

25from contextlib import contextmanager 

26from dataclasses import dataclass, field 

27from logging import Logger 

28from typing import IO, Any, Optional, Union, cast 

29 

30from blib2to3.pgen2.grammar import Grammar 

31from blib2to3.pgen2.tokenize import TokenInfo 

32from blib2to3.pytree import NL 

33 

34# Pgen imports 

35from . import grammar, parse, pgen, token, tokenize 

36 

37Path = Union[str, "os.PathLike[str]"] 

38 

39 

40@dataclass 

41class ReleaseRange: 

42 start: int 

43 end: Optional[int] = None 

44 tokens: list[Any] = field(default_factory=list) 

45 

46 def lock(self) -> None: 

47 total_eaten = len(self.tokens) 

48 self.end = self.start + total_eaten 

49 

50 

51class TokenProxy: 

52 def __init__(self, generator: Any) -> None: 

53 self._tokens = generator 

54 self._counter = 0 

55 self._release_ranges: list[ReleaseRange] = [] 

56 

57 @contextmanager 

58 def release(self) -> Iterator["TokenProxy"]: 

59 release_range = ReleaseRange(self._counter) 

60 self._release_ranges.append(release_range) 

61 try: 

62 yield self 

63 finally: 

64 # Lock the last release range to the final position that 

65 # has been eaten. 

66 release_range.lock() 

67 

68 def eat(self, point: int) -> Any: 

69 eaten_tokens = self._release_ranges[-1].tokens 

70 if point < len(eaten_tokens): 

71 return eaten_tokens[point] 

72 else: 

73 while point >= len(eaten_tokens): 

74 token = next(self._tokens) 

75 eaten_tokens.append(token) 

76 return token 

77 

78 def __iter__(self) -> "TokenProxy": 

79 return self 

80 

81 def __next__(self) -> Any: 

82 # If the current position is already compromised (looked up) 

83 # return the eaten token, if not just go further on the given 

84 # token producer. 

85 for release_range in self._release_ranges: 

86 assert release_range.end is not None 

87 

88 start, end = release_range.start, release_range.end 

89 if start <= self._counter < end: 

90 token = release_range.tokens[self._counter - start] 

91 break 

92 else: 

93 token = next(self._tokens) 

94 self._counter += 1 

95 return token 

96 

97 def can_advance(self, to: int) -> bool: 

98 # Try to eat, fail if it can't. The eat operation is cached 

99 # so there won't be any additional cost of eating here 

100 try: 

101 self.eat(to) 

102 except StopIteration: 

103 return False 

104 else: 

105 return True 

106 

107 

108class Driver: 

109 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None: 

110 self.grammar = grammar 

111 if logger is None: 

112 logger = logging.getLogger(__name__) 

113 self.logger = logger 

114 

115 def parse_tokens(self, tokens: Iterable[TokenInfo], debug: bool = False) -> NL: 

116 """Parse a series of tokens and return the syntax tree.""" 

117 # XXX Move the prefix computation into a wrapper around tokenize. 

118 proxy = TokenProxy(tokens) 

119 

120 p = parse.Parser(self.grammar) 

121 p.setup(proxy=proxy) 

122 

123 lineno = 1 

124 column = 0 

125 indent_columns: list[int] = [] 

126 type = value = start = end = line_text = None 

127 prefix = "" 

128 

129 for quintuple in proxy: 

130 type, value, start, end, line_text = quintuple 

131 if start != (lineno, column): 

132 assert (lineno, column) <= start, ((lineno, column), start) 

133 s_lineno, s_column = start 

134 if lineno < s_lineno: 

135 prefix += "\n" * (s_lineno - lineno) 

136 lineno = s_lineno 

137 column = 0 

138 if column < s_column: 

139 prefix += line_text[column:s_column] 

140 column = s_column 

141 if type in (tokenize.COMMENT, tokenize.NL): 

142 prefix += value 

143 lineno, column = end 

144 if value.endswith("\n"): 

145 lineno += 1 

146 column = 0 

147 continue 

148 if type == token.OP: 

149 type = grammar.opmap[value] 

150 if debug: 

151 assert type is not None 

152 self.logger.debug( 

153 "%s %r (prefix=%r)", token.tok_name[type], value, prefix 

154 ) 

155 if type == token.INDENT: 

156 indent_columns.append(len(value)) 

157 _prefix = prefix + value 

158 prefix = "" 

159 value = "" 

160 elif type == token.DEDENT: 

161 _indent_col = indent_columns.pop() 

162 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col) 

163 if p.addtoken(cast(int, type), value, (prefix, start)): 

164 if debug: 

165 self.logger.debug("Stop.") 

166 break 

167 prefix = "" 

168 if type in {token.INDENT, token.DEDENT}: 

169 prefix = _prefix 

170 lineno, column = end 

171 # FSTRING_MIDDLE and TSTRING_MIDDLE are the only token that can end with a 

172 # newline, and `end` will point to the next line. For that case, don't 

173 # increment lineno. 

174 if value.endswith("\n") and type not in ( 

175 token.FSTRING_MIDDLE, 

176 token.TSTRING_MIDDLE, 

177 ): 

178 lineno += 1 

179 column = 0 

180 else: 

181 # We never broke out -- EOF is too soon (how can this happen???) 

182 assert start is not None 

183 raise parse.ParseError("incomplete input", type, value, (prefix, start)) 

184 assert p.rootnode is not None 

185 return p.rootnode 

186 

187 def parse_file( 

188 self, filename: Path, encoding: Optional[str] = None, debug: bool = False 

189 ) -> NL: 

190 """Parse a file and return the syntax tree.""" 

191 with open(filename, encoding=encoding) as stream: 

192 text = stream.read() 

193 return self.parse_string(text, debug) 

194 

195 def parse_string(self, text: str, debug: bool = False) -> NL: 

196 """Parse a string and return the syntax tree.""" 

197 tokens = tokenize.tokenize(text, grammar=self.grammar) 

198 return self.parse_tokens(tokens, debug) 

199 

200 def _partially_consume_prefix(self, prefix: str, column: int) -> tuple[str, str]: 

201 lines: list[str] = [] 

202 current_line = "" 

203 current_column = 0 

204 wait_for_nl = False 

205 for char in prefix: 

206 current_line += char 

207 if wait_for_nl: 

208 if char == "\n": 

209 if current_line.strip() and current_column < column: 

210 res = "".join(lines) 

211 return res, prefix[len(res) :] 

212 

213 lines.append(current_line) 

214 current_line = "" 

215 current_column = 0 

216 wait_for_nl = False 

217 elif char in " \t": 

218 current_column += 1 

219 elif char == "\n": 

220 # unexpected empty line 

221 current_column = 0 

222 elif char == "\f": 

223 current_column = 0 

224 else: 

225 # indent is finished 

226 wait_for_nl = True 

227 return "".join(lines), current_line 

228 

229 

230def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str: 

231 head, tail = os.path.splitext(gt) 

232 if tail == ".txt": 

233 tail = "" 

234 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle" 

235 if cache_dir: 

236 return os.path.join(cache_dir, os.path.basename(name)) 

237 else: 

238 return name 

239 

240 

241def load_grammar( 

242 gt: str = "Grammar.txt", 

243 gp: Optional[str] = None, 

244 save: bool = True, 

245 force: bool = False, 

246 logger: Optional[Logger] = None, 

247) -> Grammar: 

248 """Load the grammar (maybe from a pickle).""" 

249 if logger is None: 

250 logger = logging.getLogger(__name__) 

251 gp = _generate_pickle_name(gt) if gp is None else gp 

252 if force or not _newer(gp, gt): 

253 g: grammar.Grammar = pgen.generate_grammar(gt) 

254 if save: 

255 try: 

256 g.dump(gp) 

257 except OSError: 

258 # Ignore error, caching is not vital. 

259 pass 

260 else: 

261 g = grammar.Grammar() 

262 g.load(gp) 

263 return g 

264 

265 

266def _newer(a: str, b: str) -> bool: 

267 """Inquire whether file a was written since file b.""" 

268 if not os.path.exists(a): 

269 return False 

270 if not os.path.exists(b): 

271 return True 

272 return os.path.getmtime(a) >= os.path.getmtime(b) 

273 

274 

275def load_packaged_grammar( 

276 package: str, grammar_source: str, cache_dir: Optional[Path] = None 

277) -> grammar.Grammar: 

278 """Normally, loads a pickled grammar by doing 

279 pkgutil.get_data(package, pickled_grammar) 

280 where *pickled_grammar* is computed from *grammar_source* by adding the 

281 Python version and using a ``.pickle`` extension. 

282 

283 However, if *grammar_source* is an extant file, load_grammar(grammar_source) 

284 is called instead. This facilitates using a packaged grammar file when needed 

285 but preserves load_grammar's automatic regeneration behavior when possible. 

286 

287 """ 

288 if os.path.isfile(grammar_source): 

289 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None 

290 return load_grammar(grammar_source, gp=gp) 

291 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir) 

292 data = pkgutil.get_data(package, pickled_name) 

293 assert data is not None 

294 g = grammar.Grammar() 

295 g.loads(data) 

296 return g 

297 

298 

299def main(*args: str) -> bool: 

300 """Main program, when run as a script: produce grammar pickle files. 

301 

302 Calls load_grammar for each argument, a path to a grammar text file. 

303 """ 

304 if not args: 

305 args = tuple(sys.argv[1:]) 

306 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s") 

307 for gt in args: 

308 load_grammar(gt, save=True, force=True) 

309 return True 

310 

311 

312if __name__ == "__main__": 

313 sys.exit(int(not main()))