Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/starlette/concurrency.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

31 statements  

1from __future__ import annotations 

2 

3import functools 

4import sys 

5import typing 

6import warnings 

7 

8import anyio.to_thread 

9 

10if sys.version_info >= (3, 10): # pragma: no cover 

11 from typing import ParamSpec 

12else: # pragma: no cover 

13 from typing_extensions import ParamSpec 

14 

15P = ParamSpec("P") 

16T = typing.TypeVar("T") 

17 

18 

19async def run_until_first_complete(*args: tuple[typing.Callable, dict]) -> None: # type: ignore[type-arg] 

20 warnings.warn( 

21 "run_until_first_complete is deprecated and will be removed in a future version.", 

22 DeprecationWarning, 

23 ) 

24 

25 async with anyio.create_task_group() as task_group: 

26 

27 async def run(func: typing.Callable[[], typing.Coroutine]) -> None: # type: ignore[type-arg] 

28 await func() 

29 task_group.cancel_scope.cancel() 

30 

31 for func, kwargs in args: 

32 task_group.start_soon(run, functools.partial(func, **kwargs)) 

33 

34 

35async def run_in_threadpool(func: typing.Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: 

36 if kwargs: # pragma: no cover 

37 # run_sync doesn't accept 'kwargs', so bind them in here 

38 func = functools.partial(func, **kwargs) 

39 return await anyio.to_thread.run_sync(func, *args) 

40 

41 

42class _StopIteration(Exception): 

43 pass 

44 

45 

46def _next(iterator: typing.Iterator[T]) -> T: 

47 # We can't raise `StopIteration` from within the threadpool iterator 

48 # and catch it outside that context, so we coerce them into a different 

49 # exception type. 

50 try: 

51 return next(iterator) 

52 except StopIteration: 

53 raise _StopIteration 

54 

55 

56async def iterate_in_threadpool( 

57 iterator: typing.Iterable[T], 

58) -> typing.AsyncIterator[T]: 

59 as_iterator = iter(iterator) 

60 while True: 

61 try: 

62 yield await anyio.to_thread.run_sync(_next, as_iterator) 

63 except _StopIteration: 

64 break