Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/stack_data/utils.py: 26%
109 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:07 +0000
1import ast
2import itertools
3import types
4from collections import OrderedDict, Counter, defaultdict
5from types import FrameType, TracebackType
6from typing import (
7 Iterator, List, Tuple, Iterable, Callable, Union,
8 TypeVar, Mapping,
9)
11from asttokens import ASTText
13T = TypeVar('T')
14R = TypeVar('R')
17def truncate(seq, max_length: int, middle):
18 if len(seq) > max_length:
19 right = (max_length - len(middle)) // 2
20 left = max_length - len(middle) - right
21 seq = seq[:left] + middle + seq[-right:]
22 return seq
25def unique_in_order(it: Iterable[T]) -> List[T]:
26 return list(OrderedDict.fromkeys(it))
29def line_range(atok: ASTText, node: ast.AST) -> Tuple[int, int]:
30 """
31 Returns a pair of numbers representing a half open range
32 (i.e. suitable as arguments to the `range()` builtin)
33 of line numbers of the given AST nodes.
34 """
35 if isinstance(node, getattr(ast, "match_case", ())):
36 start, _end = line_range(atok, node.pattern)
37 _start, end = line_range(atok, node.body[-1])
38 return start, end
39 else:
40 (start, _), (end, _) = atok.get_text_positions(node, padded=False)
41 return start, end + 1
44def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]:
45 counts = Counter(lst)
47 for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3):
48 if is_common:
49 group = list(group)
50 highlighted = [False] * len(group)
52 def highlight_index(f):
53 try:
54 i = f()
55 except ValueError:
56 return None
57 highlighted[i] = True
58 return i
60 for item in set(group):
61 first = highlight_index(lambda: group.index(item))
62 if first is not None:
63 highlight_index(lambda: group.index(item, first + 1))
64 highlight_index(lambda: -1 - group[::-1].index(item))
65 else:
66 highlighted = itertools.repeat(True)
68 yield from zip(group, highlighted)
71def identity(x: T) -> T:
72 return x
75def collapse_repeated(lst, *, collapser, mapper=identity, key=identity):
76 keyed = list(map(key, lst))
77 for is_highlighted, group in itertools.groupby(
78 zip(lst, highlight_unique(keyed)),
79 key=lambda t: t[1][1],
80 ):
81 original_group, highlighted_group = zip(*group)
82 if is_highlighted:
83 yield from map(mapper, original_group)
84 else:
85 keyed_group, _ = zip(*highlighted_group)
86 yield collapser(list(original_group), list(keyed_group))
89def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool:
90 assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType)))
91 return isinstance(frame_or_tb, (types.FrameType,))
94def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]:
95 while frame_or_tb:
96 yield frame_or_tb
97 if is_frame(frame_or_tb):
98 frame_or_tb = frame_or_tb.f_back
99 else:
100 frame_or_tb = frame_or_tb.tb_next
103def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]:
104 if is_frame(frame_or_tb):
105 return frame_or_tb, frame_or_tb.f_lineno
106 else:
107 return frame_or_tb.tb_frame, frame_or_tb.tb_lineno
110def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]:
111 # noinspection PyUnresolvedReferences
112 """
113 Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements
114 of the iterable and the values are lists of elements all of which correspond to the key.
116 >>> def si(d): return sorted(d.items())
117 >>> si(group_by_key_func("a bb ccc d ee fff".split(), len))
118 [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])]
119 >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2))
120 [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])]
121 """
122 result = defaultdict(list)
123 for item in iterable:
124 result[key_func(item)].append(item)
125 return result
128class cached_property(object):
129 """
130 A property that is only computed once per instance and then replaces itself
131 with an ordinary attribute. Deleting the attribute resets the property.
133 Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py
134 """
136 def __init__(self, func):
137 self.__doc__ = func.__doc__
138 self.func = func
140 def cached_property_wrapper(self, obj, _cls):
141 if obj is None:
142 return self
144 value = obj.__dict__[self.func.__name__] = self.func(obj)
145 return value
147 __get__ = cached_property_wrapper
150def _pygmented_with_ranges(formatter, code, ranges):
151 import pygments
152 from pygments.lexers import get_lexer_by_name
154 class MyLexer(type(get_lexer_by_name("python3"))):
155 def get_tokens(self, text):
156 length = 0
157 for ttype, value in super().get_tokens(text):
158 if any(start <= length < end for start, end in ranges):
159 ttype = ttype.ExecutingNode
160 length += len(value)
161 yield ttype, value
163 lexer = MyLexer(stripnl=False)
164 try:
165 highlighted = pygments.highlight(code, lexer, formatter)
166 except Exception:
167 # When pygments fails, prefer code without highlighting over crashing
168 highlighted = code
169 return highlighted.splitlines()
172def assert_(condition, error=""):
173 if not condition:
174 if isinstance(error, str):
175 error = AssertionError(error)
176 raise error
179# Copied from the standard traceback module pre-3.11
180def some_str(value):
181 try:
182 return str(value)
183 except:
184 return '<unprintable %s object>' % type(value).__name__