Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/executing/executing.py: 20%

528 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1""" 

2MIT License 

3 

4Copyright (c) 2021 Alex Hall 

5 

6Permission is hereby granted, free of charge, to any person obtaining a copy 

7of this software and associated documentation files (the "Software"), to deal 

8in the Software without restriction, including without limitation the rights 

9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 

10copies of the Software, and to permit persons to whom the Software is 

11furnished to do so, subject to the following conditions: 

12 

13The above copyright notice and this permission notice shall be included in all 

14copies or substantial portions of the Software. 

15 

16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 

21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 

22SOFTWARE. 

23""" 

24 

25import __future__ 

26import ast 

27import dis 

28import inspect 

29import io 

30import linecache 

31import re 

32import sys 

33import types 

34from collections import defaultdict 

35from copy import deepcopy 

36from functools import lru_cache 

37from itertools import islice 

38from itertools import zip_longest 

39from operator import attrgetter 

40from pathlib import Path 

41from threading import RLock 

42from tokenize import detect_encoding 

43from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, \ 

44 Type, TypeVar, Union, cast 

45 

46if TYPE_CHECKING: # pragma: no cover 

47 from asttokens import ASTTokens, ASTText 

48 from asttokens.asttokens import ASTTextBase 

49 

50 

51function_node_types = (ast.FunctionDef, ast.AsyncFunctionDef) # type: Tuple[Type, ...] 

52 

53cache = lru_cache(maxsize=None) 

54 

55# Type class used to expand out the definition of AST to include fields added by this library 

56# It's not actually used for anything other than type checking though! 

57class EnhancedAST(ast.AST): 

58 parent = None # type: EnhancedAST 

59 

60 

61class Instruction(dis.Instruction): 

62 lineno = None # type: int 

63 

64 

65# Type class used to expand out the definition of AST to include fields added by this library 

66# It's not actually used for anything other than type checking though! 

67class EnhancedInstruction(Instruction): 

68 _copied = None # type: bool 

69 

70 

71 

72def assert_(condition, message=""): 

73 # type: (Any, str) -> None 

74 """ 

75 Like an assert statement, but unaffected by -O 

76 :param condition: value that is expected to be truthy 

77 :type message: Any 

78 """ 

79 if not condition: 

80 raise AssertionError(str(message)) 

81 

82 

83def get_instructions(co): 

84 # type: (types.CodeType) -> Iterator[EnhancedInstruction] 

85 lineno = co.co_firstlineno 

86 for inst in dis.get_instructions(co): 

87 inst = cast(EnhancedInstruction, inst) 

88 lineno = inst.starts_line or lineno 

89 assert_(lineno) 

90 inst.lineno = lineno 

91 yield inst 

92 

93 

94TESTING = 0 

95 

96 

97class NotOneValueFound(Exception): 

98 def __init__(self,msg,values=[]): 

99 # type: (str, Sequence) -> None 

100 self.values=values 

101 super(NotOneValueFound,self).__init__(msg) 

102 

103T = TypeVar('T') 

104 

105 

106def only(it): 

107 # type: (Iterable[T]) -> T 

108 if isinstance(it, Sized): 

109 if len(it) != 1: 

110 raise NotOneValueFound('Expected one value, found %s' % len(it)) 

111 # noinspection PyTypeChecker 

112 return list(it)[0] 

113 

114 lst = tuple(islice(it, 2)) 

115 if len(lst) == 0: 

116 raise NotOneValueFound('Expected one value, found 0') 

117 if len(lst) > 1: 

118 raise NotOneValueFound('Expected one value, found several',lst) 

119 return lst[0] 

120 

121 

122class Source(object): 

123 """ 

124 The source code of a single file and associated metadata. 

125 

126 The main method of interest is the classmethod `executing(frame)`. 

127 

128 If you want an instance of this class, don't construct it. 

129 Ideally use the classmethod `for_frame(frame)`. 

130 If you don't have a frame, use `for_filename(filename [, module_globals])`. 

131 These methods cache instances by filename, so at most one instance exists per filename. 

132 

133 Attributes: 

134 - filename 

135 - text 

136 - lines 

137 - tree: AST parsed from text, or None if text is not valid Python 

138 All nodes in the tree have an extra `parent` attribute 

139 

140 Other methods of interest: 

141 - statements_at_line 

142 - asttokens 

143 - code_qualname 

144 """ 

145 

146 def __init__(self, filename, lines): 

147 # type: (str, Sequence[str]) -> None 

148 """ 

149 Don't call this constructor, see the class docstring. 

150 """ 

151 

152 self.filename = filename 

153 self.text = ''.join(lines) 

154 self.lines = [line.rstrip('\r\n') for line in lines] 

155 

156 self._nodes_by_line = defaultdict(list) 

157 self.tree = None 

158 self._qualnames = {} 

159 self._asttokens = None # type: Optional[ASTTokens] 

160 self._asttext = None # type: Optional[ASTText] 

161 

162 try: 

