Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/starlette/concurrency.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

33 statements  

1from __future__ import annotations 

2 

3import functools 

4import warnings 

5from collections.abc import AsyncIterator, Callable, Coroutine, Iterable, Iterator 

6from typing import ParamSpec, TypeVar 

7 

8import anyio.to_thread 

9 

10P = ParamSpec("P") 

11T = TypeVar("T") 

12 

13 

14async def run_until_first_complete(*args: tuple[Callable, dict]) -> None: # type: ignore[type-arg] 

15 warnings.warn( 

16 "run_until_first_complete is deprecated and will be removed in a future version.", 

17 DeprecationWarning, 

18 ) 

19 

20 async with anyio.create_task_group() as task_group: 

21 

22 async def run(func: Callable[[], Coroutine]) -> None: # type: ignore[type-arg] 

23 await func() 

24 task_group.cancel_scope.cancel() 

25 

26 for func, kwargs in args: 

27 task_group.start_soon(run, functools.partial(func, **kwargs)) 

28 

29 

30async def run_in_threadpool(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: 

31 func = functools.partial(func, *args, **kwargs) 

32 return await anyio.to_thread.run_sync(func) 

33 

34 

35class _StopIteration(Exception): 

36 pass 

37 

38 

39def _next(iterator: Iterator[T]) -> T: 

40 # We can't raise `StopIteration` from within the threadpool iterator 

41 # and catch it outside that context, so we coerce them into a different 

42 # exception type. 

43 try: 

44 return next(iterator) 

45 except StopIteration: 

46 raise _StopIteration 

47 

48 

49async def iterate_in_threadpool( 

50 iterator: Iterable[T], 

51) -> AsyncIterator[T]: 

52 as_iterator = iter(iterator) 

53 while True: 

54 try: 

55 yield await anyio.to_thread.run_sync(_next, as_iterator) 

56 except _StopIteration: 

57 break