Coverage for /pythoncovmergedfiles/medio/medio/src/black/src/blib2to3/pgen2/driver.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4# Modifications: 

5# Copyright 2006 Google, Inc. All Rights Reserved. 

6# Licensed to PSF under a Contributor Agreement. 

7 

8"""Parser driver. 

9 

10This provides a high-level interface to parse a file into a syntax tree. 

11 

12""" 

13 

14__author__ = "Guido van Rossum <guido@python.org>" 

15 

16__all__ = ["Driver", "load_grammar"] 

17 

18# Python imports 

19import io 

20import logging 

21import os 

22import pkgutil 

23import sys 

24from collections.abc import Iterable, Iterator 

25from contextlib import contextmanager 

26from dataclasses import dataclass, field 

27from logging import Logger 

28from typing import IO, Any, Optional, Union, cast 

29 

30from blib2to3.pgen2.grammar import Grammar 

31from blib2to3.pgen2.tokenize import TokenInfo 

32from blib2to3.pytree import NL 

33 

34# Pgen imports 

35from . import grammar, parse, pgen, token, tokenize 

36 

37Path = Union[str, "os.PathLike[str]"] 

38 

39 

40@dataclass 

41class ReleaseRange: 

42 start: int 

43 end: Optional[int] = None 

44 tokens: list[Any] = field(default_factory=list) 

45 

46 def lock(self) -> None: 

47 total_eaten = len(self.tokens) 

48 self.end = self.start + total_eaten 

49 

50 

51class TokenProxy: 

52 def __init__(self, generator: Any) -> None: 

53 self._tokens = generator 

54 self._counter = 0 

55 self._release_ranges: list[ReleaseRange] = [] 

56 

57 @contextmanager 

58 def release(self) -> Iterator["TokenProxy"]: 

59 release_range = ReleaseRange(self._counter) 

60 self._release_ranges.append(release_range) 

61 try: 

62 yield self 

63 finally: 

64 # Lock the last release range to the final position that 

65 # has been eaten. 

66 release_range.lock() 

67 

68 def eat(self, point: int) -> Any: 

69 eaten_tokens = self._release_ranges[-1].tokens 

70 if point < len(eaten_tokens): 

71 return eaten_tokens[point] 

72 else: 

73 while point >= len(eaten_tokens): 

74 token = next(self._tokens) 

75 eaten_tokens.append(token) 

76 return token 

77 

78 def __iter__(self) -> "TokenProxy": 

79 return self 

80 

81 def __next__(self) -> Any: 

82 # If the current position is already compromised (looked up) 

83 # return the eaten token, if not just go further on the given 

84 # token producer. 

85 for release_range in self._release_ranges: 

86 assert release_range.end is not None 

87 

88 start, end = release_range.start, release_range.end 

89 if start <= self._counter < end: 

90 token = release_range.tokens[self._counter - start] 

91 break 

92 else: 

93 token = next(self._tokens) 

94 self._counter += 1 

95 return token 

96 

97 def can_advance(self, to: int) -> bool: 

98 # Try to eat, fail if it can't. The eat operation is cached 

99 # so there won't be any additional cost of eating here 

100 try: 

101 self.eat(to) 

102 except StopIteration: 

103 return False 

104 else: 

105 return True 

106 

107 

108class Driver: 

109 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None: 

110 self.grammar = grammar 

111 if logger is None: 

112 logger = logging.getLogger(__name__) 

113 self.logger = logger 

114 

115 def parse_tokens(self, tokens: Iterable[TokenInfo], debug: bool = False) -> NL: 

116 """Parse a series of tokens and return the syntax tree.""" 

117 # XXX Move the prefix computation into a wrapper around tokenize. 

118 proxy = TokenProxy(tokens) 

119 

120 p = parse.Parser(self.grammar) 

121 p.setup(proxy=proxy) 

122 

123 lineno = 1 

124 column = 0 

125 indent_columns: list[int] = [] 

126 type = value = start = end = line_text = None 

127 prefix = "" 

128 

129 for quintuple in proxy: 

130 type, value, start, end, line_text = quintuple 

131 if start != (lineno, column): 

132 assert (lineno, column) <= start, ((lineno, column), start) 

133 s_lineno, s_column = start 

134 if lineno < s_lineno: 

135 prefix += "\n" * (s_lineno - lineno) 

136 lineno = s_lineno 

137 column = 0 

138 if column < s_column: 

139 prefix += line_text[column:s_column] 

140 column = s_column 

141 if type in (tokenize.COMMENT, tokenize.NL): 

142 prefix += value 

143 lineno, column = end 

144 if value.endswith("\n"): 

145 lineno += 1 

146 column = 0 

147 continue 

148 if type == token.OP: 

149 type = grammar.opmap[value] 

150 if debug: 

151 assert type is not None 

152 self.logger.debug( 

153 "%s %r (prefix=%r)", token.tok_name[type], value, prefix 

154 ) 

155 if type == token.INDENT: 

156 indent_columns.append(len(value)) 

157 _prefix = prefix + value 

158 prefix = "" 

159 value = "" 

160 elif type == token.DEDENT: 

161 _indent_col = indent_columns.pop() 

