Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/typeguard/__init__.py: 21%
647 statements
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
« prev ^ index » next coverage.py v7.4.0, created at 2024-01-03 07:57 +0000
1__all__ = ('ForwardRefPolicy', 'TypeHintWarning', 'typechecked', 'check_return_type',
2 'check_argument_types', 'check_type', 'TypeWarning', 'TypeChecker',
3 'typeguard_ignore')
5import collections.abc
6import gc
7import inspect
8import sys
9import threading
10from collections import OrderedDict
11from enum import Enum
12from functools import partial, wraps
13from inspect import Parameter, isclass, isfunction, isgeneratorfunction
14from io import BufferedIOBase, IOBase, RawIOBase, TextIOBase
15from traceback import extract_stack, print_stack
16from types import CodeType, FunctionType
17from typing import (
18 IO, TYPE_CHECKING, AbstractSet, Any, AsyncIterable, AsyncIterator, BinaryIO, Callable, Dict,
19 Generator, Iterable, Iterator, List, NewType, Optional, Sequence, Set, TextIO, Tuple, Type,
20 TypeVar, Union, get_type_hints, overload)
21from unittest.mock import Mock
22from warnings import warn
23from weakref import WeakKeyDictionary, WeakValueDictionary
25# Python 3.8+
26try:
27 from typing_extensions import Literal
28except ImportError:
29 try:
30 from typing import Literal
31 except ImportError:
32 Literal = None
34# Python 3.5.4+ / 3.6.2+
35try:
36 from typing_extensions import NoReturn
37except ImportError:
38 try:
39 from typing import NoReturn
40 except ImportError:
41 NoReturn = None
43# Python 3.6+
44try:
45 from inspect import isasyncgen, isasyncgenfunction
46 from typing import AsyncGenerator
47except ImportError:
48 AsyncGenerator = None
50 def isasyncgen(obj):
51 return False
53 def isasyncgenfunction(func):
54 return False
56# Python 3.8+
57try:
58 from typing import ForwardRef
59 evaluate_forwardref = ForwardRef._evaluate
60except ImportError:
61 from typing import _ForwardRef as ForwardRef
62 evaluate_forwardref = ForwardRef._eval_type
64if sys.version_info >= (3, 10):
65 from typing import is_typeddict
66else:
67 _typed_dict_meta_types = ()
68 if sys.version_info >= (3, 8):
69 from typing import _TypedDictMeta
70 _typed_dict_meta_types += (_TypedDictMeta,)
72 try:
73 from typing_extensions import _TypedDictMeta
74 _typed_dict_meta_types += (_TypedDictMeta,)
75 except ImportError:
76 pass
78 def is_typeddict(tp) -> bool:
79 return isinstance(tp, _typed_dict_meta_types)
82if TYPE_CHECKING:
83 _F = TypeVar("_F")
85 def typeguard_ignore(f: _F) -> _F:
86 """This decorator is a noop during static type-checking."""
87 return f
88else:
89 from typing import no_type_check as typeguard_ignore
92_type_hints_map = WeakKeyDictionary() # type: Dict[FunctionType, Dict[str, Any]]
93_functions_map = WeakValueDictionary() # type: Dict[CodeType, FunctionType]
94_missing = object()
96T_CallableOrType = TypeVar('T_CallableOrType', bound=Callable[..., Any])
98# Lifted from mypy.sharedparse
99BINARY_MAGIC_METHODS = {
100 "__add__",
101 "__and__",
102 "__cmp__",
103 "__divmod__",
104 "__div__",
105 "__eq__",
106 "__floordiv__",
107 "__ge__",
108 "__gt__",
109 "__iadd__",
110 "__iand__",
111 "__idiv__",
112 "__ifloordiv__",
113 "__ilshift__",
114 "__imatmul__",
115 "__imod__",
116 "__imul__",
117 "__ior__",
118 "__ipow__",
119 "__irshift__",
120 "__isub__",
121 "__itruediv__",
122 "__ixor__",
123 "__le__",
124 "__lshift__",
125 "__lt__",
126 "__matmul__",
127 "__mod__",
128 "__mul__",
129 "__ne__",
130 "__or__",
131 "__pow__",
132 "__radd__",
133 "__rand__",
134 "__rdiv__",
135 "__rfloordiv__",
136 "__rlshift__",
137 "__rmatmul__",
138 "__rmod__",
139 "__rmul__",
140 "__ror__",
141 "__rpow__",
142 "__rrshift__",
143 "__rshift__",
144 "__rsub__",
145 "__rtruediv__",
146 "__rxor__",
147 "__sub__",
148 "__truediv__",
149 "__xor__",
150}
153class ForwardRefPolicy(Enum):
154 """Defines how unresolved forward references are handled."""
156 ERROR = 1 #: propagate the :exc:`NameError` from :func:`~typing.get_type_hints`
157 WARN = 2 #: remove the annotation and emit a TypeHintWarning
158 #: replace the annotation with the argument's class if the qualified name matches, else remove
159 #: the annotation
160 GUESS = 3
163class TypeHintWarning(UserWarning):
164 """
165 A warning that is emitted when a type hint in string form could not be resolved to an actual
166 type.
167 """
170class _TypeCheckMemo:
171 __slots__ = 'globals', 'locals'
173 def __init__(self, globals: Dict[str, Any], locals: Dict[str, Any]):
174 self.globals = globals
175 self.locals = locals
178def _strip_annotation(annotation):
179 if isinstance(annotation, str):
180 return annotation.strip("'")
181 else:
182 return annotation
185class _CallMemo(_TypeCheckMemo):
186 __slots__ = 'func', 'func_name', 'arguments', 'is_generator', 'type_hints'
188 def __init__(self, func: Callable, frame_locals: Optional[Dict[str, Any]] = None,
189 args: tuple = None, kwargs: Dict[str, Any] = None,
190 forward_refs_policy=ForwardRefPolicy.ERROR):
191 super().__init__(func.__globals__, frame_locals)
192 self.func = func
193 self.func_name = function_name(func)
194 self.is_generator = isgeneratorfunction(func)
195 signature = inspect.signature(func)
197 if args is not None and kwargs is not None:
198 self.arguments = signature.bind(*args, **kwargs).arguments
199 else:
200 assert frame_locals is not None, 'frame must be specified if args or kwargs is None'
201 self.arguments = frame_locals
203 self.type_hints = _type_hints_map.get(func)
204 if self.type_hints is None:
205 while True:
206 if sys.version_info < (3, 5, 3):
207 frame_locals = dict(frame_locals)
209 try:
210 hints = get_type_hints(func, localns=frame_locals)
211 except NameError as exc:
212 if forward_refs_policy is ForwardRefPolicy.ERROR:
213 raise
215 typename = str(exc).split("'", 2)[1]
216 for param in signature.parameters.values():
217 if _strip_annotation(param.annotation) == typename:
218 break
219 else:
220 raise
222 func_name = function_name(func)
223 if forward_refs_policy is ForwardRefPolicy.GUESS:
224 if param.name in self.arguments:
225 argtype = self.arguments[param.name].__class__
226 stripped = _strip_annotation(param.annotation)
227 if stripped == argtype.__qualname__:
228 func.__annotations__[param.name] = argtype
229 msg = ('Replaced forward declaration {!r} in {} with {!r}'
230 .format(stripped, func_name, argtype))
231 warn(TypeHintWarning(msg))
232 continue
234 msg = 'Could not resolve type hint {!r} on {}: {}'.format(
235 param.annotation, function_name(func), exc)
236 warn(TypeHintWarning(msg))
237 del func.__annotations__[param.name]
238 else:
239 break
241 self.type_hints = OrderedDict()
242 for name, parameter in signature.parameters.items():
243 if name in hints:
244 annotated_type = hints[name]
246 # PEP 428 discourages it by MyPy does not complain
247 if parameter.default is None:
248 annotated_type = Optional[annotated_type]
250 if parameter.kind == Parameter.VAR_POSITIONAL:
251 self.type_hints[name] = Tuple[annotated_type, ...]
252 elif parameter.kind == Parameter.VAR_KEYWORD:
253 self.type_hints[name] = Dict[str, annotated_type]
254 else:
255 self.type_hints[name] = annotated_type
257 if 'return' in hints:
258 self.type_hints['return'] = hints['return']
260 _type_hints_map[func] = self.type_hints
263def resolve_forwardref(maybe_ref, memo: _TypeCheckMemo):
264 if isinstance(maybe_ref, ForwardRef):
265 if sys.version_info < (3, 9, 0):
266 return evaluate_forwardref(maybe_ref, memo.globals, memo.locals)
267 else:
268 return evaluate_forwardref(maybe_ref, memo.globals, memo.locals, frozenset())
270 else:
271 return maybe_ref
274def get_type_name(type_):
275 name = (getattr(type_, '__name__', None) or getattr(type_, '_name', None) or
276 getattr(type_, '__forward_arg__', None))
277 if name is None:
278 origin = getattr(type_, '__origin__', None)
279 name = getattr(origin, '_name', None)
280 if name is None and not inspect.isclass(type_):
281 name = type_.__class__.__name__.strip('_')
283 args = getattr(type_, '__args__', ()) or getattr(type_, '__values__', ())
284 if args != getattr(type_, '__parameters__', ()):
285 if name == 'Literal':
286 formatted_args = ', '.join(str(arg) for arg in args)
287 else:
288 formatted_args = ', '.join(get_type_name(arg) for arg in args)
290 name = '{}[{}]'.format(name, formatted_args)
292 module = getattr(type_, '__module__', None)
293 if module not in (None, 'typing', 'typing_extensions', 'builtins'):
294 name = module + '.' + name
296 return name
299def find_function(frame) -> Optional[Callable]:
300 """
301 Return a function object from the garbage collector that matches the frame's code object.
303 This process is unreliable as several function objects could use the same code object.
304 Fortunately the likelihood of this happening with the combination of the function objects
305 having different type annotations is a very rare occurrence.
307 :param frame: a frame object
308 :return: a function object if one was found, ``None`` if not
310 """
311 func = _functions_map.get(frame.f_code)
312 if func is None:
313 for obj in gc.get_referrers(frame.f_code):
314 if inspect.isfunction(obj):
315 if func is None:
316 # The first match was found
317 func = obj
318 else:
319 # A second match was found
320 return None
322 # Cache the result for future lookups
323 if func is not None:
324 _functions_map[frame.f_code] = func
325 else:
326 raise LookupError('target function not found')
328 return func
331def qualified_name(obj) -> str:
332 """
333 Return the qualified name (e.g. package.module.Type) for the given object.
335 Builtins and types from the :mod:`typing` package get special treatment by having the module
336 name stripped from the generated name.
338 """
339 type_ = obj if inspect.isclass(obj) else type(obj)
340 module = type_.__module__
341 qualname = type_.__qualname__
342 return qualname if module in ('typing', 'builtins') else '{}.{}'.format(module, qualname)
345def function_name(func: Callable) -> str:
346 """
347 Return the qualified name of the given function.
349 Builtins and types from the :mod:`typing` package get special treatment by having the module
350 name stripped from the generated name.
352 """
353 # For partial functions and objects with __call__ defined, __qualname__ does not exist
354 # For functions run in `exec` with a custom namespace, __module__ can be None
355 module = getattr(func, '__module__', '') or ''
356 qualname = (module + '.') if module not in ('builtins', '') else ''
357 return qualname + getattr(func, '__qualname__', repr(func))
360def check_callable(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
361 if not callable(value):
362 raise TypeError('{} must be a callable'.format(argname))
364 if getattr(expected_type, "__args__", None):
365 try:
366 signature = inspect.signature(value)
367 except (TypeError, ValueError):
368 return
370 if hasattr(expected_type, '__result__'):
371 # Python 3.5
372 argument_types = expected_type.__args__
373 check_args = argument_types is not Ellipsis
374 else:
375 # Python 3.6
376 argument_types = expected_type.__args__[:-1]
377 check_args = argument_types != (Ellipsis,)
379 if check_args:
380 # The callable must not have keyword-only arguments without defaults
381 unfulfilled_kwonlyargs = [
382 param.name for param in signature.parameters.values() if
383 param.kind == Parameter.KEYWORD_ONLY and param.default == Parameter.empty]
384 if unfulfilled_kwonlyargs:
385 raise TypeError(
386 'callable passed as {} has mandatory keyword-only arguments in its '
387 'declaration: {}'.format(argname, ', '.join(unfulfilled_kwonlyargs)))
389 num_mandatory_args = len([
390 param.name for param in signature.parameters.values()
391 if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD) and
392 param.default is Parameter.empty])
393 has_varargs = any(param for param in signature.parameters.values()
394 if param.kind == Parameter.VAR_POSITIONAL)
396 if num_mandatory_args > len(argument_types):
397 raise TypeError(
398 'callable passed as {} has too many arguments in its declaration; expected {} '
399 'but {} argument(s) declared'.format(argname, len(argument_types),
400 num_mandatory_args))
401 elif not has_varargs and num_mandatory_args < len(argument_types):
402 raise TypeError(
403 'callable passed as {} has too few arguments in its declaration; expected {} '
404 'but {} argument(s) declared'.format(argname, len(argument_types),
405 num_mandatory_args))
408def check_dict(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
409 if not isinstance(value, dict):
410 raise TypeError('type of {} must be a dict; got {} instead'.
411 format(argname, qualified_name(value)))
413 if expected_type is not dict:
414 if (hasattr(expected_type, "__args__") and
415 expected_type.__args__ not in (None, expected_type.__parameters__)):
416 key_type, value_type = expected_type.__args__
417 if key_type is not Any or value_type is not Any:
418 for k, v in value.items():
419 check_type('keys of {}'.format(argname), k, key_type, memo)
420 check_type('{}[{!r}]'.format(argname, k), v, value_type, memo)
423def check_typed_dict(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
424 declared_keys = frozenset(expected_type.__annotations__)
425 if hasattr(expected_type, '__required_keys__'):
426 required_keys = expected_type.__required_keys__
427 else: # py3.8 and lower
428 required_keys = declared_keys if expected_type.__total__ else frozenset()
430 existing_keys = frozenset(value)
431 extra_keys = existing_keys - declared_keys
432 if extra_keys:
433 keys_formatted = ', '.join('"{}"'.format(key) for key in sorted(extra_keys))
434 raise TypeError('extra key(s) ({}) in {}'.format(keys_formatted, argname))
436 missing_keys = required_keys - existing_keys
437 if missing_keys:
438 keys_formatted = ', '.join('"{}"'.format(key) for key in sorted(missing_keys))
439 raise TypeError('required key(s) ({}) missing from {}'.format(keys_formatted, argname))
441 for key, argtype in get_type_hints(expected_type).items():
442 argvalue = value.get(key, _missing)
443 if argvalue is not _missing:
444 check_type('dict item "{}" for {}'.format(key, argname), argvalue, argtype, memo)
447def check_list(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
448 if not isinstance(value, list):
449 raise TypeError('type of {} must be a list; got {} instead'.
450 format(argname, qualified_name(value)))
452 if expected_type is not list:
453 if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
454 (None, expected_type.__parameters__):
455 value_type = expected_type.__args__[0]
456 if value_type is not Any:
457 for i, v in enumerate(value):
458 check_type('{}[{}]'.format(argname, i), v, value_type, memo)
461def check_sequence(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
462 if not isinstance(value, collections.abc.Sequence):
463 raise TypeError('type of {} must be a sequence; got {} instead'.
464 format(argname, qualified_name(value)))
466 if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
467 (None, expected_type.__parameters__):
468 value_type = expected_type.__args__[0]
469 if value_type is not Any:
470 for i, v in enumerate(value):
471 check_type('{}[{}]'.format(argname, i), v, value_type, memo)
474def check_set(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
475 if not isinstance(value, AbstractSet):
476 raise TypeError('type of {} must be a set; got {} instead'.
477 format(argname, qualified_name(value)))
479 if expected_type is not set:
480 if hasattr(expected_type, "__args__") and expected_type.__args__ not in \
481 (None, expected_type.__parameters__):
482 value_type = expected_type.__args__[0]
483 if value_type is not Any:
484 for v in value:
485 check_type('elements of {}'.format(argname), v, value_type, memo)
488def check_tuple(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
489 # Specialized check for NamedTuples
490 is_named_tuple = False
491 if sys.version_info < (3, 8, 0):
492 is_named_tuple = hasattr(expected_type, '_field_types') # deprecated since python 3.8
493 else:
494 is_named_tuple = hasattr(expected_type, '__annotations__')
496 if is_named_tuple:
497 if not isinstance(value, expected_type):
498 raise TypeError('type of {} must be a named tuple of type {}; got {} instead'.
499 format(argname, qualified_name(expected_type), qualified_name(value)))
501 if sys.version_info < (3, 8, 0):
502 field_types = expected_type._field_types
503 else:
504 field_types = expected_type.__annotations__
506 for name, field_type in field_types.items():
507 check_type('{}.{}'.format(argname, name), getattr(value, name), field_type, memo)
509 return
510 elif not isinstance(value, tuple):
511 raise TypeError('type of {} must be a tuple; got {} instead'.
512 format(argname, qualified_name(value)))
514 if getattr(expected_type, '__tuple_params__', None):
515 # Python 3.5
516 use_ellipsis = expected_type.__tuple_use_ellipsis__
517 tuple_params = expected_type.__tuple_params__
518 elif getattr(expected_type, '__args__', None):
519 # Python 3.6+
520 use_ellipsis = expected_type.__args__[-1] is Ellipsis
521 tuple_params = expected_type.__args__[:-1 if use_ellipsis else None]
522 else:
523 # Unparametrized Tuple or plain tuple
524 return
526 if use_ellipsis:
527 element_type = tuple_params[0]
528 for i, element in enumerate(value):
529 check_type('{}[{}]'.format(argname, i), element, element_type, memo)
530 elif tuple_params == ((),):
531 if value != ():
532 raise TypeError('{} is not an empty tuple but one was expected'.format(argname))
533 else:
534 if len(value) != len(tuple_params):
535 raise TypeError('{} has wrong number of elements (expected {}, got {} instead)'
536 .format(argname, len(tuple_params), len(value)))
538 for i, (element, element_type) in enumerate(zip(value, tuple_params)):
539 check_type('{}[{}]'.format(argname, i), element, element_type, memo)
542def check_union(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
543 if hasattr(expected_type, '__union_params__'):
544 # Python 3.5
545 union_params = expected_type.__union_params__
546 else:
547 # Python 3.6+
548 union_params = expected_type.__args__
550 for type_ in union_params:
551 try:
552 check_type(argname, value, type_, memo)
553 return
554 except TypeError:
555 pass
557 typelist = ', '.join(get_type_name(t) for t in union_params)
558 raise TypeError('type of {} must be one of ({}); got {} instead'.
559 format(argname, typelist, qualified_name(value)))
562def check_class(argname: str, value, expected_type, memo: _TypeCheckMemo) -> None:
563 if not isclass(value):
564 raise TypeError('type of {} must be a type; got {} instead'.format(
565 argname, qualified_name(value)))
567 # Needed on Python 3.7+
568 if expected_type is Type:
569 return
571 if getattr(expected_type, '__origin__', None) in (Type, type):
572 expected_class = expected_type.__args__[0]
573 else:
574 expected_class = expected_type
576 if expected_class is Any:
577 return
578 elif isinstance(expected_class, TypeVar):
579 check_typevar(argname, value, expected_class, memo, True)
580 elif getattr(expected_class, '__origin__', None) is Union:
581 for arg in expected_class.__args__:
582 try:
583 check_class(argname, value, arg, memo)
584 break
585 except TypeError:
586 pass
587 else:
588 formatted_args = ', '.join(get_type_name(arg) for arg in expected_class.__args__)
589 raise TypeError('{} must match one of the following: ({}); got {} instead'.format(
590 argname, formatted_args, qualified_name(value)
591 ))
592 elif not issubclass(value, expected_class):
593 raise TypeError('{} must be a subclass of {}; got {} instead'.format(
594 argname, qualified_name(expected_class), qualified_name(value)))
597def check_typevar(argname: str, value, typevar: TypeVar, memo: _TypeCheckMemo,
598 subclass_check: bool = False) -> None:
599 value_type = value if subclass_check else type(value)
600 subject = argname if subclass_check else 'type of ' + argname
602 if typevar.__bound__ is not None:
603 bound_type = resolve_forwardref(typevar.__bound__, memo)
604 if not issubclass(value_type, bound_type):
605 raise TypeError(
606 '{} must be {} or one of its subclasses; got {} instead'
607 .format(subject, qualified_name(bound_type), qualified_name(value_type)))
608 elif typevar.__constraints__:
609 constraints = [resolve_forwardref(c, memo) for c in typevar.__constraints__]
610 for constraint in constraints:
611 try:
612 check_type(argname, value, constraint, memo)
613 except TypeError:
614 pass
615 else:
616 break
617 else:
618 formatted_constraints = ', '.join(get_type_name(constraint)
619 for constraint in constraints)
620 raise TypeError('{} must match one of the constraints ({}); got {} instead'
621 .format(subject, formatted_constraints, qualified_name(value_type)))
624def check_literal(argname: str, value, expected_type, memo: _TypeCheckMemo):
625 def get_args(literal):
626 try:
627 args = literal.__args__
628 except AttributeError:
629 # Instance of Literal from typing_extensions
630 args = literal.__values__
632 retval = []
633 for arg in args:
634 if isinstance(arg, Literal.__class__) or getattr(arg, '__origin__', None) is Literal:
635 # The first check works on py3.6 and lower, the second one on py3.7+
636 retval.extend(get_args(arg))
637 elif isinstance(arg, (int, str, bytes, bool, type(None), Enum)):
638 retval.append(arg)
639 else:
640 raise TypeError('Illegal literal value: {}'.format(arg))
642 return retval
644 final_args = tuple(get_args(expected_type))
645 if value not in final_args:
646 raise TypeError('the value of {} must be one of {}; got {} instead'.
647 format(argname, final_args, value))
650def check_number(argname: str, value, expected_type):
651 if expected_type is complex and not isinstance(value, (complex, float, int)):
652 raise TypeError('type of {} must be either complex, float or int; got {} instead'.
653 format(argname, qualified_name(value.__class__)))
654 elif expected_type is float and not isinstance(value, (float, int)):
655 raise TypeError('type of {} must be either float or int; got {} instead'.
656 format(argname, qualified_name(value.__class__)))
659def check_io(argname: str, value, expected_type):
660 if expected_type is TextIO:
661 if not isinstance(value, TextIOBase):
662 raise TypeError('type of {} must be a text based I/O object; got {} instead'.
663 format(argname, qualified_name(value.__class__)))
664 elif expected_type is BinaryIO:
665 if not isinstance(value, (RawIOBase, BufferedIOBase)):
666 raise TypeError('type of {} must be a binary I/O object; got {} instead'.
667 format(argname, qualified_name(value.__class__)))
668 elif not isinstance(value, IOBase):
669 raise TypeError('type of {} must be an I/O object; got {} instead'.
670 format(argname, qualified_name(value.__class__)))
673def check_protocol(argname: str, value, expected_type):
674 # TODO: implement proper compatibility checking and support non-runtime protocols
675 if getattr(expected_type, '_is_runtime_protocol', False):
676 if not isinstance(value, expected_type):
677 raise TypeError('type of {} ({}) is not compatible with the {} protocol'.
678 format(argname, type(value).__qualname__, expected_type.__qualname__))
681# Equality checks are applied to these
682origin_type_checkers = {
683 AbstractSet: check_set,
684 Callable: check_callable,
685 collections.abc.Callable: check_callable,
686 dict: check_dict,
687 Dict: check_dict,
688 list: check_list,
689 List: check_list,
690 Sequence: check_sequence,
691 collections.abc.Sequence: check_sequence,
692 collections.abc.Set: check_set,
693 set: check_set,
694 Set: check_set,
695 tuple: check_tuple,
696 Tuple: check_tuple,
697 type: check_class,
698 Type: check_class,
699 Union: check_union
700}
701_subclass_check_unions = hasattr(Union, '__union_set_params__')
702if Literal is not None:
703 origin_type_checkers[Literal] = check_literal
705generator_origin_types = (Generator, collections.abc.Generator,
706 Iterator, collections.abc.Iterator,
707 Iterable, collections.abc.Iterable)
708asyncgen_origin_types = (AsyncIterator, collections.abc.AsyncIterator,
709 AsyncIterable, collections.abc.AsyncIterable)
710if AsyncGenerator is not None:
711 asyncgen_origin_types += (AsyncGenerator,)
712if hasattr(collections.abc, 'AsyncGenerator'):
713 asyncgen_origin_types += (collections.abc.AsyncGenerator,)
716def check_type(argname: str, value, expected_type, memo: Optional[_TypeCheckMemo] = None, *,
717 globals: Optional[Dict[str, Any]] = None,
718 locals: Optional[Dict[str, Any]] = None) -> None:
719 """
720 Ensure that ``value`` matches ``expected_type``.
722 The types from the :mod:`typing` module do not support :func:`isinstance` or :func:`issubclass`
723 so a number of type specific checks are required. This function knows which checker to call
724 for which type.
726 :param argname: name of the argument to check; used for error messages
727 :param value: value to be checked against ``expected_type``
728 :param expected_type: a class or generic type instance
729 :param globals: dictionary of global variables to use for resolving forward references
730 (defaults to the calling frame's globals)
731 :param locals: dictionary of local variables to use for resolving forward references
732 (defaults to the calling frame's locals)
733 :raises TypeError: if there is a type mismatch
735 """
736 if expected_type is Any or isinstance(value, Mock):
737 return
739 if expected_type is None:
740 # Only happens on < 3.6
741 expected_type = type(None)
743 if memo is None:
744 frame = sys._getframe(1)
745 if globals is None:
746 globals = frame.f_globals
747 if locals is None:
748 locals = frame.f_locals
750 memo = _TypeCheckMemo(globals, locals)
752 expected_type = resolve_forwardref(expected_type, memo)
753 origin_type = getattr(expected_type, '__origin__', None)
754 if origin_type is not None:
755 checker_func = origin_type_checkers.get(origin_type)
756 if checker_func:
757 checker_func(argname, value, expected_type, memo)
758 else:
759 check_type(argname, value, origin_type, memo)
760 elif isclass(expected_type):
761 if issubclass(expected_type, Tuple):
762 check_tuple(argname, value, expected_type, memo)
763 elif issubclass(expected_type, (float, complex)):
764 check_number(argname, value, expected_type)
765 elif _subclass_check_unions and issubclass(expected_type, Union):
766 check_union(argname, value, expected_type, memo)
767 elif isinstance(expected_type, TypeVar):
768 check_typevar(argname, value, expected_type, memo)
769 elif issubclass(expected_type, IO):
770 check_io(argname, value, expected_type)
771 elif is_typeddict(expected_type):
772 check_typed_dict(argname, value, expected_type, memo)
773 elif getattr(expected_type, '_is_protocol', False):
774 check_protocol(argname, value, expected_type)
775 else:
776 expected_type = (getattr(expected_type, '__extra__', None) or origin_type or
777 expected_type)
779 if expected_type is bytes:
780 # As per https://github.com/python/typing/issues/552
781 if not isinstance(value, (bytearray, bytes, memoryview)):
782 raise TypeError('type of {} must be bytes-like; got {} instead'
783 .format(argname, qualified_name(value)))
784 elif not isinstance(value, expected_type):
785 raise TypeError(
786 'type of {} must be {}; got {} instead'.
787 format(argname, qualified_name(expected_type), qualified_name(value)))
788 elif isinstance(expected_type, TypeVar):
789 # Only happens on < 3.6
790 check_typevar(argname, value, expected_type, memo)
791 elif isinstance(expected_type, Literal.__class__):
792 # Only happens on < 3.7 when using Literal from typing_extensions
793 check_literal(argname, value, expected_type, memo)
794 elif expected_type.__class__ is NewType:
795 # typing.NewType on Python 3.10+
796 return check_type(argname, value, expected_type.__supertype__, memo)
797 elif (isfunction(expected_type) and
798 getattr(expected_type, "__module__", None) == "typing" and
799 getattr(expected_type, "__qualname__", None).startswith("NewType.") and
800 hasattr(expected_type, "__supertype__")):
801 # typing.NewType on Python 3.9 and below
802 return check_type(argname, value, expected_type.__supertype__, memo)
805def check_return_type(retval, memo: Optional[_CallMemo] = None) -> bool:
806 """
807 Check that the return value is compatible with the return value annotation in the function.
809 :param retval: the value about to be returned from the call
810 :return: ``True``
811 :raises TypeError: if there is a type mismatch
813 """
814 if memo is None:
815 # faster than inspect.currentframe(), but not officially
816 # supported in all python implementations
817 frame = sys._getframe(1)
819 try:
820 func = find_function(frame)
821 except LookupError:
822 return True # This can happen with the Pydev/PyCharm debugger extension installed
824 memo = _CallMemo(func, frame.f_locals)
826 if 'return' in memo.type_hints:
827 if memo.type_hints['return'] is NoReturn:
828 raise TypeError('{}() was declared never to return but it did'.format(memo.func_name))
830 try:
831 check_type('the return value', retval, memo.type_hints['return'], memo)
832 except TypeError as exc: # suppress unnecessarily long tracebacks
833 # Allow NotImplemented if this is a binary magic method (__eq__() et al)
834 if retval is NotImplemented and memo.type_hints['return'] is bool:
835 # This does (and cannot) not check if it's actually a method
836 func_name = memo.func_name.rsplit('.', 1)[-1]
837 if len(memo.arguments) == 2 and func_name in BINARY_MAGIC_METHODS:
838 return True
840 raise TypeError(*exc.args) from None
842 return True
845def check_argument_types(memo: Optional[_CallMemo] = None) -> bool:
846 """
847 Check that the argument values match the annotated types.
849 Unless both ``args`` and ``kwargs`` are provided, the information will be retrieved from
850 the previous stack frame (ie. from the function that called this).
852 :return: ``True``
853 :raises TypeError: if there is an argument type mismatch
855 """
856 if memo is None:
857 # faster than inspect.currentframe(), but not officially
858 # supported in all python implementations
859 frame = sys._getframe(1)
861 try:
862 func = find_function(frame)
863 except LookupError:
864 return True # This can happen with the Pydev/PyCharm debugger extension installed
866 memo = _CallMemo(func, frame.f_locals)
868 for argname, expected_type in memo.type_hints.items():
869 if argname != 'return' and argname in memo.arguments:
870 value = memo.arguments[argname]
871 description = 'argument "{}"'.format(argname)
872 try:
873 check_type(description, value, expected_type, memo)
874 except TypeError as exc: # suppress unnecessarily long tracebacks
875 raise TypeError(*exc.args) from None
877 return True
880class TypeCheckedGenerator:
881 def __init__(self, wrapped: Generator, memo: _CallMemo):
882 rtype_args = []
883 if hasattr(memo.type_hints['return'], "__args__"):
884 rtype_args = memo.type_hints['return'].__args__
886 self.__wrapped = wrapped
887 self.__memo = memo
888 self.__yield_type = rtype_args[0] if rtype_args else Any
889 self.__send_type = rtype_args[1] if len(rtype_args) > 1 else Any
890 self.__return_type = rtype_args[2] if len(rtype_args) > 2 else Any
891 self.__initialized = False
893 def __iter__(self):
894 return self
896 def __next__(self):
897 return self.send(None)
899 def __getattr__(self, name: str) -> Any:
900 return getattr(self.__wrapped, name)
902 def throw(self, *args):
903 return self.__wrapped.throw(*args)
905 def close(self):
906 self.__wrapped.close()
908 def send(self, obj):
909 if self.__initialized:
910 check_type('value sent to generator', obj, self.__send_type, memo=self.__memo)
911 else:
912 self.__initialized = True
914 try:
915 value = self.__wrapped.send(obj)
916 except StopIteration as exc:
917 check_type('return value', exc.value, self.__return_type, memo=self.__memo)
918 raise
920 check_type('value yielded from generator', value, self.__yield_type, memo=self.__memo)
921 return value
924class TypeCheckedAsyncGenerator:
925 def __init__(self, wrapped: AsyncGenerator, memo: _CallMemo):
926 rtype_args = memo.type_hints['return'].__args__
927 self.__wrapped = wrapped
928 self.__memo = memo
929 self.__yield_type = rtype_args[0]
930 self.__send_type = rtype_args[1] if len(rtype_args) > 1 else Any
931 self.__initialized = False
933 def __aiter__(self):
934 return self
936 def __anext__(self):
937 return self.asend(None)
939 def __getattr__(self, name: str) -> Any:
940 return getattr(self.__wrapped, name)
942 def athrow(self, *args):
943 return self.__wrapped.athrow(*args)
945 def aclose(self):
946 return self.__wrapped.aclose()
948 async def asend(self, obj):
949 if self.__initialized:
950 check_type('value sent to generator', obj, self.__send_type, memo=self.__memo)
951 else:
952 self.__initialized = True
954 value = await self.__wrapped.asend(obj)
955 check_type('value yielded from generator', value, self.__yield_type, memo=self.__memo)
956 return value
959@overload
960def typechecked(*, always: bool = False) -> Callable[[T_CallableOrType], T_CallableOrType]:
961 ...
964@overload
965def typechecked(func: T_CallableOrType, *, always: bool = False) -> T_CallableOrType:
966 ...
969def typechecked(func=None, *, always=False, _localns: Optional[Dict[str, Any]] = None):
970 """
971 Perform runtime type checking on the arguments that are passed to the wrapped function.
973 The return value is also checked against the return annotation if any.
975 If the ``__debug__`` global variable is set to ``False``, no wrapping and therefore no type
976 checking is done, unless ``always`` is ``True``.
978 This can also be used as a class decorator. This will wrap all type annotated methods,
979 including ``@classmethod``, ``@staticmethod``, and ``@property`` decorated methods,
980 in the class with the ``@typechecked`` decorator.
982 :param func: the function or class to enable type checking for
983 :param always: ``True`` to enable type checks even in optimized mode
985 """
986 if func is None:
987 return partial(typechecked, always=always, _localns=_localns)
989 if not __debug__ and not always: # pragma: no cover
990 return func
992 if isclass(func):
993 prefix = func.__qualname__ + '.'
994 for key, attr in func.__dict__.items():
995 if inspect.isfunction(attr) or inspect.ismethod(attr) or inspect.isclass(attr):
996 if attr.__qualname__.startswith(prefix) and getattr(attr, '__annotations__', None):
997 setattr(func, key, typechecked(attr, always=always, _localns=func.__dict__))
998 elif isinstance(attr, (classmethod, staticmethod)):
999 if getattr(attr.__func__, '__annotations__', None):
1000 wrapped = typechecked(attr.__func__, always=always, _localns=func.__dict__)
1001 setattr(func, key, type(attr)(wrapped))
1002 elif isinstance(attr, property):
1003 kwargs = dict(doc=attr.__doc__)
1004 for name in ("fset", "fget", "fdel"):
1005 property_func = kwargs[name] = getattr(attr, name)
1006 if property_func is not None and getattr(property_func, '__annotations__', ()):
1007 kwargs[name] = typechecked(
1008 property_func, always=always, _localns=func.__dict__
1009 )
1011 setattr(func, key, attr.__class__(**kwargs))
1013 return func
1015 if not getattr(func, '__annotations__', None):
1016 warn('no type annotations present -- not typechecking {}'.format(function_name(func)))
1017 return func
1019 # Find the frame in which the function was declared, for resolving forward references later
1020 if _localns is None:
1021 _localns = sys._getframe(1).f_locals
1023 # Find either the first Python wrapper or the actual function
1024 python_func = inspect.unwrap(func, stop=lambda f: hasattr(f, '__code__'))
1026 if not getattr(python_func, '__code__', None):
1027 warn('no code associated -- not typechecking {}'.format(function_name(func)))
1028 return func
1030 def wrapper(*args, **kwargs):
1031 memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
1032 check_argument_types(memo)
1033 retval = func(*args, **kwargs)
1034 try:
1035 check_return_type(retval, memo)
1036 except TypeError as exc:
1037 raise TypeError(*exc.args) from None
1039 # If a generator is returned, wrap it if its yield/send/return types can be checked
1040 if inspect.isgenerator(retval) or isasyncgen(retval):
1041 return_type = memo.type_hints.get('return')
1042 if return_type:
1043 origin = getattr(return_type, '__origin__', None)
1044 if origin in generator_origin_types:
1045 return TypeCheckedGenerator(retval, memo)
1046 elif origin is not None and origin in asyncgen_origin_types:
1047 return TypeCheckedAsyncGenerator(retval, memo)
1049 return retval
1051 async def async_wrapper(*args, **kwargs):
1052 memo = _CallMemo(python_func, _localns, args=args, kwargs=kwargs)
1053 check_argument_types(memo)
1054 retval = await func(*args, **kwargs)
1055 check_return_type(retval, memo)
1056 return retval
1058 if inspect.iscoroutinefunction(func):
1059 if python_func.__code__ is not async_wrapper.__code__:
1060 return wraps(func)(async_wrapper)
1061 else:
1062 if python_func.__code__ is not wrapper.__code__:
1063 return wraps(func)(wrapper)
1065 # the target callable was already wrapped
1066 return func
1069class TypeWarning(UserWarning):
1070 """
1071 A warning that is emitted when a type check fails.
1073 :ivar str event: ``call`` or ``return``
1074 :ivar Callable func: the function in which the violation occurred (the called function if event
1075 is ``call``, or the function where a value of the wrong type was returned from if event is
1076 ``return``)
1077 :ivar str error: the error message contained by the caught :class:`TypeError`
1078 :ivar frame: the frame in which the violation occurred
1079 """
1081 __slots__ = ('func', 'event', 'message', 'frame')
1083 def __init__(self, memo: Optional[_CallMemo], event: str, frame,
1084 exception: Union[str, TypeError]): # pragma: no cover
1085 self.func = memo.func
1086 self.event = event
1087 self.error = str(exception)
1088 self.frame = frame
1090 if self.event == 'call':
1091 caller_frame = self.frame.f_back
1092 event = 'call to {}() from {}:{}'.format(
1093 function_name(self.func), caller_frame.f_code.co_filename, caller_frame.f_lineno)
1094 else:
1095 event = 'return from {}() at {}:{}'.format(
1096 function_name(self.func), self.frame.f_code.co_filename, self.frame.f_lineno)
1098 super().__init__('[{thread_name}] {event}: {self.error}'.format(
1099 thread_name=threading.current_thread().name, event=event, self=self))
1101 @property
1102 def stack(self):
1103 """Return the stack where the last frame is from the target function."""
1104 return extract_stack(self.frame)
1106 def print_stack(self, file: TextIO = None, limit: int = None) -> None:
1107 """
1108 Print the traceback from the stack frame where the target function was run.
1110 :param file: an open file to print to (prints to stdout if omitted)
1111 :param limit: the maximum number of stack frames to print
1113 """
1114 print_stack(self.frame, limit, file)
1117class TypeChecker:
1118 """
1119 A type checker that collects type violations by hooking into :func:`sys.setprofile`.
1121 :param packages: list of top level modules and packages or modules to include for type checking
1122 :param all_threads: ``True`` to check types in all threads created while the checker is
1123 running, ``False`` to only check in the current one
1124 :param forward_refs_policy: how to handle unresolvable forward references in annotations
1126 .. deprecated:: 2.6
1127 Use :func:`~.importhook.install_import_hook` instead. This class will be removed in v3.0.
1128 """
1130 def __init__(self, packages: Union[str, Sequence[str]], *, all_threads: bool = True,
1131 forward_refs_policy: ForwardRefPolicy = ForwardRefPolicy.ERROR):
1132 assert check_argument_types()
1133 warn('TypeChecker has been deprecated and will be removed in v3.0. '
1134 'Use install_import_hook() or the pytest plugin instead.', DeprecationWarning)
1135 self.all_threads = all_threads
1136 self.annotation_policy = forward_refs_policy
1137 self._call_memos = {} # type: Dict[Any, _CallMemo]
1138 self._previous_profiler = None
1139 self._previous_thread_profiler = None
1140 self._active = False
1142 if isinstance(packages, str):
1143 self._packages = (packages,)
1144 else:
1145 self._packages = tuple(packages)
1147 @property
1148 def active(self) -> bool:
1149 """Return ``True`` if currently collecting type violations."""
1150 return self._active
1152 def should_check_type(self, func: Callable) -> bool:
1153 if not func.__annotations__:
1154 # No point in checking if there are no type hints
1155 return False
1156 elif isasyncgenfunction(func):
1157 # Async generators cannot be supported because the return arg is of an opaque builtin
1158 # type (async_generator_wrapped_value)
1159 return False
1160 else:
1161 # Check types if the module matches any of the package prefixes
1162 return any(func.__module__ == package or func.__module__.startswith(package + '.')
1163 for package in self._packages)
1165 def start(self):
1166 if self._active:
1167 raise RuntimeError('type checker already running')
1169 self._active = True
1171 # Install this instance as the current profiler
1172 self._previous_profiler = sys.getprofile()
1173 sys.setprofile(self)
1175 # If requested, set this instance as the default profiler for all future threads
1176 # (does not affect existing threads)
1177 if self.all_threads:
1178 self._previous_thread_profiler = threading._profile_hook
1179 threading.setprofile(self)
1181 def stop(self):
1182 if self._active:
1183 if sys.getprofile() is self:
1184 sys.setprofile(self._previous_profiler)
1185 else: # pragma: no cover
1186 warn('the system profiling hook has changed unexpectedly')
1188 if self.all_threads:
1189 if threading._profile_hook is self:
1190 threading.setprofile(self._previous_thread_profiler)
1191 else: # pragma: no cover
1192 warn('the threading profiling hook has changed unexpectedly')
1194 self._active = False
1196 def __enter__(self):
1197 self.start()
1198 return self
1200 def __exit__(self, exc_type, exc_val, exc_tb):
1201 self.stop()
1203 def __call__(self, frame, event: str, arg) -> None: # pragma: no cover
1204 if not self._active:
1205 # This happens if all_threads was enabled and a thread was created when the checker was
1206 # running but was then stopped. The thread's profiler callback can't be reset any other
1207 # way but this.
1208 sys.setprofile(self._previous_thread_profiler)
1209 return
1211 # If an actual profiler is running, don't include the type checking times in its results
1212 if event == 'call':
1213 try:
1214 func = find_function(frame)
1215 except Exception:
1216 func = None
1218 if func is not None and self.should_check_type(func):
1219 memo = self._call_memos[frame] = _CallMemo(
1220 func, frame.f_locals, forward_refs_policy=self.annotation_policy)
1221 if memo.is_generator:
1222 return_type_hint = memo.type_hints['return']
1223 if return_type_hint is not None:
1224 origin = getattr(return_type_hint, '__origin__', None)
1225 if origin in generator_origin_types:
1226 # Check the types of the yielded values
1227 memo.type_hints['return'] = return_type_hint.__args__[0]
1228 else:
1229 try:
1230 check_argument_types(memo)
1231 except TypeError as exc:
1232 warn(TypeWarning(memo, event, frame, exc))
1234 if self._previous_profiler is not None:
1235 self._previous_profiler(frame, event, arg)
1236 elif event == 'return':
1237 if self._previous_profiler is not None:
1238 self._previous_profiler(frame, event, arg)
1240 if arg is None:
1241 # a None return value might mean an exception is being raised but we have no way of
1242 # checking
1243 return
1245 memo = self._call_memos.get(frame)
1246 if memo is not None:
1247 try:
1248 if memo.is_generator:
1249 check_type('yielded value', arg, memo.type_hints['return'], memo)
1250 else:
1251 check_return_type(arg, memo)
1252 except TypeError as exc:
1253 warn(TypeWarning(memo, event, frame, exc))
1255 if not memo.is_generator:
1256 del self._call_memos[frame]
1257 elif self._previous_profiler is not None:
1258 self._previous_profiler(frame, event, arg)