Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/executing/executing.py: 19%

591 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:07 +0000

1""" 

2MIT License 

3 

4Copyright (c) 2021 Alex Hall 

5 

6Permission is hereby granted, free of charge, to any person obtaining a copy 

7of this software and associated documentation files (the "Software"), to deal 

8in the Software without restriction, including without limitation the rights 

9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 

10copies of the Software, and to permit persons to whom the Software is 

11furnished to do so, subject to the following conditions: 

12 

13The above copyright notice and this permission notice shall be included in all 

14copies or substantial portions of the Software. 

15 

16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

22SOFTWARE. 

23""" 

24 

25import __future__ 

26import ast 

27import dis 

28import functools 

29import inspect 

30import io 

31import linecache 

32import re 

33import sys 

34import types 

35from collections import defaultdict, namedtuple 

36from copy import deepcopy 

37from itertools import islice 

38from operator import attrgetter 

39from threading import RLock 

40from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast 

41 

42if TYPE_CHECKING: # pragma: no cover 

43 from asttokens import ASTTokens, ASTText 

44 from asttokens.asttokens import ASTTextBase 

45 

46function_node_types = (ast.FunctionDef,) # type: Tuple[Type, ...] 

47if sys.version_info[0] == 3: 

48 function_node_types += (ast.AsyncFunctionDef,) 

49 

50 

51if sys.version_info[0] == 3: 

52 # noinspection PyUnresolvedReferences 

53 from functools import lru_cache 

54 # noinspection PyUnresolvedReferences 

55 from tokenize import detect_encoding 

56 from itertools import zip_longest 

57 # noinspection PyUnresolvedReferences,PyCompatibility 

58 from pathlib import Path 

59 

60 cache = lru_cache(maxsize=None) 

61 text_type = str 

62else: 

63 from lib2to3.pgen2.tokenize import detect_encoding, cookie_re as encoding_pattern # type: ignore[attr-defined] 

64 from itertools import izip_longest as zip_longest 

65 

66 class Path(object): 

67 pass 

68 

69 

70 def cache(func): 

71 # type: (Callable) -> Callable 

72 d = {} # type: Dict[Tuple, Callable]  

73 

74 @functools.wraps(func) 

75 def wrapper(*args): 

76 # type: (Any) -> Any 

77 if args in d: 

78 return d[args] 

79 result = d[args] = func(*args) 

80 return result 

81 

82 return wrapper 

83 

84 

85 # noinspection PyUnresolvedReferences 

86 text_type = unicode 

87 

88# Type class used to expand out the definition of AST to include fields added by this library 

89# It's not actually used for anything other than type checking though! 

90class EnhancedAST(ast.AST): 

91 parent = None # type: EnhancedAST 

92 

93if sys.version_info >= (3, 4): 

94 # noinspection PyUnresolvedReferences 

95 _get_instructions = dis.get_instructions 

96 from dis import Instruction as _Instruction 

97 

98 class Instruction(_Instruction): 

99 lineno = None # type: int 

100else: 

101 class Instruction(namedtuple('Instruction', 'offset argval opname starts_line')): 

102 lineno = None # type: int 

103 

104 from dis import HAVE_ARGUMENT, EXTENDED_ARG, hasconst, opname, findlinestarts, hasname 

105 

106 # Based on dis.disassemble from 2.7 

107 # Left as similar as possible for easy diff 

108 

109 def _get_instructions(co): 

110 # type: (types.CodeType) -> Iterator[Instruction] 

111 code = co.co_code 

112 linestarts = dict(findlinestarts(co)) 

113 n = len(code) 

114 i = 0 

115 extended_arg = 0 

116 while i < n: 

117 offset = i 

118 c = code[i] 

119 op = ord(c) 

120 lineno = linestarts.get(i) 

121 argval = None 

122 i = i + 1 

123 if op >= HAVE_ARGUMENT: 

124 oparg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg 

125 extended_arg = 0 

126 i = i + 2 

127 if op == EXTENDED_ARG: 

128 extended_arg = oparg * 65536 

129 

130 if op in hasconst: 

131 argval = co.co_consts[oparg] 

132 elif op in hasname: 

133 argval = co.co_names[oparg] 

134 elif opname[op] == 'LOAD_FAST': 

135 argval = co.co_varnames[oparg] 

136 yield Instruction(offset, argval, opname[op], lineno) 

137 

138 

139# Type class used to expand out the definition of AST to include fields added by this library 

140# It's not actually used for anything other than type checking though! 

141class EnhancedInstruction(Instruction): 

142 _copied = None # type: bool 

143 

144 

145 

146def assert_(condition, message=""): 

147 # type: (Any, str) -> None 

148 """ 

149 Like an assert statement, but unaffected by -O 

150 :param condition: value that is expected to be truthy 

151 :type message: Any 

152 """ 

153 if not condition: 

154 raise AssertionError(str(message)) 

155 

156 

157def get_instructions(co): 

158 # type: (types.CodeType) -> Iterator[EnhancedInstruction] 

159 lineno = co.co_firstlineno 

160 for inst in _get_instructions(co): 

161 inst = cast(EnhancedInstruction, inst) 

162 lineno = inst.starts_line or lineno 

163 assert_(lineno) 

164 inst.lineno = lineno 

165 yield inst 

166 

167 

168TESTING = 0 

169 

170 

171class NotOneValueFound(Exception): 

172 def __init__(self,msg,values=[]): 

