Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/prometheus_client/decorator.py: 38%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

247 statements  

1# ######################### LICENSE ############################ # 

2 

3# Copyright (c) 2005-2016, Michele Simionato 

4# All rights reserved. 

5 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are 

8# met: 

9 

10# Redistributions of source code must retain the above copyright 

11# notice, this list of conditions and the following disclaimer. 

12# Redistributions in bytecode form must reproduce the above copyright 

13# notice, this list of conditions and the following disclaimer in 

14# the documentation and/or other materials provided with the 

15# distribution. 

16 

17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 

18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 

19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 

20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 

21# HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 

22# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 

23# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS 

24# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 

25# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR 

26# TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE 

27# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 

28# DAMAGE. 

29 

30""" 

31Decorator module, see http://pypi.python.org/pypi/decorator 

32for the documentation. 

33""" 

34from __future__ import print_function 

35 

36import collections 

37import inspect 

38import itertools 

39import operator 

40import re 

41import sys 

42 

43__version__ = '4.0.10' 

44 

45if sys.version_info >= (3,): 

46 from inspect import getfullargspec 

47 

48 

49 def get_init(cls): 

50 return cls.__init__ 

51else: 

52 class getfullargspec(object): 

53 "A quick and dirty replacement for getfullargspec for Python 2.X" 

54 

55 def __init__(self, f): 

56 self.args, self.varargs, self.varkw, self.defaults = \ 

57 inspect.getargspec(f) 

58 self.kwonlyargs = [] 

59 self.kwonlydefaults = None 

60 

61 def __iter__(self): 

62 yield self.args 

63 yield self.varargs 

64 yield self.varkw 

65 yield self.defaults 

66 

67 getargspec = inspect.getargspec 

68 

69 

70 def get_init(cls): 

71 return cls.__init__.__func__ 

72 

73# getargspec has been deprecated in Python 3.5 

74ArgSpec = collections.namedtuple( 

75 'ArgSpec', 'args varargs varkw defaults') 

76 

77 

78def getargspec(f): 

79 """A replacement for inspect.getargspec""" 

80 spec = getfullargspec(f) 

81 return ArgSpec(spec.args, spec.varargs, spec.varkw, spec.defaults) 

82 

83 

84DEF = re.compile(r'\s*def\s*([_\w][_\w\d]*)\s*\(') 

85 

86 

87# basic functionality 

88class FunctionMaker(object): 

89 """ 

90 An object with the ability to create functions with a given signature. 

91 It has attributes name, doc, module, signature, defaults, dict and 

92 methods update and make. 

93 """ 

94 

95 # Atomic get-and-increment provided by the GIL 

96 _compile_count = itertools.count() 

97 

98 def __init__(self, func=None, name=None, signature=None, 

99 defaults=None, doc=None, module=None, funcdict=None): 

100 self.shortsignature = signature 

101 if func: 

102 # func can be a class or a callable, but not an instance method 

103 self.name = func.__name__ 

104 if self.name == '<lambda>': # small hack for lambda functions 

105 self.name = '_lambda_' 

106 self.doc = func.__doc__ 

107 self.module = func.__module__ 

108 if inspect.isfunction(func): 

109 argspec = getfullargspec(func) 

110 self.annotations = getattr(func, '__annotations__', {}) 

111 for a in ('args', 'varargs', 'varkw', 'defaults', 'kwonlyargs', 

112 'kwonlydefaults'): 

113 setattr(self, a, getattr(argspec, a)) 

114 for i, arg in enumerate(self.args): 

115 setattr(self, 'arg%d' % i, arg) 

116 if sys.version_info < (3,): # easy way 

117 self.shortsignature = self.signature = ( 

118 inspect.formatargspec( 

119 formatvalue=lambda val: "", *argspec)[1:-1]) 

120 else: # Python 3 way 

121 allargs = list(self.args) 

122 allshortargs = list(self.args) 