163 self.tree = ast.parse(self.text, filename=filename) 

164 except (SyntaxError, ValueError): 

165 pass 

166 else: 

167 for node in ast.walk(self.tree): 

168 for child in ast.iter_child_nodes(node): 

169 cast(EnhancedAST, child).parent = cast(EnhancedAST, node) 

170 for lineno in node_linenos(node): 

171 self._nodes_by_line[lineno].append(node) 

172 

173 visitor = QualnameVisitor() 

174 visitor.visit(self.tree) 

175 self._qualnames = visitor.qualnames 

176 

177 @classmethod 

178 def for_frame(cls, frame, use_cache=True): 

179 # type: (types.FrameType, bool) -> "Source" 

180 """ 

181 Returns the `Source` object corresponding to the file the frame is executing in. 

182 """ 

183 return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache) 

184 

185 @classmethod 

186 def for_filename( 

187 cls, 

188 filename, 

189 module_globals=None, 

190 use_cache=True, # noqa no longer used 

191 ): 

192 # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source" 

193 if isinstance(filename, Path): 

194 filename = str(filename) 

195 

196 def get_lines(): 

197 # type: () -> List[str] 

198 return linecache.getlines(cast(str, filename), module_globals) 

199 

200 # Save the current linecache entry, then ensure the cache is up to date. 

201 entry = linecache.cache.get(filename) # type: ignore[attr-defined] 

202 linecache.checkcache(filename) 

203 lines = get_lines() 

204 if entry is not None and not lines: 

205 # There was an entry, checkcache removed it, and nothing replaced it. 

206 # This means the file wasn't simply changed (because the `lines` wouldn't be empty) 

207 # but rather the file was found not to exist, probably because `filename` was fake. 

208 # Restore the original entry so that we still have something. 

209 linecache.cache[filename] = entry # type: ignore[attr-defined] 

210 lines = get_lines() 

211 

212 return cls._for_filename_and_lines(filename, tuple(lines)) 

213 

214 @classmethod 

215 def _for_filename_and_lines(cls, filename, lines): 

216 # type: (str, Sequence[str]) -> "Source" 

217 source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source] 

218 try: 

219 return source_cache[(filename, lines)] 

220 except KeyError: 

221 pass 

222 

223 result = source_cache[(filename, lines)] = cls(filename, lines) 

224 return result 

225 

226 @classmethod 

227 def lazycache(cls, frame): 

228 # type: (types.FrameType) -> None 

229 linecache.lazycache(frame.f_code.co_filename, frame.f_globals) 

230 

231 @classmethod 

232 def executing(cls, frame_or_tb): 

233 # type: (Union[types.TracebackType, types.FrameType]) -> "Executing" 

234 """ 

235 Returns an `Executing` object representing the operation 

236 currently executing in the given frame or traceback object. 

237 """ 

238 if isinstance(frame_or_tb, types.TracebackType): 

239 # https://docs.python.org/3/reference/datamodel.html#traceback-objects 

240 # "tb_lineno gives the line number where the exception occurred; 

241 # tb_lasti indicates the precise instruction. 

242 # The line number and last instruction in the traceback may differ 

243 # from the line number of its frame object 

244 # if the exception occurred in a try statement with no matching except clause 

245 # or with a finally clause." 

246 tb = frame_or_tb 

247 frame = tb.tb_frame 

248 lineno = tb.tb_lineno 

249 lasti = tb.tb_lasti 

250 else: 

251 frame = frame_or_tb 

252 lineno = frame.f_lineno 

253 lasti = frame.f_lasti 

254 

255 

256 

257 code = frame.f_code 

258 key = (code, id(code), lasti) 

259 executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any] 

260 

261 args = executing_cache.get(key) 

262 if not args: 

263 node = stmts = decorator = None 

264 source = cls.for_frame(frame) 

265 tree = source.tree 

266 if tree: 

267 try: 

268 stmts = source.statements_at_line(lineno) 

269 if stmts: 

270 if is_ipython_cell_code(code): 

271 decorator, node = find_node_ipython(frame, lasti, stmts, source) 

272 else: 

273 node_finder = NodeFinder(frame, stmts, tree, lasti, source) 

274 node = node_finder.result 

275 decorator = node_finder.decorator 

276 except Exception: 

277 if TESTING: 

278 raise 

279 

280 assert stmts is not None 

281 if node: 

282 new_stmts = {statement_containing_node(node)} 

283 assert_(new_stmts <= stmts) 

284 stmts = new_stmts 

285 

286 executing_cache[key] = args = source, node, stmts, decorator 

287 

288 return Executing(frame, *args) 

289 

290 @classmethod 

291 def _class_local(cls, name, default): 

292 # type: (str, T) -> T 

293 """ 

294 Returns an attribute directly associated with this class 

295 (as opposed to subclasses), setting default if necessary 

296 """ 

297 # classes have a mappingproxy preventing us from using setdefault 

298 result = cls.__dict__.get(name, default) 