173 # type: (str, Sequence) -> None 

174 self.values=values 

175 super(NotOneValueFound,self).__init__(msg) 

176 

177T = TypeVar('T') 

178 

179 

180def only(it): 

181 # type: (Iterable[T]) -> T 

182 if isinstance(it, Sized): 

183 if len(it) != 1: 

184 raise NotOneValueFound('Expected one value, found %s' % len(it)) 

185 # noinspection PyTypeChecker 

186 return list(it)[0] 

187 

188 lst = tuple(islice(it, 2)) 

189 if len(lst) == 0: 

190 raise NotOneValueFound('Expected one value, found 0') 

191 if len(lst) > 1: 

192 raise NotOneValueFound('Expected one value, found several',lst) 

193 return lst[0] 

194 

195 

196class Source(object): 

197 """ 

198 The source code of a single file and associated metadata. 

199 

200 The main method of interest is the classmethod `executing(frame)`. 

201 

202 If you want an instance of this class, don't construct it. 

203 Ideally use the classmethod `for_frame(frame)`. 

204 If you don't have a frame, use `for_filename(filename [, module_globals])`. 

205 These methods cache instances by filename, so at most one instance exists per filename. 

206 

207 Attributes: 

208 - filename 

209 - text 

210 - lines 

211 - tree: AST parsed from text, or None if text is not valid Python 

212 All nodes in the tree have an extra `parent` attribute 

213 

214 Other methods of interest: 

215 - statements_at_line 

216 - asttokens 

217 - code_qualname 

218 """ 

219 

220 def __init__(self, filename, lines): 

221 # type: (str, Sequence[str]) -> None 

222 """ 

223 Don't call this constructor, see the class docstring. 

224 """ 

225 

226 self.filename = filename 

227 text = ''.join(lines) 

228 

229 if not isinstance(text, text_type): 

230 encoding = self.detect_encoding(text) 

231 # noinspection PyUnresolvedReferences 

232 text = text.decode(encoding) 

233 lines = [line.decode(encoding) for line in lines] 

234 

235 self.text = text 

236 self.lines = [line.rstrip('\r\n') for line in lines] 

237 

238 if sys.version_info[0] == 3: 

239 ast_text = text 

240 else: 

241 # In python 2 it's a syntax error to parse unicode 

242 # with an encoding declaration, so we remove it but 

243 # leave empty lines in its place to keep line numbers the same 

244 ast_text = ''.join([ 

245 '\n' if i < 2 and encoding_pattern.match(line) 

246 else line 

247 for i, line in enumerate(lines) 

248 ]) 

249 

250 self._nodes_by_line = defaultdict(list) 

251 self.tree = None 

252 self._qualnames = {} 

253 self._asttokens = None # type: Optional[ASTTokens] 

254 self._asttext = None # type: Optional[ASTText] 

255 

256 try: 

257 self.tree = ast.parse(ast_text, filename=filename) 

258 except (SyntaxError, ValueError): 

259 pass 

260 else: 

261 for node in ast.walk(self.tree): 

262 for child in ast.iter_child_nodes(node): 

263 cast(EnhancedAST, child).parent = cast(EnhancedAST, node) 

264 for lineno in node_linenos(node): 

265 self._nodes_by_line[lineno].append(node) 

266 

267 visitor = QualnameVisitor() 

268 visitor.visit(self.tree) 

269 self._qualnames = visitor.qualnames 

270 

271 @classmethod 

272 def for_frame(cls, frame, use_cache=True): 

273 # type: (types.FrameType, bool) -> "Source" 

274 """ 

275 Returns the `Source` object corresponding to the file the frame is executing in. 

276 """ 

277 return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache) 

278 

279 @classmethod 

280 def for_filename( 

281 cls, 

282 filename, 

283 module_globals=None, 

284 use_cache=True, # noqa no longer used 

285 ): 

286 # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source" 

287 if isinstance(filename, Path): 

288 filename = str(filename) 

289 

290 def get_lines(): 

291 # type: () -> List[str] 

292 return linecache.getlines(cast(text_type, filename), module_globals) 

293 

294 # Save the current linecache entry, then ensure the cache is up to date. 

295 entry = linecache.cache.get(filename) # type: ignore[attr-defined] 

296 linecache.checkcache(filename) 

297 lines = get_lines() 

298 if entry is not None and not lines: 

299 # There was an entry, checkcache removed it, and nothing replaced it. 

300 # This means the file wasn't simply changed (because the `lines` wouldn't be empty) 

301 # but rather the file was found not to exist, probably because `filename` was fake. 

302 # Restore the original entry so that we still have something. 

303 linecache.cache[filename] = entry # type: ignore[attr-defined] 

304 lines = get_lines() 

305 

306 return cls._for_filename_and_lines(filename, tuple(lines)) 

307 

308 @classmethod 

309 def _for_filename_and_lines(cls, filename, lines): 

310 # type: (str, Sequence[str]) -> "Source" 

311 source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source] 

312 try: 

313 return source_cache[(filename, lines)] 

314 except KeyError: 

315 pass 

316 

317 result = source_cache[(filename, lines)] = cls(filename, lines) 

318 return result 

319 

320 @classmethod 

321 def lazycache(cls, frame): 

322 # type: (types.FrameType) -> None 

323 if sys.version_info >= (3, 5): 

324 linecache.lazycache(frame.f_code.co_filename, frame.f_globals) 

325 

326 @classmethod 

