Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/stack_data/utils.py: 25%

110 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-20 06:09 +0000

1import ast 

2import itertools 

3import types 

4from collections import OrderedDict, Counter, defaultdict 

5from types import FrameType, TracebackType 

6from typing import ( 

7 Iterator, List, Tuple, Iterable, Callable, Union, 

8 TypeVar, Mapping, 

9) 

10 

11from asttokens import ASTText 

12 

13T = TypeVar('T') 

14R = TypeVar('R') 

15 

16 

17def truncate(seq, max_length: int, middle): 

18 if len(seq) > max_length: 

19 right = (max_length - len(middle)) // 2 

20 left = max_length - len(middle) - right 

21 seq = seq[:left] + middle + seq[-right:] 

22 return seq 

23 

24 

25def unique_in_order(it: Iterable[T]) -> List[T]: 

26 return list(OrderedDict.fromkeys(it)) 

27 

28 

29def line_range(atok: ASTText, node: ast.AST) -> Tuple[int, int]: 

30 """ 

31 Returns a pair of numbers representing a half open range 

32 (i.e. suitable as arguments to the `range()` builtin) 

33 of line numbers of the given AST nodes. 

34 """ 

35 if isinstance(node, getattr(ast, "match_case", ())): 

36 start, _end = line_range(atok, node.pattern) 

37 _start, end = line_range(atok, node.body[-1]) 

38 return start, end 

39 else: 

40 (start, _), (end, _) = atok.get_text_positions(node, padded=False) 

41 return start, end + 1 

42 

43 

44def highlight_unique(lst: List[T]) -> Iterator[Tuple[T, bool]]: 

45 counts = Counter(lst) 

46 

47 for is_common, group in itertools.groupby(lst, key=lambda x: counts[x] > 3): 

48 if is_common: 

49 group = list(group) 

50 highlighted = [False] * len(group) 

51 

52 def highlight_index(f): 

53 try: 

54 i = f() 

55 except ValueError: 

56 return None 

57 highlighted[i] = True 

58 return i 

59 

60 for item in set(group): 

61 first = highlight_index(lambda: group.index(item)) 

62 if first is not None: 

63 highlight_index(lambda: group.index(item, first + 1)) 

64 highlight_index(lambda: -1 - group[::-1].index(item)) 

65 else: 

66 highlighted = itertools.repeat(True) 

67 

68 yield from zip(group, highlighted) 

69 

70 

71def identity(x: T) -> T: 

72 return x 

73 

74 

75def collapse_repeated(lst, *, collapser, mapper=identity, key=identity): 

76 keyed = list(map(key, lst)) 

77 for is_highlighted, group in itertools.groupby( 

78 zip(lst, highlight_unique(keyed)), 

79 key=lambda t: t[1][1], 

80 ): 

81 original_group, highlighted_group = zip(*group) 

82 if is_highlighted: 

83 yield from map(mapper, original_group) 

84 else: 

85 keyed_group, _ = zip(*highlighted_group) 

86 yield collapser(list(original_group), list(keyed_group)) 

87 

88 

89def is_frame(frame_or_tb: Union[FrameType, TracebackType]) -> bool: 

90 assert_(isinstance(frame_or_tb, (types.FrameType, types.TracebackType))) 

91 return isinstance(frame_or_tb, (types.FrameType,)) 

92 

93 

94def iter_stack(frame_or_tb: Union[FrameType, TracebackType]) -> Iterator[Union[FrameType, TracebackType]]: 

95 current: Union[FrameType, TracebackType, None] = frame_or_tb 

96 while current: 

97 yield current 

98 if is_frame(current): 

99 current = current.f_back 

100 else: 

101 current = current.tb_next 

102 

103 

104def frame_and_lineno(frame_or_tb: Union[FrameType, TracebackType]) -> Tuple[FrameType, int]: 

105 if is_frame(frame_or_tb): 

106 return frame_or_tb, frame_or_tb.f_lineno 

107 else: 

108 return frame_or_tb.tb_frame, frame_or_tb.tb_lineno 

109 

110 

111def group_by_key_func(iterable: Iterable[T], key_func: Callable[[T], R]) -> Mapping[R, List[T]]: 

112 # noinspection PyUnresolvedReferences 

113 """ 

114 Create a dictionary from an iterable such that the keys are the result of evaluating a key function on elements 

115 of the iterable and the values are lists of elements all of which correspond to the key. 

116 

117 >>> def si(d): return sorted(d.items()) 

118 >>> si(group_by_key_func("a bb ccc d ee fff".split(), len)) 

119 [(1, ['a', 'd']), (2, ['bb', 'ee']), (3, ['ccc', 'fff'])] 

120 >>> si(group_by_key_func([-1, 0, 1, 3, 6, 8, 9, 2], lambda x: x % 2)) 

121 [(0, [0, 6, 8, 2]), (1, [-1, 1, 3, 9])] 

122 """ 

123 result = defaultdict(list) 

124 for item in iterable: 

125 result[key_func(item)].append(item) 

126 return result 

127 

128 

129class cached_property(object): 

130 """ 

131 A property that is only computed once per instance and then replaces itself 

132 with an ordinary attribute. Deleting the attribute resets the property. 

133 

134 Based on https://github.com/pydanny/cached-property/blob/master/cached_property.py 

135 """ 

136 

137 def __init__(self, func): 

138 self.__doc__ = func.__doc__ 

139 self.func = func 

140 

141 def cached_property_wrapper(self, obj, _cls): 

142 if obj is None: 

143 return self 

144 

145 value = obj.__dict__[self.func.__name__] = self.func(obj) 

146 return value 

147 

148 __get__ = cached_property_wrapper 

149 

150 

151def _pygmented_with_ranges(formatter, code, ranges): 

152 import pygments 

153 from pygments.lexers import get_lexer_by_name 

154 

155 class MyLexer(type(get_lexer_by_name("python3"))): 

156 def get_tokens(self, text): 

157 length = 0 

158 for ttype, value in super().get_tokens(text): 

159 if any(start <= length < end for start, end in ranges): 

160 ttype = ttype.ExecutingNode 

161 length += len(value) 

162 yield ttype, value 

163 

164 lexer = MyLexer(stripnl=False) 

165 try: 

166 highlighted = pygments.highlight(code, lexer, formatter) 

167 except Exception: 

168 # When pygments fails, prefer code without highlighting over crashing 

169 highlighted = code 

170 return highlighted.splitlines() 

171 

172 

173def assert_(condition, error=""): 

174 if not condition: 

175 if isinstance(error, str): 

176 error = AssertionError(error) 

177 raise error 

178 

179 

180# Copied from the standard traceback module pre-3.11 

181def some_str(value): 

182 try: 

183 return str(value) 

184 except: 

185 return '<unprintable %s object>' % type(value).__name__