299 setattr(cls, name, result) 

300 return result 

301 

302 @cache 

303 def statements_at_line(self, lineno): 

304 # type: (int) -> Set[EnhancedAST] 

305 """ 

306 Returns the statement nodes overlapping the given line. 

307 

308 Returns at most one statement unless semicolons are present. 

309 

310 If the `text` attribute is not valid python, meaning 

311 `tree` is None, returns an empty set. 

312 

313 Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)` 

314 should return at least one statement. 

315 """ 

316 

317 return { 

318 statement_containing_node(node) 

319 for node in 

320 self._nodes_by_line[lineno] 

321 } 

322 

323 def asttext(self): 

324 # type: () -> ASTText 

325 """ 

326 Returns an ASTText object for getting the source of specific AST nodes. 

327 

328 See http://asttokens.readthedocs.io/en/latest/api-index.html 

329 """ 

330 from asttokens import ASTText # must be installed separately 

331 

332 if self._asttext is None: 

333 self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename) 

334 

335 return self._asttext 

336 

337 def asttokens(self): 

338 # type: () -> ASTTokens 

339 """ 

340 Returns an ASTTokens object for getting the source of specific AST nodes. 

341 

342 See http://asttokens.readthedocs.io/en/latest/api-index.html 

343 """ 

344 import asttokens # must be installed separately 

345 

346 if self._asttokens is None: 

347 if hasattr(asttokens, 'ASTText'): 

348 self._asttokens = self.asttext().asttokens 

349 else: # pragma: no cover 

350 self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename) 

351 return self._asttokens 

352 

353 def _asttext_base(self): 

354 # type: () -> ASTTextBase 

355 import asttokens # must be installed separately 

356 

357 if hasattr(asttokens, 'ASTText'): 

358 return self.asttext() 

359 else: # pragma: no cover 

360 return self.asttokens() 

361 

362 @staticmethod 

363 def decode_source(source): 

364 # type: (Union[str, bytes]) -> str 

365 if isinstance(source, bytes): 

366 encoding = Source.detect_encoding(source) 

367 return source.decode(encoding) 

368 else: 

369 return source 

370 

371 @staticmethod 

372 def detect_encoding(source): 

373 # type: (bytes) -> str 

374 return detect_encoding(io.BytesIO(source).readline)[0] 

375 

376 def code_qualname(self, code): 

377 # type: (types.CodeType) -> str 

378 """ 

379 Imitates the __qualname__ attribute of functions for code objects. 

380 Given: 

381 

382 - A function `func` 

383 - A frame `frame` for an execution of `func`, meaning: 

384 `frame.f_code is func.__code__` 

385 

386 `Source.for_frame(frame).code_qualname(frame.f_code)` 

387 will be equal to `func.__qualname__`*. Works for Python 2 as well, 

388 where of course no `__qualname__` attribute exists. 

389 

390 Falls back to `code.co_name` if there is no appropriate qualname. 

391 

392 Based on https://github.com/wbolster/qualname 

393 

394 (* unless `func` is a lambda 

395 nested inside another lambda on the same line, in which case 

396 the outer lambda's qualname will be returned for the codes 

397 of both lambdas) 

398 """ 

399 assert_(code.co_filename == self.filename) 

400 return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name) 

401 

402 

403class Executing(object): 

404 """ 

405 Information about the operation a frame is currently executing. 

406 

407 Generally you will just want `node`, which is the AST node being executed, 

408 or None if it's unknown. 

409 

410 If a decorator is currently being called, then: 

411 - `node` is a function or class definition 

412 - `decorator` is the expression in `node.decorator_list` being called 

413 - `statements == {node}` 

414 """ 

415 

416 def __init__(self, frame, source, node, stmts, decorator): 

417 # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None 

418 self.frame = frame 

419 self.source = source 

420 self.node = node 

421 self.statements = stmts 

422 self.decorator = decorator 

423 

424 def code_qualname(self): 

425 # type: () -> str 

426 return self.source.code_qualname(self.frame.f_code) 

427 

428 def text(self): 

429 # type: () -> str 

430 return self.source._asttext_base().get_text(self.node) 

431 

432 def text_range(self): 

433 # type: () -> Tuple[int, int] 

434 return self.source._asttext_base().get_text_range(self.node) 

435 

436 

437class QualnameVisitor(ast.NodeVisitor): 

438 def __init__(self): 

439 # type: () -> None 

440 super(QualnameVisitor, self).__init__() 

441 self.stack = [] # type: List[str] 

442 self.qualnames = {} # type: Dict[Tuple[str, int], str] 

443 

444 def add_qualname(self, node, name=None): 

445 # type: (ast.AST, Optional[str]) -> None 

446 name = name or node.name # type: ignore[attr-defined] 

447 self.stack.append(name) 

448 if getattr(node, 'decorator_list', ()): 

449 lineno = node.decorator_list[0].lineno # type: ignore[attr-defined] 

450 else: 

