1from __future__ import annotations
2
3import functools
4import sys
5import warnings
6from collections.abc import AsyncIterator, Coroutine, Iterable, Iterator
7from typing import Callable, TypeVar
8
9import anyio.to_thread
10
11if sys.version_info >= (3, 10): # pragma: no cover
12 from typing import ParamSpec
13else: # pragma: no cover
14 from typing_extensions import ParamSpec
15
16P = ParamSpec("P")
17T = TypeVar("T")
18
19
20async def run_until_first_complete(*args: tuple[Callable, dict]) -> None: # type: ignore[type-arg]
21 warnings.warn(
22 "run_until_first_complete is deprecated and will be removed in a future version.",
23 DeprecationWarning,
24 )
25
26 async with anyio.create_task_group() as task_group:
27
28 async def run(func: Callable[[], Coroutine]) -> None: # type: ignore[type-arg]
29 await func()
30 task_group.cancel_scope.cancel()
31
32 for func, kwargs in args:
33 task_group.start_soon(run, functools.partial(func, **kwargs))
34
35
36async def run_in_threadpool(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
37 func = functools.partial(func, *args, **kwargs)
38 return await anyio.to_thread.run_sync(func)
39
40
41class _StopIteration(Exception):
42 pass
43
44
45def _next(iterator: Iterator[T]) -> T:
46 # We can't raise `StopIteration` from within the threadpool iterator
47 # and catch it outside that context, so we coerce them into a different
48 # exception type.
49 try:
50 return next(iterator)
51 except StopIteration:
52 raise _StopIteration
53
54
55async def iterate_in_threadpool(
56 iterator: Iterable[T],
57) -> AsyncIterator[T]:
58 as_iterator = iter(iterator)
59 while True:
60 try:
61 yield await anyio.to_thread.run_sync(_next, as_iterator)
62 except _StopIteration:
63 break