Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/concurrency.py: 45%

29 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import functools 

2import sys 

3import typing 

4import warnings 

5 

6import anyio 

7 

8if sys.version_info >= (3, 10): # pragma: no cover 

9 from typing import ParamSpec 

10else: # pragma: no cover 

11 from typing_extensions import ParamSpec 

12 

13 

14T = typing.TypeVar("T") 

15P = ParamSpec("P") 

16 

17 

18async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None: 

19 warnings.warn( 

20 "run_until_first_complete is deprecated " 

21 "and will be removed in a future version.", 

22 DeprecationWarning, 

23 ) 

24 

25 async with anyio.create_task_group() as task_group: 

26 

27 async def run(func: typing.Callable[[], typing.Coroutine]) -> None: 

28 await func() 

29 task_group.cancel_scope.cancel() 

30 

31 for func, kwargs in args: 

32 task_group.start_soon(run, functools.partial(func, **kwargs)) 

33 

34 

35async def run_in_threadpool( 

36 func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs 

37) -> T: 

38 if kwargs: # pragma: no cover 

39 # run_sync doesn't accept 'kwargs', so bind them in here 

40 func = functools.partial(func, **kwargs) 

41 return await anyio.to_thread.run_sync(func, *args) 

42 

43 

44class _StopIteration(Exception): 

45 pass 

46 

47 

48def _next(iterator: typing.Iterator[T]) -> T: 

49 # We can't raise `StopIteration` from within the threadpool iterator 

50 # and catch it outside that context, so we coerce them into a different 

51 # exception type. 

52 try: 

53 return next(iterator) 

54 except StopIteration: 

55 raise _StopIteration 

56 

57 

58async def iterate_in_threadpool( 

59 iterator: typing.Iterator[T], 

60) -> typing.AsyncIterator[T]: 

61 while True: 

62 try: 

63 yield await anyio.to_thread.run_sync(_next, iterator) 

64 except _StopIteration: 

65 break