Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/concurrency.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

31 statements  

1from __future__ import annotations 

2 

3import functools 

4import sys 

5import typing 

6import warnings 

7 

8import anyio.to_thread 

9 

10if sys.version_info >= (3, 10): # pragma: no cover 

11 from typing import ParamSpec 

12else: # pragma: no cover 

13 from typing_extensions import ParamSpec 

14 

15P = ParamSpec("P") 

16T = typing.TypeVar("T") 

17 

18 

19async def run_until_first_complete(*args: tuple[typing.Callable, dict]) -> None: # type: ignore[type-arg] # noqa: E501 

20 warnings.warn( 

21 "run_until_first_complete is deprecated " 

22 "and will be removed in a future version.", 

23 DeprecationWarning, 

24 ) 

25 

26 async with anyio.create_task_group() as task_group: 

27 

28 async def run(func: typing.Callable[[], typing.Coroutine]) -> None: # type: ignore[type-arg] # noqa: E501 

29 await func() 

30 task_group.cancel_scope.cancel() 

31 

32 for func, kwargs in args: 

33 task_group.start_soon(run, functools.partial(func, **kwargs)) 

34 

35 

36async def run_in_threadpool( 

37 func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs 

38) -> T: 

39 if kwargs: # pragma: no cover 

40 # run_sync doesn't accept 'kwargs', so bind them in here 

41 func = functools.partial(func, **kwargs) 

42 return await anyio.to_thread.run_sync(func, *args) 

43 

44 

45class _StopIteration(Exception): 

46 pass 

47 

48 

49def _next(iterator: typing.Iterator[T]) -> T: 

50 # We can't raise `StopIteration` from within the threadpool iterator 

51 # and catch it outside that context, so we coerce them into a different 

52 # exception type. 

53 try: 

54 return next(iterator) 

55 except StopIteration: 

56 raise _StopIteration 

57 

58 

59async def iterate_in_threadpool( 

60 iterator: typing.Iterable[T], 

61) -> typing.AsyncIterator[T]: 

62 as_iterator = iter(iterator) 

63 while True: 

64 try: 

65 yield await anyio.to_thread.run_sync(_next, as_iterator) 

66 except _StopIteration: 

67 break