Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpcore/_backends/auto.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

16 statements  

1from __future__ import annotations 

2 

3import typing 

4 

5from .._synchronization import current_async_library 

6from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream 

7 

8 

9class AutoBackend(AsyncNetworkBackend): 

10 async def _init_backend(self) -> None: 

11 if not (hasattr(self, "_backend")): 

12 backend = current_async_library() 

13 if backend == "trio": 

14 from .trio import TrioBackend 

15 

16 self._backend: AsyncNetworkBackend = TrioBackend() 

17 else: 

18 from .anyio import AnyIOBackend 

19 

20 self._backend = AnyIOBackend() 

21 

22 async def connect_tcp( 

23 self, 

24 host: str, 

25 port: int, 

26 timeout: float | None = None, 

27 local_address: str | None = None, 

28 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

29 ) -> AsyncNetworkStream: 

30 await self._init_backend() 

31 return await self._backend.connect_tcp( 

32 host, 

33 port, 

34 timeout=timeout, 

35 local_address=local_address, 

36 socket_options=socket_options, 

37 ) 

38 

39 async def connect_unix_socket( 

40 self, 

41 path: str, 

42 timeout: float | None = None, 

43 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

44 ) -> AsyncNetworkStream: # pragma: nocover 

45 await self._init_backend() 

46 return await self._backend.connect_unix_socket( 

47 path, timeout=timeout, socket_options=socket_options 

48 ) 

49 

50 async def sleep(self, seconds: float) -> None: # pragma: nocover 

51 await self._init_backend() 

52 return await self._backend.sleep(seconds)