451 lineno = node.lineno # type: ignore[attr-defined] 

452 self.qualnames.setdefault((name, lineno), ".".join(self.stack)) 

453 

454 def visit_FunctionDef(self, node, name=None): 

455 # type: (ast.AST, Optional[str]) -> None 

456 assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node 

457 self.add_qualname(node, name) 

458 self.stack.append('<locals>') 

459 children = [] # type: Sequence[ast.AST] 

460 if isinstance(node, ast.Lambda): 

461 children = [node.body] 

462 else: 

463 children = node.body 

464 for child in children: 

465 self.visit(child) 

466 self.stack.pop() 

467 self.stack.pop() 

468 

469 # Find lambdas in the function definition outside the body, 

470 # e.g. decorators or default arguments 

471 # Based on iter_child_nodes 

472 for field, child in ast.iter_fields(node): 

473 if field == 'body': 

474 continue 

475 if isinstance(child, ast.AST): 

476 self.visit(child) 

477 elif isinstance(child, list): 

478 for grandchild in child: 

479 if isinstance(grandchild, ast.AST): 

480 self.visit(grandchild) 

481 

482 visit_AsyncFunctionDef = visit_FunctionDef 

483 

484 def visit_Lambda(self, node): 

485 # type: (ast.AST) -> None 

486 assert isinstance(node, ast.Lambda) 

487 self.visit_FunctionDef(node, '<lambda>') 

488 

489 def visit_ClassDef(self, node): 

490 # type: (ast.AST) -> None 

491 assert isinstance(node, ast.ClassDef) 

492 self.add_qualname(node) 

493 self.generic_visit(node) 

494 self.stack.pop() 

495 

496 

497 

498 

499 

500future_flags = sum( 

501 getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names 

502) 

503 

504 

505def compile_similar_to(source, matching_code): 

506 # type: (ast.Module, types.CodeType) -> Any 

507 return compile( 

508 source, 

509 matching_code.co_filename, 

510 'exec', 

511 flags=future_flags & matching_code.co_flags, 

512 dont_inherit=True, 

513 ) 

514 

515 

516sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698' 

517 

518def is_rewritten_by_pytest(code): 

519 # type: (types.CodeType) -> bool 

520 return any( 

521 bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py") 

522 for bc in get_instructions(code) 

523 ) 

524 

525 

526class SentinelNodeFinder(object): 

527 result = None # type: EnhancedAST 

528 

529 def __init__(self, frame, stmts, tree, lasti, source): 

530 # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None 

531 assert_(stmts) 

532 self.frame = frame 

533 self.tree = tree 

534 self.code = code = frame.f_code 

535 self.is_pytest = is_rewritten_by_pytest(code) 

536 

537 if self.is_pytest: 

538 self.ignore_linenos = frozenset(assert_linenos(tree)) 

539 else: 

540 self.ignore_linenos = frozenset() 

541 

542 self.decorator = None 

543 

544 self.instruction = instruction = self.get_actual_current_instruction(lasti) 

545 op_name = instruction.opname 

546 extra_filter = lambda e: True # type: Callable[[Any], bool] 

547 ctx = type(None) # type: Type 

548 

549 typ = type(None) # type: Type 

550 if op_name.startswith('CALL_'): 

551 typ = ast.Call 

552 elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')): 

553 typ = ast.Subscript 

554 ctx = ast.Load 

555 elif op_name.startswith('BINARY_'): 

556 typ = ast.BinOp 

557 op_type = dict( 

558 BINARY_POWER=ast.Pow, 

559 BINARY_MULTIPLY=ast.Mult, 

560 BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()), 

561 BINARY_FLOOR_DIVIDE=ast.FloorDiv, 

562 BINARY_TRUE_DIVIDE=ast.Div, 

563 BINARY_MODULO=ast.Mod, 

564 BINARY_ADD=ast.Add, 

565 BINARY_SUBTRACT=ast.Sub, 

566 BINARY_LSHIFT=ast.LShift, 

567 BINARY_RSHIFT=ast.RShift, 

568 BINARY_AND=ast.BitAnd, 

569 BINARY_XOR=ast.BitXor, 

570 BINARY_OR=ast.BitOr, 

571 )[op_name] 

572 extra_filter = lambda e: isinstance(e.op, op_type) 

573 elif op_name.startswith('UNARY_'): 

574 typ = ast.UnaryOp 

575 op_type = dict( 

576 UNARY_POSITIVE=ast.UAdd, 

577 UNARY_NEGATIVE=ast.USub, 

578 UNARY_NOT=ast.Not, 

579 UNARY_INVERT=ast.Invert, 

580 )[op_name] 

581 extra_filter = lambda e: isinstance(e.op, op_type) 

582 elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'): 

583 typ = ast.Attribute 

584 ctx = ast.Load 

585 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) 

586 elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'): 

587 typ = ast.Name 

588 ctx = ast.Load 

589 extra_filter = lambda e: e.id == instruction.argval 

590 elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'): 

