Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_testing.py: 48%

29 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:38 +0000

1from __future__ import annotations 

2 

3from collections.abc import Awaitable, Generator 

4from typing import Any 

5 

6from ._eventloop import get_async_backend 

7 

8 

9class TaskInfo: 

10 """ 

11 Represents an asynchronous task. 

12 

13 :ivar int id: the unique identifier of the task 

14 :ivar parent_id: the identifier of the parent task, if any 

15 :vartype parent_id: Optional[int] 

16 :ivar str name: the description of the task (if any) 

17 :ivar ~collections.abc.Coroutine coro: the coroutine object of the task 

18 """ 

19 

20 __slots__ = "_name", "id", "parent_id", "name", "coro" 

21 

22 def __init__( 

23 self, 

24 id: int, 

25 parent_id: int | None, 

26 name: str | None, 

27 coro: Generator[Any, Any, Any] | Awaitable[Any], 

28 ): 

29 func = get_current_task 

30 self._name = f"{func.__module__}.{func.__qualname__}" 

31 self.id: int = id 

32 self.parent_id: int | None = parent_id 

33 self.name: str | None = name 

34 self.coro: Generator[Any, Any, Any] | Awaitable[Any] = coro 

35 

36 def __eq__(self, other: object) -> bool: 

37 if isinstance(other, TaskInfo): 

38 return self.id == other.id 

39 

40 return NotImplemented 

41 

42 def __hash__(self) -> int: 

43 return hash(self.id) 

44 

45 def __repr__(self) -> str: 

46 return f"{self.__class__.__name__}(id={self.id!r}, name={self.name!r})" 

47 

48 def _unwrap(self) -> TaskInfo: 

49 return self 

50 

51 

52def get_current_task() -> TaskInfo: 

53 """ 

54 Return the current task. 

55 

56 :return: a representation of the current task 

57 

58 """ 

59 return get_async_backend().get_current_task() 

60 

61 

62def get_running_tasks() -> list[TaskInfo]: 

63 """ 

64 Return a list of running tasks in the current event loop. 

65 

66 :return: a list of task info objects 

67 

68 """ 

69 return get_async_backend().get_running_tasks() 

70 

71 

72async def wait_all_tasks_blocked() -> None: 

73 """Wait until all other tasks are waiting for something.""" 

74 await get_async_backend().wait_all_tasks_blocked()