Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/face/sinter.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3from __future__ import print_function
5import sys
6import types
7import inspect
8import hashlib
9import linecache
11from boltons import iterutils
12from boltons.strutils import camel2under
13from boltons.funcutils import FunctionBuilder
15PY3 = (sys.version_info[0] == 3)
16_VERBOSE = False
17_INDENT = ' '
20def get_fb(f, drop_self=True):
21 # TODO: support partials
22 if not (inspect.isfunction(f) or inspect.ismethod(f) or \
23 inspect.isbuiltin(f)) and hasattr(f, '__call__'):
24 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder):
25 return f._sinter_fb
26 f = f.__call__ # callable objects
28 if isinstance(getattr(f, '_sinter_fb', None), FunctionBuilder):
29 return f._sinter_fb # we'll take your word for it; good luck, lil buddy.
31 ret = FunctionBuilder.from_func(f)
33 if not all([isinstance(a, str) for a in ret.args]): # pragma: no cover (2 only)
34 raise TypeError('does not support anonymous tuple arguments'
35 ' or any other strange args for that matter.')
36 if drop_self and isinstance(f, types.MethodType):
37 ret.args = ret.args[1:] # discard "self" on methods
38 return ret
41def get_arg_names(f, only_required=False):
42 fb = get_fb(f)
44 return fb.get_arg_names(only_required=only_required)
47def inject(f, injectables):
48 __traceback_hide__ = True # TODO
50 fb = get_fb(f)
52 all_kwargs = fb.get_defaults_dict()
53 all_kwargs.update(injectables)
55 if fb.varkw:
56 return f(**all_kwargs)
58 kwargs = dict([(k, v) for k, v in all_kwargs.items() if k in fb.get_arg_names()])
59 return f(**kwargs)
62def get_callable_labels(obj):
63 ctx_parts = []
64 if isinstance(obj, types.MethodType):
65 # bit of 2/3 messiness below
66 im_self = getattr(obj, 'im_self', getattr(obj, '__self__', None))
67 if im_self:
68 ctx_parts.append(im_self.__class__.__name__)
69 obj = getattr(obj, 'im_func', getattr(obj, '__func__', None))
71 fb = get_fb(obj)
72 if fb.module:
73 ctx_parts.insert(0, fb.module)
76 return '.'.join(ctx_parts), fb.name, fb.get_invocation_str()
80# TODO: turn the following into an object (keeps inner_name easier to
81# track, as well as better handling of state the func_aliaser will
82# need
84def chain_argspec(func_list, provides, inner_name):
85 provided_sofar = set([inner_name]) # the inner function name is an extremely special case
86 optional_sofar = set()
87 required_sofar = set()
88 for f, p in zip(func_list, provides):
89 # middlewares can default the same parameter to different values;
90 # can't properly keep track of default values
91 fb = get_fb(f)
92 arg_names = fb.get_arg_names()
93 defaults_dict = fb.get_defaults_dict()
95 defaulted, undefaulted = iterutils.partition(arg_names, key=defaults_dict.__contains__)
97 optional_sofar.update(defaulted)
98 # keep track of defaults so that e.g. endpoint default param
99 # can pick up request injected/provided param
100 required_sofar |= set(undefaulted) - provided_sofar
101 provided_sofar.update(p)
103 return required_sofar, optional_sofar
106#funcs[0] = function to call
107#params[0] = parameters to take
108def build_chain_str(funcs, params, inner_name, params_sofar=None, level=0,
109 func_aliaser=None, func_names=None):
110 if not funcs:
111 return '' # stopping case
112 if params_sofar is None:
113 params_sofar = set([inner_name])
115 params_sofar.update(params[0])
116 inner_args = get_fb(funcs[0]).args
117 inner_arg_dict = dict([(a, a) for a in inner_args])
118 inner_arg_items = sorted(inner_arg_dict.items())
119 inner_args = ', '.join(['%s=%s' % kv for kv in inner_arg_items
120 if kv[0] in params_sofar])
121 outer_indent = _INDENT * level
122 inner_indent = outer_indent + _INDENT
123 outer_arg_str = ', '.join(params[0])
124 def_str = '%sdef %s(%s):\n' % (outer_indent, inner_name, outer_arg_str)
125 body_str = build_chain_str(funcs[1:], params[1:], inner_name, params_sofar, level + 1)
126 #func_name = get_func_name(funcs[0])
127 #func_alias = get_inner_func_alias(funcs[0])
128 htb_str = '%s__traceback_hide__ = True\n' % (inner_indent,)
129 return_str = '%sreturn funcs[%s](%s)\n' % (inner_indent, level, inner_args)
130 return ''.join([def_str, body_str, htb_str + return_str])
133def compile_chain(funcs, params, inner_name, verbose=_VERBOSE):
134 call_str = build_chain_str(funcs, params, inner_name)
135 return compile_code(call_str, inner_name, {'funcs': funcs}, verbose=verbose)
138def compile_code(code_str, name, env=None, verbose=_VERBOSE):
139 code_hash = hashlib.sha1(code_str.encode('utf8')).hexdigest()[:16]
140 unique_filename = "<sinter generated %s %s>" % (name, code_hash)
141 code = compile(code_str, unique_filename, 'single')
142 if verbose:
143 print(code_str) # pragma: no cover
144 if PY3:
145 exec(code, env)
146 else:
147 exec("exec code in env")
149 linecache.cache[unique_filename] = (
150 len(code_str),
151 None,
152 code_str.splitlines(True),
153 unique_filename,
154 )
155 return env[name]
158def make_chain(funcs, provides, final_func, preprovided, inner_name):
159 funcs = list(funcs)
160 provides = list(provides)
161 preprovided = set(preprovided)
162 reqs, opts = chain_argspec(funcs + [final_func],
163 provides + [()], inner_name)
165 unresolved = tuple(reqs - preprovided)
166 args = reqs | (preprovided & opts)
167 chain = compile_chain(funcs + [final_func],
168 [args] + provides, inner_name)
169 return chain, set(args), set(unresolved)