123 if self.varargs: 

124 allargs.append('*' + self.varargs) 

125 allshortargs.append('*' + self.varargs) 

126 elif self.kwonlyargs: 

127 allargs.append('*') # single star syntax 

128 for a in self.kwonlyargs: 

129 allargs.append('%s=None' % a) 

130 allshortargs.append('%s=%s' % (a, a)) 

131 if self.varkw: 

132 allargs.append('**' + self.varkw) 

133 allshortargs.append('**' + self.varkw) 

134 self.signature = ', '.join(allargs) 

135 self.shortsignature = ', '.join(allshortargs) 

136 self.dict = func.__dict__.copy() 

137 # func=None happens when decorating a caller 

138 if name: 

139 self.name = name 

140 if signature is not None: 

141 self.signature = signature 

142 if defaults: 

143 self.defaults = defaults 

144 if doc: 

145 self.doc = doc 

146 if module: 

147 self.module = module 

148 if funcdict: 

149 self.dict = funcdict 

150 # check existence required attributes 

151 assert hasattr(self, 'name') 

152 if not hasattr(self, 'signature'): 

153 raise TypeError('You are decorating a non function: %s' % func) 

154 

155 def update(self, func, **kw): 

156 "Update the signature of func with the data in self" 

157 func.__name__ = self.name 

158 func.__doc__ = getattr(self, 'doc', None) 

159 func.__dict__ = getattr(self, 'dict', {}) 

160 func.__defaults__ = getattr(self, 'defaults', ()) 

161 func.__kwdefaults__ = getattr(self, 'kwonlydefaults', None) 

162 func.__annotations__ = getattr(self, 'annotations', None) 

163 try: 

164 frame = sys._getframe(3) 

165 except AttributeError: # for IronPython and similar implementations 

166 callermodule = '?' 

167 else: 

168 callermodule = frame.f_globals.get('__name__', '?') 

169 func.__module__ = getattr(self, 'module', callermodule) 

170 func.__dict__.update(kw) 

171 

172 def make(self, src_templ, evaldict=None, addsource=False, **attrs): 

173 "Make a new function from a given template and update the signature" 

174 src = src_templ % vars(self) # expand name and signature 

175 evaldict = evaldict or {} 

176 mo = DEF.match(src) 

177 if mo is None: 

178 raise SyntaxError('not a valid function template\n%s' % src) 

179 name = mo.group(1) # extract the function name 

180 names = set([name] + [arg.strip(' *') for arg in 

181 self.shortsignature.split(',')]) 

182 for n in names: 

183 if n in ('_func_', '_call_'): 

184 raise NameError('%s is overridden in\n%s' % (n, src)) 

185 

186 if not src.endswith('\n'): # add a newline for old Pythons 

187 src += '\n' 

188 

189 # Ensure each generated function has a unique filename for profilers 

190 # (such as cProfile) that depend on the tuple of (<filename>, 

191 # <definition line>, <function name>) being unique. 

192 filename = '<decorator-gen-%d>' % (next(self._compile_count),) 

193 try: 

194 code = compile(src, filename, 'single') 

195 exec(code, evaldict) 

196 except: 

197 print('Error in generated code:', file=sys.stderr) 

198 print(src, file=sys.stderr) 

199 raise 

200 func = evaldict[name] 

201 if addsource: 

202 attrs['__source__'] = src 

203 self.update(func, **attrs) 

204 return func 

205 

206 @classmethod 

207 def create(cls, obj, body, evaldict, defaults=None, 

208 doc=None, module=None, addsource=True, **attrs): 

209 """ 

210 Create a function from the strings name, signature and body. 

211 evaldict is the evaluation dictionary. If addsource is true an 

212 attribute __source__ is added to the result. The attributes attrs 

213 are added, if any. 

214 """ 

215 if isinstance(obj, str): # "name(signature)" 

216 name, rest = obj.strip().split('(', 1) 

