Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/sinter.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

96 statements  

1import sys 

2import types 

3import inspect 

4import hashlib 

5import linecache 

6 

7from boltons import iterutils 

8from boltons.strutils import camel2under 

9from boltons.funcutils import FunctionBuilder 

10 

11 

12_VERBOSE = False 

13_INDENT = ' ' 

14 

15 

16def get_fb(f, drop_self=True): 

17 # TODO: support partials 

18 if not (inspect.isfunction(f) or inspect.ismethod(f) or \ 

19 inspect.isbuiltin(f)) and hasattr(f, '__call__'): 

20 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder): 

21 return f._sinter_fb 

22 f = f.__call__ # callable objects 

23 

24 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder): 

25 return f._sinter_fb # we'll take your word for it; good luck, lil buddy. 

26 

27 ret = FunctionBuilder.from_func(f) 

28 

29 if not all([isinstance(a, str) for a in ret.args]): # pragma: no cover (2 only) 

30 raise TypeError('does not support anonymous tuple arguments' 

31 ' or any other strange args for that matter.') 

32 if drop_self and isinstance(f, types.MethodType): 

33 ret.args = ret.args[1:] # discard "self" on methods 

34 return ret 

35 

36 

37def get_arg_names(f, only_required=False): 

38 fb = get_fb(f) 

39 

40 return fb.get_arg_names(only_required=only_required) 

41 

42 

43def inject(f, injectables): 

44 __traceback_hide__ = True # TODO 

45 

46 fb = get_fb(f) 

47 

48 all_kwargs = fb.get_defaults_dict() 

49 all_kwargs.update(injectables) 

50 

51 if fb.varkw: 

52 return f(**all_kwargs) 

53 

54 kwargs = {k: v for k, v in all_kwargs.items() if k in fb.get_arg_names()} 

55 return f(**kwargs) 

56 

57 

58def get_callable_labels(obj): 

59 ctx_parts = [] 

60 if isinstance(obj, types.MethodType): 

61 # bit of 2/3 messiness below 

62 im_self = getattr(obj, 'im_self', getattr(obj, '__self__', None)) 

63 if im_self: 

64 ctx_parts.append(im_self.__class__.__name__) 

65 obj = getattr(obj, 'im_func', getattr(obj, '__func__', None)) 

66 

67 fb = get_fb(obj) 

68 if fb.module: 

69 ctx_parts.insert(0, fb.module) 

70 

71 

72 return '.'.join(ctx_parts), fb.name, fb.get_invocation_str() 

73 

74 

75 

76# TODO: turn the following into an object (keeps inner_name easier to 

77# track, as well as better handling of state the func_aliaser will 

78# need 

79 

80def chain_argspec(func_list, provides, inner_name): 

81 provided_sofar = {inner_name} # the inner function name is an extremely special case 

82 optional_sofar = set() 

83 required_sofar = set() 

84 for f, p in zip(func_list, provides): 

85 # middlewares can default the same parameter to different values; 

86 # can't properly keep track of default values 

87 fb = get_fb(f) 

88 arg_names = fb.get_arg_names() 

89 defaults_dict = fb.get_defaults_dict() 

90 

91 defaulted, undefaulted = iterutils.partition(arg_names, key=defaults_dict.__contains__) 

92 

93 optional_sofar.update(defaulted) 

94 # keep track of defaults so that e.g. endpoint default param 

95 # can pick up request injected/provided param 

96 required_sofar |= set(undefaulted) - provided_sofar 

97 provided_sofar.update(p) 

98 

99 return required_sofar, optional_sofar 

100 

101 

102#funcs[0] = function to call 

103#params[0] = parameters to take 

104def build_chain_str(funcs, params, inner_name, params_sofar=None, level=0, 

105 func_aliaser=None, func_names=None): 

106 if not funcs: 

107 return '' # stopping case 

108 if params_sofar is None: 

109 params_sofar = {inner_name} 

110 

111 params_sofar.update(params[0]) 

112 inner_args = get_fb(funcs[0]).args 

113 inner_arg_dict = {a: a for a in inner_args} 

114 inner_arg_items = sorted(inner_arg_dict.items()) 

115 inner_args = ', '.join(['%s=%s' % kv for kv in inner_arg_items 

116 if kv[0] in params_sofar]) 

117 outer_indent = _INDENT * level 

118 inner_indent = outer_indent + _INDENT 

119 outer_arg_str = ', '.join(params[0]) 

120 def_str = f'{outer_indent}def {inner_name}({outer_arg_str}):\n' 

121 body_str = build_chain_str(funcs[1:], params[1:], inner_name, params_sofar, level + 1) 

122 #func_name = get_func_name(funcs[0]) 

123 #func_alias = get_inner_func_alias(funcs[0]) 

124 htb_str = f'{inner_indent}__traceback_hide__ = True\n' 

125 return_str = f'{inner_indent}return funcs[{level}]({inner_args})\n' 

126 return ''.join([def_str, body_str, htb_str + return_str]) 

127 

128 

129def compile_chain(funcs, params, inner_name, verbose=_VERBOSE): 

130 call_str = build_chain_str(funcs, params, inner_name) 

131 return compile_code(call_str, inner_name, {'funcs': funcs}, verbose=verbose) 

132 

133 

134def compile_code(code_str, name, env=None, verbose=_VERBOSE): 

135 env = {} if env is None else env 

136 code_hash = hashlib.sha1(code_str.encode('utf8')).hexdigest()[:16] 

137 unique_filename = f"<sinter generated {name} {code_hash}>" 

138 code = compile(code_str, unique_filename, 'single') 

139 if verbose: 

140 print(code_str) # pragma: no cover 

141 

142 exec(code, env) 

143 

144 linecache.cache[unique_filename] = ( 

145 len(code_str), 

146 None, 

147 code_str.splitlines(True), 

148 unique_filename, 

149 ) 

150 return env[name] 

151 

152 

153def make_chain(funcs, provides, final_func, preprovided, inner_name): 

154 funcs = list(funcs) 

155 provides = list(provides) 

156 preprovided = set(preprovided) 

157 reqs, opts = chain_argspec(funcs + [final_func], 

158 provides + [()], inner_name) 

159 

160 unresolved = tuple(reqs - preprovided) 

161 args = reqs | (preprovided & opts) 

162 chain = compile_chain(funcs + [final_func], 

163 [args] + provides, inner_name) 

164 return chain, set(args), set(unresolved)