327 def executing(cls, frame_or_tb): 

328 # type: (Union[types.TracebackType, types.FrameType]) -> "Executing" 

329 """ 

330 Returns an `Executing` object representing the operation 

331 currently executing in the given frame or traceback object. 

332 """ 

333 if isinstance(frame_or_tb, types.TracebackType): 

334 # https://docs.python.org/3/reference/datamodel.html#traceback-objects 

335 # "tb_lineno gives the line number where the exception occurred; 

336 # tb_lasti indicates the precise instruction. 

337 # The line number and last instruction in the traceback may differ 

338 # from the line number of its frame object 

339 # if the exception occurred in a try statement with no matching except clause 

340 # or with a finally clause." 

341 tb = frame_or_tb 

342 frame = tb.tb_frame 

343 lineno = tb.tb_lineno 

344 lasti = tb.tb_lasti 

345 else: 

346 frame = frame_or_tb 

347 lineno = frame.f_lineno 

348 lasti = frame.f_lasti 

349 

350 

351 

352 code = frame.f_code 

353 key = (code, id(code), lasti) 

354 executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any] 

355 

356 args = executing_cache.get(key) 

357 if not args: 

358 node = stmts = decorator = None 

359 source = cls.for_frame(frame) 

360 tree = source.tree 

361 if tree: 

362 try: 

363 stmts = source.statements_at_line(lineno) 

364 if stmts: 

365 if is_ipython_cell_code(code): 

366 decorator, node = find_node_ipython(frame, lasti, stmts, source) 

367 else: 

368 node_finder = NodeFinder(frame, stmts, tree, lasti, source) 

369 node = node_finder.result 

370 decorator = node_finder.decorator 

371 except Exception: 

372 if TESTING: 

373 raise 

374 

375 assert stmts is not None 

376 if node: 

377 new_stmts = {statement_containing_node(node)} 

378 assert_(new_stmts <= stmts) 

379 stmts = new_stmts 

380 

381 executing_cache[key] = args = source, node, stmts, decorator 

382 

383 return Executing(frame, *args) 

384 

385 @classmethod 

386 def _class_local(cls, name, default): 

387 # type: (str, T) -> T 

388 """ 

389 Returns an attribute directly associated with this class 

390 (as opposed to subclasses), setting default if necessary 

391 """ 

392 # classes have a mappingproxy preventing us from using setdefault 

393 result = cls.__dict__.get(name, default) 

394 setattr(cls, name, result) 

395 return result 

396 

397 @cache 

398 def statements_at_line(self, lineno): 

399 # type: (int) -> Set[EnhancedAST] 

400 """ 

401 Returns the statement nodes overlapping the given line. 

402 

403 Returns at most one statement unless semicolons are present. 

404 

405 If the `text` attribute is not valid python, meaning 

406 `tree` is None, returns an empty set. 

407 

408 Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)` 

409 should return at least one statement. 

410 """ 

411 

412 return { 

413 statement_containing_node(node) 

414 for node in 

415 self._nodes_by_line[lineno] 

416 } 

417 

418 def asttext(self): 

419 # type: () -> ASTText 

420 """ 

421 Returns an ASTText object for getting the source of specific AST nodes. 

422 

423 See http://asttokens.readthedocs.io/en/latest/api-index.html 

424 """ 

425 from asttokens import ASTText # must be installed separately 

426 

427 if self._asttext is None: 

428 self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename) 

429 

430 return self._asttext 

431 

432 def asttokens(self): 

433 # type: () -> ASTTokens 

434 """ 

435 Returns an ASTTokens object for getting the source of specific AST nodes. 

436 

437 See http://asttokens.readthedocs.io/en/latest/api-index.html 

438 """ 

439 import asttokens # must be installed separately 

440 

441 if self._asttokens is None: 

442 if hasattr(asttokens, 'ASTText'): 

443 self._asttokens = self.asttext().asttokens 

444 else: # pragma: no cover 

445 self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename) 

446 return self._asttokens 

447 

448 def _asttext_base(self): 

449 # type: () -> ASTTextBase 

450 import asttokens # must be installed separately 

451 

452 if hasattr(asttokens, 'ASTText'): 

453 return self.asttext() 

454 else: # pragma: no cover 

455 return self.asttokens() 

456 

457 @staticmethod 

458 def decode_source(source): 

459 # type: (Union[str, bytes]) -> text_type 

460 if isinstance(source, bytes): 

461 encoding = Source.detect_encoding(source) 

462 return source.decode(encoding) 

463 else: 

464 return source 

465 

466 @staticmethod 

467 def detect_encoding(source): 

468 # type: (bytes) -> str 

469 return detect_encoding(io.BytesIO(source).readline)[0] 

470 

471 def code_qualname(self, code): 

472 # type: (types.CodeType) -> str 

473 """ 

474 Imitates the __qualname__ attribute of functions for code objects. 

475 Given: 

476 

477 - A function `func` 

478 - A frame `frame` for an execution of `func`, meaning: 

479 `frame.f_code is func.__code__` 

480 

481 `Source.for_frame(frame).code_qualname(frame.f_code)` 

482 will be equal to `func.__qualname__`*. Works for Python 2 as well, 

483 where of course no `__qualname__` attribute exists. 

484 

485 Falls back to `code.co_name` if there is no appropriate qualname. 

486 

487 Based on https://github.com/wbolster/qualname 

488 

489 (* unless `func` is a lambda 

490 nested inside another lambda on the same line, in which case 

491 the outer lambda's qualname will be returned for the codes 

492 of both lambdas) 

493 """ 