217 signature = rest[:-1] # strip a right parens 

218 func = None 

219 else: # a function 

220 name = None 

221 signature = None 

222 func = obj 

223 self = cls(func, name, signature, defaults, doc, module) 

224 ibody = '\n'.join(' ' + line for line in body.splitlines()) 

225 return self.make('def %(name)s(%(signature)s):\n' + ibody, 

226 evaldict, addsource, **attrs) 

227 

228 

229def decorate(func, caller): 

230 """ 

231 decorate(func, caller) decorates a function using a caller. 

232 """ 

233 evaldict = dict(_call_=caller, _func_=func) 

234 fun = FunctionMaker.create( 

235 func, "return _call_(_func_, %(shortsignature)s)", 

236 evaldict, __wrapped__=func) 

237 if hasattr(func, '__qualname__'): 

238 fun.__qualname__ = func.__qualname__ 

239 return fun 

240 

241 

242def decorator(caller, _func=None): 

243 """decorator(caller) converts a caller function into a decorator""" 

244 if _func is not None: # return a decorated function 

245 # this is obsolete behavior; you should use decorate instead 

246 return decorate(_func, caller) 

247 # else return a decorator function 

248 if inspect.isclass(caller): 

249 name = caller.__name__.lower() 

250 doc = 'decorator(%s) converts functions/generators into ' \ 

251 'factories of %s objects' % (caller.__name__, caller.__name__) 

252 elif inspect.isfunction(caller): 

253 if caller.__name__ == '<lambda>': 

254 name = '_lambda_' 

255 else: 

256 name = caller.__name__ 

257 doc = caller.__doc__ 

258 else: # assume caller is an object with a __call__ method 

259 name = caller.__class__.__name__.lower() 

260 doc = caller.__call__.__doc__ 

261 evaldict = dict(_call_=caller, _decorate_=decorate) 

262 return FunctionMaker.create( 

263 '%s(func)' % name, 'return _decorate_(func, _call_)', 

264 evaldict, doc=doc, module=caller.__module__, 

265 __wrapped__=caller) 

266 

267 

268# ####################### contextmanager ####################### # 

269 

270try: # Python >= 3.2 

271 from contextlib import _GeneratorContextManager 

272except ImportError: # Python >= 2.5 

273 from contextlib import GeneratorContextManager as _GeneratorContextManager 

274 

275 

276class ContextManager(_GeneratorContextManager): 

277 def __call__(self, func): 

278 """Context manager decorator""" 

279 return FunctionMaker.create( 

280 func, "with _self_: return _func_(%(shortsignature)s)", 

281 dict(_self_=self, _func_=func), __wrapped__=func) 

282 

283 

284init = getfullargspec(_GeneratorContextManager.__init__) 

285n_args = len(init.args) 

286if n_args == 2 and not init.varargs: # (self, genobj) Python 2.7 

287 def __init__(self, g, *a, **k): 

288 return _GeneratorContextManager.__init__(self, g(*a, **k)) 

289 

290 

291 ContextManager.__init__ = __init__ 

292elif n_args == 2 and init.varargs: # (self, gen, *a, **k) Python 3.4 

293 pass 

294elif n_args == 4: # (self, gen, args, kwds) Python 3.5 

295 def __init__(self, g, *a, **k): 

296 return _GeneratorContextManager.__init__(self, g, a, k) 

297 

298 

299 ContextManager.__init__ = __init__ 

300 

301contextmanager = decorator(ContextManager) 

302 

303 

304# ############################ dispatch_on ############################ # 

305 

306def append(a, vancestors): 

307 """ 

308 Append ``a`` to the list of the virtual ancestors, unless it is already 

309 included. 

310 """ 

311 add = True 

312 for j, va in enumerate(vancestors): 

313 if issubclass(va, a): 

314 add = False 

315 break 

316 if issubclass(a, va): 

317 vancestors[j] = a 

318 add = False 

319 if add: 