591 typ = ast.Compare 

592 extra_filter = lambda e: len(e.ops) == 1 

593 elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')): 

594 ctx = ast.Store 

595 typ = ast.Subscript 

596 elif op_name.startswith('STORE_ATTR'): 

597 ctx = ast.Store 

598 typ = ast.Attribute 

599 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval) 

600 else: 

601 raise RuntimeError(op_name) 

602 

603 with lock: 

604 exprs = { 

605 cast(EnhancedAST, node) 

606 for stmt in stmts 

607 for node in ast.walk(stmt) 

608 if isinstance(node, typ) 

609 if isinstance(getattr(node, "ctx", None), ctx) 

610 if extra_filter(node) 

611 if statement_containing_node(node) == stmt 

612 } 

613 

614 if ctx == ast.Store: 

615 # No special bytecode tricks here. 

616 # We can handle multiple assigned attributes with different names, 

617 # but only one assigned subscript. 

618 self.result = only(exprs) 

619 return 

620 

621 matching = list(self.matching_nodes(exprs)) 

622 if not matching and typ == ast.Call: 

623 self.find_decorator(stmts) 

624 else: 

625 self.result = only(matching) 

626 

627 def find_decorator(self, stmts): 

628 # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None 

629 stmt = only(stmts) 

630 assert_(isinstance(stmt, (ast.ClassDef, function_node_types))) 

631 decorators = stmt.decorator_list # type: ignore[attr-defined] 

632 assert_(decorators) 

633 line_instructions = [ 

634 inst 

635 for inst in self.clean_instructions(self.code) 

636 if inst.lineno == self.frame.f_lineno 

637 ] 

638 last_decorator_instruction_index = [ 

639 i 

640 for i, inst in enumerate(line_instructions) 

641 if inst.opname == "CALL_FUNCTION" 

642 ][-1] 

643 assert_( 

644 line_instructions[last_decorator_instruction_index + 1].opname.startswith( 

645 "STORE_" 

646 ) 

647 ) 

648 decorator_instructions = line_instructions[ 

649 last_decorator_instruction_index 

650 - len(decorators) 

651 + 1 : last_decorator_instruction_index 

652 + 1 

653 ] 

654 assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"}) 

655 decorator_index = decorator_instructions.index(self.instruction) 

656 decorator = decorators[::-1][decorator_index] 

657 self.decorator = decorator 

658 self.result = stmt 

659 

660 def clean_instructions(self, code): 

661 # type: (types.CodeType) -> List[EnhancedInstruction] 

662 return [ 

663 inst 

664 for inst in get_instructions(code) 

665 if inst.opname not in ("EXTENDED_ARG", "NOP") 

666 if inst.lineno not in self.ignore_linenos 

667 ] 

668 

669 def get_original_clean_instructions(self): 

670 # type: () -> List[EnhancedInstruction] 

671 result = self.clean_instructions(self.code) 

672 

673 # pypy sometimes (when is not clear) 

674 # inserts JUMP_IF_NOT_DEBUG instructions in bytecode 

675 # If they're not present in our compiled instructions, 

676 # ignore them in the original bytecode 

677 if not any( 

678 inst.opname == "JUMP_IF_NOT_DEBUG" 

679 for inst in self.compile_instructions() 

680 ): 

681 result = [ 

682 inst for inst in result 

683 if inst.opname != "JUMP_IF_NOT_DEBUG" 

684 ] 

685 

686 return result 

687 

688 def matching_nodes(self, exprs): 

689 # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST] 

690 original_instructions = self.get_original_clean_instructions() 

691 original_index = only( 

692 i 

693 for i, inst in enumerate(original_instructions) 

694 if inst == self.instruction 

695 ) 

696 for expr_index, expr in enumerate(exprs): 

697 setter = get_setter(expr) 

698 assert setter is not None 

699 # noinspection PyArgumentList 

700 replacement = ast.BinOp( 

701 left=expr, 

702 op=ast.Pow(), 

703 right=ast.Str(s=sentinel), 

704 ) 

705 ast.fix_missing_locations(replacement) 

706 setter(replacement) 

707 try: 

708 instructions = self.compile_instructions() 

709 finally: 

710 setter(expr) 

711 

712 if sys.version_info >= (3, 10): 

713 try: 

714 handle_jumps(instructions, original_instructions) 

715 except Exception: 

716 # Give other candidates a chance 

717 if TESTING or expr_index < len(exprs) - 1: 

718 continue 

719 raise 

720 

721 indices = [ 

722 i 

723 for i, instruction in enumerate(instructions) 

724 if instruction.argval == sentinel 

725 ] 

726 

727 # There can be several indices when the bytecode is duplicated, 

728 # as happens in a finally block in 3.9+ 

729 # First we remove the opcodes caused by our modifications 

730 for index_num, sentinel_index in enumerate(indices): 

731 # Adjustment for removing sentinel instructions below 

732 # in past iterations 

733 sentinel_index -= index_num * 2 

