Coverage for /pythoncovmergedfiles/medio/medio/src/black/src/blib2to3/pgen2/driver.py: 85%

205 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:15 +0000

1# Copyright 2004-2005 Elemental Security, Inc. All Rights Reserved. 

2# Licensed to PSF under a Contributor Agreement. 

3 

4# Modifications: 

5# Copyright 2006 Google, Inc. All Rights Reserved. 

6# Licensed to PSF under a Contributor Agreement. 

7 

8"""Parser driver. 

9 

10This provides a high-level interface to parse a file into a syntax tree. 

11 

12""" 

13 

14__author__ = "Guido van Rossum <guido@python.org>" 

15 

16__all__ = ["Driver", "load_grammar"] 

17 

18# Python imports 

19import io 

20import os 

21import logging 

22import pkgutil 

23import sys 

24from typing import ( 

25 Any, 

26 cast, 

27 IO, 

28 Iterable, 

29 List, 

30 Optional, 

31 Text, 

32 Iterator, 

33 Tuple, 

34 TypeVar, 

35 Generic, 

36 Union, 

37) 

38from contextlib import contextmanager 

39from dataclasses import dataclass, field 

40 

41# Pgen imports 

42from . import grammar, parse, token, tokenize, pgen 

43from logging import Logger 

44from blib2to3.pytree import NL 

45from blib2to3.pgen2.grammar import Grammar 

46from blib2to3.pgen2.tokenize import GoodTokenInfo 

47 

48Path = Union[str, "os.PathLike[str]"] 

49 

50 

51@dataclass 

52class ReleaseRange: 

53 start: int 

54 end: Optional[int] = None 

55 tokens: List[Any] = field(default_factory=list) 

56 

57 def lock(self) -> None: 

58 total_eaten = len(self.tokens) 

59 self.end = self.start + total_eaten 

60 

61 

62class TokenProxy: 

63 def __init__(self, generator: Any) -> None: 

64 self._tokens = generator 

65 self._counter = 0 

66 self._release_ranges: List[ReleaseRange] = [] 

67 

68 @contextmanager 

69 def release(self) -> Iterator["TokenProxy"]: 

70 release_range = ReleaseRange(self._counter) 

71 self._release_ranges.append(release_range) 

72 try: 

73 yield self 

74 finally: 

75 # Lock the last release range to the final position that 

76 # has been eaten. 

77 release_range.lock() 

78 

79 def eat(self, point: int) -> Any: 

80 eaten_tokens = self._release_ranges[-1].tokens 

81 if point < len(eaten_tokens): 

82 return eaten_tokens[point] 

83 else: 

84 while point >= len(eaten_tokens): 

85 token = next(self._tokens) 

86 eaten_tokens.append(token) 

87 return token 

88 

89 def __iter__(self) -> "TokenProxy": 

90 return self 

91 

92 def __next__(self) -> Any: 

93 # If the current position is already compromised (looked up) 

94 # return the eaten token, if not just go further on the given 

95 # token producer. 

96 for release_range in self._release_ranges: 

97 assert release_range.end is not None 

98 

99 start, end = release_range.start, release_range.end 

100 if start <= self._counter < end: 

101 token = release_range.tokens[self._counter - start] 

102 break 

103 else: 

104 token = next(self._tokens) 

105 self._counter += 1 

106 return token 

107 

108 def can_advance(self, to: int) -> bool: 

109 # Try to eat, fail if it can't. The eat operation is cached 

110 # so there won't be any additional cost of eating here 

111 try: 

112 self.eat(to) 

113 except StopIteration: 

114 return False 

115 else: 

116 return True 

117 

118 

119class Driver(object): 

120 def __init__(self, grammar: Grammar, logger: Optional[Logger] = None) -> None: 

121 self.grammar = grammar 

122 if logger is None: 

123 logger = logging.getLogger(__name__) 

124 self.logger = logger 

125 

126 def parse_tokens(self, tokens: Iterable[GoodTokenInfo], debug: bool = False) -> NL: 

127 """Parse a series of tokens and return the syntax tree.""" 

128 # XXX Move the prefix computation into a wrapper around tokenize. 

129 proxy = TokenProxy(tokens) 

130 

131 p = parse.Parser(self.grammar) 

132 p.setup(proxy=proxy) 

133 

134 lineno = 1 

135 column = 0 

136 indent_columns: List[int] = [] 

137 type = value = start = end = line_text = None 

138 prefix = "" 

139 

140 for quintuple in proxy: 

141 type, value, start, end, line_text = quintuple 

142 if start != (lineno, column): 

143 assert (lineno, column) <= start, ((lineno, column), start) 

144 s_lineno, s_column = start 

145 if lineno < s_lineno: 

146 prefix += "\n" * (s_lineno - lineno) 

147 lineno = s_lineno 

148 column = 0 

149 if column < s_column: 

150 prefix += line_text[column:s_column] 

151 column = s_column 

152 if type in (tokenize.COMMENT, tokenize.NL): 

153 prefix += value 

154 lineno, column = end 

155 if value.endswith("\n"): 

156 lineno += 1 

157 column = 0 

158 continue 

159 if type == token.OP: 

160 type = grammar.opmap[value] 

161 if debug: 

162 assert type is not None 

163 self.logger.debug( 

164 "%s %r (prefix=%r)", token.tok_name[type], value, prefix 

165 ) 

166 if type == token.INDENT: 

167 indent_columns.append(len(value)) 

168 _prefix = prefix + value 

169 prefix = "" 

170 value = "" 

171 elif type == token.DEDENT: 

172 _indent_col = indent_columns.pop() 

173 prefix, _prefix = self._partially_consume_prefix(prefix, _indent_col) 

