Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/jinja2/async_utils.py: 58%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

57 statements  

1import inspect 

2import typing as t 

3from functools import WRAPPER_ASSIGNMENTS 

4from functools import wraps 

5 

6from .utils import _PassArg 

7from .utils import pass_eval_context 

8 

9if t.TYPE_CHECKING: 

10 import typing_extensions as te 

11 

12V = t.TypeVar("V") 

13 

14 

15def async_variant(normal_func): # type: ignore 

16 def decorator(async_func): # type: ignore 

17 pass_arg = _PassArg.from_obj(normal_func) 

18 need_eval_context = pass_arg is None 

19 

20 if pass_arg is _PassArg.environment: 

21 

22 def is_async(args: t.Any) -> bool: 

23 return t.cast(bool, args[0].is_async) 

24 

25 else: 

26 

27 def is_async(args: t.Any) -> bool: 

28 return t.cast(bool, args[0].environment.is_async) 

29 

30 # Take the doc and annotations from the sync function, but the 

31 # name from the async function. Pallets-Sphinx-Themes 

32 # build_function_directive expects __wrapped__ to point to the 

33 # sync function. 

34 async_func_attrs = ("__module__", "__name__", "__qualname__") 

35 normal_func_attrs = tuple(set(WRAPPER_ASSIGNMENTS).difference(async_func_attrs)) 

36 

37 @wraps(normal_func, assigned=normal_func_attrs) 

38 @wraps(async_func, assigned=async_func_attrs, updated=()) 

39 def wrapper(*args, **kwargs): # type: ignore 

40 b = is_async(args) 

41 

42 if need_eval_context: 

43 args = args[1:] 

44 

45 if b: 

46 return async_func(*args, **kwargs) 

47 

48 return normal_func(*args, **kwargs) 

49 

50 if need_eval_context: 

51 wrapper = pass_eval_context(wrapper) 

52 

53 wrapper.jinja_async_variant = True # type: ignore[attr-defined] 

54 return wrapper 

55 

56 return decorator 

57 

58 

59_common_primitives = {int, float, bool, str, list, dict, tuple, type(None)} 

60 

61 

62async def auto_await(value: t.Union[t.Awaitable["V"], "V"]) -> "V": 

63 # Avoid a costly call to isawaitable 

64 if type(value) in _common_primitives: 

65 return t.cast("V", value) 

66 

67 if inspect.isawaitable(value): 

68 return await t.cast("t.Awaitable[V]", value) 

69 

70 return value 

71 

72 

73class _IteratorToAsyncIterator(t.Generic[V]): 

74 def __init__(self, iterator: "t.Iterator[V]"): 

75 self._iterator = iterator 

76 

77 def __aiter__(self) -> "te.Self": 

78 return self 

79 

80 async def __anext__(self) -> V: 

81 try: 

82 return next(self._iterator) 

83 except StopIteration as e: 

84 raise StopAsyncIteration(e.value) from e 

85 

86 

87def auto_aiter( 

88 iterable: "t.Union[t.AsyncIterable[V], t.Iterable[V]]", 

89) -> "t.AsyncIterator[V]": 

90 if hasattr(iterable, "__aiter__"): 

91 return iterable.__aiter__() 

92 else: 

93 return _IteratorToAsyncIterator(iter(iterable)) 

94 

95 

96async def auto_to_list( 

97 value: "t.Union[t.AsyncIterable[V], t.Iterable[V]]", 

98) -> t.List["V"]: 

99 return [x async for x in auto_aiter(value)]