734 

735 assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST') 

736 assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER') 

737 

738 # Then we see if any of the instruction indices match 

739 for index_num, sentinel_index in enumerate(indices): 

740 sentinel_index -= index_num * 2 

741 new_index = sentinel_index - 1 

742 

743 if new_index != original_index: 

744 continue 

745 

746 original_inst = original_instructions[original_index] 

747 new_inst = instructions[new_index] 

748 

749 # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)' 

750 # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT 

751 if ( 

752 original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP') 

753 and original_inst.arg != new_inst.arg # type: ignore[attr-defined] 

754 and ( 

755 original_instructions[original_index + 1].opname 

756 != instructions[new_index + 1].opname == 'UNARY_NOT' 

757 )): 

758 # Remove the difference for the upcoming assert 

759 instructions.pop(new_index + 1) 

760 

761 # Check that the modified instructions don't have anything unexpected 

762 # 3.10 is a bit too weird to assert this in all cases but things still work 

763 if sys.version_info < (3, 10): 

764 for inst1, inst2 in zip_longest( 

765 original_instructions, instructions 

766 ): 

767 assert_(inst1 and inst2 and opnames_match(inst1, inst2)) 

768 

769 yield expr 

770 

771 def compile_instructions(self): 

772 # type: () -> List[EnhancedInstruction] 

773 module_code = compile_similar_to(self.tree, self.code) 

774 code = only(self.find_codes(module_code)) 

775 return self.clean_instructions(code) 

776 

777 def find_codes(self, root_code): 

778 # type: (types.CodeType) -> list 

779 checks = [ 

780 attrgetter('co_firstlineno'), 

781 attrgetter('co_freevars'), 

782 attrgetter('co_cellvars'), 

783 lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name, 

784 ] # type: List[Callable] 

785 if not self.is_pytest: 

786 checks += [ 

787 attrgetter('co_names'), 

788 attrgetter('co_varnames'), 

789 ] 

790 

791 def matches(c): 

792 # type: (types.CodeType) -> bool 

793 return all( 

794 f(c) == f(self.code) 

795 for f in checks 

796 ) 

797 

798 code_options = [] 

799 if matches(root_code): 

800 code_options.append(root_code) 

801 

802 def finder(code): 

803 # type: (types.CodeType) -> None 

804 for const in code.co_consts: 

805 if not inspect.iscode(const): 

806 continue 

807 

808 if matches(const): 

809 code_options.append(const) 

810 finder(const) 

811 

812 finder(root_code) 

813 return code_options 

814 

815 def get_actual_current_instruction(self, lasti): 

816 # type: (int) -> EnhancedInstruction 

817 """ 

818 Get the instruction corresponding to the current 

819 frame offset, skipping EXTENDED_ARG instructions 

820 """ 

821 # Don't use get_original_clean_instructions 

822 # because we need the actual instructions including 

823 # EXTENDED_ARG 

824 instructions = list(get_instructions(self.code)) 

825 index = only( 

826 i 

827 for i, inst in enumerate(instructions) 

828 if inst.offset == lasti 

829 ) 

830 

831 while True: 

832 instruction = instructions[index] 

833 if instruction.opname != "EXTENDED_ARG": 

834 return instruction 

835 index += 1 

836 

837 

838 

839def non_sentinel_instructions(instructions, start): 

840 # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]] 

841 """ 

842 Yields (index, instruction) pairs excluding the basic 

843 instructions introduced by the sentinel transformation 

844 """ 

845 skip_power = False 

846 for i, inst in islice(enumerate(instructions), start, None): 

847 if inst.argval == sentinel: 

848 assert_(inst.opname == "LOAD_CONST") 

849 skip_power = True 

850 continue 

851 elif skip_power: 

852 assert_(inst.opname == "BINARY_POWER") 

853 skip_power = False 

854 continue 

855 yield i, inst 

856 

857 

858def walk_both_instructions(original_instructions, original_start, instructions, start): 

859 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]] 

860 """ 

861 Yields matching indices and instructions from the new and original instructions, 

862 leaving out changes made by the sentinel transformation. 

863 """ 

864 original_iter = islice(enumerate(original_instructions), original_start, None) 

865 new_iter = non_sentinel_instructions(instructions, start) 

866 inverted_comparison = False 

867 while True: 

868 try: 

869 original_i, original_inst = next(original_iter) 

870 new_i, new_inst = next(new_iter) 

871 except StopIteration: 

872 return 

873 if ( 

874 inverted_comparison 

875 and original_inst.opname != new_inst.opname == "UNARY_NOT" 

876 ): 

877 new_i, new_inst = next(new_iter) 

878 inverted_comparison = ( 

879 original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP") 

880 and original_inst.arg != new_inst.arg # type: ignore[attr-defined] 

881 ) 

882 yield original_i, original_inst, new_i, new_inst 

883 

884 

885def handle_jumps(instructions, original_instructions): 

886 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None 

