Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/sinter.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# -*- coding: utf-8 -*- 

2 

3from __future__ import print_function 

4 

5import sys 

6import types 

7import inspect 

8import hashlib 

9import linecache 

10 

11from boltons import iterutils 

12from boltons.strutils import camel2under 

13from boltons.funcutils import FunctionBuilder 

14 

15PY3 = (sys.version_info[0] == 3) 

16_VERBOSE = False 

17_INDENT = ' ' 

18 

19 

20def get_fb(f, drop_self=True): 

21 # TODO: support partials 

22 if not (inspect.isfunction(f) or inspect.ismethod(f) or \ 

23 inspect.isbuiltin(f)) and hasattr(f, '__call__'): 

24 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder): 

25 return f._sinter_fb 

26 f = f.__call__ # callable objects 

27 

28 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder): 

29 return f._sinter_fb # we'll take your word for it; good luck, lil buddy. 

30 

31 ret = FunctionBuilder.from_func(f) 

32 

33 if not all([isinstance(a, str) for a in ret.args]): # pragma: no cover (2 only) 

34 raise TypeError('does not support anonymous tuple arguments' 

35 ' or any other strange args for that matter.') 

36 if drop_self and isinstance(f, types.MethodType): 

37 ret.args = ret.args[1:] # discard "self" on methods 

38 return ret 

39 

40 

41def get_arg_names(f, only_required=False): 

42 fb = get_fb(f) 

43 

44 return fb.get_arg_names(only_required=only_required) 

45 

46 

47def inject(f, injectables): 

48 __traceback_hide__ = True # TODO 

49 

50 fb = get_fb(f) 

51 

52 all_kwargs = fb.get_defaults_dict() 

53 all_kwargs.update(injectables) 

54 

55 if fb.varkw: 

56 return f(**all_kwargs) 

57 

58 kwargs = dict([(k, v) for k, v in all_kwargs.items() if k in fb.get_arg_names()]) 

59 return f(**kwargs) 

60 

61 

62def get_callable_labels(obj): 

63 ctx_parts = [] 

64 if isinstance(obj, types.MethodType): 

65 # bit of 2/3 messiness below 

66 im_self = getattr(obj, 'im_self', getattr(obj, '__self__', None)) 

67 if im_self: 

68 ctx_parts.append(im_self.__class__.__name__) 

69 obj = getattr(obj, 'im_func', getattr(obj, '__func__', None)) 

70 

71 fb = get_fb(obj) 

72 if fb.module: 

73 ctx_parts.insert(0, fb.module) 

74 

75 

76 return '.'.join(ctx_parts), fb.name, fb.get_invocation_str() 

77 

78 

79 

80# TODO: turn the following into an object (keeps inner_name easier to 

81# track, as well as better handling of state the func_aliaser will 

82# need 

83 

84def chain_argspec(func_list, provides, inner_name): 

85 provided_sofar = set([inner_name]) # the inner function name is an extremely special case 

86 optional_sofar = set() 

87 required_sofar = set() 

88 for f, p in zip(func_list, provides): 

89 # middlewares can default the same parameter to different values; 

90 # can't properly keep track of default values 

91 fb = get_fb(f) 

92 arg_names = fb.get_arg_names() 

93 defaults_dict = fb.get_defaults_dict() 

94 

95 defaulted, undefaulted = iterutils.partition(arg_names, key=defaults_dict.__contains__) 

96 

97 optional_sofar.update(defaulted) 

98 # keep track of defaults so that e.g. endpoint default param 

99 # can pick up request injected/provided param 

100 required_sofar |= set(undefaulted) - provided_sofar 

101 provided_sofar.update(p) 

102 

103 return required_sofar, optional_sofar 

104 

105 

106#funcs[0] = function to call 

107#params[0] = parameters to take 

108def build_chain_str(funcs, params, inner_name, params_sofar=None, level=0, 

109 func_aliaser=None, func_names=None): 

110 if not funcs: 

111 return '' # stopping case 

112 if params_sofar is None: 

113 params_sofar = set([inner_name]) 

114 

115 params_sofar.update(params[0]) 

116 inner_args = get_fb(funcs[0]).args 

117 inner_arg_dict = dict([(a, a) for a in inner_args]) 

118 inner_arg_items = sorted(inner_arg_dict.items()) 

119 inner_args = ', '.join(['%s=%s' % kv for kv in inner_arg_items 

120 if kv[0] in params_sofar]) 

121 outer_indent = _INDENT * level 

122 inner_indent = outer_indent + _INDENT 

123 outer_arg_str = ', '.join(params[0]) 

124 def_str = '%sdef %s(%s):\n' % (outer_indent, inner_name, outer_arg_str) 

125 body_str = build_chain_str(funcs[1:], params[1:], inner_name, params_sofar, level + 1) 

126 #func_name = get_func_name(funcs[0]) 

127 #func_alias = get_inner_func_alias(funcs[0]) 

128 htb_str = '%s__traceback_hide__ = True\n' % (inner_indent,) 

129 return_str = '%sreturn funcs[%s](%s)\n' % (inner_indent, level, inner_args) 

130 return ''.join([def_str, body_str, htb_str + return_str]) 

131 

132 

133def compile_chain(funcs, params, inner_name, verbose=_VERBOSE): 

134 call_str = build_chain_str(funcs, params, inner_name) 

135 return compile_code(call_str, inner_name, {'funcs': funcs}, verbose=verbose) 

136 

137 

138def compile_code(code_str, name, env=None, verbose=_VERBOSE): 

139 code_hash = hashlib.sha1(code_str.encode('utf8')).hexdigest()[:16] 

140 unique_filename = "<sinter generated %s %s>" % (name, code_hash) 

141 code = compile(code_str, unique_filename, 'single') 

142 if verbose: 

143 print(code_str) # pragma: no cover 

144 if PY3: 

145 exec(code, env) 

146 else: 

147 exec("exec code in env") 

148 

149 linecache.cache[unique_filename] = ( 

150 len(code_str), 

151 None, 

152 code_str.splitlines(True), 

153 unique_filename, 

154 ) 

155 return env[name] 

156 

157 

158def make_chain(funcs, provides, final_func, preprovided, inner_name): 

159 funcs = list(funcs) 

160 provides = list(provides) 

161 preprovided = set(preprovided) 

162 reqs, opts = chain_argspec(funcs + [final_func], 

163 provides + [()], inner_name) 

164 

165 unresolved = tuple(reqs - preprovided) 

166 args = reqs | (preprovided & opts) 

167 chain = compile_chain(funcs + [final_func], 

168 [args] + provides, inner_name) 

169 return chain, set(args), set(unresolved)