162 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col) 

163 if p.addtoken(cast(int, type), value, (prefix, start)): 

164 if debug: 

165 self.logger.debug("Stop.") 

166 break 

167 prefix = "" 

168 if type in {token.INDENT, token.DEDENT}: 

169 prefix = _prefix 

170 lineno, column = end 

171 # FSTRING_MIDDLE is the only token that can end with a newline, and 

172 # `end` will point to the next line. For that case, don't increment lineno. 

173 if value.endswith("\n") and type != token.FSTRING_MIDDLE: 

174 lineno += 1 

175 column = 0 

176 else: 

177 # We never broke out -- EOF is too soon (how can this happen???) 

178 assert start is not None 

179 raise parse.ParseError("incomplete input", type, value, (prefix, start)) 

180 assert p.rootnode is not None 

181 return p.rootnode 

182 

183 def parse_file( 

184 self, filename: Path, encoding: Optional[str] = None, debug: bool = False 

185 ) -> NL: 

186 """Parse a file and return the syntax tree.""" 

187 with open(filename, encoding=encoding) as stream: 

188 text = stream.read() 

189 return self.parse_string(text, debug) 

190 

191 def parse_string(self, text: str, debug: bool = False) -> NL: 

192 """Parse a string and return the syntax tree.""" 

193 tokens = tokenize.tokenize(text, grammar=self.grammar) 

194 return self.parse_tokens(tokens, debug) 

195 

196 def _partially_consume_prefix(self, prefix: str, column: int) -> tuple[str, str]: 

197 lines: list[str] = [] 

198 current_line = "" 

199 current_column = 0 

200 wait_for_nl = False 

201 for char in prefix: 

202 current_line += char 

203 if wait_for_nl: 

204 if char == "\n": 

205 if current_line.strip() and current_column < column: 

206 res = "".join(lines) 

207 return res, prefix[len(res) :] 

208 

209 lines.append(current_line) 

210 current_line = "" 

211 current_column = 0 

212 wait_for_nl = False 

213 elif char in " \t": 

214 current_column += 1 

215 elif char == "\n": 

216 # unexpected empty line 

217 current_column = 0 

218 elif char == "\f": 

219 current_column = 0 

220 else: 

221 # indent is finished 

222 wait_for_nl = True 

223 return "".join(lines), current_line 

224 

225 

226def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> str: 

227 head, tail = os.path.splitext(gt) 

228 if tail == ".txt": 

229 tail = "" 

230 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle" 

231 if cache_dir: 

232 return os.path.join(cache_dir, os.path.basename(name)) 

233 else: 

234 return name 

235 

236 

237def load_grammar( 

238 gt: str = "Grammar.txt", 

239 gp: Optional[str] = None, 

240 save: bool = True, 

241 force: bool = False, 

242 logger: Optional[Logger] = None, 

243) -> Grammar: 

244 """Load the grammar (maybe from a pickle).""" 

245 if logger is None: 

246 logger = logging.getLogger(__name__) 

247 gp = _generate_pickle_name(gt) if gp is None else gp 

248 if force or not _newer(gp, gt): 

249 g: grammar.Grammar = pgen.generate_grammar(gt) 

250 if save: 

251 try: 

252 g.dump(gp) 

253 except OSError: 

254 # Ignore error, caching is not vital. 

255 pass 

256 else: 

257 g = grammar.Grammar() 

258 g.load(gp) 

259 return g 

260 

261 

262def _newer(a: str, b: str) -> bool: 

263 """Inquire whether file a was written since file b.""" 

264 if not os.path.exists(a): 

265 return False 

266 if not os.path.exists(b): 

267 return True 

268 return os.path.getmtime(a) >= os.path.getmtime(b) 

269 

270 

271def load_packaged_grammar( 

272 package: str, grammar_source: str, cache_dir: Optional[Path] = None 

273) -> grammar.Grammar: 

274 """Normally, loads a pickled grammar by doing 

275 pkgutil.get_data(package, pickled_grammar) 

276 where *pickled_grammar* is computed from *grammar_source* by adding the 

277 Python version and using a ``.pickle`` extension. 

278 

279 However, if *grammar_source* is an extant file, load_grammar(grammar_source) 

280 is called instead. This facilitates using a packaged grammar file when needed 

281 but preserves load_grammar's automatic regeneration behavior when possible. 

282 

283 """ 

284 if os.path.isfile(grammar_source): 

285 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None 

286 return load_grammar(grammar_source, gp=gp) 

287 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir) 

288 data = pkgutil.get_data(package, pickled_name) 

289 assert data is not None 

290 g = grammar.Grammar() 

291 g.loads(data) 

292 return g 

293 

294 

295def main(*args: str) -> bool: 

296 """Main program, when run as a script: produce grammar pickle files. 

297 

298 Calls load_grammar for each argument, a path to a grammar text file. 

299 """ 

300 if not args: 

301 args = tuple(sys.argv[1:]) 

302 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s") 

303 for gt in args: 

304 load_grammar(gt, save=True, force=True) 

305 return True 

306 

307 

308if __name__ == "__main__": 

309 sys.exit(int(not main()))