887 """ 

888 Transforms instructions in place until it looks more like original_instructions. 

889 This is only needed in 3.10+ where optimisations lead to more drastic changes 

890 after the sentinel transformation. 

891 Replaces JUMP instructions that aren't also present in original_instructions 

892 with the sections that they jump to until a raise or return. 

893 In some other cases duplication found in `original_instructions` 

894 is replicated in `instructions`. 

895 """ 

896 while True: 

897 for original_i, original_inst, new_i, new_inst in walk_both_instructions( 

898 original_instructions, 0, instructions, 0 

899 ): 

900 if opnames_match(original_inst, new_inst): 

901 continue 

902 

903 if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname: 

904 # Find where the new instruction is jumping to, ignoring 

905 # instructions which have been copied in previous iterations 

906 start = only( 

907 i 

908 for i, inst in enumerate(instructions) 

909 if inst.offset == new_inst.argval 

910 and not getattr(inst, "_copied", False) 

911 ) 

912 # Replace the jump instruction with the jumped to section of instructions 

913 # That section may also be deleted if it's not similarly duplicated 

914 # in original_instructions 

915 new_instructions = handle_jump( 

916 original_instructions, original_i, instructions, start 

917 ) 

918 assert new_instructions is not None 

919 instructions[new_i : new_i + 1] = new_instructions 

920 else: 

921 # Extract a section of original_instructions from original_i to return/raise 

922 orig_section = [] 

923 for section_inst in original_instructions[original_i:]: 

924 orig_section.append(section_inst) 

925 if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"): 

926 break 

927 else: 

928 # No return/raise - this is just a mismatch we can't handle 

929 raise AssertionError 

930 

931 instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions)) 

932 

933 # instructions has been modified, the for loop can't sensibly continue 

934 # Restart it from the beginning, checking for other issues 

935 break 

936 

937 else: # No mismatched jumps found, we're done 

938 return 

939 

940 

941def find_new_matching(orig_section, instructions): 

942 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]] 

943 """ 

944 Yields sections of `instructions` which match `orig_section`. 

945 The yielded sections include sentinel instructions, but these 

946 are ignored when checking for matches. 

947 """ 

948 for start in range(len(instructions) - len(orig_section)): 

949 indices, dup_section = zip( 

950 *islice( 

951 non_sentinel_instructions(instructions, start), 

952 len(orig_section), 

953 ) 

954 ) 

955 if len(dup_section) < len(orig_section): 

956 return 

957 if sections_match(orig_section, dup_section): 

958 yield instructions[start:indices[-1] + 1] 

959 

960 

961def handle_jump(original_instructions, original_start, instructions, start): 

962 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]] 

963 """ 

964 Returns the section of instructions starting at `start` and ending 

965 with a RETURN_VALUE or RAISE_VARARGS instruction. 

966 There should be a matching section in original_instructions starting at original_start. 

967 If that section doesn't appear elsewhere in original_instructions, 

968 then also delete the returned section of instructions. 

969 """ 

970 for original_j, original_inst, new_j, new_inst in walk_both_instructions( 

971 original_instructions, original_start, instructions, start 

972 ): 

973 assert_(opnames_match(original_inst, new_inst)) 

974 if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"): 

975 inlined = deepcopy(instructions[start : new_j + 1]) 

976 for inl in inlined: 

977 inl._copied = True 

978 orig_section = original_instructions[original_start : original_j + 1] 

979 if not check_duplicates( 

980 original_start, orig_section, original_instructions 

981 ): 

982 instructions[start : new_j + 1] = [] 

983 return inlined 

984 

985 return None 

986 

987 

988def check_duplicates(original_i, orig_section, original_instructions): 

989 # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool 

990 """ 

991 Returns True if a section of original_instructions starting somewhere other 

992 than original_i and matching orig_section is found, i.e. orig_section is duplicated. 

993 """ 

994 for dup_start in range(len(original_instructions)): 

995 if dup_start == original_i: 

996 continue 

997 dup_section = original_instructions[dup_start : dup_start + len(orig_section)] 

998 if len(dup_section) < len(orig_section): 

999 return False 

1000 if sections_match(orig_section, dup_section): 

1001 return True 

1002 

1003 return False 

1004 

1005def sections_match(orig_section, dup_section): 

1006 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool 

1007 """ 

1008 Returns True if the given lists of instructions have matching linenos and opnames. 

1009 """ 

1010 return all( 

1011 ( 

1012 orig_inst.lineno == dup_inst.lineno 

1013 # POP_BLOCKs have been found to have differing linenos in innocent cases 

1014 or "POP_BLOCK" == orig_inst.opname == dup_inst.opname 

1015 ) 

1016 and opnames_match(orig_inst, dup_inst) 

1017 for orig_inst, dup_inst in zip(orig_section, dup_section) 

1018 ) 

1019 

1020 

1021def opnames_match(inst1, inst2): 

1022 # type: (Instruction, Instruction) -> bool 

