Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_testing.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3from collections.abc import Awaitable, Generator
4from typing import Any, cast
6from ._eventloop import get_async_backend
9class TaskInfo:
10 """
11 Represents an asynchronous task.
13 :ivar int id: the unique identifier of the task
14 :ivar parent_id: the identifier of the parent task, if any
15 :vartype parent_id: Optional[int]
16 :ivar str name: the description of the task (if any)
17 :ivar ~collections.abc.Coroutine coro: the coroutine object of the task
18 """
20 __slots__ = "_name", "id", "parent_id", "name", "coro"
22 def __init__(
23 self,
24 id: int,
25 parent_id: int | None,
26 name: str | None,
27 coro: Generator[Any, Any, Any] | Awaitable[Any],
28 ):
29 func = get_current_task
30 self._name = f"{func.__module__}.{func.__qualname__}"
31 self.id: int = id
32 self.parent_id: int | None = parent_id
33 self.name: str | None = name
34 self.coro: Generator[Any, Any, Any] | Awaitable[Any] = coro
36 def __eq__(self, other: object) -> bool:
37 if isinstance(other, TaskInfo):
38 return self.id == other.id
40 return NotImplemented
42 def __hash__(self) -> int:
43 return hash(self.id)
45 def __repr__(self) -> str:
46 return f"{self.__class__.__name__}(id={self.id!r}, name={self.name!r})"
48 def has_pending_cancellation(self) -> bool:
49 """
50 Return ``True`` if the task has a cancellation pending, ``False`` otherwise.
52 """
53 return False
56def get_current_task() -> TaskInfo:
57 """
58 Return the current task.
60 :return: a representation of the current task
62 """
63 return get_async_backend().get_current_task()
66def get_running_tasks() -> list[TaskInfo]:
67 """
68 Return a list of running tasks in the current event loop.
70 :return: a list of task info objects
72 """
73 return cast("list[TaskInfo]", get_async_backend().get_running_tasks())
76async def wait_all_tasks_blocked() -> None:
77 """Wait until all other tasks are waiting for something."""
78 await get_async_backend().wait_all_tasks_blocked()