Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/executing/executing.py: 19%
591 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1"""
2MIT License
4Copyright (c) 2021 Alex Hall
6Permission is hereby granted, free of charge, to any person obtaining a copy
7of this software and associated documentation files (the "Software"), to deal
8in the Software without restriction, including without limitation the rights
9to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10copies of the Software, and to permit persons to whom the Software is
11furnished to do so, subject to the following conditions:
13The above copyright notice and this permission notice shall be included in all
14copies or substantial portions of the Software.
16THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22SOFTWARE.
23"""
25import __future__
26import ast
27import dis
28import functools
29import inspect
30import io
31import linecache
32import re
33import sys
34import types
35from collections import defaultdict, namedtuple
36from copy import deepcopy
37from itertools import islice
38from operator import attrgetter
39from threading import RLock
40from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast
42if TYPE_CHECKING: # pragma: no cover
43 from asttokens import ASTTokens, ASTText
44 from asttokens.asttokens import ASTTextBase
46function_node_types = (ast.FunctionDef,) # type: Tuple[Type, ...]
47if sys.version_info[0] == 3:
48 function_node_types += (ast.AsyncFunctionDef,)
51if sys.version_info[0] == 3:
52 # noinspection PyUnresolvedReferences
53 from functools import lru_cache
54 # noinspection PyUnresolvedReferences
55 from tokenize import detect_encoding
56 from itertools import zip_longest
57 # noinspection PyUnresolvedReferences,PyCompatibility
58 from pathlib import Path
60 cache = lru_cache(maxsize=None)
61 text_type = str
62else:
63 from lib2to3.pgen2.tokenize import detect_encoding, cookie_re as encoding_pattern # type: ignore[attr-defined]
64 from itertools import izip_longest as zip_longest
66 class Path(object):
67 pass
70 def cache(func):
71 # type: (Callable) -> Callable
72 d = {} # type: Dict[Tuple, Callable]
74 @functools.wraps(func)
75 def wrapper(*args):
76 # type: (Any) -> Any
77 if args in d:
78 return d[args]
79 result = d[args] = func(*args)
80 return result
82 return wrapper
85 # noinspection PyUnresolvedReferences
86 text_type = unicode
88# Type class used to expand out the definition of AST to include fields added by this library
89# It's not actually used for anything other than type checking though!
90class EnhancedAST(ast.AST):
91 parent = None # type: EnhancedAST
93if sys.version_info >= (3, 4):
94 # noinspection PyUnresolvedReferences
95 _get_instructions = dis.get_instructions
96 from dis import Instruction as _Instruction
98 class Instruction(_Instruction):
99 lineno = None # type: int
100else:
101 class Instruction(namedtuple('Instruction', 'offset argval opname starts_line')):
102 lineno = None # type: int
104 from dis import HAVE_ARGUMENT, EXTENDED_ARG, hasconst, opname, findlinestarts, hasname
106 # Based on dis.disassemble from 2.7
107 # Left as similar as possible for easy diff
109 def _get_instructions(co):
110 # type: (types.CodeType) -> Iterator[Instruction]
111 code = co.co_code
112 linestarts = dict(findlinestarts(co))
113 n = len(code)
114 i = 0
115 extended_arg = 0
116 while i < n:
117 offset = i
118 c = code[i]
119 op = ord(c)
120 lineno = linestarts.get(i)
121 argval = None
122 i = i + 1
123 if op >= HAVE_ARGUMENT:
124 oparg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg
125 extended_arg = 0
126 i = i + 2
127 if op == EXTENDED_ARG:
128 extended_arg = oparg * 65536
130 if op in hasconst:
131 argval = co.co_consts[oparg]
132 elif op in hasname:
133 argval = co.co_names[oparg]
134 elif opname[op] == 'LOAD_FAST':
135 argval = co.co_varnames[oparg]
136 yield Instruction(offset, argval, opname[op], lineno)
139# Type class used to expand out the definition of AST to include fields added by this library
140# It's not actually used for anything other than type checking though!
141class EnhancedInstruction(Instruction):
142 _copied = None # type: bool
146def assert_(condition, message=""):
147 # type: (Any, str) -> None
148 """
149 Like an assert statement, but unaffected by -O
150 :param condition: value that is expected to be truthy
151 :type message: Any
152 """
153 if not condition:
154 raise AssertionError(str(message))
157def get_instructions(co):
158 # type: (types.CodeType) -> Iterator[EnhancedInstruction]
159 lineno = co.co_firstlineno
160 for inst in _get_instructions(co):
161 inst = cast(EnhancedInstruction, inst)
162 lineno = inst.starts_line or lineno
163 assert_(lineno)
164 inst.lineno = lineno
165 yield inst
168TESTING = 0
171class NotOneValueFound(Exception):
172 def __init__(self,msg,values=[]):
173 # type: (str, Sequence) -> None
174 self.values=values
175 super(NotOneValueFound,self).__init__(msg)
177T = TypeVar('T')
180def only(it):
181 # type: (Iterable[T]) -> T
182 if isinstance(it, Sized):
183 if len(it) != 1:
184 raise NotOneValueFound('Expected one value, found %s' % len(it))
185 # noinspection PyTypeChecker
186 return list(it)[0]
188 lst = tuple(islice(it, 2))
189 if len(lst) == 0:
190 raise NotOneValueFound('Expected one value, found 0')
191 if len(lst) > 1:
192 raise NotOneValueFound('Expected one value, found several',lst)
193 return lst[0]
196class Source(object):
197 """
198 The source code of a single file and associated metadata.
200 The main method of interest is the classmethod `executing(frame)`.
202 If you want an instance of this class, don't construct it.
203 Ideally use the classmethod `for_frame(frame)`.
204 If you don't have a frame, use `for_filename(filename [, module_globals])`.
205 These methods cache instances by filename, so at most one instance exists per filename.
207 Attributes:
208 - filename
209 - text
210 - lines
211 - tree: AST parsed from text, or None if text is not valid Python
212 All nodes in the tree have an extra `parent` attribute
214 Other methods of interest:
215 - statements_at_line
216 - asttokens
217 - code_qualname
218 """
220 def __init__(self, filename, lines):
221 # type: (str, Sequence[str]) -> None
222 """
223 Don't call this constructor, see the class docstring.
224 """
226 self.filename = filename
227 text = ''.join(lines)
229 if not isinstance(text, text_type):
230 encoding = self.detect_encoding(text)
231 # noinspection PyUnresolvedReferences
232 text = text.decode(encoding)
233 lines = [line.decode(encoding) for line in lines]
235 self.text = text
236 self.lines = [line.rstrip('\r\n') for line in lines]
238 if sys.version_info[0] == 3:
239 ast_text = text
240 else:
241 # In python 2 it's a syntax error to parse unicode
242 # with an encoding declaration, so we remove it but
243 # leave empty lines in its place to keep line numbers the same
244 ast_text = ''.join([
245 '\n' if i < 2 and encoding_pattern.match(line)
246 else line
247 for i, line in enumerate(lines)
248 ])
250 self._nodes_by_line = defaultdict(list)
251 self.tree = None
252 self._qualnames = {}
253 self._asttokens = None # type: Optional[ASTTokens]
254 self._asttext = None # type: Optional[ASTText]
256 try:
257 self.tree = ast.parse(ast_text, filename=filename)
258 except (SyntaxError, ValueError):
259 pass
260 else:
261 for node in ast.walk(self.tree):
262 for child in ast.iter_child_nodes(node):
263 cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
264 for lineno in node_linenos(node):
265 self._nodes_by_line[lineno].append(node)
267 visitor = QualnameVisitor()
268 visitor.visit(self.tree)
269 self._qualnames = visitor.qualnames
271 @classmethod
272 def for_frame(cls, frame, use_cache=True):
273 # type: (types.FrameType, bool) -> "Source"
274 """
275 Returns the `Source` object corresponding to the file the frame is executing in.
276 """
277 return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
279 @classmethod
280 def for_filename(
281 cls,
282 filename,
283 module_globals=None,
284 use_cache=True, # noqa no longer used
285 ):
286 # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source"
287 if isinstance(filename, Path):
288 filename = str(filename)
290 def get_lines():
291 # type: () -> List[str]
292 return linecache.getlines(cast(text_type, filename), module_globals)
294 # Save the current linecache entry, then ensure the cache is up to date.
295 entry = linecache.cache.get(filename) # type: ignore[attr-defined]
296 linecache.checkcache(filename)
297 lines = get_lines()
298 if entry is not None and not lines:
299 # There was an entry, checkcache removed it, and nothing replaced it.
300 # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
301 # but rather the file was found not to exist, probably because `filename` was fake.
302 # Restore the original entry so that we still have something.
303 linecache.cache[filename] = entry # type: ignore[attr-defined]
304 lines = get_lines()
306 return cls._for_filename_and_lines(filename, tuple(lines))
308 @classmethod
309 def _for_filename_and_lines(cls, filename, lines):
310 # type: (str, Sequence[str]) -> "Source"
311 source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source]
312 try:
313 return source_cache[(filename, lines)]
314 except KeyError:
315 pass
317 result = source_cache[(filename, lines)] = cls(filename, lines)
318 return result
320 @classmethod
321 def lazycache(cls, frame):
322 # type: (types.FrameType) -> None
323 if sys.version_info >= (3, 5):
324 linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
326 @classmethod
327 def executing(cls, frame_or_tb):
328 # type: (Union[types.TracebackType, types.FrameType]) -> "Executing"
329 """
330 Returns an `Executing` object representing the operation
331 currently executing in the given frame or traceback object.
332 """
333 if isinstance(frame_or_tb, types.TracebackType):
334 # https://docs.python.org/3/reference/datamodel.html#traceback-objects
335 # "tb_lineno gives the line number where the exception occurred;
336 # tb_lasti indicates the precise instruction.
337 # The line number and last instruction in the traceback may differ
338 # from the line number of its frame object
339 # if the exception occurred in a try statement with no matching except clause
340 # or with a finally clause."
341 tb = frame_or_tb
342 frame = tb.tb_frame
343 lineno = tb.tb_lineno
344 lasti = tb.tb_lasti
345 else:
346 frame = frame_or_tb
347 lineno = frame.f_lineno
348 lasti = frame.f_lasti
352 code = frame.f_code
353 key = (code, id(code), lasti)
354 executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any]
356 args = executing_cache.get(key)
357 if not args:
358 node = stmts = decorator = None
359 source = cls.for_frame(frame)
360 tree = source.tree
361 if tree:
362 try:
363 stmts = source.statements_at_line(lineno)
364 if stmts:
365 if is_ipython_cell_code(code):
366 decorator, node = find_node_ipython(frame, lasti, stmts, source)
367 else:
368 node_finder = NodeFinder(frame, stmts, tree, lasti, source)
369 node = node_finder.result
370 decorator = node_finder.decorator
371 except Exception:
372 if TESTING:
373 raise
375 assert stmts is not None
376 if node:
377 new_stmts = {statement_containing_node(node)}
378 assert_(new_stmts <= stmts)
379 stmts = new_stmts
381 executing_cache[key] = args = source, node, stmts, decorator
383 return Executing(frame, *args)
385 @classmethod
386 def _class_local(cls, name, default):
387 # type: (str, T) -> T
388 """
389 Returns an attribute directly associated with this class
390 (as opposed to subclasses), setting default if necessary
391 """
392 # classes have a mappingproxy preventing us from using setdefault
393 result = cls.__dict__.get(name, default)
394 setattr(cls, name, result)
395 return result
397 @cache
398 def statements_at_line(self, lineno):
399 # type: (int) -> Set[EnhancedAST]
400 """
401 Returns the statement nodes overlapping the given line.
403 Returns at most one statement unless semicolons are present.
405 If the `text` attribute is not valid python, meaning
406 `tree` is None, returns an empty set.
408 Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
409 should return at least one statement.
410 """
412 return {
413 statement_containing_node(node)
414 for node in
415 self._nodes_by_line[lineno]
416 }
418 def asttext(self):
419 # type: () -> ASTText
420 """
421 Returns an ASTText object for getting the source of specific AST nodes.
423 See http://asttokens.readthedocs.io/en/latest/api-index.html
424 """
425 from asttokens import ASTText # must be installed separately
427 if self._asttext is None:
428 self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
430 return self._asttext
432 def asttokens(self):
433 # type: () -> ASTTokens
434 """
435 Returns an ASTTokens object for getting the source of specific AST nodes.
437 See http://asttokens.readthedocs.io/en/latest/api-index.html
438 """
439 import asttokens # must be installed separately
441 if self._asttokens is None:
442 if hasattr(asttokens, 'ASTText'):
443 self._asttokens = self.asttext().asttokens
444 else: # pragma: no cover
445 self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
446 return self._asttokens
448 def _asttext_base(self):
449 # type: () -> ASTTextBase
450 import asttokens # must be installed separately
452 if hasattr(asttokens, 'ASTText'):
453 return self.asttext()
454 else: # pragma: no cover
455 return self.asttokens()
457 @staticmethod
458 def decode_source(source):
459 # type: (Union[str, bytes]) -> text_type
460 if isinstance(source, bytes):
461 encoding = Source.detect_encoding(source)
462 return source.decode(encoding)
463 else:
464 return source
466 @staticmethod
467 def detect_encoding(source):
468 # type: (bytes) -> str
469 return detect_encoding(io.BytesIO(source).readline)[0]
471 def code_qualname(self, code):
472 # type: (types.CodeType) -> str
473 """
474 Imitates the __qualname__ attribute of functions for code objects.
475 Given:
477 - A function `func`
478 - A frame `frame` for an execution of `func`, meaning:
479 `frame.f_code is func.__code__`
481 `Source.for_frame(frame).code_qualname(frame.f_code)`
482 will be equal to `func.__qualname__`*. Works for Python 2 as well,
483 where of course no `__qualname__` attribute exists.
485 Falls back to `code.co_name` if there is no appropriate qualname.
487 Based on https://github.com/wbolster/qualname
489 (* unless `func` is a lambda
490 nested inside another lambda on the same line, in which case
491 the outer lambda's qualname will be returned for the codes
492 of both lambdas)
493 """
494 assert_(code.co_filename == self.filename)
495 return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
498class Executing(object):
499 """
500 Information about the operation a frame is currently executing.
502 Generally you will just want `node`, which is the AST node being executed,
503 or None if it's unknown.
505 If a decorator is currently being called, then:
506 - `node` is a function or class definition
507 - `decorator` is the expression in `node.decorator_list` being called
508 - `statements == {node}`
509 """
511 def __init__(self, frame, source, node, stmts, decorator):
512 # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None
513 self.frame = frame
514 self.source = source
515 self.node = node
516 self.statements = stmts
517 self.decorator = decorator
519 def code_qualname(self):
520 # type: () -> str
521 return self.source.code_qualname(self.frame.f_code)
523 def text(self):
524 # type: () -> str
525 return self.source._asttext_base().get_text(self.node)
527 def text_range(self):
528 # type: () -> Tuple[int, int]
529 return self.source._asttext_base().get_text_range(self.node)
532class QualnameVisitor(ast.NodeVisitor):
533 def __init__(self):
534 # type: () -> None
535 super(QualnameVisitor, self).__init__()
536 self.stack = [] # type: List[str]
537 self.qualnames = {} # type: Dict[Tuple[str, int], str]
539 def add_qualname(self, node, name=None):
540 # type: (ast.AST, Optional[str]) -> None
541 name = name or node.name # type: ignore[attr-defined]
542 self.stack.append(name)
543 if getattr(node, 'decorator_list', ()):
544 lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
545 else:
546 lineno = node.lineno # type: ignore[attr-defined]
547 self.qualnames.setdefault((name, lineno), ".".join(self.stack))
549 def visit_FunctionDef(self, node, name=None):
550 # type: (ast.AST, Optional[str]) -> None
551 if sys.version_info[0] == 3:
552 assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
553 else:
554 assert isinstance(node, (ast.FunctionDef, ast.Lambda)), node
555 self.add_qualname(node, name)
556 self.stack.append('<locals>')
557 children = [] # type: Sequence[ast.AST]
558 if isinstance(node, ast.Lambda):
559 children = [node.body]
560 else:
561 children = node.body
562 for child in children:
563 self.visit(child)
564 self.stack.pop()
565 self.stack.pop()
567 # Find lambdas in the function definition outside the body,
568 # e.g. decorators or default arguments
569 # Based on iter_child_nodes
570 for field, child in ast.iter_fields(node):
571 if field == 'body':
572 continue
573 if isinstance(child, ast.AST):
574 self.visit(child)
575 elif isinstance(child, list):
576 for grandchild in child:
577 if isinstance(grandchild, ast.AST):
578 self.visit(grandchild)
580 visit_AsyncFunctionDef = visit_FunctionDef
582 def visit_Lambda(self, node):
583 # type: (ast.AST) -> None
584 assert isinstance(node, ast.Lambda)
585 self.visit_FunctionDef(node, '<lambda>')
587 def visit_ClassDef(self, node):
588 # type: (ast.AST) -> None
589 assert isinstance(node, ast.ClassDef)
590 self.add_qualname(node)
591 self.generic_visit(node)
592 self.stack.pop()
598future_flags = sum(
599 getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
600)
603def compile_similar_to(source, matching_code):
604 # type: (ast.Module, types.CodeType) -> Any
605 return compile(
606 source,
607 matching_code.co_filename,
608 'exec',
609 flags=future_flags & matching_code.co_flags,
610 dont_inherit=True,
611 )
614sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
616def is_rewritten_by_pytest(code):
617 # type: (types.CodeType) -> bool
618 return any(
619 bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
620 for bc in get_instructions(code)
621 )
624class SentinelNodeFinder(object):
625 result = None # type: EnhancedAST
627 def __init__(self, frame, stmts, tree, lasti, source):
628 # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None
629 assert_(stmts)
630 self.frame = frame
631 self.tree = tree
632 self.code = code = frame.f_code
633 self.is_pytest = is_rewritten_by_pytest(code)
635 if self.is_pytest:
636 self.ignore_linenos = frozenset(assert_linenos(tree))
637 else:
638 self.ignore_linenos = frozenset()
640 self.decorator = None
642 self.instruction = instruction = self.get_actual_current_instruction(lasti)
643 op_name = instruction.opname
644 extra_filter = lambda e: True # type: Callable[[Any], bool]
645 ctx = type(None) # type: Type
647 typ = type(None) # type: Type
648 if op_name.startswith('CALL_'):
649 typ = ast.Call
650 elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
651 typ = ast.Subscript
652 ctx = ast.Load
653 elif op_name.startswith('BINARY_'):
654 typ = ast.BinOp
655 op_type = dict(
656 BINARY_POWER=ast.Pow,
657 BINARY_MULTIPLY=ast.Mult,
658 BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
659 BINARY_FLOOR_DIVIDE=ast.FloorDiv,
660 BINARY_TRUE_DIVIDE=ast.Div,
661 BINARY_MODULO=ast.Mod,
662 BINARY_ADD=ast.Add,
663 BINARY_SUBTRACT=ast.Sub,
664 BINARY_LSHIFT=ast.LShift,
665 BINARY_RSHIFT=ast.RShift,
666 BINARY_AND=ast.BitAnd,
667 BINARY_XOR=ast.BitXor,
668 BINARY_OR=ast.BitOr,
669 )[op_name]
670 extra_filter = lambda e: isinstance(e.op, op_type)
671 elif op_name.startswith('UNARY_'):
672 typ = ast.UnaryOp
673 op_type = dict(
674 UNARY_POSITIVE=ast.UAdd,
675 UNARY_NEGATIVE=ast.USub,
676 UNARY_NOT=ast.Not,
677 UNARY_INVERT=ast.Invert,
678 )[op_name]
679 extra_filter = lambda e: isinstance(e.op, op_type)
680 elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
681 typ = ast.Attribute
682 ctx = ast.Load
683 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
684 elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
685 typ = ast.Name
686 ctx = ast.Load
687 if sys.version_info[0] == 3 or instruction.argval:
688 extra_filter = lambda e: e.id == instruction.argval
689 elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
690 typ = ast.Compare
691 extra_filter = lambda e: len(e.ops) == 1
692 elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
693 ctx = ast.Store
694 typ = ast.Subscript
695 elif op_name.startswith('STORE_ATTR'):
696 ctx = ast.Store
697 typ = ast.Attribute
698 extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
699 else:
700 raise RuntimeError(op_name)
702 with lock:
703 exprs = {
704 cast(EnhancedAST, node)
705 for stmt in stmts
706 for node in ast.walk(stmt)
707 if isinstance(node, typ)
708 if isinstance(getattr(node, "ctx", None), ctx)
709 if extra_filter(node)
710 if statement_containing_node(node) == stmt
711 }
713 if ctx == ast.Store:
714 # No special bytecode tricks here.
715 # We can handle multiple assigned attributes with different names,
716 # but only one assigned subscript.
717 self.result = only(exprs)
718 return
720 matching = list(self.matching_nodes(exprs))
721 if not matching and typ == ast.Call:
722 self.find_decorator(stmts)
723 else:
724 self.result = only(matching)
726 def find_decorator(self, stmts):
727 # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None
728 stmt = only(stmts)
729 assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
730 decorators = stmt.decorator_list # type: ignore[attr-defined]
731 assert_(decorators)
732 line_instructions = [
733 inst
734 for inst in self.clean_instructions(self.code)
735 if inst.lineno == self.frame.f_lineno
736 ]
737 last_decorator_instruction_index = [
738 i
739 for i, inst in enumerate(line_instructions)
740 if inst.opname == "CALL_FUNCTION"
741 ][-1]
742 assert_(
743 line_instructions[last_decorator_instruction_index + 1].opname.startswith(
744 "STORE_"
745 )
746 )
747 decorator_instructions = line_instructions[
748 last_decorator_instruction_index
749 - len(decorators)
750 + 1 : last_decorator_instruction_index
751 + 1
752 ]
753 assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
754 decorator_index = decorator_instructions.index(self.instruction)
755 decorator = decorators[::-1][decorator_index]
756 self.decorator = decorator
757 self.result = stmt
759 def clean_instructions(self, code):
760 # type: (types.CodeType) -> List[EnhancedInstruction]
761 return [
762 inst
763 for inst in get_instructions(code)
764 if inst.opname not in ("EXTENDED_ARG", "NOP")
765 if inst.lineno not in self.ignore_linenos
766 ]
768 def get_original_clean_instructions(self):
769 # type: () -> List[EnhancedInstruction]
770 result = self.clean_instructions(self.code)
772 # pypy sometimes (when is not clear)
773 # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
774 # If they're not present in our compiled instructions,
775 # ignore them in the original bytecode
776 if not any(
777 inst.opname == "JUMP_IF_NOT_DEBUG"
778 for inst in self.compile_instructions()
779 ):
780 result = [
781 inst for inst in result
782 if inst.opname != "JUMP_IF_NOT_DEBUG"
783 ]
785 return result
787 def matching_nodes(self, exprs):
788 # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST]
789 original_instructions = self.get_original_clean_instructions()
790 original_index = only(
791 i
792 for i, inst in enumerate(original_instructions)
793 if inst == self.instruction
794 )
795 for expr_index, expr in enumerate(exprs):
796 setter = get_setter(expr)
797 assert setter is not None
798 # noinspection PyArgumentList
799 replacement = ast.BinOp(
800 left=expr,
801 op=ast.Pow(),
802 right=ast.Str(s=sentinel),
803 )
804 ast.fix_missing_locations(replacement)
805 setter(replacement)
806 try:
807 instructions = self.compile_instructions()
808 finally:
809 setter(expr)
811 if sys.version_info >= (3, 10):
812 try:
813 handle_jumps(instructions, original_instructions)
814 except Exception:
815 # Give other candidates a chance
816 if TESTING or expr_index < len(exprs) - 1:
817 continue
818 raise
820 indices = [
821 i
822 for i, instruction in enumerate(instructions)
823 if instruction.argval == sentinel
824 ]
826 # There can be several indices when the bytecode is duplicated,
827 # as happens in a finally block in 3.9+
828 # First we remove the opcodes caused by our modifications
829 for index_num, sentinel_index in enumerate(indices):
830 # Adjustment for removing sentinel instructions below
831 # in past iterations
832 sentinel_index -= index_num * 2
834 assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
835 assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
837 # Then we see if any of the instruction indices match
838 for index_num, sentinel_index in enumerate(indices):
839 sentinel_index -= index_num * 2
840 new_index = sentinel_index - 1
842 if new_index != original_index:
843 continue
845 original_inst = original_instructions[original_index]
846 new_inst = instructions[new_index]
848 # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
849 # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
850 if (
851 original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
852 and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
853 and (
854 original_instructions[original_index + 1].opname
855 != instructions[new_index + 1].opname == 'UNARY_NOT'
856 )):
857 # Remove the difference for the upcoming assert
858 instructions.pop(new_index + 1)
860 # Check that the modified instructions don't have anything unexpected
861 # 3.10 is a bit too weird to assert this in all cases but things still work
862 if sys.version_info < (3, 10):
863 for inst1, inst2 in zip_longest(
864 original_instructions, instructions
865 ):
866 assert_(inst1 and inst2 and opnames_match(inst1, inst2))
868 yield expr
870 def compile_instructions(self):
871 # type: () -> List[EnhancedInstruction]
872 module_code = compile_similar_to(self.tree, self.code)
873 code = only(self.find_codes(module_code))
874 return self.clean_instructions(code)
876 def find_codes(self, root_code):
877 # type: (types.CodeType) -> list
878 checks = [
879 attrgetter('co_firstlineno'),
880 attrgetter('co_freevars'),
881 attrgetter('co_cellvars'),
882 lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
883 ] # type: List[Callable]
884 if not self.is_pytest:
885 checks += [
886 attrgetter('co_names'),
887 attrgetter('co_varnames'),
888 ]
890 def matches(c):
891 # type: (types.CodeType) -> bool
892 return all(
893 f(c) == f(self.code)
894 for f in checks
895 )
897 code_options = []
898 if matches(root_code):
899 code_options.append(root_code)
901 def finder(code):
902 # type: (types.CodeType) -> None
903 for const in code.co_consts:
904 if not inspect.iscode(const):
905 continue
907 if matches(const):
908 code_options.append(const)
909 finder(const)
911 finder(root_code)
912 return code_options
914 def get_actual_current_instruction(self, lasti):
915 # type: (int) -> EnhancedInstruction
916 """
917 Get the instruction corresponding to the current
918 frame offset, skipping EXTENDED_ARG instructions
919 """
920 # Don't use get_original_clean_instructions
921 # because we need the actual instructions including
922 # EXTENDED_ARG
923 instructions = list(get_instructions(self.code))
924 index = only(
925 i
926 for i, inst in enumerate(instructions)
927 if inst.offset == lasti
928 )
930 while True:
931 instruction = instructions[index]
932 if instruction.opname != "EXTENDED_ARG":
933 return instruction
934 index += 1
938def non_sentinel_instructions(instructions, start):
939 # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]]
940 """
941 Yields (index, instruction) pairs excluding the basic
942 instructions introduced by the sentinel transformation
943 """
944 skip_power = False
945 for i, inst in islice(enumerate(instructions), start, None):
946 if inst.argval == sentinel:
947 assert_(inst.opname == "LOAD_CONST")
948 skip_power = True
949 continue
950 elif skip_power:
951 assert_(inst.opname == "BINARY_POWER")
952 skip_power = False
953 continue
954 yield i, inst
957def walk_both_instructions(original_instructions, original_start, instructions, start):
958 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]
959 """
960 Yields matching indices and instructions from the new and original instructions,
961 leaving out changes made by the sentinel transformation.
962 """
963 original_iter = islice(enumerate(original_instructions), original_start, None)
964 new_iter = non_sentinel_instructions(instructions, start)
965 inverted_comparison = False
966 while True:
967 try:
968 original_i, original_inst = next(original_iter)
969 new_i, new_inst = next(new_iter)
970 except StopIteration:
971 return
972 if (
973 inverted_comparison
974 and original_inst.opname != new_inst.opname == "UNARY_NOT"
975 ):
976 new_i, new_inst = next(new_iter)
977 inverted_comparison = (
978 original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
979 and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
980 )
981 yield original_i, original_inst, new_i, new_inst
984def handle_jumps(instructions, original_instructions):
985 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None
986 """
987 Transforms instructions in place until it looks more like original_instructions.
988 This is only needed in 3.10+ where optimisations lead to more drastic changes
989 after the sentinel transformation.
990 Replaces JUMP instructions that aren't also present in original_instructions
991 with the sections that they jump to until a raise or return.
992 In some other cases duplication found in `original_instructions`
993 is replicated in `instructions`.
994 """
995 while True:
996 for original_i, original_inst, new_i, new_inst in walk_both_instructions(
997 original_instructions, 0, instructions, 0
998 ):
999 if opnames_match(original_inst, new_inst):
1000 continue
1002 if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
1003 # Find where the new instruction is jumping to, ignoring
1004 # instructions which have been copied in previous iterations
1005 start = only(
1006 i
1007 for i, inst in enumerate(instructions)
1008 if inst.offset == new_inst.argval
1009 and not getattr(inst, "_copied", False)
1010 )
1011 # Replace the jump instruction with the jumped to section of instructions
1012 # That section may also be deleted if it's not similarly duplicated
1013 # in original_instructions
1014 new_instructions = handle_jump(
1015 original_instructions, original_i, instructions, start
1016 )
1017 assert new_instructions is not None
1018 instructions[new_i : new_i + 1] = new_instructions
1019 else:
1020 # Extract a section of original_instructions from original_i to return/raise
1021 orig_section = []
1022 for section_inst in original_instructions[original_i:]:
1023 orig_section.append(section_inst)
1024 if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
1025 break
1026 else:
1027 # No return/raise - this is just a mismatch we can't handle
1028 raise AssertionError
1030 instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
1032 # instructions has been modified, the for loop can't sensibly continue
1033 # Restart it from the beginning, checking for other issues
1034 break
1036 else: # No mismatched jumps found, we're done
1037 return
1040def find_new_matching(orig_section, instructions):
1041 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]
1042 """
1043 Yields sections of `instructions` which match `orig_section`.
1044 The yielded sections include sentinel instructions, but these
1045 are ignored when checking for matches.
1046 """
1047 for start in range(len(instructions) - len(orig_section)):
1048 indices, dup_section = zip(
1049 *islice(
1050 non_sentinel_instructions(instructions, start),
1051 len(orig_section),
1052 )
1053 )
1054 if len(dup_section) < len(orig_section):
1055 return
1056 if sections_match(orig_section, dup_section):
1057 yield instructions[start:indices[-1] + 1]
1060def handle_jump(original_instructions, original_start, instructions, start):
1061 # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]]
1062 """
1063 Returns the section of instructions starting at `start` and ending
1064 with a RETURN_VALUE or RAISE_VARARGS instruction.
1065 There should be a matching section in original_instructions starting at original_start.
1066 If that section doesn't appear elsewhere in original_instructions,
1067 then also delete the returned section of instructions.
1068 """
1069 for original_j, original_inst, new_j, new_inst in walk_both_instructions(
1070 original_instructions, original_start, instructions, start
1071 ):
1072 assert_(opnames_match(original_inst, new_inst))
1073 if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
1074 inlined = deepcopy(instructions[start : new_j + 1])
1075 for inl in inlined:
1076 inl._copied = True
1077 orig_section = original_instructions[original_start : original_j + 1]
1078 if not check_duplicates(
1079 original_start, orig_section, original_instructions
1080 ):
1081 instructions[start : new_j + 1] = []
1082 return inlined
1084 return None
1087def check_duplicates(original_i, orig_section, original_instructions):
1088 # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
1089 """
1090 Returns True if a section of original_instructions starting somewhere other
1091 than original_i and matching orig_section is found, i.e. orig_section is duplicated.
1092 """
1093 for dup_start in range(len(original_instructions)):
1094 if dup_start == original_i:
1095 continue
1096 dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
1097 if len(dup_section) < len(orig_section):
1098 return False
1099 if sections_match(orig_section, dup_section):
1100 return True
1102 return False
1104def sections_match(orig_section, dup_section):
1105 # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
1106 """
1107 Returns True if the given lists of instructions have matching linenos and opnames.
1108 """
1109 return all(
1110 (
1111 orig_inst.lineno == dup_inst.lineno
1112 # POP_BLOCKs have been found to have differing linenos in innocent cases
1113 or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
1114 )
1115 and opnames_match(orig_inst, dup_inst)
1116 for orig_inst, dup_inst in zip(orig_section, dup_section)
1117 )
1120def opnames_match(inst1, inst2):
1121 # type: (Instruction, Instruction) -> bool
1122 return (
1123 inst1.opname == inst2.opname
1124 or "JUMP" in inst1.opname
1125 and "JUMP" in inst2.opname
1126 or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
1127 or (
1128 inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
1129 and inst2.opname == "LOAD_ATTR"
1130 )
1131 or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
1132 )
1135def get_setter(node):
1136 # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]]
1137 parent = node.parent
1138 for name, field in ast.iter_fields(parent):
1139 if field is node:
1140 def setter(new_node):
1141 # type: (ast.AST) -> None
1142 return setattr(parent, name, new_node)
1143 return setter
1144 elif isinstance(field, list):
1145 for i, item in enumerate(field):
1146 if item is node:
1147 def setter(new_node):
1148 # type: (ast.AST) -> None
1149 field[i] = new_node
1151 return setter
1152 return None
1154lock = RLock()
1157@cache
1158def statement_containing_node(node):
1159 # type: (ast.AST) -> EnhancedAST
1160 while not isinstance(node, ast.stmt):
1161 node = cast(EnhancedAST, node).parent
1162 return cast(EnhancedAST, node)
1165def assert_linenos(tree):
1166 # type: (ast.AST) -> Iterator[int]
1167 for node in ast.walk(tree):
1168 if (
1169 hasattr(node, 'parent') and
1170 isinstance(statement_containing_node(node), ast.Assert)
1171 ):
1172 for lineno in node_linenos(node):
1173 yield lineno
1176def _extract_ipython_statement(stmt):
1177 # type: (EnhancedAST) -> ast.Module
1178 # IPython separates each statement in a cell to be executed separately
1179 # So NodeFinder should only compile one statement at a time or it
1180 # will find a code mismatch.
1181 while not isinstance(stmt.parent, ast.Module):
1182 stmt = stmt.parent
1183 # use `ast.parse` instead of `ast.Module` for better portability
1184 # python3.8 changes the signature of `ast.Module`
1185 # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
1186 tree = ast.parse("")
1187 tree.body = [cast(ast.stmt, stmt)]
1188 ast.copy_location(tree, stmt)
1189 return tree
1192def is_ipython_cell_code_name(code_name):
1193 # type: (str) -> bool
1194 return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
1197def is_ipython_cell_filename(filename):
1198 # type: (str) -> bool
1199 return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
1202def is_ipython_cell_code(code_obj):
1203 # type: (types.CodeType) -> bool
1204 return (
1205 is_ipython_cell_filename(code_obj.co_filename) and
1206 is_ipython_cell_code_name(code_obj.co_name)
1207 )
1210def find_node_ipython(frame, lasti, stmts, source):
1211 # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]]
1212 node = decorator = None
1213 for stmt in stmts:
1214 tree = _extract_ipython_statement(stmt)
1215 try:
1216 node_finder = NodeFinder(frame, stmts, tree, lasti, source)
1217 if (node or decorator) and (node_finder.result or node_finder.decorator):
1218 # Found potential nodes in separate statements,
1219 # cannot resolve ambiguity, give up here
1220 return None, None
1222 node = node_finder.result
1223 decorator = node_finder.decorator
1224 except Exception:
1225 pass
1226 return decorator, node
1229def attr_names_match(attr, argval):
1230 # type: (str, str) -> bool
1231 """
1232 Checks that the user-visible attr (from ast) can correspond to
1233 the argval in the bytecode, i.e. the real attribute fetched internally,
1234 which may be mangled for private attributes.
1235 """
1236 if attr == argval:
1237 return True
1238 if not attr.startswith("__"):
1239 return False
1240 return bool(re.match(r"^_\w+%s$" % attr, argval))
1243def node_linenos(node):
1244 # type: (ast.AST) -> Iterator[int]
1245 if hasattr(node, "lineno"):
1246 linenos = [] # type: Sequence[int]
1247 if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
1248 assert node.end_lineno is not None # type: ignore[attr-defined]
1249 linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
1250 else:
1251 linenos = [node.lineno] # type: ignore[attr-defined]
1252 for lineno in linenos:
1253 yield lineno
1256if sys.version_info >= (3, 11):
1257 from ._position_node_finder import PositionNodeFinder as NodeFinder
1258else:
1259 NodeFinder = SentinelNodeFinder