320 vancestors.append(a) 

321 

322 

323# inspired from simplegeneric by P.J. Eby and functools.singledispatch 

324def dispatch_on(*dispatch_args): 

325 """ 

326 Factory of decorators turning a function into a generic function 

327 dispatching on the given arguments. 

328 """ 

329 assert dispatch_args, 'No dispatch args passed' 

330 dispatch_str = '(%s,)' % ', '.join(dispatch_args) 

331 

332 def check(arguments, wrong=operator.ne, msg=''): 

333 """Make sure one passes the expected number of arguments""" 

334 if wrong(len(arguments), len(dispatch_args)): 

335 raise TypeError('Expected %d arguments, got %d%s' % 

336 (len(dispatch_args), len(arguments), msg)) 

337 

338 def gen_func_dec(func): 

339 """Decorator turning a function into a generic function""" 

340 

341 # first check the dispatch arguments 

342 argset = set(getfullargspec(func).args) 

343 if not set(dispatch_args) <= argset: 

344 raise NameError('Unknown dispatch arguments %s' % dispatch_str) 

345 

346 typemap = {} 

347 

348 def vancestors(*types): 

349 """ 

350 Get a list of sets of virtual ancestors for the given types 

351 """ 

352 check(types) 

353 ras = [[] for _ in range(len(dispatch_args))] 

354 for types_ in typemap: 

355 for t, type_, ra in zip(types, types_, ras): 

356 if issubclass(t, type_) and type_ not in t.__mro__: 

357 append(type_, ra) 

358 return [set(ra) for ra in ras] 

359 

360 def ancestors(*types): 

361 """ 

362 Get a list of virtual MROs, one for each type 

363 """ 

364 check(types) 

365 lists = [] 

366 for t, vas in zip(types, vancestors(*types)): 

367 n_vas = len(vas) 

368 if n_vas > 1: 

369 raise RuntimeError( 

370 'Ambiguous dispatch for %s: %s' % (t, vas)) 

371 elif n_vas == 1: 

372 va, = vas 

373 mro = type('t', (t, va), {}).__mro__[1:] 

374 else: 

375 mro = t.__mro__ 

376 lists.append(mro[:-1]) # discard t and object 

377 return lists 

378 

379 def register(*types): 

380 """ 

381 Decorator to register an implementation for the given types 

382 """ 

383 check(types) 

384 

385 def dec(f): 

386 check(getfullargspec(f).args, operator.lt, ' in ' + f.__name__) 

387 typemap[types] = f 

388 return f 

389 

390 return dec 

391 

392 def dispatch_info(*types): 

393 """ 

394 An utility to introspect the dispatch algorithm 

395 """ 

396 check(types) 

397 lst = [] 

398 for anc in itertools.product(*ancestors(*types)): 

399 lst.append(tuple(a.__name__ for a in anc)) 

400 return lst 

401 

402 def _dispatch(dispatch_args, *args, **kw): 

403 types = tuple(type(arg) for arg in dispatch_args) 

404 try: # fast path 

405 f = typemap[types] 

406 except KeyError: 

407 pass 

408 else: 

409 return f(*args, **kw) 

410 combinations = itertools.product(*ancestors(*types)) 

411 next(combinations) # the first one has been already tried 

412 for types_ in combinations: 

413 f = typemap.get(types_) 

414 if f is not None: 

415 return f(*args, **kw) 

416 

417 # else call the default implementation 

418 return func(*args, **kw) 

419 

420 return FunctionMaker.create( 

421 func, 'return _f_(%s, %%(shortsignature)s)' % dispatch_str, 

422 dict(_f_=_dispatch), register=register, default=func, 

423 typemap=typemap, vancestors=vancestors, ancestors=ancestors, 

424 dispatch_info=dispatch_info, __wrapped__=func) 

425 

426 gen_func_dec.__name__ = 'dispatch_on' + dispatch_str 

427 return gen_func_dec