1023 return ( 

1024 inst1.opname == inst2.opname 

1025 or "JUMP" in inst1.opname 

1026 and "JUMP" in inst2.opname 

1027 or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP") 

1028 or ( 

1029 inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD") 

1030 and inst2.opname == "LOAD_ATTR" 

1031 ) 

1032 or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION") 

1033 ) 

1034 

1035 

1036def get_setter(node): 

1037 # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]] 

1038 parent = node.parent 

1039 for name, field in ast.iter_fields(parent): 

1040 if field is node: 

1041 def setter(new_node): 

1042 # type: (ast.AST) -> None 

1043 return setattr(parent, name, new_node) 

1044 return setter 

1045 elif isinstance(field, list): 

1046 for i, item in enumerate(field): 

1047 if item is node: 

1048 def setter(new_node): 

1049 # type: (ast.AST) -> None 

1050 field[i] = new_node 

1051 

1052 return setter 

1053 return None 

1054 

1055lock = RLock() 

1056 

1057 

1058@cache 

1059def statement_containing_node(node): 

1060 # type: (ast.AST) -> EnhancedAST 

1061 while not isinstance(node, ast.stmt): 

1062 node = cast(EnhancedAST, node).parent 

1063 return cast(EnhancedAST, node) 

1064 

1065 

1066def assert_linenos(tree): 

1067 # type: (ast.AST) -> Iterator[int] 

1068 for node in ast.walk(tree): 

1069 if ( 

1070 hasattr(node, 'parent') and 

1071 isinstance(statement_containing_node(node), ast.Assert) 

1072 ): 

1073 for lineno in node_linenos(node): 

1074 yield lineno 

1075 

1076 

1077def _extract_ipython_statement(stmt): 

1078 # type: (EnhancedAST) -> ast.Module 

1079 # IPython separates each statement in a cell to be executed separately 

1080 # So NodeFinder should only compile one statement at a time or it 

1081 # will find a code mismatch. 

1082 while not isinstance(stmt.parent, ast.Module): 

1083 stmt = stmt.parent 

1084 # use `ast.parse` instead of `ast.Module` for better portability 

1085 # python3.8 changes the signature of `ast.Module` 

1086 # Inspired by https://github.com/pallets/werkzeug/pull/1552/files 

1087 tree = ast.parse("") 

1088 tree.body = [cast(ast.stmt, stmt)] 

1089 ast.copy_location(tree, stmt) 

1090 return tree 

1091 

1092 

1093def is_ipython_cell_code_name(code_name): 

1094 # type: (str) -> bool 

1095 return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name)) 

1096 

1097 

1098def is_ipython_cell_filename(filename): 

1099 # type: (str) -> bool 

1100 return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename)) 

1101 

1102 

1103def is_ipython_cell_code(code_obj): 

1104 # type: (types.CodeType) -> bool 

1105 return ( 

1106 is_ipython_cell_filename(code_obj.co_filename) and 

1107 is_ipython_cell_code_name(code_obj.co_name) 

1108 ) 

1109 

1110 

1111def find_node_ipython(frame, lasti, stmts, source): 

1112 # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]] 

1113 node = decorator = None 

1114 for stmt in stmts: 

1115 tree = _extract_ipython_statement(stmt) 

1116 try: 

1117 node_finder = NodeFinder(frame, stmts, tree, lasti, source) 

1118 if (node or decorator) and (node_finder.result or node_finder.decorator): 

1119 # Found potential nodes in separate statements, 

1120 # cannot resolve ambiguity, give up here 

1121 return None, None 

1122 

1123 node = node_finder.result 

1124 decorator = node_finder.decorator 

1125 except Exception: 

1126 pass 

1127 return decorator, node 

1128 

1129 

1130def attr_names_match(attr, argval): 

1131 # type: (str, str) -> bool 

1132 """ 

1133 Checks that the user-visible attr (from ast) can correspond to 

1134 the argval in the bytecode, i.e. the real attribute fetched internally, 

1135 which may be mangled for private attributes. 

1136 """ 

1137 if attr == argval: 

1138 return True 

1139 if not attr.startswith("__"): 

1140 return False 

1141 return bool(re.match(r"^_\w+%s$" % attr, argval)) 

1142 

1143 

1144def node_linenos(node): 

1145 # type: (ast.AST) -> Iterator[int] 

1146 if hasattr(node, "lineno"): 

1147 linenos = [] # type: Sequence[int] 

1148 if hasattr(node, "end_lineno") and isinstance(node, ast.expr): 

1149 assert node.end_lineno is not None # type: ignore[attr-defined] 

1150 linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined] 

1151 else: 

1152 linenos = [node.lineno] # type: ignore[attr-defined] 

1153 for lineno in linenos: 

1154 yield lineno 

1155 

1156 

1157if sys.version_info >= (3, 11): 

1158 from ._position_node_finder import PositionNodeFinder as NodeFinder 

1159else: 

1160 NodeFinder = SentinelNodeFinder 

1161