494 assert_(code.co_filename == self.filename) 

495 return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name) 

496 

497 

498class Executing(object): 

499 """ 

500 Information about the operation a frame is currently executing. 

501 

502 Generally you will just want `node`, which is the AST node being executed, 

503 or None if it's unknown. 

504 

505 If a decorator is currently being called, then: 

506 - `node` is a function or class definition 

507 - `decorator` is the expression in `node.decorator_list` being called 

508 - `statements == {node}` 

509 """ 

510 

511 def __init__(self, frame, source, node, stmts, decorator): 

512 # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None 

513 self.frame = frame 

514 self.source = source 

515 self.node = node 

516 self.statements = stmts 

517 self.decorator = decorator 

518 

519 def code_qualname(self): 

520 # type: () -> str 

521 return self.source.code_qualname(self.frame.f_code) 

522 

523 def text(self): 

524 # type: () -> str 

525 return self.source._asttext_base().get_text(self.node) 

526 

527 def text_range(self): 

528 # type: () -> Tuple[int, int] 

529 return self.source._asttext_base().get_text_range(self.node) 

530 

531 

532class QualnameVisitor(ast.NodeVisitor): 

533 def __init__(self): 

534 # type: () -> None 

535 super(QualnameVisitor, self).__init__() 

536 self.stack = [] # type: List[str] 

537 self.qualnames = {} # type: Dict[Tuple[str, int], str] 

538 

539 def add_qualname(self, node, name=None): 

540 # type: (ast.AST, Optional[str]) -> None 

541 name = name or node.name # type: ignore[attr-defined] 

542 self.stack.append(name) 

543 if getattr(node, 'decorator_list', ()): 

544 lineno = node.decorator_list[0].lineno # type: ignore[attr-defined] 

545 else: 

546 lineno = node.lineno # type: ignore[attr-defined] 

547 self.qualnames.setdefault((name, lineno), ".".join(self.stack)) 

548 

549 def visit_FunctionDef(self, node, name=None): 

550 # type: (ast.AST, Optional[str]) -> None 

551 if sys.version_info[0] == 3: 

552 assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node 

553 else: 

554 assert isinstance(node, (ast.FunctionDef, ast.Lambda)), node 

555 self.add_qualname(node, name) 

556 self.stack.append('<locals>') 

557 children = [] # type: Sequence[ast.AST] 

558 if isinstance(node, ast.Lambda): 

559 children = [node.body] 

560 else: 

561 children = node.body 

562 for child in children: 

563 self.visit(child) 

564 self.stack.pop() 

565 self.stack.pop() 

566 

567 # Find lambdas in the function definition outside the body, 

568 # e.g. decorators or default arguments 

569 # Based on iter_child_nodes 

570 for field, child in ast.iter_fields(node): 

571 if field == 'body': 

572 continue 

573 if isinstance(child, ast.AST): 

574 self.visit(child) 

575 elif isinstance(child, list): 

576 for grandchild in child: 

577 if isinstance(grandchild, ast.AST): 

578 self.visit(grandchild) 

579 

580 visit_AsyncFunctionDef = visit_FunctionDef 

581 

582 def visit_Lambda(self, node): 

583 # type: (ast.AST) -> None 

584 assert isinstance(node, ast.Lambda) 

585 self.visit_FunctionDef(node, '<lambda>') 

586 

587 def visit_ClassDef(self, node): 

588 # type: (ast.AST) -> None 

589 assert isinstance(node, ast.ClassDef) 

590 self.add_qualname(node) 

591 self.generic_visit(node) 

592 self.stack.pop() 

593 

594 

595 

596 

597 

598future_flags = sum( 

599 getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names 

600) 

601 

602 

603def compile_similar_to(source, matching_code): 

604 # type: (ast.Module, types.CodeType) -> Any 

605 return compile( 

606 source, 

607 matching_code.co_filename, 

608 'exec', 

609 flags=future_flags & matching_code.co_flags, 

610 dont_inherit=True, 

611 ) 

612 

613 

614sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698' 

615 

616def is_rewritten_by_pytest(code): 

617 # type: (types.CodeType) -> bool 

618 return any( 

619 bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py") 

620 for bc in get_instructions(code) 

621 ) 

622 

623 

624class SentinelNodeFinder(object): 

625 result = None # type: EnhancedAST 

626 

627 def __init__(self, frame, stmts, tree, lasti, source): 

628 # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None 

629 assert_(stmts) 

630 self.frame = frame 

631 self.tree = tree 

632 self.code = code = frame.f_code 

633 self.is_pytest = is_rewritten_by_pytest(code) 

634 

635 if self.is_pytest: 

636 self.ignore_linenos = frozenset(assert_linenos(tree)) 

637 else: 

638 self.ignore_linenos = frozenset() 

639 

640 self.decorator = None 

641 

642 self.instruction = instruction = self.get_actual_current_instruction(lasti) 

643 op_name = instruction.opname 

644 extra_filter = lambda e: True # type: Callable[[Any], bool] 

645 ctx = type(None) # type: Type 

646 

647 typ = type(None) # type: Type 

648 if op_name.startswith('CALL_'): 

649 typ = ast.Call 

650 elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')): 

651 typ = ast.Subscript 

652 ctx = ast.Load 

653 elif op_name.startswith('BINARY_'): 

