Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/sinter.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import sys
2import types
3import inspect
4import hashlib
5import linecache
7from boltons import iterutils
8from boltons.strutils import camel2under
9from boltons.funcutils import FunctionBuilder
12_VERBOSE = False
13_INDENT = ' '
16def get_fb(f, drop_self=True):
17 # TODO: support partials
18 if not (inspect.isfunction(f) or inspect.ismethod(f) or \
19 inspect.isbuiltin(f)) and hasattr(f, '__call__'):
20 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder):
21 return f._sinter_fb
22 f = f.__call__ # callable objects
24 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder):
25 return f._sinter_fb # we'll take your word for it; good luck, lil buddy.
27 ret = FunctionBuilder.from_func(f)
29 if not all([isinstance(a, str) for a in ret.args]): # pragma: no cover (2 only)
30 raise TypeError('does not support anonymous tuple arguments'
31 ' or any other strange args for that matter.')
32 if drop_self and isinstance(f, types.MethodType):
33 ret.args = ret.args[1:] # discard "self" on methods
34 return ret
37def get_arg_names(f, only_required=False):
38 fb = get_fb(f)
40 return fb.get_arg_names(only_required=only_required)
43def inject(f, injectables):
44 __traceback_hide__ = True # TODO
46 fb = get_fb(f)
48 all_kwargs = fb.get_defaults_dict()
49 all_kwargs.update(injectables)
51 if fb.varkw:
52 return f(**all_kwargs)
54 kwargs = {k: v for k, v in all_kwargs.items() if k in fb.get_arg_names()}
55 return f(**kwargs)
58def get_callable_labels(obj):
59 ctx_parts = []
60 if isinstance(obj, types.MethodType):
61 # bit of 2/3 messiness below
62 im_self = getattr(obj, 'im_self', getattr(obj, '__self__', None))
63 if im_self:
64 ctx_parts.append(im_self.__class__.__name__)
65 obj = getattr(obj, 'im_func', getattr(obj, '__func__', None))
67 fb = get_fb(obj)
68 if fb.module:
69 ctx_parts.insert(0, fb.module)
72 return '.'.join(ctx_parts), fb.name, fb.get_invocation_str()
76# TODO: turn the following into an object (keeps inner_name easier to
77# track, as well as better handling of state the func_aliaser will
78# need
80def chain_argspec(func_list, provides, inner_name):
81 provided_sofar = {inner_name} # the inner function name is an extremely special case
82 optional_sofar = set()
83 required_sofar = set()
84 for f, p in zip(func_list, provides):
85 # middlewares can default the same parameter to different values;
86 # can't properly keep track of default values
87 fb = get_fb(f)
88 arg_names = fb.get_arg_names()
89 defaults_dict = fb.get_defaults_dict()
91 defaulted, undefaulted = iterutils.partition(arg_names, key=defaults_dict.__contains__)
93 optional_sofar.update(defaulted)
94 # keep track of defaults so that e.g. endpoint default param
95 # can pick up request injected/provided param
96 required_sofar |= set(undefaulted) - provided_sofar
97 provided_sofar.update(p)
99 return required_sofar, optional_sofar
102#funcs[0] = function to call
103#params[0] = parameters to take
104def build_chain_str(funcs, params, inner_name, params_sofar=None, level=0,
105 func_aliaser=None, func_names=None):
106 if not funcs:
107 return '' # stopping case
108 if params_sofar is None:
109 params_sofar = {inner_name}
111 params_sofar.update(params[0])
112 inner_args = get_fb(funcs[0]).args
113 inner_arg_dict = {a: a for a in inner_args}
114 inner_arg_items = sorted(inner_arg_dict.items())
115 inner_args = ', '.join(['%s=%s' % kv for kv in inner_arg_items
116 if kv[0] in params_sofar])
117 outer_indent = _INDENT * level
118 inner_indent = outer_indent + _INDENT
119 outer_arg_str = ', '.join(params[0])
120 def_str = f'{outer_indent}def {inner_name}({outer_arg_str}):\n'
121 body_str = build_chain_str(funcs[1:], params[1:], inner_name, params_sofar, level + 1)
122 #func_name = get_func_name(funcs[0])
123 #func_alias = get_inner_func_alias(funcs[0])
124 htb_str = f'{inner_indent}__traceback_hide__ = True\n'
125 return_str = f'{inner_indent}return funcs[{level}]({inner_args})\n'
126 return ''.join([def_str, body_str, htb_str + return_str])
129def compile_chain(funcs, params, inner_name, verbose=_VERBOSE):
130 call_str = build_chain_str(funcs, params, inner_name)
131 return compile_code(call_str, inner_name, {'funcs': funcs}, verbose=verbose)
134def compile_code(code_str, name, env=None, verbose=_VERBOSE):
135 env = {} if env is None else env
136 code_hash = hashlib.sha1(code_str.encode('utf8')).hexdigest()[:16]
137 unique_filename = f"<sinter generated {name} {code_hash}>"
138 code = compile(code_str, unique_filename, 'single')
139 if verbose:
140 print(code_str) # pragma: no cover
142 exec(code, env)
144 linecache.cache[unique_filename] = (
145 len(code_str),
146 None,
147 code_str.splitlines(True),
148 unique_filename,
149 )
150 return env[name]
153def make_chain(funcs, provides, final_func, preprovided, inner_name):
154 funcs = list(funcs)
155 provides = list(provides)
156 preprovided = set(preprovided)
157 reqs, opts = chain_argspec(funcs + [final_func],
158 provides + [()], inner_name)
160 unresolved = tuple(reqs - preprovided)
161 args = reqs | (preprovided & opts)
162 chain = compile_chain(funcs + [final_func],
163 [args] + provides, inner_name)
164 return chain, set(args), set(unresolved)