Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/executing/executing.py: 20%
528 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-20 06:09 +0000
1"""
2MIT License
4Copyright (c) 2021 Alex Hall
6Permission is hereby granted, free of charge, to any person obtaining a copy
7of this software and associated documentation files (the "Software"), to deal
8in the Software without restriction, including without limitation the rights
9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10copies of the Software, and to permit persons to whom the Software is
11furnished to do so, subject to the following conditions:
13The above copyright notice and this permission notice shall be included in all
14copies or substantial portions of the Software.
16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22SOFTWARE.
23"""
25import __future__
26import ast
27import dis
28import inspect
29import io
30import linecache
31import re
32import sys
33import types
34from collections import defaultdict
35from copy import deepcopy
36from functools import lru_cache
37from itertools import islice
38from itertools import zip_longest
39from operator import attrgetter
40from pathlib import Path
41from threading import RLock
42from tokenize import detect_encoding
43from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, \
44 Type, TypeVar, Union, cast
46if TYPE_CHECKING: # pragma: no cover
47 from asttokens import ASTTokens, ASTText
48 from asttokens.asttokens import ASTTextBase
51function_node_types = (ast.FunctionDef, ast.AsyncFunctionDef) # type: Tuple[Type, ...]
53cache = lru_cache(maxsize=None)
55# Type class used to expand out the definition of AST to include fields added by this library
56# It's not actually used for anything other than type checking though!
57class EnhancedAST(ast.AST):
58 parent = None # type: EnhancedAST
61class Instruction(dis.Instruction):
62 lineno = None # type: int
65# Type class used to expand out the definition of AST to include fields added by this library
66# It's not actually used for anything other than type checking though!
67class EnhancedInstruction(Instruction):
68 _copied = None # type: bool
72def assert_(condition, message=""):
73 # type: (Any, str) -> None
74 """
75 Like an assert statement, but unaffected by -O
76 :param condition: value that is expected to be truthy
77 :type message: Any
78 """
79 if not condition:
80 raise AssertionError(str(message))
83def get_instructions(co):
84 # type: (types.CodeType) -> Iterator[EnhancedInstruction]
85 lineno = co.co_firstlineno
86 for inst in dis.get_instructions(co):
87 inst = cast(EnhancedInstruction, inst)
88 lineno = inst.starts_line or lineno
89 assert_(lineno)
90 inst.lineno = lineno
91 yield inst
94TESTING = 0
97class NotOneValueFound(Exception):
98 def __init__(self,msg,values=[]):
99 # type: (str, Sequence) -> None
100 self.values=values
101 super(NotOneValueFound,self).__init__(msg)
103T = TypeVar('T')
106def only(it):
107 # type: (Iterable[T]) -> T
108 if isinstance(it, Sized):
109 if len(it) != 1:
110 raise NotOneValueFound('Expected one value, found %s' % len(it))
111 # noinspection PyTypeChecker
112 return list(it)[0]
114 lst = tuple(islice(it, 2))
115 if len(lst) == 0:
116 raise NotOneValueFound('Expected one value, found 0')
117 if len(lst) > 1:
118 raise NotOneValueFound('Expected one value, found several',lst)
119 return lst[0]
122class Source(object):
123 """
124 The source code of a single file and associated metadata.
126 The main method of interest is the classmethod `executing(frame)`.
128 If you want an instance of this class, don't construct it.
129 Ideally use the classmethod `for_frame(frame)`.
130 If you don't have a frame, use `for_filename(filename [, module_globals])`.
131 These methods cache instances by filename, so at most one instance exists per filename.
133 Attributes:
134 - filename
135 - text
136 - lines
137 - tree: AST parsed from text, or None if text is not valid Python
138 All nodes in the tree have an extra `parent` attribute
140 Other methods of interest:
141 - statements_at_line
142 - asttokens
143 - code_qualname
144 """
146 def __init__(self, filename, lines):
147 # type: (str, Sequence[str]) -> None
148 """
149 Don't call this constructor, see the class docstring.
150 """
152 self.filename = filename
153 self.text = ''.join(lines)
154 self.lines = [line.rstrip('\r\n') for line in lines]
156 self._nodes_by_line = defaultdict(list)
157 self.tree = None
158 self._qualnames = {}
159 self._asttokens = None # type: Optional[ASTTokens]
160 self._asttext = None # type: Optional[ASTText]
162 try:
163 self.tree = ast.parse(self.text, filename=filename)
164 except (SyntaxError, ValueError):
165 pass
166 else:
167 for node in ast.walk(self.tree):
168 for child in ast.iter_child_nodes(node):
169 cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
170 for lineno in node_linenos(node):
171 self._nodes_by_line[lineno].append(node)
173 visitor = QualnameVisitor()
174 visitor.visit(self.tree)
175 self._qualnames = visitor.qualnames
177 @classmethod
178 def for_frame(cls, frame, use_cache=True):
179 # type: (types.FrameType, bool) -> "Source"
180 """
181 Returns the `Source` object corresponding to the file the frame is executing in.
182 """
183 return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
185 @classmethod
186 def for_filename(
187 cls,
188 filename,
189 module_globals=None,
190 use_cache=True, # noqa no longer used
191 ):
192 # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source"
193 if isinstance(filename, Path):
194 filename = str(filename)
196 def get_lines():
197 # type: () -> List[str]
198 return linecache.getlines(cast(str, filename), module_globals)
200 # Save the current linecache entry, then ensure the cache is up to date.
201 entry = linecache.cache.get(filename) # type: ignore[attr-defined]
202 linecache.checkcache(filename)
203 lines = get_lines()
204 if entry is not None and not lines:
205 # There was an entry, checkcache removed it, and nothing replaced it.
206 # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
207 # but rather the file was found not to exist, probably because `filename` was fake.
208 # Restore the original entry so that we still have something.
209 linecache.cache[filename] = entry # type: ignore[attr-defined]
210 lines = get_lines()
212 return cls._for_filename_and_lines(filename, tuple(lines))
214 @classmethod
215 def _for_filename_and_lines(cls, filename, lines):
216 # type: (str, Sequence[str]) -> "Source"
217 source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source]
218 try:
219 return source_cache[(filename, lines)]
220 except KeyError:
221 pass
223 result = source_cache[(filename, lines)] = cls(filename, lines)
224 return result
226 @classmethod
227 def lazycache(cls, frame):
228 # type: (types.FrameType) -> None
229 linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
231 @classmethod
232 def executing(cls, frame_or_tb):
233 # type: (Union[types.TracebackType, types.FrameType]) -> "Executing"
234 """
235 Returns an `Executing` object representing the operation
236 currently executing in the given frame or traceback object.
237 """
238 if isinstance(frame_or_tb, types.TracebackType):
239 # https://docs.python.org/3/reference/datamodel.html#traceback-objects
240 # "tb_lineno gives the line number where the exception occurred;
241 # tb_lasti indicates the precise instruction.
242 # The line number and last instruction in the traceback may differ
243 # from the line number of its frame object
244 # if the exception occurred in a try statement with no matching except clause
245 # or with a finally clause."
246 tb = frame_or_tb
247 frame = tb.tb_frame
248 lineno = tb.tb_lineno
249 lasti = tb.tb_lasti
250 else:
251 frame = frame_or_tb
252 lineno = frame.f_lineno
253 lasti = frame.f_lasti
257 code = frame.f_code
258 key = (code, id(code), lasti)
259 executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any]
261 args = executing_cache.get(key)
262 if not args:
263 node = stmts = decorator = None
264 source = cls.for_frame(frame)
265 tree = source.tree
266 if tree:
267 try:
268 stmts = source.statements_at_line(lineno)
269 if stmts:
270 if is_ipython_cell_code(code):
271 decorator, node = find_node_ipython(frame, lasti, stmts, source)
272 else:
273 node_finder = NodeFinder(frame, stmts, tree, lasti, source)
274 node = node_finder.result
275 decorator = node_finder.decorator
276 except Exception:
277 if TESTING:
278 raise
280 assert stmts is not None
281 if node:
282 new_stmts = {statement_containing_node(node)}
283 assert_(new_stmts <= stmts)
284 stmts = new_stmts
286 executing_cache[key] = args = source, node, stmts, decorator
288 return Executing(frame, *args)
290 @classmethod
291 def _class_local(cls, name, default):
292 # type: (str, T) -> T
293 """
294 Returns an attribute directly associated with this class
295 (as opposed to subclasses), setting default if necessary
296 """
297 # classes have a mappingproxy preventing us from using setdefault
298 result = cls.__dict__.get(name, default)
299 setattr(cls, name, result)
300 return result
302 @cache
303 def statements_at_line(self, lineno):
304 # type: (int) -> Set[EnhancedAST]
305 """
306 Returns the statement nodes overlapping the given line.
308 Returns at most one statement unless semicolons are present.
310 If the `text` attribute is not valid python, meaning
311 `tree` is None, returns an empty set.
313 Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
314 should return at least one statement.
315 """
317 return {
318 statement_containing_node(node)
319 for node in
320 self._nodes_by_line[lineno]
321 }
323 def asttext(self):
324 # type: () -> ASTText
325 """
326 Returns an ASTText object for getting the source of specific AST nodes.
328 See http://asttokens.readthedocs.io/en/latest/api-index.html
329 """
330 from asttokens import ASTText # must be installed separately
332 if self._asttext is None:
333 self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
335 return self._asttext
337 def asttokens(self):
338 # type: () -> ASTTokens
339 """
340 Returns an ASTTokens object for getting the source of specific AST nodes.
342 See http://asttokens.readthedocs.io/en/latest/api-index.html
343 """
344 import asttokens # must be installed separately
346 if self._asttokens is None:
347 if hasattr(asttokens, 'ASTText'):
348 self._asttokens = self.asttext().asttokens
349 else: # pragma: no cover
350 self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
351 return self._asttokens
353 def _asttext_base(self):
354 # type: () -> ASTTextBase
355 import asttokens # must be installed separately
357 if hasattr(asttokens, 'ASTText'):
358 return self.asttext()
359 else: # pragma: no cover
360 return self.asttokens()
362 @staticmethod
363 def decode_source(source):
364 # type: (Union[str, bytes]) -> str
365 if isinstance(source, bytes):
366 encoding = Source.detect_encoding(source)
367 return source.decode(encoding)
368 else:
369 return source
371 @staticmethod
372 def detect_encoding(source):
373 # type: (bytes) -> str
374 return detect_encoding(io.BytesIO(source).readline)[0]
376 def code_qualname(self, code):
377 # type: (types.CodeType) -> str
378 """
379 Imitates the __qualname__ attribute of functions for code objects.
380 Given:
382 - A function `func`
383 - A frame `frame` for an execution of `func`, meaning:
384 `frame.f_code is func.__code__`
386 `Source.for_frame(frame).code_qualname(frame.f_code)`
387 will be equal to `func.__qualname__`*. Works for Python 2 as well,
388 where of course no `__qualname__` attribute exists.
390 Falls back to `code.co_name` if there is no appropriate qualname.
392 Based on https://github.com/wbolster/qualname
394 (* unless `func` is a lambda
395 nested inside another lambda on the same line, in which case
396 the outer lambda's qualname will be returned for the codes
397 of both lambdas)
398 """
399 assert_(code.co_filename == self.filename)
400 return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
403class Executing(object):
404 """
405 Information about the operation a frame is currently executing.
407 Generally you will just want `node`, which is the AST node being executed,
408 or None if it's unknown.
410 If a decorator is currently being called, then:
411 - `node` is a function or class definition
412 - `decorator` is the expression in `node.decorator_list` being called
413 - `statements == {node}`
414 """
416 def __init__(self, frame, source, node, stmts, decorator):
417 # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None
418 self.frame = frame
419 self.source = source
420 self.node = node
421 self.statements = stmts
422 self.decorator = decorator
424 def code_qualname(self):
425 # type: () -> str
426 return self.source.code_qualname(self.frame.f_code)
428 def text(self):
429 # type: () -> str
430 return self.source._asttext_base().get_text(self.node)
432 def text_range(self):
433 # type: () -> Tuple[int, int]
434 return self.source._asttext_base().get_text_range(self.node)
437class QualnameVisitor(ast.NodeVisitor):
438 def __init__(self):
439 # type: () -> None
440 super(QualnameVisitor, self).__init__()
441 self.stack = [] # type: List[str]
442 self.qualnames = {} # type: Dict[Tuple[str, int], str]
444 def add_qualname(self, node, name=None):
445 # type: (ast.AST, Optional[str]) -> None
446 name = name or node.name # type: ignore[attr-defined]
447 self.stack.append(name)
448 if getattr(node, 'decorator_list', ()):
449 lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
450 else:
451 lineno = node.lineno # type: ignore[attr-defined]
452 self.qualnames.setdefault((name, lineno), ".".join(self.stack))
454 def visit_FunctionDef(self, node, name=None):
455 # type: (ast.AST, Optional[str]) -> None
456 assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
457 self.add_qualname(node, name)
458 self.stack.append('<locals>')
459 children = [] # type: Sequence[ast.AST]
460 if isinstance(node, ast.Lambda):
461 children = [node.body]
462 else:
463 children = node.body
464 for child in children:
465 self.visit(child)
466 self.stack.pop()
467 self.stack.pop()
469 # Find lambdas in the function definition outside the body,
470 # e.g. decorators or default arguments
471 # Based on iter_child_nodes
472 for field, child in ast.iter_fields(node):
473 if field == 'body':
474 continue
475 if isinstance(child, ast.AST):
476 self.visit(child)
477 elif isinstance(child, list):
478 for grandchild in child:
479 if isinstance(grandchild, ast.AST):
480 self.visit(grandchild)
482 visit_AsyncFunctionDef = visit_FunctionDef
484 def visit_Lambda(self, node):
485 # type: (ast.AST) -> None
486 assert isinstance(node, ast.Lambda)
487 self.visit_FunctionDef(node, '<lambda>')
489 def visit_ClassDef(self, node):
490 # type: (ast.AST) -> None
491 assert isinstance(node, ast.ClassDef)
492 self.add_qualname(node)
493 self.generic_visit(node)
494 self.stack.pop()
500future_flags = sum(
501 getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
502)
505def compile_similar_to(source, matching_code):
506 # type: (ast.Module, types.CodeType) -> Any
507 return compile(
508 source,
509 matching_code.co_filename,
510 'exec',
511 flags=future_flags & matching_code.co_flags,
512 dont_inherit=True,
513 )
516sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
518def is_rewritten_by_pytest(code):
519 # type: (types.CodeType) -> bool
520 return any(
521 bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
522 for bc in get_instructions(code)
523 )
526class SentinelNodeFinder(object):
527 result = None # type: EnhancedAST
529 def __init__(self, frame, stmts, tree, lasti, source):
530 # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None
531 assert_(stmts)
532 self.frame = frame
533 self.tree = tree
534 self.code = code = frame.f_code
535 self.is_pytest = is_rewritten_by_pytest(code)
537 if self.is_pytest:
538 self.ignore_linenos = frozenset(assert_linenos(tree))
539 else:
540 self.ignore_linenos = frozenset()
542 self.decorator = None
544 self.instruction = instruction = self.get_actual_current_instruction(lasti)
545 op_name = instruction.opname
546 extra_filter = lambda e: True # type: Callable[[Any], bool]
547 ctx = type(None) # type: Type
549 typ = type(None) # type: Type
550 if op_name.startswith('CALL_'):
551 typ = ast.Call
552 elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
553 typ = ast.Subscript
554 ctx = ast.Load
555 elif op_name.startswith('BINARY_'):
556 typ = ast.BinOp
557 op_type = dict(
558 BINARY_POWER=ast.Pow,
559 BINARY_MULTIPLY=ast.Mult,
560 BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
561 BINARY_FLOOR_DIVIDE=ast.FloorDiv,
562 BINARY_TRUE_DIVIDE=ast.Div,
563 BINARY_MODULO=ast.Mod,
564 BINARY_ADD=ast.Add,
565 BINARY_SUBTRACT=ast.Sub,
566 BINARY_LSHIFT=ast.LShift,
567 BINARY_RSHIFT=ast.RShift,
568 BINARY_AND=ast.BitAnd,
569 BINARY_XOR=ast.BitXor,
570 BINARY_OR=ast.BitOr,
571 )[op_name]
572 extra_filter = lambda e: isinstance(e.op, op_type)
573 elif op_name.startswith('UNARY_'):
574 typ = ast.UnaryOp
575 op_type = dict(
576 UNARY_POSITIVE=ast.UAdd,
577 UNARY_NEGATIVE=ast.USub,
578 UNARY_NOT=ast.Not,
579 UNARY_INVERT=ast.Invert,
580 )[op_name]
581 extra_filter = lambda e: isinstance(e.op, op_type)
582 elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
583 typ = ast.Attribute
584 ctx = ast.Load
585 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
586 elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
587 typ = ast.Name
588 ctx = ast.Load
589 extra_filter = lambda e: e.id == instruction.argval
590 elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
591 typ = ast.Compare
592 extra_filter = lambda e: len(e.ops) == 1
593 elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
594 ctx = ast.Store
595 typ = ast.Subscript
596 elif op_name.startswith('STORE_ATTR'):
597 ctx = ast.Store
598 typ = ast.Attribute
599 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
600 else:
601 raise RuntimeError(op_name)
603 with lock:
604 exprs = {
605 cast(EnhancedAST, node)
606 for stmt in stmts
607 for node in ast.walk(stmt)
608 if isinstance(node, typ)
609 if isinstance(getattr(node, "ctx", None), ctx)
610 if extra_filter(node)
611 if statement_containing_node(node) == stmt
612 }
614 if ctx == ast.Store:
615 # No special bytecode tricks here.
616 # We can handle multiple assigned attributes with different names,
617 # but only one assigned subscript.
618 self.result = only(exprs)
619 return
621 matching = list(self.matching_nodes(exprs))
622 if not matching and typ == ast.Call:
623 self.find_decorator(stmts)
624 else:
625 self.result = only(matching)
627 def find_decorator(self, stmts):
628 # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None
629 stmt = only(stmts)
630 assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
631 decorators = stmt.decorator_list # type: ignore[attr-defined]
632 assert_(decorators)
633 line_instructions = [
634 inst
635 for inst in self.clean_instructions(self.code)
636 if inst.lineno == self.frame.f_lineno
637 ]
638 last_decorator_instruction_index = [
639 i
640 for i, inst in enumerate(line_instructions)
641 if inst.opname == "CALL_FUNCTION"
642 ][-1]
643 assert_(
644 line_instructions[last_decorator_instruction_index + 1].opname.startswith(
645 "STORE_"
646 )
647 )
648 decorator_instructions = line_instructions[
649 last_decorator_instruction_index
650 - len(decorators)
651 + 1 : last_decorator_instruction_index
652 + 1
653 ]
654 assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
655 decorator_index = decorator_instructions.index(self.instruction)
656 decorator = decorators[::-1][decorator_index]
657 self.decorator = decorator
658 self.result = stmt
660 def clean_instructions(self, code):
661 # type: (types.CodeType) -> List[EnhancedInstruction]
662 return [
663 inst
664 for inst in get_instructions(code)
665 if inst.opname not in ("EXTENDED_ARG", "NOP")
666 if inst.lineno not in self.ignore_linenos
667 ]
669 def get_original_clean_instructions(self):
670 # type: () -> List[EnhancedInstruction]
671 result = self.clean_instructions(self.code)
673 # pypy sometimes (when is not clear)
674 # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
675 # If they're not present in our compiled instructions,
676 # ignore them in the original bytecode
677 if not any(
678 inst.opname == "JUMP_IF_NOT_DEBUG"
679 for inst in self.compile_instructions()
680 ):
681 result = [
682 inst for inst in result
683 if inst.opname != "JUMP_IF_NOT_DEBUG"
684 ]
686 return result
688 def matching_nodes(self, exprs):
689 # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST]
690 original_instructions = self.get_original_clean_instructions()
691 original_index = only(
692 i
693 for i, inst in enumerate(original_instructions)
694 if inst == self.instruction
695 )
696 for expr_index, expr in enumerate(exprs):
697 setter = get_setter(expr)
698 assert setter is not None
699 # noinspection PyArgumentList
700 replacement = ast.BinOp(
701 left=expr,
702 op=ast.Pow(),
703 right=ast.Str(s=sentinel),
704 )
705 ast.fix_missing_locations(replacement)
706 setter(replacement)
707 try:
708 instructions = self.compile_instructions()
709 finally:
710 setter(expr)
712 if sys.version_info >= (3, 10):
713 try:
714 handle_jumps(instructions, original_instructions)
715 except Exception:
716 # Give other candidates a chance
717 if TESTING or expr_index < len(exprs) - 1:
718 continue
719 raise
721 indices = [
722 i
723 for i, instruction in enumerate(instructions)
724 if instruction.argval == sentinel
725 ]
727 # There can be several indices when the bytecode is duplicated,
728 # as happens in a finally block in 3.9+
729 # First we remove the opcodes caused by our modifications
730 for index_num, sentinel_index in enumerate(indices):
731 # Adjustment for removing sentinel instructions below
732 # in past iterations
733 sentinel_index -= index_num * 2
735 assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
736 assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
738 # Then we see if any of the instruction indices match
739 for index_num, sentinel_index in enumerate(indices):
740 sentinel_index -= index_num * 2
741 new_index = sentinel_index - 1
743 if new_index != original_index:
744 continue
746 original_inst = original_instructions[original_index]
747 new_inst = instructions[new_index]
749 # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
750 # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
751 if (
752 original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
753 and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
754 and (
755 original_instructions[original_index + 1].opname
756 != instructions[new_index + 1].opname == 'UNARY_NOT'
757 )):
758 # Remove the difference for the upcoming assert
759 instructions.pop(new_index + 1)
761 # Check that the modified instructions don't have anything unexpected
762 # 3.10 is a bit too weird to assert this in all cases but things still work
763 if sys.version_info < (3, 10):
764 for inst1, inst2 in zip_longest(
765 original_instructions, instructions
766 ):
767 assert_(inst1 and inst2 and opnames_match(inst1, inst2))
769 yield expr
771 def compile_instructions(self):
772 # type: () -> List[EnhancedInstruction]
773 module_code = compile_similar_to(self.tree, self.code)
774 code = only(self.find_codes(module_code))
775 return self.clean_instructions(code)
777 def find_codes(self, root_code):
778 # type: (types.CodeType) -> list
779 checks = [
780 attrgetter('co_firstlineno'),
781 attrgetter('co_freevars'),
782 attrgetter('co_cellvars'),
783 lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
784 ] # type: List[Callable]
785 if not self.is_pytest:
786 checks += [
787 attrgetter('co_names'),
788 attrgetter('co_varnames'),
789 ]
791 def matches(c):
792 # type: (types.CodeType) -> bool
793 return all(
794 f(c) == f(self.code)
795 for f in checks
796 )
798 code_options = []
799 if matches(root_code):
800 code_options.append(root_code)
802 def finder(code):
803 # type: (types.CodeType) -> None
804 for const in code.co_consts:
805 if not inspect.iscode(const):
806 continue
808 if matches(const):
809 code_options.append(const)
810 finder(const)
812 finder(root_code)
813 return code_options
815 def get_actual_current_instruction(self, lasti):
816 # type: (int) -> EnhancedInstruction
817 """
818 Get the instruction corresponding to the current
819 frame offset, skipping EXTENDED_ARG instructions
820 """
821 # Don't use get_original_clean_instructions
822 # because we need the actual instructions including
823 # EXTENDED_ARG
824 instructions = list(get_instructions(self.code))
825 index = only(
826 i
827 for i, inst in enumerate(instructions)
828 if inst.offset == lasti
829 )
831 while True:
832 instruction = instructions[index]
833 if instruction.opname != "EXTENDED_ARG":
834 return instruction
835 index += 1
839def non_sentinel_instructions(instructions, start):
840 # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]]
841 """
842 Yields (index, instruction) pairs excluding the basic
843 instructions introduced by the sentinel transformation
844 """
845 skip_power = False
846 for i, inst in islice(enumerate(instructions), start, None):
847 if inst.argval == sentinel:
848 assert_(inst.opname == "LOAD_CONST")
849 skip_power = True
850 continue
851 elif skip_power:
852 assert_(inst.opname == "BINARY_POWER")
853 skip_power = False
854 continue
855 yield i, inst
858def walk_both_instructions(original_instructions, original_start, instructions, start):
859 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]
860 """
861 Yields matching indices and instructions from the new and original instructions,
862 leaving out changes made by the sentinel transformation.
863 """
864 original_iter = islice(enumerate(original_instructions), original_start, None)
865 new_iter = non_sentinel_instructions(instructions, start)
866 inverted_comparison = False
867 while True:
868 try:
869 original_i, original_inst = next(original_iter)
870 new_i, new_inst = next(new_iter)
871 except StopIteration:
872 return
873 if (
874 inverted_comparison
875 and original_inst.opname != new_inst.opname == "UNARY_NOT"
876 ):
877 new_i, new_inst = next(new_iter)
878 inverted_comparison = (
879 original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
880 and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
881 )
882 yield original_i, original_inst, new_i, new_inst
885def handle_jumps(instructions, original_instructions):
886 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None
887 """
888 Transforms instructions in place until it looks more like original_instructions.
889 This is only needed in 3.10+ where optimisations lead to more drastic changes
890 after the sentinel transformation.
891 Replaces JUMP instructions that aren't also present in original_instructions
892 with the sections that they jump to until a raise or return.
893 In some other cases duplication found in `original_instructions`
894 is replicated in `instructions`.
895 """
896 while True:
897 for original_i, original_inst, new_i, new_inst in walk_both_instructions(
898 original_instructions, 0, instructions, 0
899 ):
900 if opnames_match(original_inst, new_inst):
901 continue
903 if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
904 # Find where the new instruction is jumping to, ignoring
905 # instructions which have been copied in previous iterations
906 start = only(
907 i
908 for i, inst in enumerate(instructions)
909 if inst.offset == new_inst.argval
910 and not getattr(inst, "_copied", False)
911 )
912 # Replace the jump instruction with the jumped to section of instructions
913 # That section may also be deleted if it's not similarly duplicated
914 # in original_instructions
915 new_instructions = handle_jump(
916 original_instructions, original_i, instructions, start
917 )
918 assert new_instructions is not None
919 instructions[new_i : new_i + 1] = new_instructions
920 else:
921 # Extract a section of original_instructions from original_i to return/raise
922 orig_section = []
923 for section_inst in original_instructions[original_i:]:
924 orig_section.append(section_inst)
925 if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
926 break
927 else:
928 # No return/raise - this is just a mismatch we can't handle
929 raise AssertionError
931 instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
933 # instructions has been modified, the for loop can't sensibly continue
934 # Restart it from the beginning, checking for other issues
935 break
937 else: # No mismatched jumps found, we're done
938 return
941def find_new_matching(orig_section, instructions):
942 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]
943 """
944 Yields sections of `instructions` which match `orig_section`.
945 The yielded sections include sentinel instructions, but these
946 are ignored when checking for matches.
947 """
948 for start in range(len(instructions) - len(orig_section)):
949 indices, dup_section = zip(
950 *islice(
951 non_sentinel_instructions(instructions, start),
952 len(orig_section),
953 )
954 )
955 if len(dup_section) < len(orig_section):
956 return
957 if sections_match(orig_section, dup_section):
958 yield instructions[start:indices[-1] + 1]
961def handle_jump(original_instructions, original_start, instructions, start):
962 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]]
963 """
964 Returns the section of instructions starting at `start` and ending
965 with a RETURN_VALUE or RAISE_VARARGS instruction.
966 There should be a matching section in original_instructions starting at original_start.
967 If that section doesn't appear elsewhere in original_instructions,
968 then also delete the returned section of instructions.
969 """
970 for original_j, original_inst, new_j, new_inst in walk_both_instructions(
971 original_instructions, original_start, instructions, start
972 ):
973 assert_(opnames_match(original_inst, new_inst))
974 if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
975 inlined = deepcopy(instructions[start : new_j + 1])
976 for inl in inlined:
977 inl._copied = True
978 orig_section = original_instructions[original_start : original_j + 1]
979 if not check_duplicates(
980 original_start, orig_section, original_instructions
981 ):
982 instructions[start : new_j + 1] = []
983 return inlined
985 return None
988def check_duplicates(original_i, orig_section, original_instructions):
989 # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
990 """
991 Returns True if a section of original_instructions starting somewhere other
992 than original_i and matching orig_section is found, i.e. orig_section is duplicated.
993 """
994 for dup_start in range(len(original_instructions)):
995 if dup_start == original_i:
996 continue
997 dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
998 if len(dup_section) < len(orig_section):
999 return False
1000 if sections_match(orig_section, dup_section):
1001 return True
1003 return False
1005def sections_match(orig_section, dup_section):
1006 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
1007 """
1008 Returns True if the given lists of instructions have matching linenos and opnames.
1009 """
1010 return all(
1011 (
1012 orig_inst.lineno == dup_inst.lineno
1013 # POP_BLOCKs have been found to have differing linenos in innocent cases
1014 or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
1015 )
1016 and opnames_match(orig_inst, dup_inst)
1017 for orig_inst, dup_inst in zip(orig_section, dup_section)
1018 )
1021def opnames_match(inst1, inst2):
1022 # type: (Instruction, Instruction) -> bool
1023 return (
1024 inst1.opname == inst2.opname
1025 or "JUMP" in inst1.opname
1026 and "JUMP" in inst2.opname
1027 or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
1028 or (
1029 inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
1030 and inst2.opname == "LOAD_ATTR"
1031 )
1032 or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
1033 )
1036def get_setter(node):
1037 # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]]
1038 parent = node.parent
1039 for name, field in ast.iter_fields(parent):
1040 if field is node:
1041 def setter(new_node):
1042 # type: (ast.AST) -> None
1043 return setattr(parent, name, new_node)
1044 return setter
1045 elif isinstance(field, list):
1046 for i, item in enumerate(field):
1047 if item is node:
1048 def setter(new_node):
1049 # type: (ast.AST) -> None
1050 field[i] = new_node
1052 return setter
1053 return None
1055lock = RLock()
1058@cache
1059def statement_containing_node(node):
1060 # type: (ast.AST) -> EnhancedAST
1061 while not isinstance(node, ast.stmt):
1062 node = cast(EnhancedAST, node).parent
1063 return cast(EnhancedAST, node)
1066def assert_linenos(tree):
1067 # type: (ast.AST) -> Iterator[int]
1068 for node in ast.walk(tree):
1069 if (
1070 hasattr(node, 'parent') and
1071 isinstance(statement_containing_node(node), ast.Assert)
1072 ):
1073 for lineno in node_linenos(node):
1074 yield lineno
1077def _extract_ipython_statement(stmt):
1078 # type: (EnhancedAST) -> ast.Module
1079 # IPython separates each statement in a cell to be executed separately
1080 # So NodeFinder should only compile one statement at a time or it
1081 # will find a code mismatch.
1082 while not isinstance(stmt.parent, ast.Module):
1083 stmt = stmt.parent
1084 # use `ast.parse` instead of `ast.Module` for better portability
1085 # python3.8 changes the signature of `ast.Module`
1086 # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
1087 tree = ast.parse("")
1088 tree.body = [cast(ast.stmt, stmt)]
1089 ast.copy_location(tree, stmt)
1090 return tree
1093def is_ipython_cell_code_name(code_name):
1094 # type: (str) -> bool
1095 return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
1098def is_ipython_cell_filename(filename):
1099 # type: (str) -> bool
1100 return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
1103def is_ipython_cell_code(code_obj):
1104 # type: (types.CodeType) -> bool
1105 return (
1106 is_ipython_cell_filename(code_obj.co_filename) and
1107 is_ipython_cell_code_name(code_obj.co_name)
1108 )
1111def find_node_ipython(frame, lasti, stmts, source):
1112 # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]]
1113 node = decorator = None
1114 for stmt in stmts:
1115 tree = _extract_ipython_statement(stmt)
1116 try:
1117 node_finder = NodeFinder(frame, stmts, tree, lasti, source)
1118 if (node or decorator) and (node_finder.result or node_finder.decorator):
1119 # Found potential nodes in separate statements,
1120 # cannot resolve ambiguity, give up here
1121 return None, None
1123 node = node_finder.result
1124 decorator = node_finder.decorator
1125 except Exception:
1126 pass
1127 return decorator, node
1130def attr_names_match(attr, argval):
1131 # type: (str, str) -> bool
1132 """
1133 Checks that the user-visible attr (from ast) can correspond to
1134 the argval in the bytecode, i.e. the real attribute fetched internally,
1135 which may be mangled for private attributes.
1136 """
1137 if attr == argval:
1138 return True
1139 if not attr.startswith("__"):
1140 return False
1141 return bool(re.match(r"^_\w+%s$" % attr, argval))
1144def node_linenos(node):
1145 # type: (ast.AST) -> Iterator[int]
1146 if hasattr(node, "lineno"):
1147 linenos = [] # type: Sequence[int]
1148 if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
1149 assert node.end_lineno is not None # type: ignore[attr-defined]
1150 linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
1151 else:
1152 linenos = [node.lineno] # type: ignore[attr-defined]
1153 for lineno in linenos:
1154 yield lineno
1157if sys.version_info >= (3, 11):
1158 from ._position_node_finder import PositionNodeFinder as NodeFinder
1159else:
1160 NodeFinder = SentinelNodeFinder