654 typ = ast.BinOp 

655 op_type = dict( 

656 BINARY_POWER=ast.Pow, 

657 BINARY_MULTIPLY=ast.Mult, 

658 BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()), 

659 BINARY_FLOOR_DIVIDE=ast.FloorDiv, 

660 BINARY_TRUE_DIVIDE=ast.Div, 

661 BINARY_MODULO=ast.Mod, 

662 BINARY_ADD=ast.Add, 

663 BINARY_SUBTRACT=ast.Sub, 

664 BINARY_LSHIFT=ast.LShift, 

665 BINARY_RSHIFT=ast.RShift, 

666 BINARY_AND=ast.BitAnd, 

667 BINARY_XOR=ast.BitXor, 

668 BINARY_OR=ast.BitOr, 

669 )[op_name] 

670 extra_filter = lambda e: isinstance(e.op, op_type) 

671 elif op_name.startswith('UNARY_'): 

672 typ = ast.UnaryOp 

673 op_type = dict( 

674 UNARY_POSITIVE=ast.UAdd, 

675 UNARY_NEGATIVE=ast.USub, 

676 UNARY_NOT=ast.Not, 

677 UNARY_INVERT=ast.Invert, 

678 )[op_name] 

679 extra_filter = lambda e: isinstance(e.op, op_type) 

680 elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'): 

681 typ = ast.Attribute 

682 ctx = ast.Load 

683 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) 

684 elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'): 

685 typ = ast.Name 

686 ctx = ast.Load 

687 if sys.version_info[0] == 3 or instruction.argval: 

688 extra_filter = lambda e: e.id == instruction.argval 

689 elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'): 

690 typ = ast.Compare 

691 extra_filter = lambda e: len(e.ops) == 1 

692 elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')): 

693 ctx = ast.Store 

694 typ = ast.Subscript 

695 elif op_name.startswith('STORE_ATTR'): 

696 ctx = ast.Store 

697 typ = ast.Attribute 

698 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) 

699 else: 

700 raise RuntimeError(op_name) 

701 

702 with lock: 

703 exprs = { 

704 cast(EnhancedAST, node) 

705 for stmt in stmts 

706 for node in ast.walk(stmt) 

707 if isinstance(node, typ) 

708 if isinstance(getattr(node, "ctx", None), ctx) 

709 if extra_filter(node) 

710 if statement_containing_node(node) == stmt 

711 } 

712 

713 if ctx == ast.Store: 

714 # No special bytecode tricks here. 

715 # We can handle multiple assigned attributes with different names, 

716 # but only one assigned subscript. 

717 self.result = only(exprs) 

718 return 

719 

720 matching = list(self.matching_nodes(exprs)) 

721 if not matching and typ == ast.Call: 

722 self.find_decorator(stmts) 

723 else: 

724 self.result = only(matching) 

725 

726 def find_decorator(self, stmts): 

727 # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None 

728 stmt = only(stmts) 

729 assert_(isinstance(stmt, (ast.ClassDef, function_node_types))) 

730 decorators = stmt.decorator_list # type: ignore[attr-defined] 

731 assert_(decorators) 

732 line_instructions = [ 

733 inst 

734 for inst in self.clean_instructions(self.code) 

735 if inst.lineno == self.frame.f_lineno 

736 ] 

737 last_decorator_instruction_index = [ 

738 i 

739 for i, inst in enumerate(line_instructions) 

740 if inst.opname == "CALL_FUNCTION" 

741 ][-1] 

742 assert_( 

743 line_instructions[last_decorator_instruction_index + 1].opname.startswith( 

744 "STORE_" 

745 ) 

746 ) 

747 decorator_instructions = line_instructions[ 

748 last_decorator_instruction_index 

749 - len(decorators) 

750 + 1 : last_decorator_instruction_index 

751 + 1 

752 ] 

753 assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"}) 

754 decorator_index = decorator_instructions.index(self.instruction) 

755 decorator = decorators[::-1][decorator_index] 

756 self.decorator = decorator 

757 self.result = stmt 

758 

759 def clean_instructions(self, code): 

760 # type: (types.CodeType) -> List[EnhancedInstruction] 

761 return [ 

762 inst 

763 for inst in get_instructions(code) 

764 if inst.opname not in ("EXTENDED_ARG", "NOP") 

765 if inst.lineno not in self.ignore_linenos 

766 ] 

767 

768 def get_original_clean_instructions(self): 

769 # type: () -> List[EnhancedInstruction] 

770 result = self.clean_instructions(self.code) 

771 

772 # pypy sometimes (when is not clear) 

773 # inserts JUMP_IF_NOT_DEBUG instructions in bytecode 

774 # If they're not present in our compiled instructions, 

775 # ignore them in the original bytecode 

776 if not any( 

777 inst.opname == "JUMP_IF_NOT_DEBUG" 

778 for inst in self.compile_instructions() 

779 ): 

780 result = [ 

781 inst for inst in result 

782 if inst.opname != "JUMP_IF_NOT_DEBUG" 

783 ] 

784 

785 return result 

786 

787 def matching_nodes(self, exprs): 

788 # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST] 

789 original_instructions = self.get_original_clean_instructions() 

790 original_index = only( 

791 i 

792 for i, inst in enumerate(original_instructions) 

793 if inst == self.instruction 

794 ) 

795 for expr_index, expr in enumerate(exprs): 

796 setter = get_setter(expr) 