174 if p.addtoken(cast(int, type), value, (prefix, start)): 

175 if debug: 

176 self.logger.debug("Stop.") 

177 break 

178 prefix = "" 

179 if type in {token.INDENT, token.DEDENT}: 

180 prefix = _prefix 

181 lineno, column = end 

182 if value.endswith("\n"): 

183 lineno += 1 

184 column = 0 

185 else: 

186 # We never broke out -- EOF is too soon (how can this happen???) 

187 assert start is not None 

188 raise parse.ParseError("incomplete input", type, value, (prefix, start)) 

189 assert p.rootnode is not None 

190 return p.rootnode 

191 

192 def parse_stream_raw(self, stream: IO[Text], debug: bool = False) -> NL: 

193 """Parse a stream and return the syntax tree.""" 

194 tokens = tokenize.generate_tokens(stream.readline, grammar=self.grammar) 

195 return self.parse_tokens(tokens, debug) 

196 

197 def parse_stream(self, stream: IO[Text], debug: bool = False) -> NL: 

198 """Parse a stream and return the syntax tree.""" 

199 return self.parse_stream_raw(stream, debug) 

200 

201 def parse_file( 

202 self, filename: Path, encoding: Optional[Text] = None, debug: bool = False 

203 ) -> NL: 

204 """Parse a file and return the syntax tree.""" 

205 with io.open(filename, "r", encoding=encoding) as stream: 

206 return self.parse_stream(stream, debug) 

207 

208 def parse_string(self, text: Text, debug: bool = False) -> NL: 

209 """Parse a string and return the syntax tree.""" 

210 tokens = tokenize.generate_tokens( 

211 io.StringIO(text).readline, grammar=self.grammar 

212 ) 

213 return self.parse_tokens(tokens, debug) 

214 

215 def _partially_consume_prefix(self, prefix: Text, column: int) -> Tuple[Text, Text]: 

216 lines: List[str] = [] 

217 current_line = "" 

218 current_column = 0 

219 wait_for_nl = False 

220 for char in prefix: 

221 current_line += char 

222 if wait_for_nl: 

223 if char == "\n": 

224 if current_line.strip() and current_column < column: 

225 res = "".join(lines) 

226 return res, prefix[len(res) :] 

227 

228 lines.append(current_line) 

229 current_line = "" 

230 current_column = 0 

231 wait_for_nl = False 

232 elif char in " \t": 

233 current_column += 1 

234 elif char == "\n": 

235 # unexpected empty line 

236 current_column = 0 

237 else: 

238 # indent is finished 

239 wait_for_nl = True 

240 return "".join(lines), current_line 

241 

242 

243def _generate_pickle_name(gt: Path, cache_dir: Optional[Path] = None) -> Text: 

244 head, tail = os.path.splitext(gt) 

245 if tail == ".txt": 

246 tail = "" 

247 name = head + tail + ".".join(map(str, sys.version_info)) + ".pickle" 

248 if cache_dir: 

249 return os.path.join(cache_dir, os.path.basename(name)) 

250 else: 

251 return name 

252 

253 

254def load_grammar( 

255 gt: Text = "Grammar.txt", 

256 gp: Optional[Text] = None, 

257 save: bool = True, 

258 force: bool = False, 

259 logger: Optional[Logger] = None, 

260) -> Grammar: 

261 """Load the grammar (maybe from a pickle).""" 

262 if logger is None: 

263 logger = logging.getLogger(__name__) 

264 gp = _generate_pickle_name(gt) if gp is None else gp 

265 if force or not _newer(gp, gt): 

266 g: grammar.Grammar = pgen.generate_grammar(gt) 

267 if save: 

268 try: 

269 g.dump(gp) 

270 except OSError: 

271 # Ignore error, caching is not vital. 

272 pass 

273 else: 

274 g = grammar.Grammar() 

275 g.load(gp) 

276 return g 

277 

278 

279def _newer(a: Text, b: Text) -> bool: 

280 """Inquire whether file a was written since file b.""" 

281 if not os.path.exists(a): 

282 return False 

283 if not os.path.exists(b): 

284 return True 

285 return os.path.getmtime(a) >= os.path.getmtime(b) 

286 

287 

288def load_packaged_grammar( 

289 package: str, grammar_source: Text, cache_dir: Optional[Path] = None 

290) -> grammar.Grammar: 

291 """Normally, loads a pickled grammar by doing 

292 pkgutil.get_data(package, pickled_grammar) 

293 where *pickled_grammar* is computed from *grammar_source* by adding the 

294 Python version and using a ``.pickle`` extension. 

295 

296 However, if *grammar_source* is an extant file, load_grammar(grammar_source) 

297 is called instead. This facilitates using a packaged grammar file when needed 

298 but preserves load_grammar's automatic regeneration behavior when possible. 

299 

300 """ 

301 if os.path.isfile(grammar_source): 

302 gp = _generate_pickle_name(grammar_source, cache_dir) if cache_dir else None 

303 return load_grammar(grammar_source, gp=gp) 

304 pickled_name = _generate_pickle_name(os.path.basename(grammar_source), cache_dir) 

305 data = pkgutil.get_data(package, pickled_name) 

306 assert data is not None 

307 g = grammar.Grammar() 

308 g.loads(data) 

309 return g 

310 

311 

312def main(*args: Text) -> bool: 

313 """Main program, when run as a script: produce grammar pickle files. 

314 

315 Calls load_grammar for each argument, a path to a grammar text file. 

316 """ 

317 if not args: 

318 args = tuple(sys.argv[1:]) 

319 logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s") 

320 for gt in args: 

321 load_grammar(gt, save=True, force=True) 

322 return True 

323 

324 

325if __name__ == "__main__": 

326 sys.exit(int(not main()))