797 assert setter is not None 

798 # noinspection PyArgumentList 

799 replacement = ast.BinOp( 

800 left=expr, 

801 op=ast.Pow(), 

802 right=ast.Str(s=sentinel), 

803 ) 

804 ast.fix_missing_locations(replacement) 

805 setter(replacement) 

806 try: 

807 instructions = self.compile_instructions() 

808 finally: 

809 setter(expr) 

810 

811 if sys.version_info >= (3, 10): 

812 try: 

813 handle_jumps(instructions, original_instructions) 

814 except Exception: 

815 # Give other candidates a chance 

816 if TESTING or expr_index < len(exprs) - 1: 

817 continue 

818 raise 

819 

820 indices = [ 

821 i 

822 for i, instruction in enumerate(instructions) 

823 if instruction.argval == sentinel 

824 ] 

825 

826 # There can be several indices when the bytecode is duplicated, 

827 # as happens in a finally block in 3.9+ 

828 # First we remove the opcodes caused by our modifications 

829 for index_num, sentinel_index in enumerate(indices): 

830 # Adjustment for removing sentinel instructions below 

831 # in past iterations 

832 sentinel_index -= index_num * 2 

833 

834 assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST') 

835 assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER') 

836 

837 # Then we see if any of the instruction indices match 

838 for index_num, sentinel_index in enumerate(indices): 

839 sentinel_index -= index_num * 2 

840 new_index = sentinel_index - 1 

841 

842 if new_index != original_index: 

843 continue 

844 

845 original_inst = original_instructions[original_index] 

846 new_inst = instructions[new_index] 

847 

848 # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)' 

849 # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT 

850 if ( 

851 original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP') 

852 and original_inst.arg != new_inst.arg # type: ignore[attr-defined] 

853 and ( 

854 original_instructions[original_index + 1].opname 

855 != instructions[new_index + 1].opname == 'UNARY_NOT' 

856 )): 

857 # Remove the difference for the upcoming assert 

858 instructions.pop(new_index + 1) 

859 

860 # Check that the modified instructions don't have anything unexpected 

861 # 3.10 is a bit too weird to assert this in all cases but things still work 

862 if sys.version_info < (3, 10): 

863 for inst1, inst2 in zip_longest( 

864 original_instructions, instructions 

865 ): 

866 assert_(inst1 and inst2 and opnames_match(inst1, inst2)) 

867 

868 yield expr 

869 

870 def compile_instructions(self): 

871 # type: () -> List[EnhancedInstruction] 

872 module_code = compile_similar_to(self.tree, self.code) 

873 code = only(self.find_codes(module_code)) 

874 return self.clean_instructions(code) 

875 

876 def find_codes(self, root_code): 

877 # type: (types.CodeType) -> list 

878 checks = [ 

879 attrgetter('co_firstlineno'), 

880 attrgetter('co_freevars'), 

881 attrgetter('co_cellvars'), 

882 lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name, 

883 ] # type: List[Callable] 

884 if not self.is_pytest: 

885 checks += [ 

886 attrgetter('co_names'), 

887 attrgetter('co_varnames'), 

888 ] 

889 

890 def matches(c): 

891 # type: (types.CodeType) -> bool 

892 return all( 

893 f(c) == f(self.code) 

894 for f in checks 

895 ) 

896 

897 code_options = [] 

898 if matches(root_code): 

899 code_options.append(root_code) 

900 

901 def finder(code): 

902 # type: (types.CodeType) -> None 

903 for const in code.co_consts: 

904 if not inspect.iscode(const): 

905 continue 

906 

907 if matches(const): 

908 code_options.append(const) 

909 finder(const) 

910 

911 finder(root_code) 

912 return code_options 

913 

914 def get_actual_current_instruction(self, lasti): 

915 # type: (int) -> EnhancedInstruction 

916 """ 

917 Get the instruction corresponding to the current 

918 frame offset, skipping EXTENDED_ARG instructions 

919 """ 

920 # Don't use get_original_clean_instructions 

921 # because we need the actual instructions including 

922 # EXTENDED_ARG 

923 instructions = list(get_instructions(self.code)) 

924 index = only( 

925 i 

926 for i, inst in enumerate(instructions) 

927 if inst.offset == lasti 

928 ) 

929 

930 while True: 

931 instruction = instructions[index] 

932 if instruction.opname != "EXTENDED_ARG": 

933 return instruction 

934 index += 1 

935 

936 

937 

938def non_sentinel_instructions(instructions, start): 

939 # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]] 

940 """ 

941 Yields (index, instruction) pairs excluding the basic 

942 instructions introduced by the sentinel transformation 

943 """ 

944 skip_power = False 

945 for i, inst in islice(enumerate(instructions), start, None): 

946 if inst.argval == sentinel: 

947 assert_(inst.opname == "LOAD_CONST") 

948 skip_power = True 

949 continue 

950 elif skip_power: 

951 assert_(inst.opname == "BINARY_POWER") 

952 skip_power = False 

953 continue 

954 yield i, inst 

955 

956 

957def walk_both_instructions(original_instructions, original_start, instructions, start): 

958 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]] 

959 """ 

960 Yields matching indices and instructions from the new and original instructions, 

961 leaving out changes made by the sentinel transformation. 

962 """ 

963 original_iter = islice(enumerate(original_instructions), original_start, None) 

964 new_iter = non_sentinel_instructions(instructions, start) 

965 inverted_comparison = False 

966 while True: 

967 try: 

968 original_i, original_inst = next(original_iter) 

969 new_i, new_inst = next(new_iter) 

970 except StopIteration: 

971 return 

972 if ( 

973 inverted_comparison 

974 and original_inst.opname != new_inst.opname == "UNARY_NOT" 

975 ): 

976 new_i, new_inst = next(new_iter) 

977 inverted_comparison = ( 

978 original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP") 

979 and original_inst.arg != new_inst.arg # type: ignore[attr-defined] 

980 ) 

981 yield original_i, original_inst, new_i, new_inst 

982 

983 

984def handle_jumps(instructions, original_instructions): 

985 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None 

986 """ 

987 Transforms instructions in place until it looks more like original_instructions. 

988 This is only needed in 3.10+ where optimisations lead to more drastic changes 

989 after the sentinel transformation. 

990 Replaces JUMP instructions that aren't also present in original_instructions 

991 with the sections that they jump to until a raise or return. 

992 In some other cases duplication found in `original_instructions` 

993 is replicated in `instructions`. 

994 """ 

995 while True: 

996 for original_i, original_inst, new_i, new_inst in walk_both_instructions( 

997 original_instructions, 0, instructions, 0 

998 ): 

999 if opnames_match(original_inst, new_inst): 

1000 continue 

1001 

1002 if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname: 

1003 # Find where the new instruction is jumping to, ignoring 

1004 # instructions which have been copied in previous iterations 

1005 start = only( 

1006 i 

1007 for i, inst in enumerate(instructions) 

1008 if inst.offset == new_inst.argval 

1009 and not getattr(inst, "_copied", False) 

1010 ) 

1011 # Replace the jump instruction with the jumped to section of instructions 

1012 # That section may also be deleted if it's not similarly duplicated 

1013 # in original_instructions 

1014 new_instructions = handle_jump( 

1015 original_instructions, original_i, instructions, start 

1016 ) 

1017 assert new_instructions is not None 

1018 instructions[new_i : new_i + 1] = new_instructions 

1019 else: 

1020 # Extract a section of original_instructions from original_i to return/raise 

1021 orig_section = [] 

1022 for section_inst in original_instructions[original_i:]: 

1023 orig_section.append(section_inst) 

1024 if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"): 

1025 break 

1026 else: 

1027 # No return/raise - this is just a mismatch we can't handle 

1028 raise AssertionError 

1029 

1030 instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions)) 

1031 

1032 # instructions has been modified, the for loop can't sensibly continue 

1033 # Restart it from the beginning, checking for other issues 

1034 break 

1035 

1036 else: # No mismatched jumps found, we're done 

1037 return 

1038 

1039 

1040def find_new_matching(orig_section, instructions): 

1041 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]] 

1042 """ 

1043 Yields sections of `instructions` which match `orig_section`. 

1044 The yielded sections include sentinel instructions, but these 

1045 are ignored when checking for matches. 

1046 """ 

1047 for start in range(len(instructions) - len(orig_section)): 

1048 indices, dup_section = zip( 

1049 *islice( 

1050 non_sentinel_instructions(instructions, start), 

1051 len(orig_section), 

1052 ) 

1053 ) 

1054 if len(dup_section) < len(orig_section): 

1055 return 

1056 if sections_match(orig_section, dup_section): 

1057 yield instructions[start:indices[-1] + 1] 

1058 

1059 

1060def handle_jump(original_instructions, original_start, instructions, start): 

1061 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]] 

1062 """ 

1063 Returns the section of instructions starting at `start` and ending 

1064 with a RETURN_VALUE or RAISE_VARARGS instruction. 

1065 There should be a matching section in original_instructions starting at original_start. 

1066 If that section doesn't appear elsewhere in original_instructions, 

1067 then also delete the returned section of instructions. 

1068 """ 

1069 for original_j, original_inst, new_j, new_inst in walk_both_instructions( 

1070 original_instructions, original_start, instructions, start 

1071 ): 

1072 assert_(opnames_match(original_inst, new_inst)) 

1073 if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"): 

1074 inlined = deepcopy(instructions[start : new_j + 1]) 

1075 for inl in inlined: 

1076 inl._copied = True 

1077 orig_section = original_instructions[original_start : original_j + 1] 

1078 if not check_duplicates( 

1079 original_start, orig_section, original_instructions 

1080 ): 

1081 instructions[start : new_j + 1] = [] 

1082 return inlined 

1083 

1084 return None 

1085 

1086 

1087def check_duplicates(original_i, orig_section, original_instructions): 

1088 # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool 

1089 """ 

1090 Returns True if a section of original_instructions starting somewhere other 

1091 than original_i and matching orig_section is found, i.e. orig_section is duplicated. 

1092 """ 

1093 for dup_start in range(len(original_instructions)): 

1094 if dup_start == original_i: 

1095 continue 

1096 dup_section = original_instructions[dup_start : dup_start + len(orig_section)] 

1097 if len(dup_section) < len(orig_section): 

1098 return False 

1099 if sections_match(orig_section, dup_section): 

1100 return True 

1101 

1102 return False 

1103 

1104def sections_match(orig_section, dup_section): 

1105 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool 

1106 """ 

1107 Returns True if the given lists of instructions have matching linenos and opnames. 

1108 """ 

1109 return all( 

1110 ( 

1111 orig_inst.lineno == dup_inst.lineno 

1112 # POP_BLOCKs have been found to have differing linenos in innocent cases 

1113 or "POP_BLOCK" == orig_inst.opname == dup_inst.opname 

1114 ) 

1115 and opnames_match(orig_inst, dup_inst) 

1116 for orig_inst, dup_inst in zip(orig_section, dup_section) 

1117 ) 

1118 

1119 

1120def opnames_match(inst1, inst2): 

1121 # type: (Instruction, Instruction) -> bool 

1122 return ( 

1123 inst1.opname == inst2.opname 

1124 or "JUMP" in inst1.opname 

1125 and "JUMP" in inst2.opname 

1126 or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP") 

1127 or ( 

1128 inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD") 

1129 and inst2.opname == "LOAD_ATTR" 

1130 ) 

1131 or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION") 

1132 ) 

1133 

1134 

1135def get_setter(node): 

1136 # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]] 

1137 parent = node.parent 

1138 for name, field in ast.iter_fields(parent): 

1139 if field is node: 

1140 def setter(new_node): 

1141 # type: (ast.AST) -> None 

1142 return setattr(parent, name, new_node) 

1143 return setter 

1144 elif isinstance(field, list): 

1145 for i, item in enumerate(field): 

1146 if item is node: 

1147 def setter(new_node): 

1148 # type: (ast.AST) -> None 

1149 field[i] = new_node 

1150 

1151 return setter 

1152 return None 

1153 

1154lock = RLock() 

1155 

1156 

1157@cache 

1158def statement_containing_node(node): 

1159 # type: (ast.AST) -> EnhancedAST 

1160 while not isinstance(node, ast.stmt): 

1161 node = cast(EnhancedAST, node).parent 

1162 return cast(EnhancedAST, node) 

1163 

1164 

1165def assert_linenos(tree): 

1166 # type: (ast.AST) -> Iterator[int] 

1167 for node in ast.walk(tree): 

1168 if ( 

1169 hasattr(node, 'parent') and 

1170 isinstance(statement_containing_node(node), ast.Assert) 

1171 ): 

1172 for lineno in node_linenos(node): 

1173 yield lineno 

1174 

1175 

1176def _extract_ipython_statement(stmt): 

1177 # type: (EnhancedAST) -> ast.Module 

1178 # IPython separates each statement in a cell to be executed separately 

1179 # So NodeFinder should only compile one statement at a time or it 

1180 # will find a code mismatch. 

1181 while not isinstance(stmt.parent, ast.Module): 

1182 stmt = stmt.parent 

1183 # use `ast.parse` instead of `ast.Module` for better portability 

1184 # python3.8 changes the signature of `ast.Module` 

1185 # Inspired by https://github.com/pallets/werkzeug/pull/1552/files 

1186 tree = ast.parse("") 

1187 tree.body = [cast(ast.stmt, stmt)] 

1188 ast.copy_location(tree, stmt) 

1189 return tree 

1190 

1191 

1192def is_ipython_cell_code_name(code_name): 

1193 # type: (str) -> bool 

1194 return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name)) 

1195 

1196 

1197def is_ipython_cell_filename(filename): 

1198 # type: (str) -> bool 

1199 return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename)) 

1200 

1201 

1202def is_ipython_cell_code(code_obj): 

1203 # type: (types.CodeType) -> bool 

1204 return ( 

1205 is_ipython_cell_filename(code_obj.co_filename) and 

1206 is_ipython_cell_code_name(code_obj.co_name) 

1207 ) 

1208 

1209 

1210def find_node_ipython(frame, lasti, stmts, source): 

1211 # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]] 

1212 node = decorator = None 

1213 for stmt in stmts: 

1214 tree = _extract_ipython_statement(stmt) 

1215 try: 

1216 node_finder = NodeFinder(frame, stmts, tree, lasti, source) 

1217 if (node or decorator) and (node_finder.result or node_finder.decorator): 

1218 # Found potential nodes in separate statements, 

1219 # cannot resolve ambiguity, give up here 

1220 return None, None 

1221 

1222 node = node_finder.result 

1223 decorator = node_finder.decorator 

1224 except Exception: 

1225 pass 

1226 return decorator, node 

1227 

1228 

1229def attr_names_match(attr, argval): 

1230 # type: (str, str) -> bool 

1231 """ 

1232 Checks that the user-visible attr (from ast) can correspond to 

1233 the argval in the bytecode, i.e. the real attribute fetched internally, 

1234 which may be mangled for private attributes. 

1235 """ 

1236 if attr == argval: 

1237 return True 

1238 if not attr.startswith("__"): 

1239 return False 

1240 return bool(re.match(r"^_\w+%s$" % attr, argval)) 

1241 

1242 

1243def node_linenos(node): 

1244 # type: (ast.AST) -> Iterator[int] 

1245 if hasattr(node, "lineno"): 

1246 linenos = [] # type: Sequence[int] 

1247 if hasattr(node, "end_lineno") and isinstance(node, ast.expr): 

1248 assert node.end_lineno is not None # type: ignore[attr-defined] 

1249 linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined] 

1250 else: 

1251 linenos = [node.lineno] # type: ignore[attr-defined] 

1252 for lineno in linenos: 

1253 yield lineno 

1254 

1255 

1256if sys.version_info >= (3, 11): 

1257 from ._position_node_finder import PositionNodeFinder as NodeFinder 

1258else: 

1259 NodeFinder = SentinelNodeFinder 

1260